Skip to content

Commit 99d5728

Browse files
authored
feat(bigquery): add more dml statistics to query statistics (#4405)
This adds the new DMLStats submessage, which indicates the number of rows inserted/updated/deleted by a DML statement. It also makes a change to test utility functions to pass back query statistics.
1 parent 9dc78e0 commit 99d5728

File tree

2 files changed

+58
-12
lines changed

2 files changed

+58
-12
lines changed

Diff for: bigquery/integration_test.go

+32-12
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ func TestIntegration_SnapshotAndRestore(t *testing.T) {
479479
FROM
480480
UNNEST(GENERATE_ARRAY(0,999))
481481
`, qualified)
482-
if err := runQueryJob(ctx, sql); err != nil {
482+
if _, err := runQueryJob(ctx, sql); err != nil {
483483
t.Fatalf("couldn't instantiate base table: %v", err)
484484
}
485485

@@ -872,7 +872,7 @@ func TestIntegration_DatasetUpdateAccess(t *testing.T) {
872872
sql := fmt.Sprintf(`
873873
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
874874
routine.FullyQualifiedName())
875-
if err := runQueryJob(ctx, sql); err != nil {
875+
if _, err := runQueryJob(ctx, sql); err != nil {
876876
t.Fatal(err)
877877
}
878878
defer routine.Delete(ctx)
@@ -1288,7 +1288,7 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) {
12881288
END`,
12891289
routine.FullyQualifiedName())
12901290

1291-
if err := runQueryJob(ctx, sql); err != nil {
1291+
if _, err := runQueryJob(ctx, sql); err != nil {
12921292
t.Fatal(err)
12931293
}
12941294
defer routine.Delete(ctx)
@@ -2013,7 +2013,8 @@ func TestIntegration_DML(t *testing.T) {
20132013
('b', [1], STRUCT<BOOL>(FALSE)),
20142014
('c', [2], STRUCT<BOOL>(TRUE))`,
20152015
table.DatasetID, table.TableID)
2016-
if err := runQueryJob(ctx, sql); err != nil {
2016+
stats, err := runQueryJob(ctx, sql)
2017+
if err != nil {
20172018
t.Fatal(err)
20182019
}
20192020
wantRows := [][]Value{
@@ -2022,11 +2023,23 @@ func TestIntegration_DML(t *testing.T) {
20222023
{"c", []Value{int64(2)}, []Value{true}},
20232024
}
20242025
checkRead(t, "DML", table.Read(ctx), wantRows)
2026+
if stats == nil {
2027+
t.Fatalf("no query stats")
2028+
}
2029+
if stats.DMLStats == nil {
2030+
t.Fatalf("no dml stats")
2031+
}
2032+
wantRowCount := int64(len(wantRows))
2033+
if stats.DMLStats.InsertedRowCount != wantRowCount {
2034+
t.Fatalf("dml stats mismatch. got %d inserted rows, want %d", stats.DMLStats.InsertedRowCount, wantRowCount)
2035+
}
20252036
}
20262037

20272038
// runQueryJob is useful for running queries where no row data is returned (DDL/DML).
2028-
func runQueryJob(ctx context.Context, sql string) error {
2029-
return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
2039+
func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) {
2040+
var stats *QueryStatistics
2041+
var err error
2042+
err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
20302043
job, err := client.Query(sql).Run(ctx)
20312044
if err != nil {
20322045
if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
@@ -2041,8 +2054,15 @@ func runQueryJob(ctx context.Context, sql string) error {
20412054
}
20422055
return false, err
20432056
}
2057+
status := job.LastStatus()
2058+
if status.Statistics != nil {
2059+
if qStats, ok := status.Statistics.Details.(*QueryStatistics); ok {
2060+
stats = qStats
2061+
}
2062+
}
20442063
return true, nil
20452064
})
2065+
return stats, err
20462066
}
20472067

