Skip to content

Commit

Permalink
feat(bigquery): add job timeout support (#5707)
Browse files Browse the repository at this point in the history
This feature allows callers to configure a job timeout.  If a BigQuery
job runs longer than this duration, BigQuery may cancel the job.
  • Loading branch information
shollyman committed Mar 1, 2022
1 parent a34503b commit 868363c
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 2 deletions.
13 changes: 13 additions & 0 deletions bigquery/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package bigquery

import (
"context"
"time"

bq "google.golang.org/api/bigquery/v2"
)
Expand Down Expand Up @@ -58,6 +59,16 @@ type CopyConfig struct {
// One of the supported operation types when executing a Table Copy jobs. By default this
// copies tables, but can also be set to perform snapshot or restore operations.
OperationType TableCopyOperationType

// Sets a best-effort deadline on a specific job. If job execution exceeds this
// timeout, BigQuery may attempt to cancel this work automatically.
//
// This deadline cannot be adjusted or removed once the job is created. Consider
// using Job.Cancel in situations where you need more dynamic behavior.
//
// Experimental: this option is experimental and may be modified or removed in future versions,
// regardless of any other documented package stability guarantees.
JobTimeout time.Duration
}

func (c *CopyConfig) toBQ() *bq.JobConfiguration {
Expand All @@ -75,6 +86,7 @@ func (c *CopyConfig) toBQ() *bq.JobConfiguration {
SourceTables: ts,
OperationType: string(c.OperationType),
},
JobTimeoutMs: c.JobTimeout.Milliseconds(),
}
}

Expand All @@ -86,6 +98,7 @@ func bqToCopyConfig(q *bq.JobConfiguration, c *Client) *CopyConfig {
Dst: bqToTable(q.Copy.DestinationTable, c),
DestinationEncryptionConfig: bqToEncryptionConfig(q.Copy.DestinationEncryptionConfiguration),
OperationType: TableCopyOperationType(q.Copy.OperationType),
JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond,
}
for _, t := range q.Copy.SourceTables {
cc.Srcs = append(cc.Srcs, bqToTable(t, c))
Expand Down
3 changes: 3 additions & 0 deletions bigquery/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package bigquery

import (
"testing"
"time"

"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -152,10 +153,12 @@ func TestCopy(t *testing.T) {
},
config: CopyConfig{
OperationType: SnapshotOperation,
JobTimeout: 6 * time.Second,
},
want: func() *bq.Job {
j := defaultCopyJob()
j.Configuration.Copy.OperationType = "SNAPSHOT"
j.Configuration.JobTimeoutMs = 6000
return j
}(),
},
Expand Down
13 changes: 13 additions & 0 deletions bigquery/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package bigquery

