Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13806] Add x-lang BigQuery IO integration test to Go SDK. #16818

Merged
merged 11 commits into from Jun 14, 2022

Conversation

youngoli
Copy link
Contributor

Also includes piping in flags for BigQuery IO through integration test script, and a small file for creating bigquery tables that expire after a day.

This was manually tested, but can't be tested on Jenkins at the moment because it only works on Dataflow and there currently isn't a test suite that runs Go xlang transforms on Dataflow. That should be fixed shortly and then this can be tested on Jenkins. It shouldn't be submitted until then.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@github-actions github-actions bot added the go label Feb 11, 2022
@youngoli
Copy link
Contributor Author

R: @lostluck @riteshghorse

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The big comment is about the output PCollections for Writes, but I don't expect that change in this PR. I beleive It's a best practice to always have those for write transforms so it's possible to block on them with side inputs. I clearly missed that in reviews last week.

Comment on lines 118 to 120
bigqueryio.Write(s, table, rows,
bigqueryio.CreateDisposition(bigqueryio.CreateNever),
bigqueryio.WriteExpansionAddr(*integration.GCPIoExpansionAddr))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the bigqueryIO Write not return a pcollection we can have the read block on? As it stands, I think this will have both happen at the same time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but I split the pipeline into two separate pipelines that will be run in sequence, which should avoid this issue.

checkFlags(t)

// Create a table before running the pipeline
table, err := newTempTable(*integration.BigQueryDataset, "go_bqio_it", ddlTestRowSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider printing the table name using t.Log, to make it clearer which resources map to this test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

bigqueryio.ReadExpansionAddr(*integration.GCPIoExpansionAddr))
passert.Equals(s, readRows, rows)

