[BEAM-4794] Move SQL and SQL Nexmark to the schema framework#5956
[BEAM-4794] Move SQL and SQL Nexmark to the schema framework#5956reuvenlax merged 10 commits intoapache:masterfrom
Conversation
|
This is cool to see the first (AFAIK) real-pipeline use of the schema PCollection framework ! |
apilloud
left a comment
There was a problem hiding this comment.
About half of the change is just reformatting so I might have missed something there, but LGTM.
| } | ||
| })) | ||
| .setCoder(schema.getRowCoder()); | ||
| .setSchema(schema, SerializableFunctions.identity(), SerializableFunctions.identity()); |
There was a problem hiding this comment.
This pattern all over the place is kind of annoying. How about adding a setSchema(schema) function that uses the new of(Schema schema) so we don't have to repeat this pattern all over the place?
| public class BeamPCollectionTable extends BaseBeamTable { | ||
| private transient PCollection<Row> upstream; | ||
| public class BeamPCollectionTable<InputT> extends BaseBeamTable { | ||
| private transient PCollection<InputT> upstream; |
There was a problem hiding this comment.
Not your bug, but this should be final instead of transient I believe.
| LONG_CODER.encode(value.reserve, outStream); | ||
| LONG_CODER.encode(value.dateTime, outStream); | ||
| LONG_CODER.encode(value.expires, outStream); | ||
| INSTANT_CODER.encode(value.dateTime.toInstant(), outStream); |
|
Also, a good chuck of tests are failing. It would be good to run the nexmark postcommits before this goes in. |
|
retest this please |
| } | ||
| })) | ||
| .setCoder(schema.getRowCoder()); | ||
| .setSchema(schema, SerializableFunctions.identity(), SerializableFunctions.identity()); |
There was a problem hiding this comment.
+1, some shorthand version would be helpful. Maybe default to identity() and have a couple of overloads of setSchema() to allow customization? Or wire it up to schema registry and default to identity() there?
And is setSchema the right place to specify these transforms? Isn't it just conflating .apply(ToRow.withSchema())... .apply(ParDo.of(fromRow()))? My thought is that if the transforms are non-trivial, then they are probably better be specified as ParDos
There was a problem hiding this comment.
These functions are not being applied here. All schemas in Beam must be registered with a conversion function to/from Row. These conversions are applied by Beam at the appropriate places (e.g. in the DoFn state machine). They can't really be specified as part of ParDo, because they are needed to implement ParDo behind the scenes!
In this case, it's clearly not needed as the type is already Row. Will add a simpler function for PCollection
315099c to
a514a44
Compare
a514a44 to
1c969b4
Compare
|
run Dataflow ValidatesRunner |
|
run Flink ValidatesRunner |
|
run Spark ValidatesRunner |
|
run Java PostCommit |
|
UnboundedEventSourceTest.resumeFromCheckpoint failed in Post Commit, but appears to simply be flaky. That same test ran successfully in PreCommit, and also passed locally. |
This PR move SQL and Nexmark to the new schema framework.
Advantages:
Note: this can not be merged until schemas are enabled for all major runners (BEAM-4793). A PR is currently out for review to make schemas work for all runners except for the Gearpump runner.
R: @apilloud