diff --git a/sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go b/sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go index 19f2d626a77b2..6bfc8939106c5 100644 --- a/sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go +++ b/sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go @@ -187,8 +187,15 @@ func FromTable(table string) readOption { // FromQuery is a Read option that specifies a query to use for reading from BigQuery. Uses the // BigQuery Standard SQL dialect. // +// Important: When reading from a query, the schema of any source tables is not used and the read +// transform cannot detect which elements are Required, therefore every field in the output type +// will be a pointer (including fields within inner structs). +// // For more details see in the Java SDK: // org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Read.fromQuery(java.lang.String) +// +// BUG(https://github.com/apache/beam/issues/21784): Query read outputs currently cannot be named +// struct types. See link for workaround. func FromQuery(query string) readOption { return func(rc *readConfig) { rc.cfg.Query = &query diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index 28e8c5482853f..4ea0c57f8587d 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -173,18 +173,22 @@ ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts -> dependsOn ":sdks:java:io:expansion-service:build" dependsOn ":sdks:java:extensions:schemaio-expansion-service:build" dependsOn ":sdks:java:io:debezium:expansion-service:build" + dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build" dependsOn ":sdks:java:testing:kafka-service:buildTestKafkaServiceJar" doLast { def ioExpJar = project(":sdks:java:io:expansion-service").shadowJar.archivePath def schemaIoExpJar = project(":sdks:java:extensions:schemaio-expansion-service").shadowJar.archivePath def debeziumIoExpJar = project(":sdks:java:io:debezium:expansion-service").shadowJar.archivePath + def gcpIoExpJar = project(":sdks:java:io:google-cloud-platform:expansion-service").shadowJar.archivePath def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. "--kafka_jar=${kafkaJar}", "--expansion_jar=io:${ioExpJar}", "--expansion_jar=schemaio:${schemaIoExpJar}", "--expansion_jar=debeziumio:${debeziumIoExpJar}", + "--expansion_jar=gcpio:${gcpIoExpJar}", + "--bq_dataset=apache-beam-testing.beam_bigquery_io_test_temp", ] pipelineOptions.addAll(pipelineOpts) def options = [ diff --git a/sdks/go/test/integration/flags.go b/sdks/go/test/integration/flags.go index 72954f2be0660..3a58826a4368b 100644 --- a/sdks/go/test/integration/flags.go +++ b/sdks/go/test/integration/flags.go @@ -42,6 +42,11 @@ var ( "Sets an auto-shutdown timeout to the Kafka cluster. "+ "Requires the timeout command to be present in Path, unless the value is set to \"\".") + // BigQueryDataset is the name of the dataset to create tables in for + // BigQuery integration tests. + BigQueryDataset = flag.String("bq_dataset", "", + "Name of the dataset to create tables in for BigQuery tests.") + // ExpansionJars contains elements in the form "label:jar" describing jar // filepaths for expansion services to use in integration tests, and the // corresponding labels. Once provided through this flag, those jars can diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 939bb6df2c508..2af9530330498 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -64,6 +64,7 @@ var directFilters = []string{ // The direct runner does not yet support cross-language. "TestXLang.*", "TestKafkaIO.*", + "TestBigQueryIO.*", "TestDebeziumIO_BasicRead", "TestJDBCIO_BasicReadWrite", "TestJDBCIO_PostgresReadWrite", @@ -93,6 +94,8 @@ var portableFilters = []string{ "TestPanes", // TODO(BEAM-12797): Python portable runner times out on Kafka reads. "TestKafkaIO.*", + // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. + "TestBigQueryIO.*", // The portable runner does not support self-checkpointing "TestCheckpointing", // The portable runner does not support pipeline drain for SDF. @@ -107,6 +110,8 @@ var flinkFilters = []string{ // TODO(BEAM-12815): Test fails on post commits: "Insufficient number of network buffers". "TestXLang_Multi", "TestDebeziumIO_BasicRead", + // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. + "TestBigQueryIO.*", // The number of produced outputs in AfterSynchronizedProcessingTime varies in different runs. "TestTriggerAfterSynchronizedProcessingTime", // The flink runner does not support pipeline drain for SDF. @@ -126,6 +131,8 @@ var samzaFilters = []string{ "TestPanes", // TODO(BEAM-13006): Samza doesn't yet support post job metrics, used by WordCount "TestWordCount.*", + // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. + "TestBigQueryIO.*", // The Samza runner does not support self-checkpointing "TestCheckpointing", // The samza runner does not support pipeline drain for SDF. @@ -146,6 +153,8 @@ var sparkFilters = []string{ "TestPanes", // [BEAM-13921]: Spark doesn't support side inputs to executable stages "TestDebeziumIO_BasicRead", + // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. + "TestBigQueryIO.*", // The spark runner does not support self-checkpointing "TestCheckpointing", // The spark runner does not support pipeline drain for SDF. @@ -174,6 +183,8 @@ var dataflowFilters = []string{ // Dataflow does not automatically terminate the TestCheckpointing pipeline when // complete. "TestCheckpointing", + // TODO(21761): This test needs to provide GCP project to expansion service. + "TestBigQueryIO_BasicWriteQueryRead", // Dataflow does not drain jobs by itself. "TestDrain", } diff --git a/sdks/go/test/integration/io/xlang/bigquery/bigquery.go b/sdks/go/test/integration/io/xlang/bigquery/bigquery.go new file mode 100644 index 0000000000000..37ea08256d52e --- /dev/null +++ b/sdks/go/test/integration/io/xlang/bigquery/bigquery.go @@ -0,0 +1,17 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package bigquery contains integration tests for cross-language BigQuery IO transforms. +package bigquery diff --git a/sdks/go/test/integration/io/xlang/bigquery/bigquery_test.go b/sdks/go/test/integration/io/xlang/bigquery/bigquery_test.go new file mode 100644 index 0000000000000..0f7516d1986b5 --- /dev/null +++ b/sdks/go/test/integration/io/xlang/bigquery/bigquery_test.go @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bigquery + +import ( + "flag" + "fmt" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "log" + "math/rand" + "reflect" + "strings" + "testing" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + "github.com/apache/beam/sdks/v2/go/test/integration" +) + +func init() { + register.DoFn2x0[[]byte, func(TestRow)](&CreateTestRowsFn{}) + register.Emitter1[TestRow]() + // TODO(https://github.com/apache/beam/issues/21789): Uncomment once this register no longer panics. + //register.Function1x1(castFn) +} + +var expansionAddr string // Populate with expansion address labelled "gcpio". + +func checkFlags(t *testing.T) { + if *integration.BigQueryDataset == "" { + t.Skip("No BigQuery dataset provided.") + } +} + +const ( + // A text to shuffle to get random words. + text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Maecenas eget nulla nec " + + "velit hendrerit placerat. Donec eu odio ultricies, fermentum arcu at, mollis lectus. " + + "Vestibulum porttitor pharetra sem vitae feugiat. Mauris facilisis neque in mauris " + + "feugiat rhoncus. Donec eu ipsum at nibh lobortis euismod. Nam at hendrerit felis. " + + "Vivamus et orci ex. Nam dui nisl, rutrum ac pretium eget, vehicula in tortor. Class " + + "aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. " + + "Phasellus ante lorem, pharetra blandit dapibus et, tempus nec purus. Maecenas in " + + "posuere sem, vel pharetra nisl. Pellentesque habitant morbi tristique senectus et netus " + + "et malesuada fames ac turpis egestas. Donec nec facilisis ex. Praesent euismod commodo " + + "efficitur. Fusce in nisi nunc." + // Number of random elements to create for test. Must be less than number of words in text. + inputSize = 50 +) + +// TestRow is a sample row to write and read from that is expected to contain enough deterministic +// and random data in different data types to provide a reasonable signal that reading and writing +// works at a basic level. +type TestRow struct { + Counter int64 `beam:"counter"` // A deterministic counter, increments for each row generated. + Rand_data RandData `beam:"rand_data"` // An inner struct containing randomized data. +} + +func shuffleText() []string { + words := strings.Fields(text) + rand.Shuffle(len(words), func(i, j int) { words[i], words[j] = words[j], words[i] }) + return words +} + +// RandData is a struct of various types of random data. +type RandData struct { + Flip bool `beam:"flip"` // Flip is a bool with a random chance of either result (a coin flip). + Num int64 `beam:"num"` // Num is a random int64. + Word string `beam:"word"` // Word is a randomly selected word from a sample text. +} + +// ddlSchema is a string for BigQuery data definition language that corresponds to TestRow. +const ddlTestRowSchema = "counter INT64 NOT NULL, " + + "rand_data STRUCT<" + + "flip BOOL NOT NULL," + + "num INT64 NOT NULL," + + "word STRING NOT NULL" + + "> NOT NULL" + +// CreateTestRowsFn is a DoFn that creates randomized TestRows based on a seed. +type CreateTestRowsFn struct { + seed int64 +} + +// ProcessElement creates a number of TestRows, populating the randomized data. +func (fn *CreateTestRowsFn) ProcessElement(_ []byte, emit func(TestRow)) { + rand.Seed(fn.seed) + words := shuffleText() + for i := 0; i < inputSize; i++ { + emit(TestRow{ + Counter: int64(i), + Rand_data: RandData{ + Flip: rand.Int63n(2) != 0, + Num: rand.Int63(), + Word: words[i], + }, + }) + } +} + +// WritePipeline creates a pipeline that writes elements created by createFn into a BigQuery table. +func WritePipeline(expansionAddr, table string, createFn interface{}) *beam.Pipeline { + p := beam.NewPipeline() + s := p.Root() + + // Generate elements and write to table. + rows := beam.ParDo(s, createFn, beam.Impulse(s)) + bigqueryio.Write(s, table, rows, + bigqueryio.CreateDisposition(bigqueryio.CreateNever), + bigqueryio.WriteExpansionAddr(expansionAddr)) + + return p +} + +// ReadPipeline creates a pipeline that reads elements directly from a BigQuery table and asserts +// that they match elements created by createFn. +func ReadPipeline(expansionAddr, table string, createFn interface{}) *beam.Pipeline { + p := beam.NewPipeline() + s := p.Root() + + // Read from table and compare to generated elements. + rows := beam.ParDo(s, createFn, beam.Impulse(s)) + inType := reflect.TypeOf((*TestRow)(nil)).Elem() + readRows := bigqueryio.Read(s, inType, + bigqueryio.FromTable(table), + bigqueryio.ReadExpansionAddr(expansionAddr)) + passert.Equals(s, readRows, rows) + + return p +} + +// TestRowPtrs is equivalent to TestRow but all fields are pointers, meant to be used when reading +// via query. +// +// TODO(https://github.com/apache/beam/issues/21784): Change back to a named struct once resolved. +type TestRowPtrs = struct { + Counter *int64 `beam:"counter"` + Rand_data *RandDataPtrs `beam:"rand_data"` +} + +// RandDataPtrs is equivalent to RandData but all fields are pointers, meant to be used when reading +// via query. +// +// TODO(https://github.com/apache/beam/issues/21784): Change back to a named struct once resolved. +type RandDataPtrs = struct { + Flip *bool `beam:"flip"` + Num *int64 `beam:"num"` + Word *string `beam:"word"` +} + +// castFn converts the result of the query which has pointer fields, into the original TestRow +// type that was written to BigQuery. +func castFn(elm TestRowPtrs) TestRow { + return TestRow{ + Counter: *elm.Counter, + Rand_data: RandData{ + Flip: *elm.Rand_data.Flip, + Num: *elm.Rand_data.Num, + Word: *elm.Rand_data.Word, + }, + } +} + +// ReadPipeline creates a pipeline that reads elements from a BigQuery table via a SQL Query, and +// asserts that they match elements created by createFn. +func ReadFromQueryPipeline(expansionAddr, table string, createFn interface{}) *beam.Pipeline { + p := beam.NewPipeline() + s := p.Root() + + // Read from table and compare to generated elements. + rows := beam.ParDo(s, createFn, beam.Impulse(s)) + inType := reflect.TypeOf((*TestRowPtrs)(nil)).Elem() + query := fmt.Sprintf("SELECT * FROM `%s`", table) + readRows := bigqueryio.Read(s, inType, + bigqueryio.FromQuery(query), + bigqueryio.ReadExpansionAddr(expansionAddr)) + castRows := beam.ParDo(s, castFn, readRows) + passert.Equals(s, castRows, rows) + + return p +} + +// TestBigQueryIO_BasicWriteRead runs a pipeline that generates semi-randomized elements, writes +// them to a BigQuery table and then reads from that table, and checks that the result matches the +// original inputs. This requires a pre-existing table to be created. +func TestBigQueryIO_BasicWriteRead(t *testing.T) { + integration.CheckFilters(t) + checkFlags(t) + + // Create a table before running the pipeline + table, err := newTempTable(*integration.BigQueryDataset, "go_bqio_it", ddlTestRowSchema) + if err != nil { + t.Fatalf("error creating BigQuery table: %v", err) + } + t.Logf("Created BigQuery table %v", table) + + createTestRows := &CreateTestRowsFn{seed: time.Now().UnixNano()} + write := WritePipeline(expansionAddr, table, createTestRows) + ptest.RunAndValidate(t, write) + read := ReadPipeline(expansionAddr, table, createTestRows) + ptest.RunAndValidate(t, read) + + t.Logf("Deleting BigQuery table %v", table) + err = deleteTempTable(table) + if err != nil { + t.Logf("Error deleting BigQuery table: %v", err) + } +} + +// TestBigQueryIO_BasicWriteQueryRead runs a pipeline that generates semi-randomized elements, +// writes them to a BigQuery table and then reads from that table, and checks that the result +// matches the original inputs. This requires a pre-existing table to be created. +// +// This test reads from a Bigquery SQL query, instead of directly from a table. +func TestBigQueryIO_BasicWriteQueryRead(t *testing.T) { + integration.CheckFilters(t) + checkFlags(t) + + // Create a table before running the pipeline + table, err := newTempTable(*integration.BigQueryDataset, "go_bqio_it", ddlTestRowSchema) + if err != nil { + t.Fatalf("error creating BigQuery table: %v", err) + } + t.Logf("Created BigQuery table %v", table) + + createTestRows := &CreateTestRowsFn{seed: time.Now().UnixNano()} + write := WritePipeline(expansionAddr, table, createTestRows) + ptest.RunAndValidate(t, write) + readQuery := ReadFromQueryPipeline(expansionAddr, table, createTestRows) + ptest.RunAndValidate(t, readQuery) + + t.Logf("Deleting BigQuery table %v", table) + err = deleteTempTable(table) + if err != nil { + t.Logf("Error deleting BigQuery table: %v", err) + } +} + +func TestMain(m *testing.M) { + flag.Parse() + beam.Init() + + services := integration.NewExpansionServices() + defer func() { services.Shutdown() }() + addr, err := services.GetAddr("gcpio") + if err != nil { + log.Printf("skipping missing expansion service: %v", err) + } else { + expansionAddr = addr + } + + ptest.MainRet(m) +} diff --git a/sdks/go/test/integration/io/xlang/bigquery/table.go b/sdks/go/test/integration/io/xlang/bigquery/table.go new file mode 100644 index 0000000000000..7d5f3dca37755 --- /dev/null +++ b/sdks/go/test/integration/io/xlang/bigquery/table.go @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bigquery + +import ( + "fmt" + "os/exec" + "time" +) + +// newTempTable creates a new BigQuery table using BigQuery's Data Definition Language (DDL) and the +// "bq query" console command. Reference: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language +// The tables created are set to expire after a day. +// +// newTable takes the name of a BigQuery dataset, the prefix for naming the table, and a DDL schema +// for the data, and generates that table with a unique suffix and an expiration time of a day +// later. +func newTempTable(dataset, prefix, schema string) (string, error) { + name := fmt.Sprintf("%s.%s_temp_%v", dataset, prefix, time.Now().UnixNano()) + query := fmt.Sprintf("CREATE TABLE `%s`(%s) OPTIONS(expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 DAY))", name, schema) + cmd := exec.Command("bq", "query", "--use_legacy_sql=false", query) + out, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("creating table through command \"%s\" failed with output:\n%s", cmd.String(), out) + } + return name, nil +} + +// deleteTable deletes a BigQuery table using BigQuery's Data Definition Language (DDL) and the +// "bq query" console command. Reference: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language +func deleteTempTable(table string) error { + query := fmt.Sprintf("DROP TABLE IF EXISTS `%s`", table) + cmd := exec.Command("bq", "query", "--use_legacy_sql=false", query) + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("deleting table through command \"%s\" failed with output:\n%s", cmd.String(), out) + } + return nil +}