ptest.RunAndValidate(t, p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the test is successful could we please also delete the table manually?

I see that we have limited it's lifespan, but I'd prefer that we don't accrue a rolling pile of these over the course of the 24 hour window.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done.

@aaltay
Copy link
Member

aaltay commented Feb 25, 2022

@youngoli - What is the next step on this PR?

@aaltay
Copy link
Member

aaltay commented Mar 10, 2022

Could this be merged?

@youngoli
Copy link
Contributor Author

Sorry I haven't been checking this. Don't merge this yet, I need to test it on Jenkins (which requires getting in #16819 first). I'll try to find time to work on these and get them in.

@youngoli
Copy link
Contributor Author

Run XVR_GoUsingJava_Dataflow PostCommit

@youngoli
Copy link
Contributor Author

I rebased to fix merge errors and account for the changes in our integration testing script, but the functionality of the first commit should be unchanged, only the test framework and expansion service setup should be any different.

The second commit contains all the fixup addressing the comments.

@youngoli
Copy link
Contributor Author

Run XVR_GoUsingJava_Dataflow PostCommit

1 similar comment
@youngoli
Copy link
Contributor Author

Run XVR_GoUsingJava_Dataflow PostCommit

@aaltay
Copy link
Member

aaltay commented Apr 28, 2022

What is the next step on this PR?

rows := beam.ParDo(s, createFn, beam.Impulse(s))
inType := reflect.TypeOf((*TestRow)(nil)).Elem()
readRows := bigqueryio.Read(s, inType,
bigqueryio.FromTable(table),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also test reading from a query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did. As an update to what we talked about online, I'm not getting any permissions errors testing locally. (I am getting a different unrelated error, but it's because of a bug in how I wrote the test. Will commit once I fix that.)

Also includes piping in flags for BigQuery IO through integration test script, and a small file for creating bigquery tables that expire after a day.
Splits the integration test into two pipelines to run sequentially. Also drops table after a successful test and logs table names.
CreateRows wasn't creating the same elements in both read and write pipelines after splitting the two pipelines. Adjusted it to use a consistent seed in all pipelines.
@codecov
Copy link

codecov bot commented Jun 3, 2022

Codecov Report

Merging #16818 (8db67b4) into master (9e5d085) will not change coverage.
The diff coverage is n/a.

@@           Coverage Diff           @@
##           master   #16818   +/-   ##
=======================================
  Coverage   74.10%   74.10%           
=======================================
  Files         698      698           
  Lines       92486    92486           
=======================================
  Hits        68541    68541           
  Misses      22690    22690           
  Partials     1255     1255           
Flag Coverage Δ
go 50.86% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.


Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9e5d085...8db67b4. Read the comment docs.

@youngoli
Copy link
Contributor Author

youngoli commented Jun 3, 2022

I added a test that reads from a Query, and while testing in my own environment I'm getting weird results. The elements output from a query read have all fields as pointers because SQL queries don't match the table's schema. That's pretty normal. But when I made an equivalent struct in Go, I get errors when decoding into that struct, even though all the fields seem to be matching exactly:

panic: reflect: Call using struct { Counter *int64 "beam:\"counter\""; Rand_data *struct { Flip *bool "beam:\"flip\""; Num *int64 "beam:\"num\""; Word *string "beam:\"word\"" } "beam:\"rand_data\"" } as type bigquery.TestRowPtrs
Full error:
while executing Process for Plan[s08-67]:
2: DataSink[S[ptransform-65@localhost:12371]] Coder:W;coder-80<LP;coder-81<R[bigquery.TestRow]>>!GWC
3: PCollection[pcollection-72] Out:[2]
4: ParDo[bigquery.ReadFromQueryPipeline.func1] Out:[2]
1: DataSource[S[ptransform-64@localhost:12371], 0] Coder:W;coder-76<LP;coder-77<R[struct { Counter *int64 "beam:\"counter\""; Rand_data *struct { Flip *bool "beam:\"flip\""; Num *int64 "beam:\"num\""; Word *string "beam:\"word\"" } "beam:\"rand_data\"" }]>>!GWC Out:4
	caused by:
panic: reflect: Call using struct { Counter *int64 "beam:\"counter\""; Rand_data *struct { Flip *bool "beam:\"flip\""; Num *int64 "beam:\"num\""; Word *string "beam:\"word\"" } "beam:\"rand_data\"" } as type bigquery.TestRowPtrs goroutine 60 [running]:
runtime/debug.Stack()
...

@youngoli
Copy link
Contributor Author

youngoli commented Jun 3, 2022

Run XVR_GoUsingJava_Dataflow PostCommit

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we document the type alias work around for "lost" types, I'm very comfortable getting this in. We should file an issue for whatever is happening with Query. We likely need to simply hard replace the coder java thinks it has in the CrossLanguage handler, or figure out why a new one is derived in the first place.

sdks/go/test/integration/io/xlang/bigquery/bigquery.go Outdated Show resolved Hide resolved
sdks/go/test/integration/io/xlang/bigquery/bigquery.go Outdated Show resolved Hide resolved
sdks/go/test/integration/io/xlang/bigquery/bigquery.go Outdated Show resolved Hide resolved
sdks/go/test/integration/io/xlang/bigquery/bigquery.go Outdated Show resolved Hide resolved
@youngoli
Copy link
Contributor Author

Run XVR_GoUsingJava_Dataflow PostCommit

@lostluck
Copy link
Contributor

Is this the error @robertwb ran into? Missing ProjectID?

:39:18 --- FAIL: TestBigQueryIO_BasicWriteRead (418.27s)
00:39:18 panic: 	tried cross-language for beam:transform:org.apache.beam:schemaio_bigquery_read:v1 against localhost:37821 and failed
00:39:18 	expanding external transform
00:39:18 	expanding transform with ExpansionRequest: components:{environments:{key:"go" value:{}}} transform:{unique_name:"External" spec:{urn:"beam:transform:org.apache.beam:schemaio_bigquery_read:v1" payload:"\nX\n\x0e\n\x08location\x1a\x02\x10\x07\n\x0c\n\x06config\x1a\x02\x10\t\n\x12\n\ndataSchema\x1a\x04\x08\x01\x10\t\x12$3fc24beb-ef0b-4fd0-b491-64340b33ca1e\x12k\x03\x01\x04\x00f\x04\x01\rbSELECT * FROM `apache-beam-testing.beam_bigquery_io_test_temp.go_bqio_it_temp_1654844426145960792`"} environment_id:"go"} namespace:"fPIRkaLpTJ"
00:39:18 expansion failed
00:39:18 	caused by:
00:39:18 org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaRetrievalException: Exception while trying to retrieve schema of query
00:39:18 	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryQuerySourceDef.getBeamSchema(BigQueryQuerySourceDef.java:179)
00:39:18 	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.expand(BigQueryIO.java:1222)
00:39:18 	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.expand(BigQueryIO.java:743)
00:39:18 	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
00:39:18 	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
00:39:18 	at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
00:39:18 	at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider$BigQuerySchemaIO$1.expand(BigQuerySchemaIOProvider.java:183)
00:39:18 	at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider$BigQuerySchemaIO$1.expand(BigQuerySchemaIOProvider.java:165)
00:39:18 	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
00:39:18 	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
00:39:18 	at org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:396)
00:39:18 	at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:515)
00:39:18 	at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:595)
00:39:18 	at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:220)
00:39:18 	at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
00:39:18 	at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)
00:39:18 	at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
00:39:18 	at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
00:39:18 	at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
00:39:18 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
00:39:18 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
00:39:18 	at java.lang.Thread.run(Thread.java:748)
00:39:18 Caused by: java.lang.NullPointerException: Required parameter projectId must be specified.
00:39:18 	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:907)
00:39:18 	at com.google.api.client.util.Preconditions.checkNotNull(Preconditions.java:138)
00:39:18 	at com.google.api.services.bigquery.Bigquery$Jobs$Insert.<init>(Bigquery.java:1737)
00:39:18 	at com.google.api.services.bigquery.Bigquery$Jobs.insert(Bigquery.java:1687)
00:39:18 	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.dryRunQuery(BigQueryServicesImpl.java:473)
00:39:18 	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryQueryHelper.dryRunQueryIfNeeded(BigQueryQueryHelper.java:73)
00:39:18 	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryQuerySourceDef.getBeamSchema(BigQueryQuerySourceDef.java:168)
00:39:18 	... 21 more

