Skip to content

Commit 37607b4

Browse files
authored
feat(bigquery): add support for user defined TVF (#4043)
* feat(bigquery): add support for user defined TVF Allows user to define table-valued functions using the BigQuery API.
1 parent 4c12b42 commit 37607b4

File tree

4 files changed

+111
-2
lines changed

4 files changed

+111
-2
lines changed

Diff for: bigquery/integration_test.go

+38
Original file line numberDiff line numberDiff line change
@@ -1291,6 +1291,44 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) {
12911291
it, [][]Value{{int64(10)}})
12921292
}
12931293

1294+
func TestIntegration_RoutineUserTVF(t *testing.T) {
1295+
if client == nil {
1296+
t.Skip("Integration tests skipped")
1297+
}
1298+
ctx := context.Background()
1299+
1300+
routineID := routineIDs.New()
1301+
routine := dataset.Routine(routineID)
1302+
inMeta := &RoutineMetadata{
1303+
Type: "TABLE_VALUED_FUNCTION",
1304+
Language: "SQL",
1305+
Arguments: []*RoutineArgument{
1306+
{Name: "filter",
1307+
DataType: &StandardSQLDataType{TypeKind: "INT64"},
1308+
}},
1309+
ReturnTableType: &StandardSQLTableType{
1310+
Columns: []*StandardSQLField{
1311+
{Name: "x", Type: &StandardSQLDataType{TypeKind: "INT64"}},
1312+
},
1313+
},
1314+
Body: "SELECT x FROM UNNEST([1,2,3]) x WHERE x = filter",
1315+
}
1316+
if err := routine.Create(ctx, inMeta); err != nil {
1317+
t.Fatalf("routine create: %v", err)
1318+
}
1319+
defer routine.Delete(ctx)
1320+
1321+
meta, err := routine.Metadata(ctx)
1322+
if err != nil {
1323+
t.Fatal(err)
1324+
}
1325+
1326+
// Now, compare the input meta to the output meta
1327+
if diff := testutil.Diff(inMeta, meta, cmpopts.IgnoreFields(RoutineMetadata{}, "CreationTime", "LastModifiedTime", "ETag")); diff != "" {
1328+
t.Errorf("routine metadata differs, got=-, want=+\n%s", diff)
1329+
}
1330+
}
1331+
12941332
func TestIntegration_InsertErrors(t *testing.T) {
12951333
// This test serves to verify streaming behavior in the face of oversized data.
12961334
// BigQuery will reject insertAll payloads that exceed a defined limit (10MB).

Diff for: bigquery/routine.go

+26-2
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ const (
144144
// RoutineMetadata represents details of a given BigQuery Routine.
145145
type RoutineMetadata struct {
146146
ETag string
147-
// Type indicates the type of routine, such as SCALAR_FUNCTION or PROCEDURE.
147+
// Type indicates the type of routine, such as SCALAR_FUNCTION, PROCEDURE,
148+
// or TABLE_VALUED_FUNCTION.
148149
Type string
149150
CreationTime time.Time
150151
Description string
@@ -156,6 +157,9 @@ type RoutineMetadata struct {
156157
// The list of arguments for the the routine.
157158
Arguments []*RoutineArgument
158159
ReturnType *StandardSQLDataType
160+
161+
// Set only if the routine type is TABLE_VALUED_FUNCTION.
162+
ReturnTableType *StandardSQLTableType
159163
// For javascript routines, this indicates the paths for imported libraries.
160164
ImportedLibraries []string
161165
// Body contains the routine's body.
@@ -184,7 +188,13 @@ func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) {
184188
return nil, err
185189
}
186190
r.ReturnType = rt
187-
191+
if rm.ReturnTableType != nil {
192+
tt, err := rm.ReturnTableType.toBQ()
193+
if err != nil {
194+
return nil, fmt.Errorf("couldn't convert return table type: %v", err)
195+
}
196+
r.ReturnTableType = tt
197+
}
188198
var args []*bq.Argument
189199
for _, v := range rm.Arguments {
190200
bqa, err := v.toBQ()
@@ -301,6 +311,7 @@ type RoutineMetadataToUpdate struct {
301311
Body optional.String
302312
ImportedLibraries []string
303313
ReturnType *StandardSQLDataType
314+
ReturnTableType *StandardSQLTableType
304315
}
305316

306317
func (rm *RoutineMetadataToUpdate) toBQ() (*bq.Routine, error) {
@@ -370,6 +381,14 @@ func (rm *RoutineMetadataToUpdate) toBQ() (*bq.Routine, error) {
370381
r.ReturnType = dt
371382
forceSend("ReturnType")
372383
}
384+
if rm.ReturnTableType != nil {
385+
tt, err := rm.ReturnTableType.toBQ()
386+
if err != nil {
387+
return nil, err
388+
}
389+
r.ReturnTableType = tt
390+
forceSend("ReturnTableType")
391+
}
373392
return r, nil
374393
}
375394

@@ -395,5 +414,10 @@ func bqToRoutineMetadata(r *bq.Routine) (*RoutineMetadata, error) {
395414
return nil, err
396415
}
397416
meta.ReturnType = ret
417+
tt, err := bqToStandardSQLTableType(r.ReturnTableType)
418+
if err != nil {
419+
return nil, err
420+
}
421+
meta.ReturnTableType = tt
398422
return meta, nil
399423
}

Diff for: bigquery/routine_test.go

+10
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ func TestRoutineTypeConversions(t *testing.T) {
8484
RoutineType: "type",
8585
Language: "lang",
8686
ReturnType: &bq.StandardSqlDataType{TypeKind: "INT64"},
87+
ReturnTableType: &bq.StandardSqlTableType{
88+
Columns: []*bq.StandardSqlField{
89+
{Name: "field", Type: &bq.StandardSqlDataType{TypeKind: "FLOAT64"}},
90+
},
91+
},
8792
},
8893
&RoutineMetadata{
8994
CreationTime: aTime,
@@ -95,6 +100,11 @@ func TestRoutineTypeConversions(t *testing.T) {
95100
Type: "type",
96101
Language: "lang",
97102
ReturnType: &StandardSQLDataType{TypeKind: "INT64"},
103+
ReturnTableType: &StandardSQLTableType{
104+
Columns: []*StandardSQLField{
105+
{Name: "field", Type: &StandardSQLDataType{TypeKind: "FLOAT64"}},
106+
},
107+
},
98108
}},
99109
{"body_and_libs", "FromRoutineMetadataToUpdate",
100110
&RoutineMetadataToUpdate{

Diff for: bigquery/standardsql.go

+37
Original file line numberDiff line numberDiff line change
@@ -175,3 +175,40 @@ func standardSQLStructFieldsToBQ(fields []*StandardSQLField) ([]*bq.StandardSqlF
175175
}
176176
return bqFields, nil
177177
}
178+
179+
// StandardSQLTableType models a table-like resource, which has a set of columns.
180+
type StandardSQLTableType struct {
181+
182+
// The columns of the table.
183+
Columns []*StandardSQLField
184+
}
185+
186+
func (sstt *StandardSQLTableType) toBQ() (*bq.StandardSqlTableType, error) {
187+
if sstt == nil {
188+
return nil, nil
189+
}
190+
out := &bq.StandardSqlTableType{}
191+
for k, v := range sstt.Columns {
192+
bq, err := v.toBQ()
193+
if err != nil {
194+
return nil, fmt.Errorf("error converting column %d: %v", k, err)
195+
}
196+
out.Columns = append(out.Columns, bq)
197+
}
198+
return out, nil
199+
}
200+
201+
func bqToStandardSQLTableType(in *bq.StandardSqlTableType) (*StandardSQLTableType, error) {
202+
if in == nil {
203+
return nil, nil
204+
}
205+
out := &StandardSQLTableType{}
206+
for k, v := range in.Columns {
207+
f, err := bqToStandardSQLField(v)
208+
if err != nil {
209+
return nil, fmt.Errorf("error converting column %d: %v", k, err)
210+
}
211+
out.Columns = append(out.Columns, f)
212+
}
213+
return out, nil
214+
}

0 commit comments

Comments
 (0)