Skip to content

Commit

Permalink
feat(bigtable): add change stream config to create and update table (#…
Browse files Browse the repository at this point in the history
…8180)

Co-authored-by: Mattie Fu <mattiefu@google.com>
  • Loading branch information
tonytanger and mutianf committed Jul 5, 2023
1 parent f41d56f commit 32897ce
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 26 deletions.
71 changes: 51 additions & 20 deletions bigtable/admin.go
Expand Up @@ -41,6 +41,7 @@ import (
btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/durationpb"
field_mask "google.golang.org/protobuf/types/known/fieldmaskpb"
)

Expand Down Expand Up @@ -211,6 +212,11 @@ func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) {
return names, nil
}

// ChangeStreamRetention indicates how long bigtable should retain change data.
// Minimum is 1 day. Maximum is 7. nil to not change the retention period. 0 to
// disable change stream retention.
type ChangeStreamRetention optional.Duration

// DeletionProtection indicates whether the table is protected against data loss
// i.e. when set to protected, deleting the table, the column families in the table,
// and the instance containing the table would be prohibited.
Expand All @@ -233,13 +239,14 @@ type TableConf struct {
Families map[string]GCPolicy
// DeletionProtection can be none, protected or unprotected
// set to protected to make the table protected against data loss
DeletionProtection DeletionProtection
DeletionProtection DeletionProtection
ChangeStreamRetention ChangeStreamRetention
}

// CreateTable creates a new table in the instance.
// This method may return before the table's creation is complete.
func (ac *AdminClient) CreateTable(ctx context.Context, table string) error {
return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, DeletionProtection: None})
return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, ChangeStreamRetention: nil, DeletionProtection: None})
}

// CreatePresplitTable creates a new table in the instance.
Expand All @@ -248,7 +255,7 @@ func (ac *AdminClient) CreateTable(ctx context.Context, table string) error {
// spanning the key ranges: [, s1), [s1, s2), [s2, ).
// This method may return before the table's creation is complete.
func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, splitKeys []string) error {
return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys, DeletionProtection: None})
return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys, ChangeStreamRetention: nil, DeletionProtection: None})
}

// CreateTableFromConf creates a new table in the instance from the given configuration.
Expand All @@ -269,6 +276,10 @@ func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf)
} else if conf.DeletionProtection == Unprotected {
tbl.DeletionProtection = false
}
if conf.ChangeStreamRetention != nil && conf.ChangeStreamRetention.(time.Duration) != 0 {
tbl.ChangeStreamConfig = &btapb.ChangeStreamConfig{}
tbl.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.ChangeStreamRetention.(time.Duration))
}
if conf.Families != nil {
tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily)
for fam, policy := range conf.Families {
Expand Down Expand Up @@ -307,12 +318,23 @@ type UpdateTableConf struct {
tableID string
// deletionProtection can be unset, true or false
// set to true to make the table protected against data loss
deletionProtection DeletionProtection
deletionProtection DeletionProtection
changeStreamRetention ChangeStreamRetention
}

// UpdateTableDisableChangeStream updates a table to disable change stream for table ID.
func (ac *AdminClient) UpdateTableDisableChangeStream(ctx context.Context, tableID string) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, time.Duration(0)})
}

// UpdateTableWithChangeStream updates a table to with the given table ID and change stream config.
func (ac *AdminClient) UpdateTableWithChangeStream(ctx context.Context, tableID string, changeStreamRetention ChangeStreamRetention) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, changeStreamRetention})
}

// UpdateTableWithDeletionProtection updates a table with the given table ID and deletion protection parameter.
func (ac *AdminClient) UpdateTableWithDeletionProtection(ctx context.Context, tableID string, deletionProtection DeletionProtection) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, deletionProtection})
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, deletionProtection, nil})
}

// updateTableWithConf updates a table in the instance from the given configuration.
Expand All @@ -323,30 +345,34 @@ func (ac *AdminClient) updateTableWithConf(ctx context.Context, conf *UpdateTabl
return errors.New("TableID is required")
}

if conf.deletionProtection == None {
return errors.New("deletion protection is required")
}

ctx = mergeOutgoingMetadata(ctx, ac.md)

updateMask := &field_mask.FieldMask{
Paths: []string{
"deletion_protection",
},
}

deletionProtection := true
if conf.deletionProtection == Unprotected {
deletionProtection = false
Paths: []string{},
}
prefix := ac.instancePrefix()
req := &btapb.UpdateTableRequest{
Table: &btapb.Table{
Name: prefix + "/tables/" + conf.tableID,
DeletionProtection: deletionProtection,
Name: prefix + "/tables/" + conf.tableID,
},
UpdateMask: updateMask,
}

if conf.deletionProtection != None {
updateMask.Paths = append(updateMask.Paths, "deletion_protection")
req.Table.DeletionProtection = conf.deletionProtection != Unprotected
}

if conf.changeStreamRetention != nil {
if conf.changeStreamRetention.(time.Duration) == time.Duration(0) {
updateMask.Paths = append(updateMask.Paths, "change_stream_config")
} else {
updateMask.Paths = append(updateMask.Paths, "change_stream_config.retention_period")
req.Table.ChangeStreamConfig = &btapb.ChangeStreamConfig{}
req.Table.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.changeStreamRetention.(time.Duration))
}
}

lro, err := ac.tClient.UpdateTable(ctx, req)
if err != nil {
return fmt.Errorf("error from update: %w", err)
Expand Down Expand Up @@ -394,7 +420,8 @@ type TableInfo struct {
// DeletionProtection indicates whether the table is protected against data loss
// DeletionProtection could be None depending on the table view
// for example when using NAME_ONLY, the response does not contain DeletionProtection and the value should be None
DeletionProtection DeletionProtection
DeletionProtection DeletionProtection
ChangeStreamRetention ChangeStreamRetention
}

// FamilyInfo represents information about a column family.
Expand Down Expand Up @@ -450,6 +477,10 @@ func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo,
} else {
ti.DeletionProtection = Unprotected
}
if res.ChangeStreamConfig != nil && res.ChangeStreamConfig.RetentionPeriod != nil {
ti.ChangeStreamRetention = res.ChangeStreamConfig.RetentionPeriod.AsDuration()
}

return ti, nil
}

Expand Down
92 changes: 86 additions & 6 deletions bigtable/admin_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"testing"
"time"

longrunning "cloud.google.com/go/longrunning/autogen/longrunningpb"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -101,6 +102,45 @@ func TestTableAdmin_CreateTableFromConf_DeletionProtection_Unprotected(t *testin
}
}

func TestTableAdmin_CreateTableFromConf_ChangeStream_Valid(t *testing.T) {
mock := &mockTableAdminClock{}
c := setupTableClient(t, mock)

changeStreamRetention, err := time.ParseDuration("24h")
if err != nil {
t.Fatalf("ChangeStreamRetention not valid: %v", err)
}
err = c.CreateTableFromConf(context.Background(), &TableConf{TableID: "My-table", ChangeStreamRetention: changeStreamRetention})
if err != nil {
t.Fatalf("CreateTableFromConf failed: %v", err)
}
createTableReq := mock.createTableReq
if !cmp.Equal(createTableReq.TableId, "My-table") {
t.Errorf("Unexpected table ID: %v, expected %v", createTableReq.TableId, "My-table")
}
if !cmp.Equal(createTableReq.Table.ChangeStreamConfig.RetentionPeriod.Seconds, int64(changeStreamRetention.Seconds())) {
t.Errorf("Unexpected table change stream retention: %v, expected %v", createTableReq.Table.ChangeStreamConfig.RetentionPeriod.Seconds, changeStreamRetention.Seconds())
}
}

func TestTableAdmin_CreateTableFromConf_ChangeStream_Disable(t *testing.T) {
mock := &mockTableAdminClock{}
c := setupTableClient(t, mock)

changeStreamRetention := time.Duration(0)
err := c.CreateTableFromConf(context.Background(), &TableConf{TableID: "My-table", ChangeStreamRetention: changeStreamRetention})
if err != nil {
t.Fatalf("CreateTableFromConf failed: %v", err)
}
createTableReq := mock.createTableReq
if !cmp.Equal(createTableReq.TableId, "My-table") {
t.Errorf("Unexpected table ID: %v, expected %v", createTableReq.TableId, "My-table")
}
if createTableReq.Table.ChangeStreamConfig != nil {
t.Errorf("Unexpected table change stream retention: %v should be empty", createTableReq.Table.ChangeStreamConfig)
}
}

