From 868363cbc68c655d4c1f8959280cf1acba5073a7 Mon Sep 17 00:00:00 2001 From: shollyman Date: Tue, 1 Mar 2022 15:13:35 -0800 Subject: [PATCH] feat(bigquery): add job timeout support (#5707) This feature allows callers to configure a job timeout. If a BigQuery job runs longer than this duration, BigQuery may cancel the job. --- bigquery/copy.go | 13 +++++++++++++ bigquery/copy_test.go | 3 +++ bigquery/extract.go | 13 +++++++++++++ bigquery/extract_test.go | 3 +++ bigquery/load.go | 15 +++++++++++++++ bigquery/load_test.go | 4 ++++ bigquery/query.go | 21 +++++++++++++++++++-- bigquery/query_test.go | 2 ++ 8 files changed, 72 insertions(+), 2 deletions(-) diff --git a/bigquery/copy.go b/bigquery/copy.go index 38d4eee83985..aaec924113f7 100644 --- a/bigquery/copy.go +++ b/bigquery/copy.go @@ -16,6 +16,7 @@ package bigquery import ( "context" + "time" bq "google.golang.org/api/bigquery/v2" ) @@ -58,6 +59,16 @@ type CopyConfig struct { // One of the supported operation types when executing a Table Copy jobs. By default this // copies tables, but can also be set to perform snapshot or restore operations. OperationType TableCopyOperationType + + // Sets a best-effort deadline on a specific job. If job execution exceeds this + // timeout, BigQuery may attempt to cancel this work automatically. + // + // This deadline cannot be adjusted or removed once the job is created. Consider + // using Job.Cancel in situations where you need more dynamic behavior. + // + // Experimental: this option is experimental and may be modified or removed in future versions, + // regardless of any other documented package stability guarantees. + JobTimeout time.Duration } func (c *CopyConfig) toBQ() *bq.JobConfiguration { @@ -75,6 +86,7 @@ func (c *CopyConfig) toBQ() *bq.JobConfiguration { SourceTables: ts, OperationType: string(c.OperationType), }, + JobTimeoutMs: c.JobTimeout.Milliseconds(), } } @@ -86,6 +98,7 @@ func bqToCopyConfig(q *bq.JobConfiguration, c *Client) *CopyConfig { Dst: bqToTable(q.Copy.DestinationTable, c), DestinationEncryptionConfig: bqToEncryptionConfig(q.Copy.DestinationEncryptionConfiguration), OperationType: TableCopyOperationType(q.Copy.OperationType), + JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond, } for _, t := range q.Copy.SourceTables { cc.Srcs = append(cc.Srcs, bqToTable(t, c)) diff --git a/bigquery/copy_test.go b/bigquery/copy_test.go index 9b16ebf2d3a0..368c4fd58ce5 100644 --- a/bigquery/copy_test.go +++ b/bigquery/copy_test.go @@ -16,6 +16,7 @@ package bigquery import ( "testing" + "time" "cloud.google.com/go/internal/testutil" "github.com/google/go-cmp/cmp/cmpopts" @@ -152,10 +153,12 @@ func TestCopy(t *testing.T) { }, config: CopyConfig{ OperationType: SnapshotOperation, + JobTimeout: 6 * time.Second, }, want: func() *bq.Job { j := defaultCopyJob() j.Configuration.Copy.OperationType = "SNAPSHOT" + j.Configuration.JobTimeoutMs = 6000 return j }(), }, diff --git a/bigquery/extract.go b/bigquery/extract.go index 468392541ad3..ee4cbc4953c8 100644 --- a/bigquery/extract.go +++ b/bigquery/extract.go @@ -16,6 +16,7 @@ package bigquery import ( "context" + "time" "cloud.google.com/go/internal/trace" bq "google.golang.org/api/bigquery/v2" @@ -46,6 +47,16 @@ type ExtractConfig struct { // an integer column annotated with the appropriate timestamp-micros/millis annotation // in the resulting Avro files. UseAvroLogicalTypes bool + + // Sets a best-effort deadline on a specific job. If job execution exceeds this + // timeout, BigQuery may attempt to cancel this work automatically. + // + // This deadline cannot be adjusted or removed once the job is created. Consider + // using Job.Cancel in situations where you need more dynamic behavior. + // + // Experimental: this option is experimental and may be modified or removed in future versions, + // regardless of any other documented package stability guarantees. + JobTimeout time.Duration } func (e *ExtractConfig) toBQ() *bq.JobConfiguration { @@ -65,6 +76,7 @@ func (e *ExtractConfig) toBQ() *bq.JobConfiguration { PrintHeader: printHeader, UseAvroLogicalTypes: e.UseAvroLogicalTypes, }, + JobTimeoutMs: e.JobTimeout.Milliseconds(), } if e.Src != nil { cfg.Extract.SourceTable = e.Src.toBQ() @@ -93,6 +105,7 @@ func bqToExtractConfig(q *bq.JobConfiguration, c *Client) *ExtractConfig { Src: bqToTable(qe.SourceTable, c), SrcModel: bqToModel(qe.SourceModel, c), UseAvroLogicalTypes: qe.UseAvroLogicalTypes, + JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond, } } diff --git a/bigquery/extract_test.go b/bigquery/extract_test.go index 322895bac311..0f937d0c08c4 100644 --- a/bigquery/extract_test.go +++ b/bigquery/extract_test.go @@ -16,6 +16,7 @@ package bigquery import ( "testing" + "time" "cloud.google.com/go/internal/testutil" "github.com/google/go-cmp/cmp" @@ -83,10 +84,12 @@ func TestExtract(t *testing.T) { config: ExtractConfig{ DisableHeader: true, Labels: map[string]string{"a": "b"}, + JobTimeout: 8 * time.Second, }, want: func() *bq.Job { j := defaultExtractJob() j.Configuration.Labels = map[string]string{"a": "b"} + j.Configuration.JobTimeoutMs = 8000 f := false j.Configuration.Extract.PrintHeader = &f return j diff --git a/bigquery/load.go b/bigquery/load.go index c8487bc5b55b..b99e343b7257 100644 --- a/bigquery/load.go +++ b/bigquery/load.go @@ -17,6 +17,7 @@ package bigquery import ( "context" "io" + "time" "cloud.google.com/go/internal/trace" bq "google.golang.org/api/bigquery/v2" @@ -77,6 +78,16 @@ type LoadConfig struct { // // StringTargetType supports all precision and scale values. DecimalTargetTypes []DecimalTargetType + + // Sets a best-effort deadline on a specific job. If job execution exceeds this + // timeout, BigQuery may attempt to cancel this work automatically. + // + // This deadline cannot be adjusted or removed once the job is created. Consider + // using Job.Cancel in situations where you need more dynamic behavior. + // + // Experimental: this option is experimental and may be modified or removed in future versions, + // regardless of any other documented package stability guarantees. + JobTimeout time.Duration } func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) { @@ -95,6 +106,7 @@ func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) { ProjectionFields: l.ProjectionFields, HivePartitioningOptions: l.HivePartitioningOptions.toBQ(), }, + JobTimeoutMs: l.JobTimeout.Milliseconds(), } for _, v := range l.DecimalTargetTypes { config.Load.DecimalTargetTypes = append(config.Load.DecimalTargetTypes, string(v)) @@ -118,6 +130,9 @@ func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig { ProjectionFields: q.Load.ProjectionFields, HivePartitioningOptions: bqToHivePartitioningOptions(q.Load.HivePartitioningOptions), } + if q.JobTimeoutMs > 0 { + lc.JobTimeout = time.Duration(q.JobTimeoutMs) * time.Millisecond + } for _, v := range q.Load.DecimalTargetTypes { lc.DecimalTargetTypes = append(lc.DecimalTargetTypes, DecimalTargetType(v)) } diff --git a/bigquery/load_test.go b/bigquery/load_test.go index f6a3a2a2c8d8..7bdec0ee823a 100644 --- a/bigquery/load_test.go +++ b/bigquery/load_test.go @@ -139,12 +139,16 @@ func TestLoad(t *testing.T) { g.IgnoreUnknownValues = true return g }(), + config: LoadConfig{ + JobTimeout: 4 * time.Second, + }, want: func() *bq.Job { j := defaultLoadJob() j.Configuration.Load.MaxBadRecords = 1 j.Configuration.Load.AllowJaggedRows = true j.Configuration.Load.AllowQuotedNewlines = true j.Configuration.Load.IgnoreUnknownValues = true + j.Configuration.JobTimeoutMs = 4000 return j }(), }, diff --git a/bigquery/query.go b/bigquery/query.go index b64ae0621145..6d5f3554a036 100644 --- a/bigquery/query.go +++ b/bigquery/query.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "time" "cloud.google.com/go/internal/trace" "cloud.google.com/go/internal/uid" @@ -138,6 +139,16 @@ type QueryConfig struct { // ConnectionProperties are optional key-values settings. ConnectionProperties []*ConnectionProperty + + // Sets a best-effort deadline on a specific job. If job execution exceeds this + // timeout, BigQuery may attempt to cancel this work automatically. + // + // This deadline cannot be adjusted or removed once the job is created. Consider + // using Job.Cancel in situations where you need more dynamic behavior. + // + // Experimental: this option is experimental and may be modified or removed in future versions, + // regardless of any other documented package stability guarantees. + JobTimeout time.Duration } func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) { @@ -209,11 +220,15 @@ func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) { } qconf.ConnectionProperties = bqcp } - return &bq.JobConfiguration{ + jc := &bq.JobConfiguration{ Labels: qc.Labels, DryRun: qc.DryRun, Query: qconf, - }, nil + } + if qc.JobTimeout > 0 { + jc.JobTimeoutMs = qc.JobTimeout.Milliseconds() + } + return jc, nil } func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) { @@ -221,6 +236,7 @@ func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) { qc := &QueryConfig{ Labels: q.Labels, DryRun: q.DryRun, + JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond, Q: qq.Query, CreateDisposition: TableCreateDisposition(qq.CreateDisposition), WriteDisposition: TableWriteDisposition(qq.WriteDisposition), @@ -417,6 +433,7 @@ func (q *Query) probeFastPath() (*bq.QueryRequest, error) { q.QueryConfig.Clustering != nil || q.QueryConfig.DestinationEncryptionConfig != nil || q.QueryConfig.SchemaUpdateOptions != nil || + q.QueryConfig.JobTimeout != 0 || // User has defined the jobID generation behavior q.JobIDConfig.JobID != "" { return nil, fmt.Errorf("QueryConfig incompatible with fastPath") diff --git a/bigquery/query_test.go b/bigquery/query_test.go index 3b292b77b9e5..98b96a88c568 100644 --- a/bigquery/query_test.go +++ b/bigquery/query_test.go @@ -515,6 +515,7 @@ func TestConfiguringQuery(t *testing.T) { } query.DestinationEncryptionConfig = &EncryptionConfig{KMSKeyName: "keyName"} query.SchemaUpdateOptions = []string{"ALLOW_FIELD_ADDITION"} + query.JobTimeout = time.Duration(5) * time.Second // Note: Other configuration fields are tested in other tests above. // A lot of that can be consolidated once Client.Copy is gone. @@ -534,6 +535,7 @@ func TestConfiguringQuery(t *testing.T) { DestinationEncryptionConfiguration: &bq.EncryptionConfiguration{KmsKeyName: "keyName"}, SchemaUpdateOptions: []string{"ALLOW_FIELD_ADDITION"}, }, + JobTimeoutMs: 5000, }, JobReference: &bq.JobReference{ JobId: "ajob",