forked from grafana/loki
/
table_client.go
126 lines (102 loc) · 2.98 KB
/
table_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package gcp
import (
"context"
"time"
"google.golang.org/grpc/codes"
"cloud.google.com/go/bigtable"
"google.golang.org/grpc/status"
"github.com/pkg/errors"
"github.com/frelon/loki/v2/pkg/storage/chunk"
)
type tableClient struct {
cfg Config
client *bigtable.AdminClient
tableInfo map[string]*bigtable.TableInfo
tableExpiration time.Time
}
// NewTableClient returns a new TableClient.
func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) {
dialOpts, err := cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())
if err != nil {
return nil, err
}
client, err := bigtable.NewAdminClient(ctx, cfg.Project, cfg.Instance, toOptions(dialOpts)...)
if err != nil {
return nil, err
}
return &tableClient{
cfg: cfg,
client: client,
tableInfo: map[string]*bigtable.TableInfo{},
}, nil
}
// ListTables lists all of the correctly specified cortex tables in bigtable
func (c *tableClient) ListTables(ctx context.Context) ([]string, error) {
tables, err := c.client.Tables(ctx)
if err != nil {
return nil, errors.Wrap(err, "client.Tables")
}
if c.tableExpiration.Before(time.Now()) {
c.tableInfo = map[string]*bigtable.TableInfo{}
c.tableExpiration = time.Now().Add(c.cfg.TableCacheExpiration)
}
output := make([]string, 0, len(tables))
for _, table := range tables {
info, exists := c.tableInfo[table]
if !c.cfg.TableCacheEnabled || !exists {
info, err = c.client.TableInfo(ctx, table)
if err != nil {
return nil, errors.Wrap(err, "client.TableInfo")
}
}
// Check each table has the right column family. If not, omit it.
if hasColumnFamily(info.FamilyInfos) {
output = append(output, table)
c.tableInfo[table] = info
}
}
return output, nil
}
func hasColumnFamily(infos []bigtable.FamilyInfo) bool {
for _, family := range infos {
if family.Name == columnFamily {
return true
}
}
return false
}
func (c *tableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) error {
if err := c.client.CreateTable(ctx, desc.Name); err != nil {
if !alreadyExistsError(err) {
return errors.Wrap(err, "client.CreateTable")
}
}
if err := c.client.CreateColumnFamily(ctx, desc.Name, columnFamily); err != nil {
if !alreadyExistsError(err) {
return errors.Wrap(err, "client.CreateColumnFamily")
}
}
return nil
}
func alreadyExistsError(err error) bool {
serr, ok := status.FromError(err)
return ok && serr.Code() == codes.AlreadyExists
}
func (c *tableClient) DeleteTable(ctx context.Context, name string) error {
if err := c.client.DeleteTable(ctx, name); err != nil {
return errors.Wrap(err, "client.DeleteTable")
}
delete(c.tableInfo, name)
return nil
}
func (c *tableClient) DescribeTable(ctx context.Context, name string) (desc chunk.TableDesc, isActive bool, err error) {
return chunk.TableDesc{
Name: name,
}, true, nil
}
func (c *tableClient) UpdateTable(ctx context.Context, current, expected chunk.TableDesc) error {
return nil
}
func (c *tableClient) Stop() {
c.client.Close()
}