func TestTableAdmin_UpdateTableWithDeletionProtection(t *testing.T) {
mock := &mockTableAdminClock{}
c := setupTableClient(t, mock)
Expand All @@ -118,6 +158,9 @@ func TestTableAdmin_UpdateTableWithDeletionProtection(t *testing.T) {
if !cmp.Equal(updateTableReq.Table.DeletionProtection, true) {
t.Errorf("UpdateTableRequest does not match, TableID: %v", updateTableReq.Table.Name)
}
if !cmp.Equal(len(updateTableReq.UpdateMask.Paths), 1) {
t.Errorf("UpdateTableRequest does not match, UpdateMask has length of %d, expected 1", len(updateTableReq.UpdateMask.Paths))
}
if !cmp.Equal(updateTableReq.UpdateMask.Paths[0], "deletion_protection") {
t.Errorf("UpdateTableRequest does not match, TableID: %v", updateTableReq.Table.Name)
}
Expand Down Expand Up @@ -148,16 +191,53 @@ func TestTableAdmin_UpdateTable_TableID_NotProvided(t *testing.T) {
}
}

func TestTableAdmin_UpdateTable_DeletionProtection_NotProvided(t *testing.T) {
func TestTableAdmin_UpdateTableWithChangeStreamRetention(t *testing.T) {
mock := &mockTableAdminClock{}
c := setupTableClient(t, mock)
deletionProtection := None
changeStreamRetention, err := time.ParseDuration("24h")
if err != nil {
t.Fatalf("ChangeStreamRetention not valid: %v", err)
}

// Check if the update fails when deletion protection is not provided
err := c.UpdateTableWithDeletionProtection(context.Background(), "My-table", deletionProtection)
err = c.UpdateTableWithChangeStream(context.Background(), "My-table", changeStreamRetention)
if err != nil {
t.Fatalf("UpdateTableWithChangeStream failed: %v", err)
}
updateTableReq := mock.updateTableReq
if !cmp.Equal(updateTableReq.Table.Name, "projects/my-cool-project/instances/my-cool-instance/tables/My-table") {
t.Errorf("UpdateTableRequest does not match, TableID: %v", updateTableReq.Table.Name)
}
if !cmp.Equal(updateTableReq.Table.ChangeStreamConfig.RetentionPeriod.Seconds, int64(changeStreamRetention.Seconds())) {
t.Errorf("UpdateTableRequest does not match, ChangeStreamConfig: %v", updateTableReq.Table.ChangeStreamConfig)
}
if !cmp.Equal(len(updateTableReq.UpdateMask.Paths), 1) {
t.Errorf("UpdateTableRequest does not match, UpdateMask has length of %d, expected 1", len(updateTableReq.UpdateMask.Paths))
}
if !cmp.Equal(updateTableReq.UpdateMask.Paths[0], "change_stream_config.retention_period") {
t.Errorf("UpdateTableRequest does not match, UpdateMask: %v", updateTableReq.UpdateMask.Paths[0])
}
}

if fmt.Sprint(err) != "deletion protection is required" {
t.Fatalf("UpdateTable failed: %v", err)
func TestTableAdmin_UpdateTableDisableChangeStream(t *testing.T) {
mock := &mockTableAdminClock{}
c := setupTableClient(t, mock)

err := c.UpdateTableDisableChangeStream(context.Background(), "My-table")
if err != nil {
t.Fatalf("UpdateTableDisableChangeStream failed: %v", err)
}
updateTableReq := mock.updateTableReq
if !cmp.Equal(updateTableReq.Table.Name, "projects/my-cool-project/instances/my-cool-instance/tables/My-table") {
t.Errorf("UpdateTableRequest does not match, TableID: %v", updateTableReq.Table.Name)
}
if updateTableReq.Table.ChangeStreamConfig != nil {
t.Errorf("UpdateTableRequest does not match, ChangeStreamConfig: %v should be empty", updateTableReq.Table.ChangeStreamConfig)
}
if !cmp.Equal(len(updateTableReq.UpdateMask.Paths), 1) {
t.Errorf("UpdateTableRequest does not match, UpdateMask has length of %d, expected 1", len(updateTableReq.UpdateMask.Paths))
}
if !cmp.Equal(updateTableReq.UpdateMask.Paths[0], "change_stream_config") {
t.Errorf("UpdateTableRequest does not match, UpdateMask: %v", updateTableReq.UpdateMask.Paths[0])
}
}

Expand Down
87 changes: 87 additions & 0 deletions bigtable/integration_test.go
Expand Up @@ -1535,6 +1535,93 @@ func TestIntegration_TableDeletionProtection(t *testing.T) {
}
}

// testing if change stream works properly i.e. can create table with change
// stream and disable change stream on existing table and delete fails if change
// stream is enabled.
func TestIntegration_EnableChangeStream(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()

timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, _ := context.WithTimeout(context.Background(), timeout)

adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()

changeStreamRetention, err := time.ParseDuration("24h")
if err != nil {
t.Fatalf("ChangeStreamRetention not valid: %v", err)
}

tableConf := TableConf{
TableID: myTableName,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
ChangeStreamRetention: changeStreamRetention,
}

if err := adminClient.CreateTableFromConf(ctx, &tableConf); err != nil {
t.Fatalf("Create table from config: %v", err)
}

table, err := adminClient.TableInfo(ctx, myTableName)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}

if table.ChangeStreamRetention != changeStreamRetention {
t.Errorf("Expect table change stream to be enabled for table: %v has info: %v", myTableName, table)
}

// Update retention
changeStreamRetention, err = time.ParseDuration("70h")
if err != nil {
t.Fatalf("ChangeStreamRetention not valid: %v", err)
}

if err := adminClient.UpdateTableWithChangeStream(ctx, myTableName, changeStreamRetention); err != nil {
t.Fatalf("Update table from config: %v", err)
}

table, err = adminClient.TableInfo(ctx, myTableName)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}

if table.ChangeStreamRetention != changeStreamRetention {
t.Errorf("Expect table change stream to be enabled for table: %v has info: %v", myTableName, table)
}

// Disable change stream
if err := adminClient.UpdateTableDisableChangeStream(ctx, myTableName); err != nil {
t.Fatalf("Update table from config: %v", err)
}

table, err = adminClient.TableInfo(ctx, myTableName)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}

if table.ChangeStreamRetention != nil {
t.Errorf("Expect table change stream to be disabled for table: %v has info: %v", myTableName, table)
}

if err = adminClient.DeleteTable(ctx, tableConf.TableID); err != nil {
t.Errorf("Deleting the table failed when change stream is disabled: %v", err)
}
}

func TestIntegration_Admin(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
Expand Down

0 comments on commit 32897ce

Please sign in to comment.