import (
"context"
"time"

"cloud.google.com/go/internal/trace"
bq "google.golang.org/api/bigquery/v2"
Expand Down Expand Up @@ -46,6 +47,16 @@ type ExtractConfig struct {
// an integer column annotated with the appropriate timestamp-micros/millis annotation
// in the resulting Avro files.
UseAvroLogicalTypes bool

// Sets a best-effort deadline on a specific job. If job execution exceeds this
// timeout, BigQuery may attempt to cancel this work automatically.
//
// This deadline cannot be adjusted or removed once the job is created. Consider
// using Job.Cancel in situations where you need more dynamic behavior.
//
// Experimental: this option is experimental and may be modified or removed in future versions,
// regardless of any other documented package stability guarantees.
JobTimeout time.Duration
}

func (e *ExtractConfig) toBQ() *bq.JobConfiguration {
Expand All @@ -65,6 +76,7 @@ func (e *ExtractConfig) toBQ() *bq.JobConfiguration {
PrintHeader: printHeader,
UseAvroLogicalTypes: e.UseAvroLogicalTypes,
},
JobTimeoutMs: e.JobTimeout.Milliseconds(),
}
if e.Src != nil {
cfg.Extract.SourceTable = e.Src.toBQ()
Expand Down Expand Up @@ -93,6 +105,7 @@ func bqToExtractConfig(q *bq.JobConfiguration, c *Client) *ExtractConfig {
Src: bqToTable(qe.SourceTable, c),
SrcModel: bqToModel(qe.SourceModel, c),
UseAvroLogicalTypes: qe.UseAvroLogicalTypes,
JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond,
}
}

Expand Down
3 changes: 3 additions & 0 deletions bigquery/extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package bigquery

import (
"testing"
"time"

"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -83,10 +84,12 @@ func TestExtract(t *testing.T) {
config: ExtractConfig{
DisableHeader: true,
Labels: map[string]string{"a": "b"},
JobTimeout: 8 * time.Second,
},
want: func() *bq.Job {
j := defaultExtractJob()
j.Configuration.Labels = map[string]string{"a": "b"}
j.Configuration.JobTimeoutMs = 8000
f := false
j.Configuration.Extract.PrintHeader = &f
return j
Expand Down
15 changes: 15 additions & 0 deletions bigquery/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package bigquery
import (
"context"
"io"
"time"

"cloud.google.com/go/internal/trace"
bq "google.golang.org/api/bigquery/v2"
Expand Down Expand Up @@ -77,6 +78,16 @@ type LoadConfig struct {
//
// StringTargetType supports all precision and scale values.
DecimalTargetTypes []DecimalTargetType

// Sets a best-effort deadline on a specific job. If job execution exceeds this
// timeout, BigQuery may attempt to cancel this work automatically.
//
// This deadline cannot be adjusted or removed once the job is created. Consider
// using Job.Cancel in situations where you need more dynamic behavior.
//
// Experimental: this option is experimental and may be modified or removed in future versions,
// regardless of any other documented package stability guarantees.
JobTimeout time.Duration
}

func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
Expand All @@ -95,6 +106,7 @@ func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
ProjectionFields: l.ProjectionFields,
HivePartitioningOptions: l.HivePartitioningOptions.toBQ(),
},
JobTimeoutMs: l.JobTimeout.Milliseconds(),
}
for _, v := range l.DecimalTargetTypes {
config.Load.DecimalTargetTypes = append(config.Load.DecimalTargetTypes, string(v))
Expand All @@ -118,6 +130,9 @@ func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig {
ProjectionFields: q.Load.ProjectionFields,
HivePartitioningOptions: bqToHivePartitioningOptions(q.Load.HivePartitioningOptions),
}
if q.JobTimeoutMs > 0 {
lc.JobTimeout = time.Duration(q.JobTimeoutMs) * time.Millisecond
}
for _, v := range q.Load.DecimalTargetTypes {
lc.DecimalTargetTypes = append(lc.DecimalTargetTypes, DecimalTargetType(v))
}
Expand Down
4 changes: 4 additions & 0 deletions bigquery/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,16 @@ func TestLoad(t *testing.T) {
g.IgnoreUnknownValues = true
return g
}(),
config: LoadConfig{
JobTimeout: 4 * time.Second,
},
want: func() *bq.Job {
j := defaultLoadJob()
j.Configuration.Load.MaxBadRecords = 1
j.Configuration.Load.AllowJaggedRows = true
j.Configuration.Load.AllowQuotedNewlines = true
j.Configuration.Load.IgnoreUnknownValues = true
j.Configuration.JobTimeoutMs = 4000
return j
}(),
},
Expand Down
21 changes: 19 additions & 2 deletions bigquery/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"time"

"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/uid"
Expand Down Expand Up @@ -138,6 +139,16 @@ type QueryConfig struct {

// ConnectionProperties are optional key-values settings.
ConnectionProperties []*ConnectionProperty

// Sets a best-effort deadline on a specific job. If job execution exceeds this
// timeout, BigQuery may attempt to cancel this work automatically.
//
// This deadline cannot be adjusted or removed once the job is created. Consider
// using Job.Cancel in situations where you need more dynamic behavior.
//
// Experimental: this option is experimental and may be modified or removed in future versions,
// regardless of any other documented package stability guarantees.
JobTimeout time.Duration
}

func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
Expand Down Expand Up @@ -209,18 +220,23 @@ func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
}
qconf.ConnectionProperties = bqcp
}
return &bq.JobConfiguration{
jc := &bq.JobConfiguration{
Labels: qc.Labels,
DryRun: qc.DryRun,
Query: qconf,
}, nil
}
if qc.JobTimeout > 0 {
jc.JobTimeoutMs = qc.JobTimeout.Milliseconds()
}
return jc, nil
}

func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
qq := q.Query
qc := &QueryConfig{
Labels: q.Labels,
DryRun: q.DryRun,
JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond,
Q: qq.Query,
CreateDisposition: TableCreateDisposition(qq.CreateDisposition),
WriteDisposition: TableWriteDisposition(qq.WriteDisposition),
Expand Down Expand Up @@ -417,6 +433,7 @@ func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
q.QueryConfig.Clustering != nil ||
q.QueryConfig.DestinationEncryptionConfig != nil ||
q.QueryConfig.SchemaUpdateOptions != nil ||
q.QueryConfig.JobTimeout != 0 ||
// User has defined the jobID generation behavior
q.JobIDConfig.JobID != "" {
return nil, fmt.Errorf("QueryConfig incompatible with fastPath")
Expand Down
2 changes: 2 additions & 0 deletions bigquery/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ func TestConfiguringQuery(t *testing.T) {
}
query.DestinationEncryptionConfig = &EncryptionConfig{KMSKeyName: "keyName"}
query.SchemaUpdateOptions = []string{"ALLOW_FIELD_ADDITION"}
query.JobTimeout = time.Duration(5) * time.Second

// Note: Other configuration fields are tested in other tests above.
// A lot of that can be consolidated once Client.Copy is gone.
Expand All @@ -534,6 +535,7 @@ func TestConfiguringQuery(t *testing.T) {
DestinationEncryptionConfiguration: &bq.EncryptionConfiguration{KmsKeyName: "keyName"},
SchemaUpdateOptions: []string{"ALLOW_FIELD_ADDITION"},
},
JobTimeoutMs: 5000,
},
JobReference: &bq.JobReference{
JobId: "ajob",
Expand Down

0 comments on commit 868363c

Please sign in to comment.