Skip to content

[BEAM-3773][SQL] Add EnumerableConverter for JDBC support#5173

Merged
kennknowles merged 1 commit intoapache:masterfrom
apilloud:enumerable
May 3, 2018
Merged

[BEAM-3773][SQL] Add EnumerableConverter for JDBC support#5173
kennknowles merged 1 commit intoapache:masterfrom
apilloud:enumerable

Conversation

@apilloud
Copy link
Member

@apilloud apilloud commented Apr 18, 2018

This adds a converter from BeamRelNode to EnumerableRel. The calcite JDBC engine can execute any plan with a root node of the EnumerableRel type.


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand:
    • What the pull request does
    • Why it does it
    • How it does it
    • Why this approach
  • Each commit in the pull request should have a meaningful subject line and body.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@apilloud
Copy link
Member Author

R: @kennknowles
cc: @akedin

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be tested in isolation at all? Or can it be tested via other test suites?

}

public static Enumerable<Object> toEnumerable(BeamRelNode node) {
PipelineOptions options = PipelineOptionsFactory.create();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking - since Collector only works on the DirectRunner it seems fine to hardcode it here. But are the options specified elsewhere when using SQL so here it would just be validation that the configuration is supported?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipelines that aren't supported should get a NullPointerException when the Collector tries to process an element. I considered adding a check but decided it could wait because this is the only public path in. Is the user able to change the runner without the ability to set the PipelineOptions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather get the options plumbed deliberately here. Assuming decent testing, it won't get left forever, but it does leave the code in a sort of weird stage. Does it add a ton of scope to plumb them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plumbing the PipelineOptions into BeamEnumerableConverter is not easy as they will have to take a trip through the calcite JDBC interface on the way here (possibly on the jdbc connection string). I can add them as an argument to toEnumerable, but that will just move this line up to implement.

}

private static class Collector extends DoFn<Row, Void> {
// This will only work on the direct runner.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there are two routes that are interesting here: (1) expose the ability to observe PCollection contents in the DirectRunner. This used to be the case when it only supported bounded collections. It is a different situation now, as the contents are never materialized. But that could change.

The other thing that would potentially make it cross runner, and what Scio does, is to write it to a sink. It could be TextIO writing to the globally-required tempLocation. We should definitely learn from Scio either way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on writing to a sink. I was considering a version that sinks to BigQuery and read back from there. There are many options with different tradeoffs, we should consider them as followups.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think we probably want to be able to set options to control that if/when we do implement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just (w/ JIRA) add checkArgument(options.getRunner().getCanonicalName().equals("org.apache.beam.runners.direct.DirectRunner"))?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apilloud
Copy link
Member Author

And this one is green.


Collector.globalValues.put(id, values);
run(options, node, new Collector());
Collector.globalValues.remove(id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to keep a static state? Cannot this be an instance field of the Collector?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Collector is serialized by beam and a copy is passed to the worker. As a result each worker ends up with a copy of the collector, so the original queue won't contain any values added by the worker. The static state is circumventing Beam's requirement that there be no global state. This only works in the direct runner because all workers are in the same process.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like a pattern of concern that this only works on the direct runner but maybe is positioned in the codebase in a way that makes us think it should work more broadly. It could be solved just by really clear signaling and validation, perhaps, and maybe TODO JIRAs? I'm looking for options to make everyone happy and unblock your other work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm explicitly violating the contract here, but being able to collect results in memory seems like a useful testing pattern for the direct runner. Is this something that might be able to live in a util folder inside the direct runner?

/**
* BeamRelNode to replace a {@code Enumerable} node.
*/
public class BeamEnumerableConverter extends ConverterImpl implements EnumerableRel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document the behavior and how this class is used? And maybe an integration test of some kind?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A integration test would be good, I'll get one added. I started down the unit test path last week and ended up deleting all the code that was unit testable when I found a library that implemented it.

.apply(node.toPTransform())
.apply(ParDo.of(doFn));
PipelineResult result = pipeline.run();
result.waitUntilFinish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work in general case? E.g. what happens when the input is unbounded? My understanding is that Enumerables can be unbounded as well, and JDBC supports paginated unbounded results as well, is this going to work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not work in the general case, it only works on pipelines that eventually finish. I wanted to get something in for my DDL work, we can make this better in a followup.

}

public static Enumerable<Object> toEnumerable(BeamRelNode node) {
PipelineOptions options = PipelineOptionsFactory.create();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather get the options plumbed deliberately here. Assuming decent testing, it won't get left forever, but it does leave the code in a sort of weird stage. Does it add a ton of scope to plumb them?

}

private static class Collector extends DoFn<Row, Void> {
// This will only work on the direct runner.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think we probably want to be able to set options to control that if/when we do implement.


Collector.globalValues.put(id, values);
run(options, node, new Collector());
Collector.globalValues.remove(id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like a pattern of concern that this only works on the direct runner but maybe is positioned in the codebase in a way that makes us think it should work more broadly. It could be solved just by really clear signaling and validation, perhaps, and maybe TODO JIRAs? I'm looking for options to make everyone happy and unblock your other work.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's unblock w/ JIRAs as we all agree which piece to come back to.

private static Enumerable<Object> count(PipelineOptions options, BeamRelNode node) {
PipelineResult result = run(options, node, new RowCounter());
MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named(BeamEnumerableConverter.class, "rows"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to spend some time on the best way to get reliable counts that are meaningful in the way that JDBC expects. I think the SQL <-> IO adapter may have to own it, for those Beam connectors that do things like write "mutations" that are not rows at all, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

private static class Collector extends DoFn<Row, Void> {
// This will only work on the direct runner.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just (w/ JIRA) add checkArgument(options.getRunner().getCanonicalName().equals("org.apache.beam.runners.direct.DirectRunner"))?

@apilloud
Copy link
Member Author

Fixed the issues from the comments and added tests at the toEnumerable layer. There are still some missing pieces for a functional calcite JDBC test, so that will have to come once those features are added.

@kennknowles kennknowles merged commit bf94e36 into apache:master May 3, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants