Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bigtable: implement managed backup feature #2524

Merged
merged 2 commits into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
225 changes: 225 additions & 0 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,3 +1424,228 @@ func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient

return results, nil
}

liubonan marked this conversation as resolved.
Show resolved Hide resolved
// RestoreTable creates a table from a backup. The table will be created in the same cluster as the backup.
func (ac *AdminClient) RestoreTable(ctx context.Context, table, cluster, backup string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
backupPath := prefix + "/clusters/" + cluster + "/backups/" + backup

req := &btapb.RestoreTableRequest{
Parent: prefix,
TableId: table,
Source: &btapb.RestoreTableRequest_Backup{backupPath},
}
op, err := ac.tClient.RestoreTable(ctx, req)
if err != nil {
return err
}
resp := btapb.Table{}
return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
}

// CreateBackup creates a new backup in the specified cluster from the
// specified source table with the user-provided expire time.
func (ac *AdminClient) CreateBackup(ctx context.Context, table, cluster, backup string, expireTime time.Time) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()

parsedExpireTime, err := ptypes.TimestampProto(expireTime)
if err != nil {
return err
}

req := &btapb.CreateBackupRequest{
Parent: prefix + "/clusters/" + cluster,
BackupId: backup,
Backup: &btapb.Backup{
ExpireTime: parsedExpireTime,
SourceTable: prefix + "/tables/" + table,
},
}

op, err := ac.tClient.CreateBackup(ctx, req)
if err != nil {
return err
}
resp := btapb.Backup{}
return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
}

// Backups returns a BackupIterator for iterating over the backups in a cluster.
// To list backups across all of the clusters in the instance specify "-" as the cluster.
func (ac *AdminClient) Backups(ctx context.Context, cluster string) *BackupIterator {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster

it := &BackupIterator{}
req := &btapb.ListBackupsRequest{
Parent: clusterPath,
}

fetch := func(pageSize int, pageToken string) (string, error) {
req.PageToken = pageToken
if pageSize > math.MaxInt32 {
req.PageSize = math.MaxInt32
} else {
req.PageSize = int32(pageSize)
}

var resp *btapb.ListBackupsResponse
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
var err error
resp, err = ac.tClient.ListBackups(ctx, req)
return err
}, retryOptions...)
if err != nil {
return "", err
}
for _, s := range resp.Backups {
backupInfo, err := newBackupInfo(s)
if err != nil {
return "", fmt.Errorf("failed to parse backup proto %v", err)
}
it.items = append(it.items, backupInfo)
}
return resp.NextPageToken, nil
}
bufLen := func() int { return len(it.items) }
takeBuf := func() interface{} { b := it.items; it.items = nil; return b }

it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf)

return it
}

// newBackupInfo creates a BackupInfo struct from a btapb.Backup protocol buffer.
func newBackupInfo(backup *btapb.Backup) (*BackupInfo, error) {
nameParts := strings.Split(backup.Name, "/")
name := nameParts[len(nameParts)-1]
tablePathParts := strings.Split(backup.SourceTable, "/")
tableID := tablePathParts[len(tablePathParts)-1]

startTime, err := ptypes.Timestamp(backup.StartTime)
if err != nil {
return nil, fmt.Errorf("invalid startTime: %v", err)
}

endTime, err := ptypes.Timestamp(backup.EndTime)
if err != nil {
return nil, fmt.Errorf("invalid endTime: %v", err)
}

expireTime, err := ptypes.Timestamp(backup.ExpireTime)
if err != nil {
return nil, fmt.Errorf("invalid expireTime: %v", err)
}

return &BackupInfo{
Name: name,
SourceTable: tableID,
SizeBytes: backup.SizeBytes,
StartTime: startTime,
EndTime: endTime,
ExpireTime: expireTime,
State: backup.State.String(),
}, nil
}

// BackupIterator is an EntryIterator that iterates over log entries.
type BackupIterator struct {
items []*BackupInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was comparing this with ProfileIterator above, which uses the AppProfile struct from the proto directly rather than wrapping with an info struct as you do here. Is there a reason for the different approach (it seems fine to me, just curious)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree there were some inconsistency, but the backup implementation was following the table APIs which use TableInfo. Probably we need a separate cleanup PR which get to a consistent naming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah. Unfortunately we can't make breaking changes like that without releasing a new major version of the library, but it's something to keep in mind. This seems fine to me in any case.

pageInfo *iterator.PageInfo
nextFunc func() error
}

// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details.
func (it *BackupIterator) PageInfo() *iterator.PageInfo {
return it.pageInfo
}

// Next returns the next result. Its second return value is iterator.Done
// (https://godoc.org/google.golang.org/api/iterator) if there are no more
// results. Once Next returns Done, all subsequent calls will return Done.
func (it *BackupIterator) Next() (*BackupInfo, error) {
if err := it.nextFunc(); err != nil {
return nil, err
}
item := it.items[0]
it.items = it.items[1:]
return item, nil
}

// BackupInfo contains backup metadata. This struct is read-only.
type BackupInfo struct {
Name string
SourceTable string
SizeBytes int64
StartTime time.Time
EndTime time.Time
ExpireTime time.Time
State string
}