@lostluck
Copy link
Contributor

Some of the "default" tests are also failing (prefix, multi, partition), but they're hard dying after apparently sending a bundle to the Java side (I see successful Go side messages). Definitely unrelated, but separately worth investigating.

@youngoli
Copy link
Contributor Author

Yes, that looks like the same issue Robert ran into. It seems to be failing because the expansion service is unaware of the GCP project from pipeline options. But this was discussed earlier this week and it should be a relatively easy fix. I think for now though I might split the query and non-query reads into separate tests and sickbay the query read temporarily.

@lostluck
Copy link
Contributor

lostluck commented Jun 10, 2022 via email

@lostluck
Copy link
Contributor

Run XVR_GoUsingJava_Dataflow PostCommit

@lostluck
Copy link
Contributor

Prior to the merge conflicts (that I'll manually fix in a bit) I kicked off which should be one final run of these (less the filtered out Query test). Assuming this passes, a quick fix to merge the integration filters later, this will be good to merge.
https://ci-beam.apache.org/job/beam_PostCommit_XVR_GoUsingJava_Dataflow_PR/11/

@lostluck
Copy link
Contributor

The run did pass, modulo the 3 unrelated tests that are currently failing. I'm guessing something changed to make those "built in" transforms to no longer be built in, so they're failing. They're working for python however, so there's some mechanism that's failing with how Go specifies them.

@youngoli youngoli merged commit 080f54a into apache:master Jun 14, 2022
@youngoli
Copy link
Contributor Author

Nice, merged in. Thanks for doing that last bit of checking, I appreciate it.

bullet03 pushed a commit to akvelon/beam that referenced this pull request Jun 20, 2022
…he#16818)

* [BEAM-13806] Add x-lang BigQuery IO integration test to Go SDK.

Also includes piping in flags for BigQuery IO through integration test script, and a small file for creating bigquery tables that expire after a day.

* [BEAM-13806] Splitting BigQuery IT into read and write pipelines.

Splits the integration test into two pipelines to run sequentially. Also drops table after a successful test and logs table names.

* Fixup: Fix gradle build and undo VR script changes.

* Fixup: Add Query test and fix deterministic random element generation.

CreateRows wasn't creating the same elements in both read and write pipelines after splitting the two pipelines. Adjusted it to use a consistent seed in all pipelines.

* Fixup: Avoiding inline functions

* Workaround to coder issue, plus some debugging code

* Polishing workaround with documentation and removing debug prints.

* Move pipeline code to test file

* Split Query test from non-query test

Co-authored-by: Robert Burke <lostluck@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants