From c25092892dfc55c86784f77222d2bd96e40bd2d1 Mon Sep 17 00:00:00 2001 From: Steven Niemitz Date: Fri, 8 Mar 2024 15:55:58 -0500 Subject: [PATCH] feat: Add aggregate support to the bigtable emulator (#9530) Change-Id: I3348aea3101eb87bab43c219a5af59b654491486 --- bigtable/admin.go | 62 +++++++++++++++- bigtable/bigtable.go | 15 ++++ bigtable/bigtable_test.go | 63 +++++++++++++++++ bigtable/bttest/inmem.go | 102 +++++++++++++++++++++------ bigtable/bttest/inmem_test.go | 90 ++++++++++++++++++++++++ bigtable/go.mod | 4 +- bigtable/go.sum | 8 +-- bigtable/type.go | 129 ++++++++++++++++++++++++++++++++++ bigtable/type_test.go | 87 +++++++++++++++++++++++ 9 files changed, 528 insertions(+), 32 deletions(-) create mode 100644 bigtable/type.go create mode 100644 bigtable/type_test.go diff --git a/bigtable/admin.go b/bigtable/admin.go index b988b7075090..b4315d5455a7 100644 --- a/bigtable/admin.go +++ b/bigtable/admin.go @@ -234,12 +234,23 @@ const ( Unprotected ) -// TableConf contains all of the information necessary to create a table with column families. +// Family represents a column family with its optional GC policy and value type. +type Family struct { + GCPolicy GCPolicy + ValueType Type +} + +// TableConf contains all the information necessary to create a table with column families. type TableConf struct { TableID string SplitKeys []string - // Families is a map from family name to GCPolicy + // DEPRECATED: Use ColumnFamilies instead. + // Families is a map from family name to GCPolicy. + // Only one of Families or ColumnFamilies may be set. Families map[string]GCPolicy + // ColumnFamilies is a map from family name to family configuration. + // Only one of Families or ColumnFamilies may be set. + ColumnFamilies map[string]Family // DeletionProtection can be none, protected or unprotected // set to protected to make the table protected against data loss DeletionProtection DeletionProtection @@ -283,7 +294,28 @@ func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf) tbl.ChangeStreamConfig = &btapb.ChangeStreamConfig{} tbl.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.ChangeStreamRetention.(time.Duration)) } - if conf.Families != nil { + if conf.Families != nil && conf.ColumnFamilies != nil { + return errors.New("only one of Families or ColumnFamilies may be set, not both") + } + + if conf.ColumnFamilies != nil { + tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily) + for fam, config := range conf.ColumnFamilies { + var gcPolicy *btapb.GcRule + if config.GCPolicy != nil { + gcPolicy = config.GCPolicy.proto() + } else { + gcPolicy = &btapb.GcRule{} + } + + var typeProto *btapb.Type = nil + if config.ValueType != nil { + typeProto = config.ValueType.proto() + } + + tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: gcPolicy, ValueType: typeProto} + } + } else if conf.Families != nil { tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily) for fam, policy := range conf.Families { tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: policy.proto()} @@ -316,6 +348,30 @@ func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family str return err } +// CreateColumnFamilyWithConfig creates a new column family in a table with an optional GC policy and value type. +func (ac *AdminClient) CreateColumnFamilyWithConfig(ctx context.Context, table, family string, config Family) error { + ctx = mergeOutgoingMetadata(ctx, ac.md) + prefix := ac.instancePrefix() + + cf := &btapb.ColumnFamily{} + if config.GCPolicy != nil { + cf.GcRule = config.GCPolicy.proto() + } + if config.ValueType != nil { + cf.ValueType = config.ValueType.proto() + } + + req := &btapb.ModifyColumnFamiliesRequest{ + Name: prefix + "/tables/" + table, + Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ + Id: family, + Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: cf}, + }}, + } + _, err := ac.tClient.ModifyColumnFamilies(ctx, req) + return err +} + // UpdateTableConf contains all of the information necessary to update a table with column families. type UpdateTableConf struct { tableID string diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 147d8f36e9c5..fc39e8f43c3a 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -960,6 +960,21 @@ func (m *Mutation) DeleteRow() { m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromRow_{DeleteFromRow: &btpb.Mutation_DeleteFromRow{}}}) } +// AddIntToCell adds an int64 value to a cell in an aggregate column family. The column family must +// have an input type of Int64 or this mutation will fail. +func (m *Mutation) AddIntToCell(family, column string, ts Timestamp, value int64) { + m.addToCell(family, column, ts, &btpb.Value{Kind: &btpb.Value_IntValue{IntValue: value}}) +} + +func (m *Mutation) addToCell(family, column string, ts Timestamp, value *btpb.Value) { + m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_AddToCell_{AddToCell: &btpb.Mutation_AddToCell{ + FamilyName: family, + ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte(column)}}, + Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: int64(ts.TruncateToMilliseconds())}}, + Input: value, + }}}) +} + // entryErr is a container that combines an entry with the error that was returned for it. // Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed. type entryErr struct { diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index 0bea28e72eef..0f63b6346f16 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -17,7 +17,9 @@ limitations under the License. package bigtable import ( + "bytes" "context" + "encoding/binary" "reflect" "testing" "time" @@ -748,3 +750,64 @@ func TestHeaderPopulatedWithAppProfile(t *testing.T) { t.Errorf("Incorrect value in resourcePrefixHeader. Got %s, want %s", got, want) } } + +func TestMutateRowsWithAggregates(t *testing.T) { + testEnv, err := NewEmulatedEnv(IntegrationTestConfig{}) + if err != nil { + t.Fatalf("NewEmulatedEnv failed: %v", err) + } + conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)), + ) + if err != nil { + t.Fatalf("grpc.Dial failed: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + if err != nil { + t.Fatalf("NewClient failed: %v", err) + } + defer adminClient.Close() + + tableConf := &TableConf{ + TableID: testEnv.config.Table, + ColumnFamilies: map[string]Family{ + "f": { + ValueType: AggregateType{ + Input: Int64Type{}, + Aggregator: SumAggregator{}, + }, + }, + }, + } + if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil { + t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err) + } + + client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + if err != nil { + t.Fatalf("NewClient failed: %v", err) + } + defer client.Close() + table := client.Open(testEnv.config.Table) + + m := NewMutation() + m.AddIntToCell("f", "q", 0, 1000) + err = table.Apply(ctx, "row1", m) + if err != nil { + t.Fatalf("Apply failed: %v", err) + } + + m = NewMutation() + m.AddIntToCell("f", "q", 0, 2000) + err = table.Apply(ctx, "row1", m) + if err != nil { + t.Fatalf("Apply failed: %v", err) + } + + row, err := table.ReadRow(ctx, "row1") + if !bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000)) { + t.Error() + } +} diff --git a/bigtable/bttest/inmem.go b/bigtable/bttest/inmem.go index 4114901d8e3b..556abc2a8555 100644 --- a/bigtable/bttest/inmem.go +++ b/bigtable/bttest/inmem.go @@ -299,11 +299,7 @@ func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColu if _, ok := tbl.families[mod.Id]; ok { return nil, status.Errorf(codes.AlreadyExists, "family %q already exists", mod.Id) } - newcf := &columnFamily{ - name: req.Name + "/columnFamilies/" + mod.Id, - order: tbl.counter, - gcRule: create.GcRule, - } + newcf := newColumnFamily(req.Name+"/columnFamilies/"+mod.Id, tbl.counter, create) tbl.counter++ tbl.families[mod.Id] = newcf } else if mod.GetDrop() { @@ -324,10 +320,7 @@ func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColu if _, ok := tbl.families[mod.Id]; !ok { return nil, fmt.Errorf("no such family %q", mod.Id) } - newcf := &columnFamily{ - name: req.Name + "/columnFamilies/" + mod.Id, - gcRule: modify.GcRule, - } + newcf := newColumnFamily(req.Name+"/columnFamilies/"+mod.Id, 0, modify) // assume that we ALWAYS want to replace by the new setting // we may need partial update through tbl.families[mod.Id] = newcf @@ -1085,7 +1078,8 @@ func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*co return fmt.Errorf("can't handle mutation type %T", mut) case *btpb.Mutation_SetCell_: set := mut.SetCell - if _, ok := fs[set.FamilyName]; !ok { + var cf, ok = fs[set.FamilyName] + if !ok { return fmt.Errorf("unknown family %q", set.FamilyName) } ts := set.TimestampMicros @@ -1100,7 +1094,36 @@ func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*co newCell := cell{ts: ts, value: set.Value} f := r.getOrCreateFamily(fam, fs[fam].order) - f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell) + f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf) + case *btpb.Mutation_AddToCell_: + add := mut.AddToCell + var cf, ok = fs[add.FamilyName] + if !ok { + return fmt.Errorf("unknown family %q", add.FamilyName) + } + if cf.valueType == nil || cf.valueType.GetAggregateType() == nil { + return fmt.Errorf("illegal attempt to use AddToCell on non-aggregate cell") + } + ts := add.Timestamp.GetRawTimestampMicros() + if ts < 0 { + return fmt.Errorf("AddToCell must set timestamp >= 0") + } + + fam := add.FamilyName + col := string(add.GetColumnQualifier().GetRawValue()) + + var value []byte + switch v := add.Input.Kind.(type) { + case *btpb.Value_IntValue: + value = binary.BigEndian.AppendUint64(value, uint64(v.IntValue)) + default: + return fmt.Errorf("only int64 values are supported") + } + + newCell := cell{ts: ts, value: value} + f := r.getOrCreateFamily(fam, fs[fam].order) + f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf) + case *btpb.Mutation_DeleteFromColumn_: del := mut.DeleteFromColumn if _, ok := fs[del.FamilyName]; !ok { @@ -1176,16 +1199,18 @@ func newTimestamp() int64 { return ts } -func appendOrReplaceCell(cs []cell, newCell cell) []cell { +func appendOrReplaceCell(cs []cell, newCell cell, cf *columnFamily) []cell { replaced := false for i, cell := range cs { if cell.ts == newCell.ts { + newCell.value = cf.updateFn(cs[i].value, newCell.value) cs[i] = newCell replaced = true break } } if !replaced { + newCell.value = cf.initFn(newCell.value) cs = append(cs, newCell) } sort.Sort(byDescTS(cs)) @@ -1212,7 +1237,8 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri // Assume all mutations apply to the most recent version of the cell. // TODO(dsymonds): Verify this assumption and document it in the proto. for _, rule := range req.Rules { - if _, ok := fs[rule.FamilyName]; !ok { + var cf, ok = fs[rule.FamilyName] + if !ok { return nil, fmt.Errorf("unknown family %q", rule.FamilyName) } @@ -1255,7 +1281,7 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri } // Store the new cell - f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell) + f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf) // Store a copy for the result row resultFamily := resultRow.getOrCreateFamily(fam, fs[fam].order) @@ -1378,11 +1404,7 @@ func newTable(ctr *btapb.CreateTableRequest) *table { c := uint64(0) if ctr.Table != nil { for id, cf := range ctr.Table.ColumnFamilies { - fams[id] = &columnFamily{ - name: ctr.Parent + "/columnFamilies/" + id, - order: c, - gcRule: cf.GcRule, - } + fams[id] = newColumnFamily(ctr.Parent+"/columnFamilies/"+id, c, cf) c++ } } @@ -1681,15 +1703,49 @@ func (b byDescTS) Len() int { return len(b) } func (b byDescTS) Swap(i, j int) { b[i], b[j] = b[j], b[i] } func (b byDescTS) Less(i, j int) bool { return b[i].ts > b[j].ts } +func newColumnFamily(name string, order uint64, cf *btapb.ColumnFamily) *columnFamily { + var updateFn = func(_, newVal []byte) []byte { + return newVal + } + if cf.ValueType != nil { + switch v := cf.ValueType.Kind.(type) { + case *btapb.Type_AggregateType: + switch v.AggregateType.Aggregator.(type) { + case *btapb.Type_Aggregate_Sum_: + updateFn = func(existing, newVal []byte) []byte { + existingInt := int64(binary.BigEndian.Uint64(existing)) + newInt := int64(binary.BigEndian.Uint64(newVal)) + return binary.BigEndian.AppendUint64([]byte{}, uint64(existingInt+newInt)) + } + } + default: + } + } + return &columnFamily{ + name: name, + order: order, + gcRule: cf.GcRule, + valueType: cf.ValueType, + updateFn: updateFn, + initFn: func(newVal []byte) []byte { + return newVal + }, + } +} + type columnFamily struct { - name string - order uint64 // Creation order of column family - gcRule *btapb.GcRule + name string + order uint64 // Creation order of column family + gcRule *btapb.GcRule + valueType *btapb.Type + updateFn func(existing, newVal []byte) []byte + initFn func(newVal []byte) []byte } func (c *columnFamily) proto() *btapb.ColumnFamily { return &btapb.ColumnFamily{ - GcRule: c.gcRule, + GcRule: c.gcRule, + ValueType: c.valueType, } } diff --git a/bigtable/bttest/inmem_test.go b/bigtable/bttest/inmem_test.go index 1072d7e91844..671299b8855c 100644 --- a/bigtable/bttest/inmem_test.go +++ b/bigtable/bttest/inmem_test.go @@ -17,6 +17,7 @@ package bttest import ( "bytes" "context" + "encoding/binary" "fmt" "math" "math/rand" @@ -1792,6 +1793,95 @@ func TestFilters(t *testing.T) { } } +func TestMutateRowsAggregate(t *testing.T) { + ctx := context.Background() + + s := &server{ + tables: make(map[string]*table), + } + + tblInfo, err := populateTable(ctx, s) + if err != nil { + t.Fatal(err) + } + + _, err = s.ModifyColumnFamilies(ctx, &btapb.ModifyColumnFamiliesRequest{ + Name: tblInfo.Name, + Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ + Id: "sum", + Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{ + ValueType: &btapb.Type{ + Kind: &btapb.Type_AggregateType{ + AggregateType: &btapb.Type_Aggregate{ + InputType: &btapb.Type{ + Kind: &btapb.Type_Int64Type{}, + }, + Aggregator: &btapb.Type_Aggregate_Sum_{ + Sum: &btapb.Type_Aggregate_Sum{}, + }, + }, + }, + }, + }, + }}, + }}) + + if err != nil { + t.Fatal(err) + } + + _, err = s.MutateRow(ctx, &btpb.MutateRowRequest{ + TableName: tblInfo.GetName(), + RowKey: []byte("row1"), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_AddToCell_{AddToCell: &btpb.Mutation_AddToCell{ + FamilyName: "sum", + ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte("col1")}}, + Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: 0}}, + Input: &btpb.Value{Kind: &btpb.Value_IntValue{IntValue: 1}}, + }}, + }}, + }) + + if err != nil { + t.Fatal(err) + } + + _, err = s.MutateRow(ctx, &btpb.MutateRowRequest{ + TableName: tblInfo.GetName(), + RowKey: []byte("row1"), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_AddToCell_{AddToCell: &btpb.Mutation_AddToCell{ + FamilyName: "sum", + ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte("col1")}}, + Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: 0}}, + Input: &btpb.Value{Kind: &btpb.Value_IntValue{IntValue: 2}}, + }}, + }}, + }) + + if err != nil { + t.Fatal(err) + } + + mock := &MockReadRowsServer{} + err = s.ReadRows(&btpb.ReadRowsRequest{ + TableName: tblInfo.GetName(), + Rows: &btpb.RowSet{ + RowKeys: [][]byte{ + []byte("row1"), + }, + }}, mock) + if err != nil { + t.Fatal(err) + } + got := mock.responses[0] + + if !bytes.Equal(got.Chunks[0].Value, binary.BigEndian.AppendUint64([]byte{}, 3)) { + t.Error() + } +} + func Test_Mutation_DeleteFromColumn(t *testing.T) { ctx := context.Background() diff --git a/bigtable/go.mod b/bigtable/go.mod index 4f26dab7091c..e8416bd3ce8d 100644 --- a/bigtable/go.mod +++ b/bigtable/go.mod @@ -11,7 +11,7 @@ require ( github.com/googleapis/cloud-bigtable-clients-test v0.0.2 github.com/googleapis/gax-go/v2 v2.12.2 google.golang.org/api v0.167.0 - google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 + google.golang.org/genproto v0.0.0-20240308144416-29370a3891b7 google.golang.org/genproto/googleapis/rpc v0.0.0-20240304161311-37d4d3c04a78 google.golang.org/grpc v1.62.0 google.golang.org/protobuf v1.32.0 @@ -50,5 +50,5 @@ require ( golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/appengine v1.6.8 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240221002015-b0ce06bbee7c // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240228224816-df926f6c8641 // indirect ) diff --git a/bigtable/go.sum b/bigtable/go.sum index 287c212f3737..c4808006d32a 100644 --- a/bigtable/go.sum +++ b/bigtable/go.sum @@ -173,10 +173,10 @@ google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y= -google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s= -google.golang.org/genproto/googleapis/api v0.0.0-20240221002015-b0ce06bbee7c h1:9g7erC9qu44ks7UK4gDNlnk4kOxZG707xKm4jVniy6o= -google.golang.org/genproto/googleapis/api v0.0.0-20240221002015-b0ce06bbee7c/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8= +google.golang.org/genproto v0.0.0-20240308144416-29370a3891b7 h1:5cmXPmmYZddhZs05mvqVzGwPsoE/uq+1YBCeRmBDyMo= +google.golang.org/genproto v0.0.0-20240308144416-29370a3891b7/go.mod h1:yA7a1bW1kwl459Ol0m0lV4hLTfrL/7Bkk4Mj2Ir1mWI= +google.golang.org/genproto/googleapis/api v0.0.0-20240228224816-df926f6c8641 h1:SO1wX9btGFrwj9EzH3ocqfwiPVOxfv4ggAJajzlHA5s= +google.golang.org/genproto/googleapis/api v0.0.0-20240228224816-df926f6c8641/go.mod h1:wLupoVsUfYPgOMwjzhYFbaVklw/INms+dqTp0tc1fv8= google.golang.org/genproto/googleapis/rpc v0.0.0-20240304161311-37d4d3c04a78 h1:Xs9lu+tLXxLIfuci70nG4cpwaRC+mRQPUL7LoIeDJC4= google.golang.org/genproto/googleapis/rpc v0.0.0-20240304161311-37d4d3c04a78/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/bigtable/type.go b/bigtable/type.go new file mode 100644 index 000000000000..a390e8fd44b2 --- /dev/null +++ b/bigtable/type.go @@ -0,0 +1,129 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bigtable + +import btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" + +// Type wraps the protobuf representation of a type. See the protobuf definition +// for more details on types. +type Type interface { + proto() *btapb.Type +} + +// BytesEncoding represents the encoding of a Bytes type. +type BytesEncoding interface { + proto() *btapb.Type_Bytes_Encoding +} + +// RawBytesEncoding represents a Bytes encoding with no additional encodings. +type RawBytesEncoding struct { +} + +func (encoding RawBytesEncoding) proto() *btapb.Type_Bytes_Encoding { + return &btapb.Type_Bytes_Encoding{ + Encoding: &btapb.Type_Bytes_Encoding_Raw_{ + Raw: &btapb.Type_Bytes_Encoding_Raw{}}} +} + +// BytesType represents a string of bytes. +type BytesType struct { + Encoding BytesEncoding +} + +func (bytes BytesType) proto() *btapb.Type { + var encoding *btapb.Type_Bytes_Encoding + if bytes.Encoding != nil { + encoding = bytes.Encoding.proto() + } else { + encoding = RawBytesEncoding{}.proto() + } + return &btapb.Type{Kind: &btapb.Type_BytesType{BytesType: &btapb.Type_Bytes{Encoding: encoding}}} +} + +// Int64Encoding represents the encoding of an Int64 type. +type Int64Encoding interface { + proto() *btapb.Type_Int64_Encoding +} + +// BigEndianBytesEncoding represents an Int64 encoding where the value is encoded +// as an 8-byte big-endian value. The byte representation may also have further encoding +// via Bytes. +type BigEndianBytesEncoding struct { + Bytes BytesType +} + +func (beb BigEndianBytesEncoding) proto() *btapb.Type_Int64_Encoding { + return &btapb.Type_Int64_Encoding{ + Encoding: &btapb.Type_Int64_Encoding_BigEndianBytes_{ + BigEndianBytes: &btapb.Type_Int64_Encoding_BigEndianBytes{ + BytesType: beb.Bytes.proto().GetBytesType(), + }, + }, + } +} + +// Int64Type represents an 8-byte integer. +type Int64Type struct { + Encoding Int64Encoding +} + +func (it Int64Type) proto() *btapb.Type { + var encoding *btapb.Type_Int64_Encoding + if it.Encoding != nil { + encoding = it.Encoding.proto() + } else { + // default encoding to BigEndianBytes + encoding = BigEndianBytesEncoding{}.proto() + } + + return &btapb.Type{ + Kind: &btapb.Type_Int64Type{ + Int64Type: &btapb.Type_Int64{ + Encoding: encoding, + }, + }, + } +} + +// Aggregator represents an aggregation function for an aggregate type. +type Aggregator interface { + fillProto(proto *btapb.Type_Aggregate) +} + +// SumAggregator is an aggregation function that sums inputs together into its +// accumulator. +type SumAggregator struct{} + +func (sum SumAggregator) fillProto(proto *btapb.Type_Aggregate) { + proto.Aggregator = &btapb.Type_Aggregate_Sum_{Sum: &btapb.Type_Aggregate_Sum{}} +} + +// AggregateType represents an aggregate. See types.proto for more details +// on aggregate types. +type AggregateType struct { + Input Type + Aggregator Aggregator +} + +func (agg AggregateType) proto() *btapb.Type { + protoAgg := &btapb.Type_Aggregate{ + InputType: agg.Input.proto(), + } + + agg.Aggregator.fillProto(protoAgg) + return &btapb.Type{Kind: &btapb.Type_AggregateType{AggregateType: protoAgg}} +} diff --git a/bigtable/type_test.go b/bigtable/type_test.go new file mode 100644 index 000000000000..4f688384ad3c --- /dev/null +++ b/bigtable/type_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bigtable + +import ( + "testing" + + btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" + "google.golang.org/protobuf/proto" +) + +func TestInt64Proto(t *testing.T) { + want := &btapb.Type{ + Kind: &btapb.Type_Int64Type{ + Int64Type: &btapb.Type_Int64{ + Encoding: &btapb.Type_Int64_Encoding{ + Encoding: &btapb.Type_Int64_Encoding_BigEndianBytes_{ + BigEndianBytes: &btapb.Type_Int64_Encoding_BigEndianBytes{ + BytesType: &btapb.Type_Bytes{ + Encoding: &btapb.Type_Bytes_Encoding{ + Encoding: &btapb.Type_Bytes_Encoding_Raw_{ + Raw: &btapb.Type_Bytes_Encoding_Raw{}, + }, + }, + }, + }, + }, + }, + }, + }, + } + + got := Int64Type{}.proto() + if !proto.Equal(got, want) { + t.Errorf("got type %v, want: %v", got, want) + } +} + +func TestAggregateProto(t *testing.T) { + want := &btapb.Type{ + Kind: &btapb.Type_AggregateType{ + AggregateType: &btapb.Type_Aggregate{ + InputType: &btapb.Type{ + Kind: &btapb.Type_Int64Type{ + Int64Type: &btapb.Type_Int64{ + Encoding: &btapb.Type_Int64_Encoding{ + Encoding: &btapb.Type_Int64_Encoding_BigEndianBytes_{ + BigEndianBytes: &btapb.Type_Int64_Encoding_BigEndianBytes{ + BytesType: &btapb.Type_Bytes{ + Encoding: &btapb.Type_Bytes_Encoding{ + Encoding: &btapb.Type_Bytes_Encoding_Raw_{ + Raw: &btapb.Type_Bytes_Encoding_Raw{}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + Aggregator: &btapb.Type_Aggregate_Sum_{ + Sum: &btapb.Type_Aggregate_Sum{}, + }, + }, + }, + } + + got := AggregateType{Input: Int64Type{}, Aggregator: SumAggregator{}}.proto() + if !proto.Equal(got, want) { + t.Errorf("got type %v, want: %v", got, want) + } +}