// BackupInfo gets backup metadata.
func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (*BackupInfo, error) {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup

req := &btapb.GetBackupRequest{
Name: backupPath,
}

var resp *btapb.Backup
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
var err error
resp, err = ac.tClient.GetBackup(ctx, req)
return err
}, retryOptions...)
if err != nil {
return nil, err
}

return newBackupInfo(resp)
}

// DeleteBackup deletes a backup in a cluster.
func (ac *AdminClient) DeleteBackup(ctx context.Context, cluster, backup string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup

req := &btapb.DeleteBackupRequest{
Name: backupPath,
}
_, err := ac.tClient.DeleteBackup(ctx, req)
return err
}

// UpdateBackup updates the backup metadata in a cluster. The API only supports updating expire time.
func (ac *AdminClient) UpdateBackup(ctx context.Context, cluster, backup string, expireTime time.Time) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
clusterPath := prefix + "/clusters/" + cluster
backupPath := clusterPath + "/backups/" + backup

expireTimestamp, err := ptypes.TimestampProto(expireTime)
if err != nil {
return err
}

updateMask := &field_mask.FieldMask{}
updateMask.Paths = append(updateMask.Paths, "expire_time")

req := &btapb.UpdateBackupRequest{
Backup: &btapb.Backup{
Name: backupPath,
ExpireTime: expireTimestamp,
},
UpdateMask: updateMask,
}
_, err = ac.tClient.UpdateBackup(ctx, req)
return err
}
132 changes: 132 additions & 0 deletions bigtable/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,138 @@ func TestIntegration_InstanceUpdate(t *testing.T) {
}
}

func TestIntegration_AdminBackup(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()

if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support backups")
}

timeout := 5 * time.Minute
ctx, _ := context.WithTimeout(context.Background(), timeout)

adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()

table := testEnv.Config().Table
cluster := testEnv.Config().Cluster

// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer adminClient.DeleteTable(ctx, table)

list := func(cluster string) ([]*BackupInfo, error) {
infos := []*BackupInfo(nil)

it := adminClient.Backups(ctx, cluster)
for {
s, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
infos = append(infos, s)
}
return infos, err
}

if err := adminClient.CreateTable(ctx, table); err != nil {
t.Fatalf("Creating table: %v", err)
}

// Precondition: no backups
backups, err := list(cluster)
if err != nil {
t.Fatalf("Initial backup list: %v", err)
}
if got, want := len(backups), 0; got != want {
t.Fatalf("Initial backup list len: %d, want: %d", got, want)
}

// Create backup
defer adminClient.DeleteBackup(ctx, cluster, "mybackup")

if err = adminClient.CreateBackup(ctx, table, cluster, "mybackup", time.Now().Add(8*time.Hour)); err != nil {
t.Fatalf("Creating backup: %v", err)
}

// List backup
backups, err = list(cluster)
if err != nil {
t.Fatalf("Listing backups: %v", err)
}
if got, want := len(backups), 1; got != want {
t.Fatalf("Listing backup count: %d, want: %d", got, want)
}
if got, want := backups[0].Name, "mybackup"; got != want {
t.Fatalf("Backup name: %s, want: %s", got, want)
}
if got, want := backups[0].SourceTable, table; got != want {
t.Fatalf("Backup SourceTable: %s, want: %s", got, want)
}
if got, want := backups[0].ExpireTime, backups[0].StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
t.Fatalf("Backup ExpireTime: %s, want: %s", got, want)
}

// Get backup
backup, err := adminClient.BackupInfo(ctx, cluster, "mybackup")
if err != nil {
t.Fatalf("BackupInfo: %v", backup)
}
if got, want := *backup, *backups[0]; got != want {
t.Fatalf("BackupInfo: %v, want: %v", got, want)
}

// Update backup
newExpireTime := time.Now().Add(10 * time.Hour)
err = adminClient.UpdateBackup(ctx, cluster, "mybackup", newExpireTime)
if err != nil {
t.Fatalf("UpdateBackup failed: %v", err)
}

// Check that updated backup has the correct expire time
updatedBackup, err := adminClient.BackupInfo(ctx, cluster, "mybackup")
if err != nil {
t.Fatalf("BackupInfo: %v", err)
}
backup.ExpireTime = newExpireTime
// Server clock and local clock may not be perfectly sync'ed.
if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute {
t.Fatalf("BackupInfo: %v, want: %v", got, want)
}

// Restore backup
restoredTable := table + "-restored"
defer adminClient.DeleteTable(ctx, restoredTable)
if err = adminClient.RestoreTable(ctx, restoredTable, cluster, "mybackup"); err != nil {
t.Fatalf("RestoreTable: %v", err)
}
if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
t.Fatalf("Restored TableInfo: %v", err)
}

// Delete backup
if err = adminClient.DeleteBackup(ctx, cluster, "mybackup"); err != nil {
t.Fatalf("DeleteBackup: %v", err)
}
backups, err = list(cluster)
if err != nil {
t.Fatalf("List after Delete: %v", err)
}
if got, want := len(backups), 0; got != want {
t.Fatalf("List after delete len: %d, want: %d", got, want)
}
}

func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
testEnv, err := NewIntegrationEnv()
if err != nil {
Expand Down