Skip to content

Commit

Permalink
[BEAM-13806] Add x-lang BigQuery IO integration test to Go SDK. (#16818)
Browse files Browse the repository at this point in the history
* [BEAM-13806] Add x-lang BigQuery IO integration test to Go SDK.

Also includes piping in flags for BigQuery IO through integration test script, and a small file for creating bigquery tables that expire after a day.

* [BEAM-13806] Splitting BigQuery IT into read and write pipelines.

Splits the integration test into two pipelines to run sequentially. Also drops table after a successful test and logs table names.

* Fixup: Fix gradle build and undo VR script changes.

* Fixup: Add Query test and fix deterministic random element generation.

CreateRows wasn't creating the same elements in both read and write pipelines after splitting the two pipelines. Adjusted it to use a consistent seed in all pipelines.

* Fixup: Avoiding inline functions

* Workaround to coder issue, plus some debugging code

* Polishing workaround with documentation and removing debug prints.

* Move pipeline code to test file

* Split Query test from non-query test

Co-authored-by: Robert Burke <lostluck@users.noreply.github.com>
  • Loading branch information
youngoli and lostluck committed Jun 14, 2022
1 parent b426d34 commit 080f54a
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 0 deletions.
7 changes: 7 additions & 0 deletions sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,15 @@ func FromTable(table string) readOption {
// FromQuery is a Read option that specifies a query to use for reading from BigQuery. Uses the
// BigQuery Standard SQL dialect.
//
// Important: When reading from a query, the schema of any source tables is not used and the read
// transform cannot detect which elements are Required, therefore every field in the output type
// will be a pointer (including fields within inner structs).
//
// For more details see in the Java SDK:
// org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Read.fromQuery(java.lang.String)
//
// BUG(https://github.com/apache/beam/issues/21784): Query read outputs currently cannot be named
// struct types. See link for workaround.
func FromQuery(query string) readOption {
return func(rc *readConfig) {
rc.cfg.Query = &query
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,22 @@ ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts ->
dependsOn ":sdks:java:io:expansion-service:build"
dependsOn ":sdks:java:extensions:schemaio-expansion-service:build"
dependsOn ":sdks:java:io:debezium:expansion-service:build"
dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build"
dependsOn ":sdks:java:testing:kafka-service:buildTestKafkaServiceJar"

doLast {
def ioExpJar = project(":sdks:java:io:expansion-service").shadowJar.archivePath
def schemaIoExpJar = project(":sdks:java:extensions:schemaio-expansion-service").shadowJar.archivePath
def debeziumIoExpJar = project(":sdks:java:io:debezium:expansion-service").shadowJar.archivePath
def gcpIoExpJar = project(":sdks:java:io:google-cloud-platform:expansion-service").shadowJar.archivePath
def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--kafka_jar=${kafkaJar}",
"--expansion_jar=io:${ioExpJar}",
"--expansion_jar=schemaio:${schemaIoExpJar}",
"--expansion_jar=debeziumio:${debeziumIoExpJar}",
"--expansion_jar=gcpio:${gcpIoExpJar}",
"--bq_dataset=apache-beam-testing.beam_bigquery_io_test_temp",
]
pipelineOptions.addAll(pipelineOpts)
def options = [
Expand Down
5 changes: 5 additions & 0 deletions sdks/go/test/integration/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ var (
"Sets an auto-shutdown timeout to the Kafka cluster. "+
"Requires the timeout command to be present in Path, unless the value is set to \"\".")

// BigQueryDataset is the name of the dataset to create tables in for
// BigQuery integration tests.
BigQueryDataset = flag.String("bq_dataset", "",
"Name of the dataset to create tables in for BigQuery tests.")

// ExpansionJars contains elements in the form "label:jar" describing jar
// filepaths for expansion services to use in integration tests, and the
// corresponding labels. Once provided through this flag, those jars can
Expand Down
11 changes: 11 additions & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var directFilters = []string{
// The direct runner does not yet support cross-language.
"TestXLang.*",
"TestKafkaIO.*",
"TestBigQueryIO.*",
"TestDebeziumIO_BasicRead",
"TestJDBCIO_BasicReadWrite",
"TestJDBCIO_PostgresReadWrite",
Expand Down Expand Up @@ -93,6 +94,8 @@ var portableFilters = []string{
"TestPanes",
// TODO(BEAM-12797): Python portable runner times out on Kafka reads.
"TestKafkaIO.*",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
// The portable runner does not support self-checkpointing
"TestCheckpointing",
// The portable runner does not support pipeline drain for SDF.
Expand All @@ -107,6 +110,8 @@ var flinkFilters = []string{
// TODO(BEAM-12815): Test fails on post commits: "Insufficient number of network buffers".
"TestXLang_Multi",
"TestDebeziumIO_BasicRead",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
// The number of produced outputs in AfterSynchronizedProcessingTime varies in different runs.
"TestTriggerAfterSynchronizedProcessingTime",
// The flink runner does not support pipeline drain for SDF.
Expand All @@ -126,6 +131,8 @@ var samzaFilters = []string{
"TestPanes",
// TODO(BEAM-13006): Samza doesn't yet support post job metrics, used by WordCount
"TestWordCount.*",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
// The Samza runner does not support self-checkpointing
"TestCheckpointing",
// The samza runner does not support pipeline drain for SDF.
Expand All @@ -146,6 +153,8 @@ var sparkFilters = []string{
"TestPanes",
// [BEAM-13921]: Spark doesn't support side inputs to executable stages
"TestDebeziumIO_BasicRead",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
// The spark runner does not support self-checkpointing
"TestCheckpointing",
// The spark runner does not support pipeline drain for SDF.
Expand Down Expand Up @@ -174,6 +183,8 @@ var dataflowFilters = []string{
// Dataflow does not automatically terminate the TestCheckpointing pipeline when
// complete.
"TestCheckpointing",
// TODO(21761): This test needs to provide GCP project to expansion service.
"TestBigQueryIO_BasicWriteQueryRead",
// Dataflow does not drain jobs by itself.
"TestDrain",
}
Expand Down
17 changes: 17 additions & 0 deletions sdks/go/test/integration/io/xlang/bigquery/bigquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package bigquery contains integration tests for cross-language BigQuery IO transforms.
package bigquery
270 changes: 270 additions & 0 deletions sdks/go/test/integration/io/xlang/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bigquery

import (
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"log"
"math/rand"
"reflect"
"strings"
"testing"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
)

func init() {
register.DoFn2x0[[]byte, func(TestRow)](&CreateTestRowsFn{})
register.Emitter1[TestRow]()
// TODO(https://github.com/apache/beam/issues/21789): Uncomment once this register no longer panics.
//register.Function1x1(castFn)
}

var expansionAddr string // Populate with expansion address labelled "gcpio".

func checkFlags(t *testing.T) {
if *integration.BigQueryDataset == "" {
t.Skip("No BigQuery dataset provided.")
}
}

const (
// A text to shuffle to get random words.
text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Maecenas eget nulla nec " +
"velit hendrerit placerat. Donec eu odio ultricies, fermentum arcu at, mollis lectus. " +
"Vestibulum porttitor pharetra sem vitae feugiat. Mauris facilisis neque in mauris " +
"feugiat rhoncus. Donec eu ipsum at nibh lobortis euismod. Nam at hendrerit felis. " +
"Vivamus et orci ex. Nam dui nisl, rutrum ac pretium eget, vehicula in tortor. Class " +
"aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. " +
"Phasellus ante lorem, pharetra blandit dapibus et, tempus nec purus. Maecenas in " +
"posuere sem, vel pharetra nisl. Pellentesque habitant morbi tristique senectus et netus " +
"et malesuada fames ac turpis egestas. Donec nec facilisis ex. Praesent euismod commodo " +
"efficitur. Fusce in nisi nunc."
// Number of random elements to create for test. Must be less than number of words in text.
inputSize = 50
)

// TestRow is a sample row to write and read from that is expected to contain enough deterministic
// and random data in different data types to provide a reasonable signal that reading and writing
// works at a basic level.
type TestRow struct {
Counter int64 `beam:"counter"` // A deterministic counter, increments for each row generated.
Rand_data RandData `beam:"rand_data"` // An inner struct containing randomized data.
}

func shuffleText() []string {
words := strings.Fields(text)
rand.Shuffle(len(words), func(i, j int) { words[i], words[j] = words[j], words[i] })
return words
}

// RandData is a struct of various types of random data.
type RandData struct {
Flip bool `beam:"flip"` // Flip is a bool with a random chance of either result (a coin flip).
Num int64 `beam:"num"` // Num is a random int64.
Word string `beam:"word"` // Word is a randomly selected word from a sample text.
}

// ddlSchema is a string for BigQuery data definition language that corresponds to TestRow.
const ddlTestRowSchema = "counter INT64 NOT NULL, " +
"rand_data STRUCT<" +
"flip BOOL NOT NULL," +
"num INT64 NOT NULL," +
"word STRING NOT NULL" +
"> NOT NULL"

// CreateTestRowsFn is a DoFn that creates randomized TestRows based on a seed.
type CreateTestRowsFn struct {
seed int64
}

// ProcessElement creates a number of TestRows, populating the randomized data.
func (fn *CreateTestRowsFn) ProcessElement(_ []byte, emit func(TestRow)) {
rand.Seed(fn.seed)
words := shuffleText()
for i := 0; i < inputSize; i++ {
emit(TestRow{
Counter: int64(i),
Rand_data: RandData{
Flip: rand.Int63n(2) != 0,
Num: rand.Int63(),
Word: words[i],
},
})
}
}

// WritePipeline creates a pipeline that writes elements created by createFn into a BigQuery table.
func WritePipeline(expansionAddr, table string, createFn interface{}) *beam.Pipeline {
p := beam.NewPipeline()
s := p.Root()

// Generate elements and write to table.
rows := beam.ParDo(s, createFn, beam.Impulse(s))
bigqueryio.Write(s, table, rows,
bigqueryio.CreateDisposition(bigqueryio.CreateNever),
bigqueryio.WriteExpansionAddr(expansionAddr))

return p
}

// ReadPipeline creates a pipeline that reads elements directly from a BigQuery table and asserts
// that they match elements created by createFn.
func ReadPipeline(expansionAddr, table string, createFn interface{}) *beam.Pipeline {
p := beam.NewPipeline()
s := p.Root()

// Read from table and compare to generated elements.
rows := beam.ParDo(s, createFn, beam.Impulse(s))
inType := reflect.TypeOf((*TestRow)(nil)).Elem()
readRows := bigqueryio.Read(s, inType,
bigqueryio.FromTable(table),
bigqueryio.ReadExpansionAddr(expansionAddr))
passert.Equals(s, readRows, rows)

return p
}

// TestRowPtrs is equivalent to TestRow but all fields are pointers, meant to be used when reading
// via query.
//
// TODO(https://github.com/apache/beam/issues/21784): Change back to a named struct once resolved.
type TestRowPtrs = struct {
Counter *int64 `beam:"counter"`
Rand_data *RandDataPtrs `beam:"rand_data"`
}

// RandDataPtrs is equivalent to RandData but all fields are pointers, meant to be used when reading
// via query.
//
// TODO(https://github.com/apache/beam/issues/21784): Change back to a named struct once resolved.
type RandDataPtrs = struct {
Flip *bool `beam:"flip"`
Num *int64 `beam:"num"`
Word *string `beam:"word"`
}

// castFn converts the result of the query which has pointer fields, into the original TestRow
// type that was written to BigQuery.
func castFn(elm TestRowPtrs) TestRow {
return TestRow{
Counter: *elm.Counter,
Rand_data: RandData{
Flip: *elm.Rand_data.Flip,
Num: *elm.Rand_data.Num,
Word: *elm.Rand_data.Word,
},
}
}

// ReadPipeline creates a pipeline that reads elements from a BigQuery table via a SQL Query, and
// asserts that they match elements created by createFn.
func ReadFromQueryPipeline(expansionAddr, table string, createFn interface{}) *beam.Pipeline {
p := beam.NewPipeline()
s := p.Root()

// Read from table and compare to generated elements.
rows := beam.ParDo(s, createFn, beam.Impulse(s))
inType := reflect.TypeOf((*TestRowPtrs)(nil)).Elem()
query := fmt.Sprintf("SELECT * FROM `%s`", table)
readRows := bigqueryio.Read(s, inType,
bigqueryio.FromQuery(query),
bigqueryio.ReadExpansionAddr(expansionAddr))
castRows := beam.ParDo(s, castFn, readRows)
passert.Equals(s, castRows, rows)

return p
}

// TestBigQueryIO_BasicWriteRead runs a pipeline that generates semi-randomized elements, writes
// them to a BigQuery table and then reads from that table, and checks that the result matches the
// original inputs. This requires a pre-existing table to be created.
func TestBigQueryIO_BasicWriteRead(t *testing.T) {
integration.CheckFilters(t)
checkFlags(t)

// Create a table before running the pipeline
table, err := newTempTable(*integration.BigQueryDataset, "go_bqio_it", ddlTestRowSchema)
if err != nil {
t.Fatalf("error creating BigQuery table: %v", err)
}
t.Logf("Created BigQuery table %v", table)

createTestRows := &CreateTestRowsFn{seed: time.Now().UnixNano()}
write := WritePipeline(expansionAddr, table, createTestRows)
ptest.RunAndValidate(t, write)
read := ReadPipeline(expansionAddr, table, createTestRows)
ptest.RunAndValidate(t, read)

t.Logf("Deleting BigQuery table %v", table)
err = deleteTempTable(table)
if err != nil {
t.Logf("Error deleting BigQuery table: %v", err)
}
}

// TestBigQueryIO_BasicWriteQueryRead runs a pipeline that generates semi-randomized elements,
// writes them to a BigQuery table and then reads from that table, and checks that the result
// matches the original inputs. This requires a pre-existing table to be created.
//
// This test reads from a Bigquery SQL query, instead of directly from a table.
func TestBigQueryIO_BasicWriteQueryRead(t *testing.T) {
integration.CheckFilters(t)
checkFlags(t)

// Create a table before running the pipeline
table, err := newTempTable(*integration.BigQueryDataset, "go_bqio_it", ddlTestRowSchema)
if err != nil {
t.Fatalf("error creating BigQuery table: %v", err)
}
t.Logf("Created BigQuery table %v", table)

createTestRows := &CreateTestRowsFn{seed: time.Now().UnixNano()}
write := WritePipeline(expansionAddr, table, createTestRows)
ptest.RunAndValidate(t, write)
readQuery := ReadFromQueryPipeline(expansionAddr, table, createTestRows)
ptest.RunAndValidate(t, readQuery)

t.Logf("Deleting BigQuery table %v", table)
err = deleteTempTable(table)
if err != nil {
t.Logf("Error deleting BigQuery table: %v", err)
}
}

func TestMain(m *testing.M) {
flag.Parse()
beam.Init()

services := integration.NewExpansionServices()
defer func() { services.Shutdown() }()
addr, err := services.GetAddr("gcpio")
if err != nil {
log.Printf("skipping missing expansion service: %v", err)
} else {
expansionAddr = addr
}

ptest.MainRet(m)
}

0 comments on commit 080f54a

Please sign in to comment.