Skip to content

Commit e846dfd

Browse files
authored
feat(bigquery): add session and connection support (#4754)
Adds: * CreateSession to Query config * ConnectionProperties to specify connection keys/values in Query config * SessionInfo in JobStatistics to record session stats
1 parent 6861b30 commit e846dfd

File tree

4 files changed

+159
-15
lines changed

4 files changed

+159
-15
lines changed

Diff for: bigquery/integration_test.go

+60-15
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ func TestIntegration_SnapshotAndRestore(t *testing.T) {
479479
FROM
480480
UNNEST(GENERATE_ARRAY(0,999))
481481
`, qualified)
482-
if _, err := runQueryJob(ctx, sql); err != nil {
482+
if _, _, err := runQuerySQL(ctx, sql); err != nil {
483483
t.Fatalf("couldn't instantiate base table: %v", err)
484484
}
485485

@@ -872,7 +872,7 @@ func TestIntegration_DatasetUpdateAccess(t *testing.T) {
872872
sql := fmt.Sprintf(`
873873
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
874874
routine.FullyQualifiedName())
875-
if _, err := runQueryJob(ctx, sql); err != nil {
875+
if _, _, err := runQuerySQL(ctx, sql); err != nil {
876876
t.Fatal(err)
877877
}
878878
defer routine.Delete(ctx)
@@ -1288,7 +1288,7 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) {
12881288
END`,
12891289
routine.FullyQualifiedName())
12901290

1291-
if _, err := runQueryJob(ctx, sql); err != nil {
1291+
if _, _, err := runQuerySQL(ctx, sql); err != nil {
12921292
t.Fatal(err)
12931293
}
12941294
defer routine.Delete(ctx)
@@ -2014,7 +2014,7 @@ func TestIntegration_DML(t *testing.T) {
20142014
('b', [1], STRUCT<BOOL>(FALSE)),
20152015
('c', [2], STRUCT<BOOL>(TRUE))`,
20162016
table.DatasetID, table.TableID)
2017-
stats, err := runQueryJob(ctx, sql)
2017+
_, stats, err := runQuerySQL(ctx, sql)
20182018
if err != nil {
20192019
t.Fatal(err)
20202020
}
@@ -2036,12 +2036,18 @@ func TestIntegration_DML(t *testing.T) {
20362036
}
20372037
}
20382038

2039+
// runQuerySQL runs arbitrary SQL text.
2040+
func runQuerySQL(ctx context.Context, sql string) (*JobStatistics, *QueryStatistics, error) {
2041+
return runQueryJob(ctx, client.Query(sql))
2042+
}
2043+
20392044
// runQueryJob is useful for running queries where no row data is returned (DDL/DML).
2040-
func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) {
2041-
var stats *QueryStatistics
2045+
func runQueryJob(ctx context.Context, q *Query) (*JobStatistics, *QueryStatistics, error) {
2046+
var jobStats *JobStatistics
2047+
var queryStats *QueryStatistics
20422048
var err error
20432049
err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
2044-
job, err := client.Query(sql).Run(ctx)
2050+
job, err := q.Run(ctx)
20452051
if err != nil {
20462052
if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
20472053
return true, err // fail on 4xx
@@ -2057,13 +2063,14 @@ func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) {
20572063
}
20582064
status := job.LastStatus()
20592065
if status.Statistics != nil {
2066+
jobStats = status.Statistics
20602067
if qStats, ok := status.Statistics.Details.(*QueryStatistics); ok {
2061-
stats = qStats
2068+
queryStats = qStats
20622069
}
20632070
}
20642071
return true, nil
20652072
})
2066-
return stats, err
2073+
return jobStats, queryStats, err
20672074
}
20682075

