Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public interface BeamSqlTable {
/** create a {@code IO.write()} instance to write to target. */
POutput buildIOWriter(PCollection<Row> input);

/** Whether this table is bounded (known to be finite) or unbounded (may or may not be finite). */
PCollection.IsBounded isBounded();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we probably want to ultimately get rid of BeamSqlTable and just use raw PCollections

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the best way to split up the metadata about a table and its implementation. I believe there might be a different way to organize BeamSqlTable and TableProvider. Good to keep in mind.


/** Get the schema info of the table. */
Schema getSchema();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public BeamIOSourceRel(
this.pipelineOptions = pipelineOptions;
}

@Override
public PCollection.IsBounded isBounded() {
return sqlTable.isBounded();
}

@Override
public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
return new Transform();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,43 @@ public List<RelNode> getPCollectionInputs() {
public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
if (isSideInputLookupJoin()) {
return new SideInputLookupJoin();
} else if (isSideInputJoin()) {
// if one of the sides is Bounded & the other is Unbounded
// then do a sideInput join
// when doing a sideInput join, the windowFn does not need to match
// Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be
// the unbounded
if (joinType == JoinRelType.FULL) {
throw new UnsupportedOperationException(
"FULL OUTER JOIN is not supported when join "
+ "a bounded table with an unbounded table.");
}

BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);

if ((joinType == JoinRelType.LEFT && leftRelNode.isBounded() == PCollection.IsBounded.BOUNDED)
|| (joinType == JoinRelType.RIGHT
&& rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED)) {
throw new UnsupportedOperationException(
"LEFT side of an OUTER JOIN must be Unbounded table.");
}

return new SideInputJoin();
} else {
return new Transform();
return new StandardJoin();
}
}

private boolean isSideInputJoin() {
BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
return (leftRelNode.isBounded() == PCollection.IsBounded.BOUNDED
&& rightRelNode.isBounded() == UNBOUNDED)
|| (leftRelNode.isBounded() == UNBOUNDED
&& rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED);
}

private boolean isSideInputLookupJoin() {
return seekableInputIndex().isPresent() && nonSeekableInputIndex().isPresent();
}
Expand Down Expand Up @@ -200,10 +232,11 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) {
}
}

private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
private class ExtractJoinKeys
extends PTransform<PCollectionList<Row>, PCollectionList<KV<Row, Row>>> {

@Override
public PCollection<Row> expand(PCollectionList<Row> pinput) {
public PCollectionList<KV<Row, Row>> expand(PCollectionList<Row> pinput) {
BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);

Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
Expand All @@ -213,9 +246,6 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) {
PCollection<Row> leftRows = pinput.get(0);
PCollection<Row> rightRows = pinput.get(1);

WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();

// extract the join fields
List<Pair<Integer, Integer>> pairs =
extractJoinColumns(leftRelNode.getRowType().getFieldCount());
Expand Down Expand Up @@ -247,52 +277,56 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) {
false, pairs, extractKeySchemaRight)))
.setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));

// a regular join
if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
&& rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
|| (leftRows.isBounded() == UNBOUNDED && rightRows.isBounded() == UNBOUNDED)) {
verifySupportedTrigger(leftRows);
verifySupportedTrigger(rightRows);

try {
leftWinFn.verifyCompatibility(rightWinFn);
} catch (IncompatibleWindowException e) {
throw new IllegalArgumentException(
"WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
}
return PCollectionList.of(extractedLeftRows).and(extractedRightRows);
}
}

