-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-6392] Add support for the BigQuery read API to BigQueryIO. #7441
Conversation
cc: @chamikaramj |
058a5c4
to
2ccd882
Compare
cc: @reuvenlax @pedapudi |
83baf95
to
4405cd3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Sorry about the delay in reviewing.
* This method formats a BigQuery TIME value into a String matching the format used by JSON | ||
* export. Time records are stored in "microseconds since midnight" format. | ||
*/ | ||
private static String formatTime(long timeMicros) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify, why don't we always use most precise version (ISO_LOCAL_TIME_FORMATTER_MICROS) here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is necessary in order to match exactly the string format used by BigQuery export, which always uses the least-precise format if a value can be converted without loss of precision. It's likely that the approach you propose would not lead to correctness issues for pipelines; the integration test I've added wouldn't pass, though. :-)
case "GEOGRAPHY": | ||
// Avro will use a CharSequence to represent String objects, but it may not always use | ||
// java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. | ||
verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); | ||
return v.toString(); | ||
case "DATE": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any backwards incompatible data type conversions for people who migrate pipelines from export-based read transform to read API based read transform ? If so we should try to minimize that and any incompatibilities that are unavoidable should be clearly documented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on the caller's project context, export may or may not produce Avro records identical to the read API today. Going forward, we will standardize on the format used by the read API.
case "GEOGRAPHY": | ||
// Avro will use a CharSequence to represent String objects, but it may not always use | ||
// java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. | ||
verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); | ||
return v.toString(); | ||
case "DATE": | ||
if (avroType == Type.INT) { | ||
verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this path only hit for Read API (if so please add a comment so that we don't loose the mapping).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No -- this code path (an Avro logical 'date' type for a BigQuery DATE record) can be reached in both the export and read API cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So is this change (and below) backwards incompatible changes for export based read path ? That'll break our existing users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record (based on offline chat), this change just generalizes the current behavior and does not break backwards compatibility.
case "TIME": | ||
if (avroType == Type.LONG) { | ||
verify(v instanceof Long, "Expected Long, got %s", v.getClass()); | ||
verifyNotNull(avroLogicalType, "Expected TimeMicros logical type"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No -- this code path (an Avro logical 'time-micros' type for a BigQuery TIME record) can be reached in both the export and read API cases.
*/ | ||
EXPORT, | ||
|
||
/** Read the contents of a table directly using the BigQuery storage API. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
storage API or read API ?
Can you also add a link since this is pretty new.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The official name is the BigQuery Storage API. I'll add a link here when the docs are published. :-)
@Override | ||
public long getEstimatedSizeBytes(PipelineOptions options) { | ||
// The size of stream source can't be estimated due to server-side liquid sharding. | ||
return 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably add a TODO to better support this in the future ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
4405cd3
to
31ac3ab
Compare
Please try to upgrade $generated_grpc_beta_version to 0.39.0 and using it for google_cloud_bigquery_storage_proto instead of using a separate version there. Also, seems like there's a conflict now. LGTM for the rest. |
d577308
to
85f6508
Compare
@chamikaramj the change to update the GCP connector versions (#7783) has been merged, and I've resolved the conflict here. |
e645393
to
1d8c589
Compare
Run Java PreCommit |
This change adds new Source objects which support reading tables from BigQuery using the new high-throughput read API. It also modifies the Avro-to-JSON conversion code in the BigQuery connector to support the Avro records generated by both the existing export process and the new read API, and adds an integration test to verify that the TableRows which are constructed using each method are equivalent.
1d8c589
to
0025a63
Compare
LGTM. Will merge after tests pass. Thanks. |
@chamikaramj we should be good to go here. |
Run Java PostCommit |
This change adds support for the new BigQuery high-throughput read API to BigQueryIO. The initial commit supports only reading from existing tables.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)