-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-4044] [SQL] Add tables via TableStore in Schema, execute DDL in Calcite model #5224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
LGTM |
|
R: @xumingmin @xumingming This is a trivial change, but it does change the public interface around BeamQueryPlanner. |
f7c251f to
2b0476e
Compare
|
R: @kennknowles This gets us 90% of the way to sqlline. |
akedin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, mostly nits.
Are there any new tests needed?
| final boolean existed; | ||
| switch (getKind()) { | ||
| case DROP_TABLE: | ||
| case DROP_MATERIALIZED_VIEW: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we support views? If we don't have concrete plans to support them i'd rather remove all related code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting question for the future. For pure SQL REPL use, we probably would want something to name queries for reuse. Does calcite manage these for us, and only delegates materialized views?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted. Calcite does a lot of things for us when we get out of the way, views are probably one of them.
| inputPCollections.getPipeline().apply("left", leftRelNode.toPTransform()); | ||
| PCollection<Row> rightRows = | ||
| inputPCollections.apply("right", rightRelNode.toPTransform()); | ||
| inputPCollections.getPipeline().apply("right", rightRelNode.toPTransform()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's the right thing to access the pipeline directly. Who knows what's there? Does it have a source so that it can produce elements? With PCollections I would have at least an expectation that there should be elements in it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Does this actually work? If so, is there a different data path whereby the left and right collections are passed as input? This invocation will not register them as inputs to the transform.
| /** | ||
| * A {@code BeamSqlTableProvider} provides read only set of {@code BeamSqlTable}. | ||
| */ | ||
| public class BeamSqlTableProvider implements TableProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it @AutoValue+Builder?
| .type(getTableType()) | ||
| .name(table.getKey()) | ||
| .columns(Collections.emptyList()) | ||
| .build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would rewrite it this way:
tables
.values()
.stream()
.map(sqlTable ->
Table
.builder()
.type(getTableType())
.name(sqlTable.getKey())
.columns(Collections.emptyList())
.build())
.collect(toList());There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I would write this exactly how I did. I find for loops to be far easier to read then java streams.
| for (Map.Entry<TupleTag<?>, PValue> input : inputs.expand().entrySet()) { | ||
| tables.put(input.getKey().getId(), | ||
| new BeamPCollectionTable(toRows(input.getValue()))); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would avoid stateful if/else with loops with generics, hurts readability. Might consider extracting something like this:
if (input instanceof PCollection) {
return
ImmuableMap.of(
PCOLLECTION_NAME,
new BeamPCollectionTable(toRows(inputs)))
}
return
inputs
.expand()
.entrySet()
.stream()
.collect(
toMap(
keyedPCollection -> keyedPCollection.getKey().getId(),
keyedPCollection -> keyedPCollection.getValue()))and then create BeamSqlTableProvider outside
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incidentally, here (or maybe it is nearby beneath a fold) seems like a good place to (possibly redundantly) explain that a PCollection makes a single magic table while any other kind of input uses expand() to make many tables using the tags as names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is hard to read, I disagree that java streams makes it more readable. I've restructured it like you've suggested otherwise and added the comment.
|
|
||
| @Override | ||
| public RelDataType getRowType(RelDataTypeFactory typeFactory) { | ||
| return CalciteUtils.toCalciteRowType(this.beamTable.getSchema(), BeamQueryPlanner.TYPE_FACTORY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create this in constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
| handleDropTable((SqlDropTable) sqlNode); | ||
| if (sqlNode instanceof SqlExecutableStatement) { | ||
| ((SqlExecutableStatement) sqlNode).execute(env.getContext()); | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a comment what is executable statement, what is not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment added: DDL nodes are SqlExecutableStatement
| if (table.getName().equals(name)) { | ||
| return new BeamCalciteTable(tableProvider.buildBeamSqlTable(table)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: looks like it would be better to convert this to a Map<String, BeamCalciteTable> once in constructor, this way you wouldn't need to implement map.keySet() or map.get()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming this comment is on the line with tableProvider.listTables() not }? If so, the output of that function can not be cached. I do however agree that it makes sense to change the return value of that API to Map<String, Table>. Simplifies a lot of code all over, so I'll do that.
| private class ContextImpl implements CalcitePrepare.Context { | ||
| @Override | ||
| public JavaTypeFactory getTypeFactory() { | ||
| throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be wrong to return BeamQueryPlanner.TYPE_FACTORY?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, changed.
|
|
||
| public InMemoryMetaStore() { | ||
| @Override public String getTableType() { | ||
| return ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should have its own table type still
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I added type of store.
kennknowles
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. And I have said similar on a bunch of lines where the new stuff is more readable. Good cleanup along to way to this addition.
| PCollectionTuple inputTuple = toPCollectionTuple(input); | ||
|
|
||
| BeamSqlEnv sqlEnv = new BeamSqlEnv(); | ||
| BeamSqlEnv sqlEnv = new BeamSqlEnv(toTableProvider(input)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
| options.setJobName("BeamPlanCreator"); | ||
| Pipeline pipeline = Pipeline.create(options); | ||
| compilePipeline(sqlString, pipeline, env); | ||
| env.getPlanner().compileBeamPipeline(sqlString, pipeline); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
| } else if (sqlNode instanceof SqlDropTable) { | ||
| handleDropTable((SqlDropTable) sqlNode); | ||
| if (sqlNode instanceof SqlExecutableStatement) { | ||
| ((SqlExecutableStatement) sqlNode).execute(env.getContext()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
| for (Table table : tables) { | ||
| env.registerTable(table.getName(), metaStore.buildBeamSqlTable(table.getName())); | ||
| } | ||
| this.env = new BeamSqlEnv(metaStore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
| for (Map.Entry<TupleTag<?>, PValue> input : inputs.expand().entrySet()) { | ||
| tables.put(input.getKey().getId(), | ||
| new BeamPCollectionTable(toRows(input.getValue()))); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incidentally, here (or maybe it is nearby beneath a fold) seems like a good place to (possibly redundantly) explain that a PCollection makes a single magic table while any other kind of input uses expand() to make many tables using the tags as names.
|
|
||
| @Override | ||
| public boolean isMutable() { | ||
| return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious - why? Is it that the underlying TableProvider is mutable? Or does this simply mean that the DDL is allowed to introduce new tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means the DDL is allowed to introduce new tables, but I don't know of anywhere it is actually checked or set to anything but true in calcite.
| final boolean existed; | ||
| switch (getKind()) { | ||
| case DROP_TABLE: | ||
| case DROP_MATERIALIZED_VIEW: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting question for the future. For pure SQL REPL use, we probably would want something to name queries for reuse. Does calcite manage these for us, and only delegates materialized views?
| inputPCollections.getPipeline().apply("left", leftRelNode.toPTransform()); | ||
| PCollection<Row> rightRows = | ||
| inputPCollections.apply("right", rightRelNode.toPTransform()); | ||
| inputPCollections.getPipeline().apply("right", rightRelNode.toPTransform()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Does this actually work? If so, is there a different data path whereby the left and right collections are passed as input? This invocation will not register them as inputs to the transform.
| PCollectionTuple inputPCollections) { | ||
| PCollection<Row> factStream = inputPCollections.apply(leftRelNode.toPTransform()); | ||
| PInput inputPCollections) { | ||
| PCollection<Row> factStream = inputPCollections.getPipeline().apply(leftRelNode.toPTransform()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getPipeline().apply() is probably not what you want here, either. It is actually a bad method - it is the same as getPipeline().begin().apply() so it always starts a new initial pipeline segment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current model is wrong, the new model is wrong. I've dropped this change and just added another PCollectionTuple.empty(input.getPipeline()). We can discuss the right way to do it as a followup.
| public PCollection<Row> buildBeamPipeline(PInput inputPCollections) { | ||
| PCollection<Row> leftRows = | ||
| inputPCollections.apply( | ||
| inputPCollections.getPipeline().apply( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here, and throughout
|
run java precommit |
|
Got a Beam 1 Disconnected failure. |
|
run java precommit |
|
run java precommit |
2 similar comments
|
run java precommit |
|
run java precommit |
This PR moves our TableStore into the Calcite Schema as the only way to provide tables which removes the need to copy tables between the two. It also moves our DDL execution into the rel node, which allows calcite to execute the DDL directly.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue../gradlew buildto make sure basic checks pass. A more thorough check will be performed on your pull request automatically.