Skip to content

Commit 4c12b42

Browse files
shollymantbpg
andauthored
feat(bigquery): add support for snapshot/restore (#4112)
* feat(bigquery): add support for snapshot/restore Secondary changes: This complicated some internal wrapping functions for TableMetadata. The fact that TableMetadata can now contain a Table reference for other tables means we need to pass around an additional instance of the Client, as user may want to do something like call Metadata() on the embedded reference to the base table in a snapshot definition. * add comments to exported operation types * doc SnapshotDefinition * address reviewer comments Co-authored-by: Tyler Bui-Palsulich <26876514+tbpg@users.noreply.github.com>
1 parent 83337b8 commit 4c12b42

File tree

5 files changed

+180
-4
lines changed

5 files changed

+180
-4
lines changed

Diff for: bigquery/copy.go

+19
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,19 @@ import (
2020
bq "google.golang.org/api/bigquery/v2"
2121
)
2222

23+
// TableCopyOperationType is used to indicate the type of operation performed by a BigQuery
24+
// copy job.
25+
type TableCopyOperationType string
26+
27+
var (
28+
// CopyOperation indicates normal table to table copying.
29+
CopyOperation TableCopyOperationType = "COPY"
30+
// SnapshotOperation indicates creating a snapshot from a regular table.
31+
SnapshotOperation TableCopyOperationType = "SNAPSHOT"
32+
// RestoreOperation indicates creating/restoring a table from a snapshot.
33+
RestoreOperation TableCopyOperationType = "RESTORE"
34+
)
35+
2336
// CopyConfig holds the configuration for a copy job.
2437
type CopyConfig struct {
2538
// Srcs are the tables from which data will be copied.
@@ -41,6 +54,10 @@ type CopyConfig struct {
4154

4255
// Custom encryption configuration (e.g., Cloud KMS keys).
4356
DestinationEncryptionConfig *EncryptionConfig
57+
58+
// One of the supported operation types when executing a Table Copy jobs. By default this
59+
// copies tables, but can also be set to perform snapshot or restore operations.
60+
OperationType TableCopyOperationType
4461
}
4562

4663
func (c *CopyConfig) toBQ() *bq.JobConfiguration {
@@ -56,6 +73,7 @@ func (c *CopyConfig) toBQ() *bq.JobConfiguration {
5673
DestinationTable: c.Dst.toBQ(),
5774
DestinationEncryptionConfiguration: c.DestinationEncryptionConfig.toBQ(),
5875
SourceTables: ts,
76+
OperationType: string(c.OperationType),
5977
},
6078
}
6179
}
@@ -67,6 +85,7 @@ func bqToCopyConfig(q *bq.JobConfiguration, c *Client) *CopyConfig {
6785
WriteDisposition: TableWriteDisposition(q.Copy.WriteDisposition),
6886
Dst: bqToTable(q.Copy.DestinationTable, c),
6987
DestinationEncryptionConfig: bqToEncryptionConfig(q.Copy.DestinationEncryptionConfiguration),
88+
OperationType: TableCopyOperationType(q.Copy.OperationType),
7089
}
7190
for _, t := range q.Copy.SourceTables {
7291
cc.Srcs = append(cc.Srcs, bqToTable(t, c))

Diff for: bigquery/copy_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,28 @@ func TestCopy(t *testing.T) {
137137
return j
138138
}(),
139139
},
140+
{
141+
dst: &Table{
142+
ProjectID: "d-project-id",
143+
DatasetID: "d-dataset-id",
144+
TableID: "d-table-id",
145+
},
146+
srcs: []*Table{
147+
{
148+
ProjectID: "s-project-id",
149+
DatasetID: "s-dataset-id",
150+
TableID: "s-table-id",
151+
},
152+
},
153+
config: CopyConfig{
154+
OperationType: SnapshotOperation,
155+
},
156+
want: func() *bq.Job {
157+
j := defaultCopyJob()
158+
j.Configuration.Copy.OperationType = "SNAPSHOT"
159+
return j
160+
}(),
161+
},
140162
}
141163
c := &Client{projectID: "client-project-id"}
142164
for i, tc := range testCases {

Diff for: bigquery/integration_test.go

+90
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,96 @@ func TestIntegration_TableMetadata(t *testing.T) {
438438

439439
}
440440

441+
func TestIntegration_SnapshotAndRestore(t *testing.T) {
442+
443+
if client == nil {
444+
t.Skip("Integration tests skipped")
445+
}
446+
ctx := context.Background()
447+
448+
// instantiate a base table via a CTAS
449+
baseTableID := tableIDs.New()
450+
qualified := fmt.Sprintf("`%s`.%s.%s", testutil.ProjID(), dataset.DatasetID, baseTableID)
451+
sql := fmt.Sprintf(`
452+
CREATE TABLE %s
453+
(
454+
sample_value INT64,
455+
groupid STRING,
456+
)
457+
AS
458+
SELECT
459+
CAST(RAND() * 100 AS INT64),
460+
CONCAT("group", CAST(CAST(RAND()*10 AS INT64) AS STRING))
461+
FROM
462+
UNNEST(GENERATE_ARRAY(0,999))
463+
`, qualified)
464+
if err := runQueryJob(ctx, sql); err != nil {
465+
t.Fatalf("couldn't instantiate base table: %v", err)
466+
}
467+
468+
// Create a snapshot. We'll select our snapshot time explicitly to validate the snapshot time is the same.
469+
targetTime := time.Now()
470+
snapshotID := tableIDs.New()
471+
copier := dataset.Table(snapshotID).CopierFrom(dataset.Table(fmt.Sprintf("%s@%d", baseTableID, targetTime.UnixNano()/1e6)))
472+
copier.OperationType = SnapshotOperation
473+
job, err := copier.Run(ctx)
474+
if err != nil {
475+
t.Fatalf("couldn't run snapshot: %v", err)
476+
}
477+
status, err := job.Wait(ctx)
478+
if err != nil {
479+
t.Fatalf("polling snapshot failed: %v", err)
480+
}
481+
if status.Err() != nil {
482+
t.Fatalf("snapshot failed in error: %v", status.Err())
483+
}
484+
485+
// verify metadata on the snapshot
486+
meta, err := dataset.Table(snapshotID).Metadata(ctx)
487+
if err != nil {
488+
t.Fatalf("couldn't get metadata from snapshot: %v", err)
489+
}
490+
if meta.Type != Snapshot {
491+
t.Errorf("expected snapshot table type, got %s", meta.Type)
492+
}
493+
want := &SnapshotDefinition{
494+
BaseTableReference: dataset.Table(baseTableID),
495+
SnapshotTime: targetTime,
496+
}
497+
if diff := testutil.Diff(meta.SnapshotDefinition, want, cmp.AllowUnexported(Table{}), cmpopts.IgnoreUnexported(Client{}), cmpopts.EquateApproxTime(time.Millisecond)); diff != "" {
498+
t.Fatalf("SnapshotDefinition differs. got=-, want=+:\n%s", diff)
499+
}
500+
501+
// execute a restore using the snapshot.
502+
restoreID := tableIDs.New()
503+
restorer := dataset.Table(restoreID).CopierFrom(dataset.Table(snapshotID))
504+
restorer.OperationType = RestoreOperation
505+
job, err = restorer.Run(ctx)
506+
if err != nil {
507+
t.Fatalf("couldn't run restore: %v", err)
508+
}
509+
status, err = job.Wait(ctx)
510+
if err != nil {
511+
t.Fatalf("polling restore failed: %v", err)
512+
}
513+
if status.Err() != nil {
514+
t.Fatalf("restore failed in error: %v", status.Err())
515+
}
516+
517+
restoreMeta, err := dataset.Table(restoreID).Metadata(ctx)
518+
if err != nil {
519+
t.Fatalf("couldn't get restored table metadata: %v", err)
520+
}
521+
522+
if meta.NumBytes != restoreMeta.NumBytes {
523+
t.Errorf("bytes mismatch. snap had %d bytes, restore had %d bytes", meta.NumBytes, restoreMeta.NumBytes)
524+
}
525+
if meta.NumRows != restoreMeta.NumRows {
526+
t.Errorf("row counts mismatch. snap had %d rows, restore had %d rows", meta.NumRows, restoreMeta.NumRows)
527+
}
528+
529+
}
530+
441531
func TestIntegration_HourTimePartitioning(t *testing.T) {
442532
if client == nil {
443533
t.Skip("Integration tests skipped")

Diff for: bigquery/table.go

+48-3
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ type TableMetadata struct {
120120
// This does not include data that is being buffered during a streaming insert.
121121
NumRows uint64
122122

123+
// SnapshotDefinition contains additional information about the provenance of a
124+
// given snapshot table.
125+
SnapshotDefinition *SnapshotDefinition
126+
123127
// Contains information regarding this table's streaming buffer, if one is
124128
// present. This field will be nil if the table is not being streamed to or if
125129
// there is no data in the streaming buffer.
@@ -177,6 +181,9 @@ const (
177181
// MaterializedView represents a managed storage table that's derived from
178182
// a base table.
179183
MaterializedView TableType = "MATERIALIZED_VIEW"
184+
// Snapshot represents an immutable point in time snapshot of some other
185+
// table.
186+
Snapshot TableType = "SNAPSHOT"
180187
)
181188

182189
// MaterializedViewDefinition contains information for materialized views.
@@ -223,6 +230,42 @@ func bqToMaterializedViewDefinition(q *bq.MaterializedViewDefinition) *Materiali
223230
}
224231
}
225232

233+
// SnapshotDefinition provides metadata related to the origin of a snapshot.
234+
type SnapshotDefinition struct {
235+
236+
// BaseTableReference describes the ID of the table that this snapshot
237+
// came from.
238+
BaseTableReference *Table
239+
240+
// SnapshotTime indicates when the base table was snapshot.
241+
SnapshotTime time.Time
242+
}
243+
244+
func (sd *SnapshotDefinition) toBQ() *bq.SnapshotDefinition {
245+
if sd == nil {
246+
return nil
247+
}
248+
return &bq.SnapshotDefinition{
249+
BaseTableReference: sd.BaseTableReference.toBQ(),
250+
SnapshotTime: sd.SnapshotTime.Format(time.RFC3339),
251+
}
252+
}
253+
254+
func bqToSnapshotDefinition(q *bq.SnapshotDefinition, c *Client) *SnapshotDefinition {
255+
if q == nil {
256+
return nil
257+
}
258+
sd := &SnapshotDefinition{
259+
BaseTableReference: bqToTable(q.BaseTableReference, c),
260+
}
261+
// It's possible we could fail to populate SnapshotTime if we fail to parse
262+
// the backend representation.
263+
if t, err := time.Parse(time.RFC3339, q.SnapshotTime); err == nil {
264+
sd.SnapshotTime = t
265+
}
266+
return sd
267+
}
268+
226269
// TimePartitioningType defines the interval used to partition managed data.
227270
type TimePartitioningType string
228271

@@ -496,6 +539,7 @@ func (tm *TableMetadata) toBQ() (*bq.Table, error) {
496539
t.RangePartitioning = tm.RangePartitioning.toBQ()
497540
t.Clustering = tm.Clustering.toBQ()
498541
t.RequirePartitionFilter = tm.RequirePartitionFilter
542+
t.SnapshotDefinition = tm.SnapshotDefinition.toBQ()
499543

500544
if !validExpiration(tm.ExpirationTime) {
501545
return nil, fmt.Errorf("invalid expiration time: %v.\n"+
@@ -554,10 +598,10 @@ func (t *Table) Metadata(ctx context.Context) (md *TableMetadata, err error) {
554598
if err != nil {
555599
return nil, err
556600
}
557-
return bqToTableMetadata(table)
601+
return bqToTableMetadata(table, t.c)
558602
}
559603

560-
func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) {
604+
func bqToTableMetadata(t *bq.Table, c *Client) (*TableMetadata, error) {
561605
md := &TableMetadata{
562606
Description: t.Description,
563607
Name: t.FriendlyName,
@@ -574,6 +618,7 @@ func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) {
574618
ETag: t.Etag,
575619
EncryptionConfig: bqToEncryptionConfig(t.EncryptionConfiguration),
576620
RequirePartitionFilter: t.RequirePartitionFilter,
621+
SnapshotDefinition: bqToSnapshotDefinition(t.SnapshotDefinition, c),
577622
}
578623
if t.MaterializedView != nil {
579624
md.MaterializedView = bqToMaterializedViewDefinition(t.MaterializedView)
@@ -652,7 +697,7 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin
652697
}); err != nil {
653698
return nil, err
654699
}
655-
return bqToTableMetadata(res)
700+
return bqToTableMetadata(res, t.c)
656701
}
657702

658703
func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) {

Diff for: bigquery/table_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func TestBQToTableMetadata(t *testing.T) {
113113
},
114114
},
115115
} {
116-
got, err := bqToTableMetadata(test.in)
116+
got, err := bqToTableMetadata(test.in, &Client{})
117117
if err != nil {
118118
t.Fatal(err)
119119
}

0 commit comments

Comments
 (0)