/
table_client.go
127 lines (103 loc) · 3.03 KB
/
table_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package gcp
import (
"context"
"time"
"google.golang.org/grpc/codes"
"cloud.google.com/go/bigtable"
"google.golang.org/grpc/status"
"github.com/pkg/errors"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
)
type tableClient struct {
cfg Config
client *bigtable.AdminClient
tableInfo map[string]*bigtable.TableInfo
tableExpiration time.Time
}
// NewTableClient returns a new TableClient.
func NewTableClient(ctx context.Context, cfg Config) (index.TableClient, error) {
dialOpts, err := cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())
if err != nil {
return nil, err
}
client, err := bigtable.NewAdminClient(ctx, cfg.Project, cfg.Instance, toOptions(dialOpts)...)
if err != nil {
return nil, err
}
return &tableClient{
cfg: cfg,
client: client,
tableInfo: map[string]*bigtable.TableInfo{},
}, nil
}
// ListTables lists all of the correctly specified cortex tables in bigtable
func (c *tableClient) ListTables(ctx context.Context) ([]string, error) {
tables, err := c.client.Tables(ctx)
if err != nil {
return nil, errors.Wrap(err, "client.Tables")
}
if c.tableExpiration.Before(time.Now()) {
c.tableInfo = map[string]*bigtable.TableInfo{}
c.tableExpiration = time.Now().Add(c.cfg.TableCacheExpiration)
}
output := make([]string, 0, len(tables))
for _, table := range tables {
info, exists := c.tableInfo[table]
if !c.cfg.TableCacheEnabled || !exists {
info, err = c.client.TableInfo(ctx, table)
if err != nil {
return nil, errors.Wrap(err, "client.TableInfo")
}
}
// Check each table has the right column family. If not, omit it.
if hasColumnFamily(info.FamilyInfos) {
output = append(output, table)
c.tableInfo[table] = info
}
}
return output, nil
}
func hasColumnFamily(infos []bigtable.FamilyInfo) bool {
for _, family := range infos {
if family.Name == columnFamily {
return true
}
}
return false
}
func (c *tableClient) CreateTable(ctx context.Context, desc config.TableDesc) error {
if err := c.client.CreateTable(ctx, desc.Name); err != nil {
if !alreadyExistsError(err) {
return errors.Wrap(err, "client.CreateTable")
}
}
if err := c.client.CreateColumnFamily(ctx, desc.Name, columnFamily); err != nil {
if !alreadyExistsError(err) {
return errors.Wrap(err, "client.CreateColumnFamily")
}
}
return nil
}
func alreadyExistsError(err error) bool {
serr, ok := status.FromError(err)
return ok && serr.Code() == codes.AlreadyExists
}
func (c *tableClient) DeleteTable(ctx context.Context, name string) error {
if err := c.client.DeleteTable(ctx, name); err != nil {
return errors.Wrap(err, "client.DeleteTable")
}
delete(c.tableInfo, name)
return nil
}
func (c *tableClient) DescribeTable(_ context.Context, name string) (desc config.TableDesc, isActive bool, err error) {
return config.TableDesc{
Name: name,
}, true, nil
}
func (c *tableClient) UpdateTable(_ context.Context, _, _ config.TableDesc) error {
return nil
}
func (c *tableClient) Stop() {
c.client.Close()
}