20692076
func TestIntegration_TimeTypes(t *testing.T) {
@@ -2103,7 +2110,7 @@ func TestIntegration_TimeTypes(t *testing.T) {
21032110
"VALUES ('%s', '%s', '%s', '%s')",
21042111
table.DatasetID, table.TableID,
21052112
d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
2106-
if _, err := runQueryJob(ctx, query); err != nil {
2113+
if _, _, err := runQuerySQL(ctx, query); err != nil {
21072114
t.Fatal(err)
21082115
}
21092116
wantRows = append(wantRows, wantRows[0])
@@ -2275,6 +2282,44 @@ func TestIntegration_QueryExternalHivePartitioning(t *testing.T) {
22752282
checkReadAndTotalRows(t, "HiveQuery", it, [][]Value{{int64(50)}})
22762283
}
22772284

2285+
func TestIntegration_QuerySessionSupport(t *testing.T) {
2286+
if client == nil {
2287+
t.Skip("Integration tests skipped")
2288+
}
2289+
ctx := context.Background()
2290+
2291+
q := client.Query("CREATE TEMPORARY TABLE temptable AS SELECT 17 as foo")
2292+
q.CreateSession = true
2293+
jobStats, _, err := runQueryJob(ctx, q)
2294+
if err != nil {
2295+
t.Fatalf("error running CREATE TEMPORARY TABLE: %v", err)
2296+
}
2297+
if jobStats.SessionInfo == nil {
2298+
t.Fatalf("expected session info, was nil")
2299+
}
2300+
sessionID := jobStats.SessionInfo.SessionID
2301+
if len(sessionID) == 0 {
2302+
t.Errorf("expected non-empty sessionID")
2303+
}
2304+
2305+
q2 := client.Query("SELECT * FROM temptable")
2306+
q2.ConnectionProperties = []*ConnectionProperty{
2307+
{Key: "session_id", Value: sessionID},
2308+
}
2309+
jobStats, _, err = runQueryJob(ctx, q2)
2310+
if err != nil {
2311+
t.Errorf("error running SELECT: %v", err)
2312+
}
2313+
if jobStats.SessionInfo == nil {
2314+
t.Fatalf("expected sessionInfo in second query, was nil")
2315+
}
2316+
got := jobStats.SessionInfo.SessionID
2317+
if got != sessionID {
2318+
t.Errorf("second query mismatched session ID, got %s want %s", got, sessionID)
2319+
}
2320+
2321+
}
2322+
22782323
func TestIntegration_QueryParameters(t *testing.T) {
22792324
if client == nil {
22802325
t.Skip("Integration tests skipped")
@@ -2560,7 +2605,7 @@ func TestIntegration_ExtractExternal(t *testing.T) {
25602605
sql := fmt.Sprintf(`INSERT %s.%s (name, num)
25612606
VALUES ('a', 1), ('b', 2), ('c', 3)`,
25622607
table.DatasetID, table.TableID)
2563-
if _, err := runQueryJob(ctx, sql); err != nil {
2608+
if _, _, err := runQuerySQL(ctx, sql); err != nil {
25642609
t.Fatal(err)
25652610
}
25662611
// Extract to a GCS object as CSV.
@@ -2986,7 +3031,7 @@ func TestIntegration_MaterializedViewLifecycle(t *testing.T) {
29863031
FROM
29873032
UNNEST(GENERATE_ARRAY(0,999))
29883033
`, qualified)
2989-
if _, err := runQueryJob(ctx, sql); err != nil {
3034+
if _, _, err := runQuerySQL(ctx, sql); err != nil {
29903035
t.Fatalf("couldn't instantiate base table: %v", err)
29913036
}
29923037

@@ -3114,7 +3159,7 @@ func TestIntegration_ModelLifecycle(t *testing.T) {
31143159
UNION ALL
31153160
SELECT 'b' AS f1, 3.8 AS label
31163161
)`, modelRef)
3117-
if _, err := runQueryJob(ctx, sql); err != nil {
3162+
if _, _, err := runQuerySQL(ctx, sql); err != nil {
31183163
t.Fatal(err)
31193164
}
31203165
defer model.Delete(ctx)
@@ -3297,7 +3342,7 @@ func TestIntegration_RoutineComplexTypes(t *testing.T) {
32973342
(SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
32983343
)`,
32993344
routine.FullyQualifiedName())
3300-
if _, err := runQueryJob(ctx, sql); err != nil {
3345+
if _, _, err := runQuerySQL(ctx, sql); err != nil {
33013346
t.Fatal(err)
33023347
}
33033348
defer routine.Delete(ctx)
@@ -3357,7 +3402,7 @@ func TestIntegration_RoutineLifecycle(t *testing.T) {
33573402
sql := fmt.Sprintf(`
33583403
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
33593404
routine.FullyQualifiedName())
3360-
if _, err := runQueryJob(ctx, sql); err != nil {
3405+
if _, _, err := runQuerySQL(ctx, sql); err != nil {
33613406
t.Fatal(err)
33623407
}
33633408
defer routine.Delete(ctx)

Diff for: bigquery/job.go

+27
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,9 @@ type JobStatistics struct {
375375

376376
// TransactionInfo indicates the transaction ID associated with the job, if any.
377377
TransactionInfo *TransactionInfo
378+
379+
// SessionInfo contains information about the session if this job is part of one.
380+
SessionInfo *SessionInfo
378381
}
379382

380383
// Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics.
@@ -884,6 +887,7 @@ func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
884887
ScriptStatistics: bqToScriptStatistics(s.ScriptStatistics),
885888
ReservationUsage: bqToReservationUsage(s.ReservationUsage),
886889
TransactionInfo: bqToTransactionInfo(s.TransactionInfo),
890+
SessionInfo: bqToSessionInfo(s.SessionInfo),
887891
}
888892
switch {
889893
case s.Extract != nil:
@@ -1002,3 +1006,26 @@ func bqToTransactionInfo(in *bq.TransactionInfo) *TransactionInfo {
10021006
TransactionID: in.TransactionId,
10031007
}
10041008
}
1009+
1010+
// SessionInfo contains information about a session associated with a job.
1011+
type SessionInfo struct {
1012+
SessionID string
1013+
}
1014+
1015+
func (s *SessionInfo) toBQ() *bq.SessionInfo {
1016+
if s == nil {
1017+
return nil
1018+
}
1019+
return &bq.SessionInfo{
1020+
SessionId: s.SessionID,
1021+
}
1022+
}
1023+
1024+
func bqToSessionInfo(in *bq.SessionInfo) *SessionInfo {
1025+
if in == nil {
1026+
return nil
1027+
}
1028+
return &SessionInfo{
1029+
SessionID: in.SessionId,
1030+
}
1031+
}

Diff for: bigquery/query.go

+51
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ type QueryConfig struct {
132132
// Allows the schema of the destination table to be updated as a side effect of
133133
// the query job.
134134
SchemaUpdateOptions []string
135+
136+
// CreateSession will trigger creation of a new session when true.
137+
CreateSession bool
138+
139+
// ConnectionProperties are optional key-values settings.
140+
ConnectionProperties []*ConnectionProperty
135141
}
136142

137143
func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
@@ -147,6 +153,7 @@ func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
147153
Clustering: qc.Clustering.toBQ(),
148154
DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(),
149155
SchemaUpdateOptions: qc.SchemaUpdateOptions,
156+
CreateSession: qc.CreateSession,
150157
}
151158
if len(qc.TableDefinitions) > 0 {
152159
qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration)
@@ -195,6 +202,13 @@ func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
195202
}
196203
qconf.QueryParameters = append(qconf.QueryParameters, qp)
197204
}
205+
if len(qc.ConnectionProperties) > 0 {
206+
bqcp := make([]*bq.ConnectionProperty, len(qc.ConnectionProperties))
207+
for k, v := range qc.ConnectionProperties {
208+
bqcp[k] = v.toBQ()
209+
}
210+
qconf.ConnectionProperties = bqcp
211+
}
198212
return &bq.JobConfiguration{
199213
Labels: qc.Labels,
200214
DryRun: qc.DryRun,
@@ -219,6 +233,7 @@ func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
219233
Clustering: bqToClustering(qq.Clustering),
220234
DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration),
221235
SchemaUpdateOptions: qq.SchemaUpdateOptions,
236+
CreateSession: qq.CreateSession,
222237
}
223238
qc.UseStandardSQL = !qc.UseLegacySQL
224239

@@ -255,6 +270,13 @@ func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
255270
}
256271
qc.Parameters = append(qc.Parameters, p)
257272
}
273+
if len(qq.ConnectionProperties) > 0 {
274+
props := make([]*ConnectionProperty, len(qq.ConnectionProperties))
275+
for k, v := range qq.ConnectionProperties {
276+
props[k] = bqToConnectionProperty(v)
277+
}
278+
qc.ConnectionProperties = props
279+
}
258280
return qc, nil
259281
}
260282

@@ -402,6 +424,7 @@ func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
402424
pfalse := false
403425
qRequest := &bq.QueryRequest{
404426
Query: q.QueryConfig.Q,
427+
CreateSession: q.CreateSession,
405428
Location: q.Location,
406429
UseLegacySql: &pfalse,
407430
MaximumBytesBilled: q.QueryConfig.MaxBytesBilled,
@@ -427,3 +450,31 @@ func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
427450
}
428451
return qRequest, nil
429452
}
453+
454+
// ConnectionProperty represents a single key and value pair that can be sent alongside a query request.
455+
type ConnectionProperty struct {
456+
// Name of the connection property to set.
457+
Key string
458+
// Value of the connection property.
459+
Value string
460+
}
461+
462+
func (cp *ConnectionProperty) toBQ() *bq.ConnectionProperty {
463+
if cp == nil {
464+
return nil
465+
}
466+
return &bq.ConnectionProperty{
467+
Key: cp.Key,
468+
Value: cp.Value,
469+
}
470+
}
471+
472+
func bqToConnectionProperty(in *bq.ConnectionProperty) *ConnectionProperty {
473+
if in == nil {
474+
return nil
475+
}
476+
return &ConnectionProperty{
477+
Key: in.Key,
478+
Value: in.Value,
479+
}
480+
}

Diff for: bigquery/query_test.go

+21
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,27 @@ func TestQuery(t *testing.T) {
323323
return j
324324
}(),
325325
},
326+
{
327+
dst: c.Dataset("dataset-id").Table("table-id"),
328+
src: &QueryConfig{
329+
Q: "query string",
330+
DefaultProjectID: "def-project-id",
331+
DefaultDatasetID: "def-dataset-id",
332+
ConnectionProperties: []*ConnectionProperty{
333+
{Key: "key-a", Value: "value-a"},
334+
{Key: "key-b", Value: "value-b"},
335+
},
336+
},
337+
want: func() *bq.Job {
338+
j := defaultQueryJob()
339+
j.Configuration.Query.ForceSendFields = nil
340+
j.Configuration.Query.ConnectionProperties = []*bq.ConnectionProperty{
341+
{Key: "key-a", Value: "value-a"},
342+
{Key: "key-b", Value: "value-b"},
343+
}
344+
return j
345+
}(),
346+
},
326347
}
327348
for i, tc := range testCases {
328349
query := c.Query("")

0 commit comments

Comments
 (0)