-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform #17135
[BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform #17135
Conversation
@@ -42,13 +42,13 @@ | |||
@Experimental(Kind.SCHEMAS) | |||
public abstract class TypedSchemaTransformProvider<ConfigT> implements SchemaTransformProvider { | |||
|
|||
abstract Class<ConfigT> configurationClass(); | |||
public abstract Class<ConfigT> configurationClass(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do these need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was a little confused about the structure of this change so left some comments about that. We don't need to support new features / all features in the transform. We should just be able to mimic the behavior of SchemaIO and ideally this would just be a change to change the structure - e.g. I wouldn't expect a different way of reading/generating transforms, etc in this change. Let me know if I misunderstood or missed something.
*/ | ||
@Internal | ||
@Experimental | ||
@AutoService(SchemaIOProvider.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably still need AutoService, just for the SchemaTransformProvider instead of SchemaIOProvider.
public String identifier() { | ||
return "bigquery"; | ||
public BigQuerySchemaIOProvider() { | ||
super(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The base class doesn't have a constructor. Is it normal to call super in this case?
*/ | ||
@Override | ||
public BigQuerySchemaIO from(String location, Row configuration, @Nullable Schema dataSchema) { | ||
return new BigQuerySchemaIO(location, configuration); | ||
public SchemaTransform from(BigQuerySchemaIOConfiguration configuration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SchemaTransform, but maybe BigQueryConfig is good enough?
public boolean requiresDataSchema() { | ||
return false; | ||
public String identifier() { | ||
return BigQuerySchemaIOConfiguration.IDENTIFIER; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conceptually this doesn't really belong in the configuration class. I'd either do a string or a constant described in this file
*/ | ||
@Internal | ||
@Experimental | ||
@AutoService(SchemaIOProvider.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need one of these for read and write, right? Shouldn't that be in the name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically I've seen the SchemaProvider + the SchemaTransform/IO in one file. So we'd have two sets of those. I don't know if that fits with the classes here or not though.
public PCollection.IsBounded isBounded() { | ||
return PCollection.IsBounded.BOUNDED; | ||
public List<String> inputCollectionNames() { | ||
// TODO: determine valid input collection names for JobType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also just put input or output if it's just one input and output. I think this is more useful when there's several inputs, e.g. a join.
*/ | ||
@DefaultSchema(AutoValueSchema.class) | ||
@AutoValue | ||
public abstract class BigQuerySchemaIOConfiguration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where did this list of configs come from? For the first version we can probably just skip with the set of configurations the SchemaIO provided.
eturn Schema.builder()
.addNullableField("table", FieldType.STRING)
.addNullableField("query", FieldType.STRING)
.addNullableField("queryLocation", FieldType.STRING)
.addNullableField("createDisposition", FieldType.STRING)
.build();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the SchemaTransform doesn't have to expose everything the base class does - though eventually it may. When it comes to converting it ot the new form we can just mimic what was in the original form though.
import org.apache.beam.sdk.values.PCollectionRowTuple; | ||
|
||
@AutoValue | ||
public abstract class BigQuerySchemaTransform implements SchemaTransform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need one for write and one for read
@Override | ||
public PCollectionRowTuple expand(PCollectionRowTuple input) { | ||
if (input.getAll().isEmpty()) { | ||
BigQueryRowReader.Builder builder = BigQueryRowReader.builderOf(getConfiguration()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not combine BigQueryRowReader and the SchemaTransform?
import org.apache.beam.sdk.transforms.DoFn; | ||
import org.apache.beam.sdk.values.Row; | ||
|
||
class TableRowToBeamRowFn extends DoFn<TableRow, Row> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to add this? Just moving it to a shared location?
This PR, currently work-in-progress, closes BEAM-14035 by:
BigQuerySchemaIOConfiguration
configuration that models Query, Extract, or Load BigQuery jobsBigQuerySchemaTransform
implementation ofSchemaTransform
BigQuerySchemaIOProvider
as an extension ofTypedSchemaTransformProvider<BigQuerySchemaIOConfiguration>
BigQueryRowReader
implementation ofPTransform<PBegin, PCollectionRowTuple>
Remaining work is:
BigQueryRowReader
to handle an Extract BigQuery jobBigQueryRowWriter
implementation ofPTransform<PCollectionRowTuple, PDone>
BigQuerySchemaIOConfiguration
with Load Job configuration propertiesI would like to request @laraschmidt to review this PR or delegate where relevant.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.UpdateCHANGES.md
with noteworthy changes.If this contribution is large, please file an Apache Individual Contributor License Agreement.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.