return standardJoin(extractedLeftRows, extractedRightRows, leftSchema, rightSchema);
} else if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
&& rightRows.isBounded() == UNBOUNDED)
|| (leftRows.isBounded() == UNBOUNDED
&& rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
// if one of the sides is Bounded & the other is Unbounded
// then do a sideInput join
// when doing a sideInput join, the windowFn does not need to match
// Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be
// the unbounded
if (joinType == JoinRelType.FULL) {
throw new UnsupportedOperationException(
"FULL OUTER JOIN is not supported when join "
+ "a bounded table with an unbounded table.");
}
private class SideInputJoin extends PTransform<PCollectionList<Row>, PCollection<Row>> {

if ((joinType == JoinRelType.LEFT && leftRows.isBounded() == PCollection.IsBounded.BOUNDED)
|| (joinType == JoinRelType.RIGHT
&& rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
throw new UnsupportedOperationException(
"LEFT side of an OUTER JOIN must be Unbounded table.");
}
@Override
public PCollection<Row> expand(PCollectionList<Row> pinput) {
Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
Schema rightSchema = CalciteUtils.toSchema(right.getRowType());

return sideInputJoin(extractedLeftRows, extractedRightRows, leftSchema, rightSchema);
} else {
throw new UnsupportedOperationException(
"The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn);
PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new ExtractJoinKeys());

PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);

return sideInputJoin(extractedLeftRows, extractedRightRows, leftSchema, rightSchema);
}
}

private class StandardJoin extends PTransform<PCollectionList<Row>, PCollection<Row>> {

@Override
public PCollection<Row> expand(PCollectionList<Row> pinput) {
Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
Schema rightSchema = CalciteUtils.toSchema(right.getRowType());

PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new ExtractJoinKeys());

PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);

WindowFn leftWinFn = extractedLeftRows.getWindowingStrategy().getWindowFn();
WindowFn rightWinFn = extractedRightRows.getWindowingStrategy().getWindowFn();

try {
leftWinFn.verifyCompatibility(rightWinFn);
} catch (IncompatibleWindowException e) {
throw new IllegalArgumentException(
"WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
}

verifySupportedTrigger(extractedLeftRows);
verifySupportedTrigger(extractedRightRows);

return standardJoin(extractedLeftRows, extractedRightRows, leftSchema, rightSchema);
}
}

private void verifySupportedTrigger(PCollection<Row> pCollection) {
private <T> void verifySupportedTrigger(PCollection<T> pCollection) {
WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();

if (UNBOUNDED.equals(pCollection.isBounded()) && !triggersOncePerWindow(windowingStrategy)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@
/** A {@link RelNode} that can also give a {@link PTransform} that implements the expression. */
public interface BeamRelNode extends RelNode {

/**
* Whether the collection of rows represented by this relational expression is bounded (known to
* be finite) or unbounded (may or may not be finite).
*
* @return bounded if and only if all PCollection inputs are bounded
*/
default PCollection.IsBounded isBounded() {
return getPCollectionInputs()
.stream()
.allMatch(
rel ->
BeamSqlRelUtils.getBeamRelInput(rel).isBounded()
== PCollection.IsBounded.BOUNDED)
? PCollection.IsBounded.BOUNDED
: PCollection.IsBounded.UNBOUNDED;
}

default List<RelNode> getPCollectionInputs() {
return getInputs();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public BeamPCollectionTable(PCollection<InputT> upstream) {
this.upstream = upstream;
}

@Override
public PCollection.IsBounded isBounded() {
return upstream.isBounded();
}

@Override
public PCollection<Row> buildIOReader(PBegin begin) {
assert begin.getPipeline() == upstream.getPipeline();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public BeamBigQueryTable(Schema beamSchema, String tableSpec) {
this.tableSpec = tableSpec;
}

@Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.BOUNDED;
}

@Override
public PCollection<Row> buildIOReader(PBegin begin) {
// TODO: make this more generic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates
return this;
}

@Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.UNBOUNDED;
}

public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>
getPTransformForInput();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ static Builder builder() {
return new AutoValue_PubsubIOJsonTable.Builder();
}

@Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.UNBOUNDED;
}

/**
* Table schema, describes Pubsub message schema.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public TestBoundedTable(Schema beamSchema) {
super(beamSchema);
}

@Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.BOUNDED;
}

/**
* Convenient way to build a mocked bounded table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public List<Row> getRows() {
private static class InMemoryTable implements BeamSqlTable {
private TableWithRows tableWithRows;

@Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.BOUNDED;
}

public InMemoryTable(TableWithRows tableWithRows) {
this.tableWithRows = tableWithRows;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public TestUnboundedTable timestampColumnIndex(int idx) {
return this;
}

@Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.UNBOUNDED;
}

/**
* Add rows to the builder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public String getFilePattern() {
return filePattern;
}

@Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.BOUNDED;
}

@Override
public PCollection<Row> buildIOReader(PBegin begin) {
return begin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public FakeTable() {
super(null);
}

@Override
public PCollection.IsBounded isBounded() {
return null;
}

@Override
public PCollection<Row> buildIOReader(PBegin begin) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public SiteLookupTable(Schema schema) {
super(schema);
}

@Override
public PCollection.IsBounded isBounded() {
throw new UnsupportedOperationException();
}

@Override
public PCollection<Row> buildIOReader(PBegin begin) {
throw new UnsupportedOperationException();
Expand Down