From 997b45cef6553d4d4eaf40cade968ca2798dc4de Mon Sep 17 00:00:00 2001 From: Bonan Liu Date: Wed, 20 May 2020 14:38:14 -0400 Subject: [PATCH] bigtable: implement managed backup feature --- bigtable/admin.go | 225 +++++++++++++++++++++++++++++++++++ bigtable/integration_test.go | 132 ++++++++++++++++++++ 2 files changed, 357 insertions(+) diff --git a/bigtable/admin.go b/bigtable/admin.go index c8e625e4e5b4..0b6ed8daf1ca 100644 --- a/bigtable/admin.go +++ b/bigtable/admin.go @@ -1395,3 +1395,228 @@ func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient return results, nil } + +// RestoreTable creates a table from a backup. The table will be created in the same cluster as the backup. +func (ac *AdminClient) RestoreTable(ctx context.Context, table, cluster, backup string) error { + ctx = mergeOutgoingMetadata(ctx, ac.md) + prefix := ac.instancePrefix() + backupPath := prefix + "/clusters/" + cluster + "/backups/" + backup + + req := &btapb.RestoreTableRequest{ + Parent: prefix, + TableId: table, + Source: &btapb.RestoreTableRequest_Backup{backupPath}, + } + op, err := ac.tClient.RestoreTable(ctx, req) + if err != nil { + return err + } + resp := btapb.Table{} + return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) +} + +// CreateBackup creates a new backup in the specified cluster from the +// specified source table with the user-provided expire time. +func (ac *AdminClient) CreateBackup(ctx context.Context, table, cluster, backup string, expireTime time.Time) error { + ctx = mergeOutgoingMetadata(ctx, ac.md) + prefix := ac.instancePrefix() + + parsedExpireTime, err := ptypes.TimestampProto(expireTime) + if err != nil { + return err + } + + req := &btapb.CreateBackupRequest{ + Parent: prefix + "/clusters/" + cluster, + BackupId: backup, + Backup: &btapb.Backup{ + ExpireTime: parsedExpireTime, + SourceTable: prefix + "/tables/" + table, + }, + } + + op, err := ac.tClient.CreateBackup(ctx, req) + if err != nil { + return err + } + resp := btapb.Backup{} + return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) +} + +// Backups returns a BackupIterator for iterating over the backups in a cluster. +// To list backups across all of the clusters in the instance specify "-" as the cluster. +func (ac *AdminClient) Backups(ctx context.Context, cluster string) *BackupIterator { + ctx = mergeOutgoingMetadata(ctx, ac.md) + prefix := ac.instancePrefix() + clusterPath := prefix + "/clusters/" + cluster + + it := &BackupIterator{} + req := &btapb.ListBackupsRequest{ + Parent: clusterPath, + } + + fetch := func(pageSize int, pageToken string) (string, error) { + req.PageToken = pageToken + if pageSize > math.MaxInt32 { + req.PageSize = math.MaxInt32 + } else { + req.PageSize = int32(pageSize) + } + + var resp *btapb.ListBackupsResponse + err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { + var err error + resp, err = ac.tClient.ListBackups(ctx, req) + return err + }, retryOptions...) + if err != nil { + return "", err + } + for _, s := range resp.Backups { + backupInfo, err := newBackupInfo(s) + if err != nil { + return "", fmt.Errorf("failed to parse backup proto %v", err) + } + it.items = append(it.items, backupInfo) + } + return resp.NextPageToken, nil + } + bufLen := func() int { return len(it.items) } + takeBuf := func() interface{} { b := it.items; it.items = nil; return b } + + it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf) + + return it +} + +// newBackupInfo creates a BackupInfo struct from a btapb.Backup protocol buffer. +func newBackupInfo(backup *btapb.Backup) (*BackupInfo, error) { + nameParts := strings.Split(backup.Name, "/") + name := nameParts[len(nameParts)-1] + tablePathParts := strings.Split(backup.SourceTable, "/") + tableID := tablePathParts[len(tablePathParts)-1] + + startTime, err := ptypes.Timestamp(backup.StartTime) + if err != nil { + return nil, fmt.Errorf("invalid startTime: %v", err) + } + + endTime, err := ptypes.Timestamp(backup.EndTime) + if err != nil { + return nil, fmt.Errorf("invalid endTime: %v", err) + } + + expireTime, err := ptypes.Timestamp(backup.ExpireTime) + if err != nil { + return nil, fmt.Errorf("invalid expireTime: %v", err) + } + + return &BackupInfo{ + Name: name, + SourceTable: tableID, + SizeBytes: backup.SizeBytes, + StartTime: startTime, + EndTime: endTime, + ExpireTime: expireTime, + State: backup.State.String(), + }, nil +} + +// BackupIterator is an EntryIterator that iterates over log entries. +type BackupIterator struct { + items []*BackupInfo + pageInfo *iterator.PageInfo + nextFunc func() error +} + +// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details. +func (it *BackupIterator) PageInfo() *iterator.PageInfo { + return it.pageInfo +} + +// Next returns the next result. Its second return value is iterator.Done +// (https://godoc.org/google.golang.org/api/iterator) if there are no more +// results. Once Next returns Done, all subsequent calls will return Done. +func (it *BackupIterator) Next() (*BackupInfo, error) { + if err := it.nextFunc(); err != nil { + return nil, err + } + item := it.items[0] + it.items = it.items[1:] + return item, nil +} + +// BackupInfo contains backup metadata. This struct is read-only. +type BackupInfo struct { + Name string + SourceTable string + SizeBytes int64 + StartTime time.Time + EndTime time.Time + ExpireTime time.Time + State string +} + +// BackupInfo gets backup metadata. +func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (*BackupInfo, error) { + ctx = mergeOutgoingMetadata(ctx, ac.md) + prefix := ac.instancePrefix() + clusterPath := prefix + "/clusters/" + cluster + backupPath := clusterPath + "/backups/" + backup + + req := &btapb.GetBackupRequest{ + Name: backupPath, + } + + var resp *btapb.Backup + err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { + var err error + resp, err = ac.tClient.GetBackup(ctx, req) + return err + }, retryOptions...) + if err != nil { + return nil, err + } + + return newBackupInfo(resp) +} + +// DeleteBackup deletes a backup in a cluster. +func (ac *AdminClient) DeleteBackup(ctx context.Context, cluster, backup string) error { + ctx = mergeOutgoingMetadata(ctx, ac.md) + prefix := ac.instancePrefix() + clusterPath := prefix + "/clusters/" + cluster + backupPath := clusterPath + "/backups/" + backup + + req := &btapb.DeleteBackupRequest{ + Name: backupPath, + } + _, err := ac.tClient.DeleteBackup(ctx, req) + return err +} + +// UpdateBackup updates the backup metadata in a cluster. The API only supports updating expire time. +func (ac *AdminClient) UpdateBackup(ctx context.Context, cluster, backup string, expireTime time.Time) error { + ctx = mergeOutgoingMetadata(ctx, ac.md) + prefix := ac.instancePrefix() + clusterPath := prefix + "/clusters/" + cluster + backupPath := clusterPath + "/backups/" + backup + + expireTimestamp, err := ptypes.TimestampProto(expireTime) + if err != nil { + return err + } + + updateMask := &field_mask.FieldMask{} + updateMask.Paths = append(updateMask.Paths, "expire_time") + + req := &btapb.UpdateBackupRequest{ + Backup: &btapb.Backup{ + Name: backupPath, + ExpireTime: expireTimestamp, + }, + UpdateMask: updateMask, + } + _, err = ac.tClient.UpdateBackup(ctx, req) + return err +} diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 0a68f45108c4..1e8524c9d200 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -1865,6 +1865,138 @@ func TestIntegration_InstanceUpdate(t *testing.T) { } } +func TestIntegration_AdminBackup(t *testing.T) { + testEnv, err := NewIntegrationEnv() + if err != nil { + t.Fatalf("IntegrationEnv: %v", err) + } + defer testEnv.Close() + + if !testEnv.Config().UseProd { + t.Skip("emulator doesn't support backups") + } + + timeout := 5 * time.Minute + ctx, _ := context.WithTimeout(context.Background(), timeout) + + adminClient, err := testEnv.NewAdminClient() + if err != nil { + t.Fatalf("NewAdminClient: %v", err) + } + defer adminClient.Close() + + table := testEnv.Config().Table + cluster := testEnv.Config().Cluster + + // Delete the table at the end of the test. Schedule ahead of time + // in case the client fails + defer adminClient.DeleteTable(ctx, table) + + list := func(cluster string) ([]*BackupInfo, error) { + infos := []*BackupInfo(nil) + + it := adminClient.Backups(ctx, cluster) + for { + s, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, err + } + infos = append(infos, s) + } + return infos, err + } + + if err := adminClient.CreateTable(ctx, table); err != nil { + t.Fatalf("Creating table: %v", err) + } + + // Precondition: no backups + backups, err := list(cluster) + if err != nil { + t.Fatalf("Initial backup list: %v", err) + } + if got, want := len(backups), 0; got != want { + t.Fatalf("Initial backup list len: %d, want: %d", got, want) + } + + // Create backup + defer adminClient.DeleteBackup(ctx, cluster, "mybackup") + + if err = adminClient.CreateBackup(ctx, table, cluster, "mybackup", time.Now().Add(8*time.Hour)); err != nil { + t.Fatalf("Creating backup: %v", err) + } + + // List backup + backups, err = list(cluster) + if err != nil { + t.Fatalf("Listing backups: %v", err) + } + if got, want := len(backups), 1; got != want { + t.Fatalf("Listing backup count: %d, want: %d", got, want) + } + if got, want := backups[0].Name, "mybackup"; got != want { + t.Fatalf("Backup name: %s, want: %s", got, want) + } + if got, want := backups[0].SourceTable, table; got != want { + t.Fatalf("Backup SourceTable: %s, want: %s", got, want) + } + if got, want := backups[0].ExpireTime, backups[0].StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { + t.Fatalf("Backup ExpireTime: %s, want: %s", got, want) + } + + // Get backup + backup, err := adminClient.BackupInfo(ctx, cluster, "mybackup") + if err != nil { + t.Fatalf("BackupInfo: %v", backup) + } + if got, want := *backup, *backups[0]; got != want { + t.Fatalf("BackupInfo: %v, want: %v", got, want) + } + + // Update backup + newExpireTime := time.Now().Add(10 * time.Hour) + err = adminClient.UpdateBackup(ctx, cluster, "mybackup", newExpireTime) + if err != nil { + t.Fatalf("UpdateBackup failed: %v", err) + } + + // Check that updated backup has the correct expire time + updatedBackup, err := adminClient.BackupInfo(ctx, cluster, "mybackup") + if err != nil { + t.Fatalf("BackupInfo: %v", err) + } + backup.ExpireTime = newExpireTime + // Server clock and local clock may not be perfectly sync'ed. + if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute { + t.Fatalf("BackupInfo: %v, want: %v", got, want) + } + + // Restore backup + restoredTable := table + "-restored" + defer adminClient.DeleteTable(ctx, restoredTable) + if err = adminClient.RestoreTable(ctx, restoredTable, cluster, "mybackup"); err != nil { + t.Fatalf("RestoreTable: %v", err) + } + if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { + t.Fatalf("Restored TableInfo: %v", err) + } + + // Delete backup + if err = adminClient.DeleteBackup(ctx, cluster, "mybackup"); err != nil { + t.Fatalf("DeleteBackup: %v", err) + } + backups, err = list(cluster) + if err != nil { + t.Fatalf("List after Delete: %v", err) + } + if got, want := len(backups), 0; got != want { + t.Fatalf("List after delete len: %d, want: %d", got, want) + } +} + func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { testEnv, err := NewIntegrationEnv() if err != nil {