20482068
func TestIntegration_TimeTypes(t *testing.T) {
@@ -2082,7 +2102,7 @@ func TestIntegration_TimeTypes(t *testing.T) {
20822102
"VALUES ('%s', '%s', '%s', '%s')",
20832103
table.DatasetID, table.TableID,
20842104
d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
2085-
if err := runQueryJob(ctx, query); err != nil {
2105+
if _, err := runQueryJob(ctx, query); err != nil {
20862106
t.Fatal(err)
20872107
}
20882108
wantRows = append(wantRows, wantRows[0])
@@ -2506,7 +2526,7 @@ func TestIntegration_ExtractExternal(t *testing.T) {
25062526
sql := fmt.Sprintf(`INSERT %s.%s (name, num)
25072527
VALUES ('a', 1), ('b', 2), ('c', 3)`,
25082528
table.DatasetID, table.TableID)
2509-
if err := runQueryJob(ctx, sql); err != nil {
2529+
if _, err := runQueryJob(ctx, sql); err != nil {
25102530
t.Fatal(err)
25112531
}
25122532
// Extract to a GCS object as CSV.
@@ -2932,7 +2952,7 @@ func TestIntegration_MaterializedViewLifecycle(t *testing.T) {
29322952
FROM
29332953
UNNEST(GENERATE_ARRAY(0,999))
29342954
`, qualified)
2935-
if err := runQueryJob(ctx, sql); err != nil {
2955+
if _, err := runQueryJob(ctx, sql); err != nil {
29362956
t.Fatalf("couldn't instantiate base table: %v", err)
29372957
}
29382958

@@ -3060,7 +3080,7 @@ func TestIntegration_ModelLifecycle(t *testing.T) {
30603080
UNION ALL
30613081
SELECT 'b' AS f1, 3.8 AS label
30623082
)`, modelRef)
3063-
if err := runQueryJob(ctx, sql); err != nil {
3083+
if _, err := runQueryJob(ctx, sql); err != nil {
30643084
t.Fatal(err)
30653085
}
30663086
defer model.Delete(ctx)
@@ -3243,7 +3263,7 @@ func TestIntegration_RoutineComplexTypes(t *testing.T) {
32433263
(SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
32443264
)`,
32453265
routine.FullyQualifiedName())
3246-
if err := runQueryJob(ctx, sql); err != nil {
3266+
if _, err := runQueryJob(ctx, sql); err != nil {
32473267
t.Fatal(err)
32483268
}
32493269
defer routine.Delete(ctx)
@@ -3303,7 +3323,7 @@ func TestIntegration_RoutineLifecycle(t *testing.T) {
33033323
sql := fmt.Sprintf(`
33043324
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
33053325
routine.FullyQualifiedName())
3306-
if err := runQueryJob(ctx, sql); err != nil {
3326+
if _, err := runQueryJob(ctx, sql); err != nil {
33073327
t.Fatal(err)
33083328
}
33093329
defer routine.Delete(ctx)

Diff for: bigquery/job.go

+26
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,10 @@ type QueryStatistics struct {
436436
// statements INSERT, UPDATE or DELETE.
437437
NumDMLAffectedRows int64
438438

439+
// DMLStats provides statistics about the row mutations performed by
440+
// DML statements.
441+
DMLStats *DMLStatistics
442+
439443
// Describes a timeline of job execution.
440444
Timeline []*QueryTimelineSample
441445

@@ -665,6 +669,27 @@ func bqToScriptStackFrame(bsf *bq.ScriptStackFrame) *ScriptStackFrame {
665669
}
666670
}
667671

672+
// DMLStatistics contains counts of row mutations triggered by a DML query statement.
673+
type DMLStatistics struct {
674+
// Rows added by the statement.
675+
InsertedRowCount int64
676+
// Rows removed by the statement.
677+
DeletedRowCount int64
678+
// Rows changed by the statement.
679+
UpdatedRowCount int64
680+
}
681+
682+
func bqToDMLStatistics(q *bq.DmlStatistics) *DMLStatistics {
683+
if q == nil {
684+
return nil
685+
}
686+
return &DMLStatistics{
687+
InsertedRowCount: q.InsertedRowCount,
688+
DeletedRowCount: q.DeletedRowCount,
689+
UpdatedRowCount: q.UpdatedRowCount,
690+
}
691+
}
692+
668693
func (*ExtractStatistics) implementsStatistics() {}
669694
func (*LoadStatistics) implementsStatistics() {}
670695
func (*QueryStatistics) implementsStatistics() {}
@@ -888,6 +913,7 @@ func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
888913
TotalBytesProcessed: s.Query.TotalBytesProcessed,
889914
TotalBytesProcessedAccuracy: s.Query.TotalBytesProcessedAccuracy,
890915
NumDMLAffectedRows: s.Query.NumDmlAffectedRows,
916+
DMLStats: bqToDMLStatistics(s.Query.DmlStats),
891917
QueryPlan: queryPlanFromProto(s.Query.QueryPlan),
892918
Schema: bqToSchema(s.Query.Schema),
893919
SlotMillis: s.Query.TotalSlotMs,

0 commit comments

Comments
 (0)