Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ public BeamSqlFnExecutor(RexProgram program) {
* represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
*/
static BeamSqlExpression buildExpression(RexNode rexNode) {
BeamSqlExpression ret = getBeamSqlExpression(rexNode);

if (!ret.accept()) {
throw new IllegalStateException(
ret.getClass().getSimpleName() + " does not accept the operands.(" + rexNode + ")");
}

return ret;
}

private static BeamSqlExpression getBeamSqlExpression(RexNode rexNode) {
BeamSqlExpression ret;
if (rexNode instanceof RexLiteral) {
RexLiteral node = (RexLiteral) rexNode;
Expand All @@ -156,11 +167,11 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
if (SqlTypeName.CHAR_TYPES.contains(type) && node.getValue() instanceof NlsString) {
// NlsString is not serializable, we need to convert
// it to string explicitly.
return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
ret = BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
} else if (isDateNode(type, value)) {
// does this actually make sense?
// Calcite actually treat Calendar as the java type of Date Literal
return BeamSqlPrimitive.of(type, new DateTime(((Calendar) value).getTimeInMillis()));
ret = BeamSqlPrimitive.of(type, new DateTime(((Calendar) value).getTimeInMillis()));
} else {
// node.getTypeName().getSqlTypeName() and node.getSqlTypeName() can be different
// e.g. sql: "select 1"
Expand Down Expand Up @@ -207,7 +218,7 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
}
}

return BeamSqlPrimitive.of(realType, realValue);
ret = BeamSqlPrimitive.of(realType, realValue);
}
} else if (rexNode instanceof RexInputRef) {
RexInputRef node = (RexInputRef) rexNode;
Expand Down Expand Up @@ -394,13 +405,15 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {

// date functions
case "Reinterpret":
return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName());
ret = new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName());
break;
case "CEIL":
if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
return new BeamSqlCeilExpression(subExps);
ret = new BeamSqlCeilExpression(subExps);
} else {
return new BeamSqlOperatorExpression(DateOperators.DATETIME_CEIL, subExps);
ret = new BeamSqlOperatorExpression(DateOperators.DATETIME_CEIL, subExps);
}
break;

case "FLOOR":
if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
Expand All @@ -412,53 +425,67 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {

case "EXTRACT_DATE":
case "EXTRACT":
return new BeamSqlOperatorExpression(DateOperators.EXTRACT, subExps);
ret = new BeamSqlOperatorExpression(DateOperators.EXTRACT, subExps);
break;

case "LOCALTIME":
case "CURRENT_TIME":
return new BeamSqlCurrentTimeExpression(subExps);
ret = new BeamSqlCurrentTimeExpression(subExps);
break;

case "CURRENT_TIMESTAMP":
case "LOCALTIMESTAMP":
return new BeamSqlCurrentTimestampExpression(subExps);
ret = new BeamSqlCurrentTimestampExpression(subExps);
break;

case "CURRENT_DATE":
return new BeamSqlCurrentDateExpression();
ret = new BeamSqlCurrentDateExpression();
break;

case "DATETIME_PLUS":
return new BeamSqlDatetimePlusExpression(subExps);
ret = new BeamSqlDatetimePlusExpression(subExps);
break;

// array functions
case "ARRAY":
return new BeamSqlArrayExpression(subExps);
ret = new BeamSqlArrayExpression(subExps);
break;
// map functions
case "MAP":
return new BeamSqlMapExpression(subExps);
ret = new BeamSqlMapExpression(subExps);
break;

case "ITEM":
switch (subExps.get(0).getOutputType()) {
case MAP:
return new BeamSqlMapItemExpression(subExps, node.type.getSqlTypeName());
ret = new BeamSqlMapItemExpression(subExps, node.type.getSqlTypeName());
break;
case ARRAY:
return new BeamSqlArrayItemExpression(subExps, node.type.getSqlTypeName());
ret = new BeamSqlArrayItemExpression(subExps, node.type.getSqlTypeName());
break;
default:
throw new UnsupportedOperationException(
"Operator: " + opName + " is not supported yet");
}
break;

// collections functions
case "ELEMENT":
return new BeamSqlSingleElementExpression(subExps, node.type.getSqlTypeName());
ret = new BeamSqlSingleElementExpression(subExps, node.type.getSqlTypeName());
break;

case "CARDINALITY":
return new BeamSqlCardinalityExpression(subExps, node.type.getSqlTypeName());
ret = new BeamSqlCardinalityExpression(subExps, node.type.getSqlTypeName());
break;

case "DOT":
return new BeamSqlDotExpression(subExps, node.type.getSqlTypeName());
ret = new BeamSqlDotExpression(subExps, node.type.getSqlTypeName());
break;

// DEFAULT keyword for UDF with optional parameter
case "DEFAULT":
return new BeamSqlDefaultExpression();
ret = new BeamSqlDefaultExpression();
break;

case "CASE":
ret = new BeamSqlCaseExpression(subExps);
Expand Down Expand Up @@ -506,14 +533,6 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
throw new UnsupportedOperationException(
String.format("%s is not supported yet", rexNode.getClass().toString()));
}

// TODO: https://issues.apache.org/jira/browse/BEAM-4622
// Many paths above do not reach this validation
if (!ret.accept()) {
throw new IllegalStateException(
ret.getClass().getSimpleName() + " does not accept the operands.(" + rexNode + ")");
}

return ret;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,15 @@ public BeamSqlMapExpression(List<BeamSqlExpression> operands) {

@Override
public boolean accept() {
return operands.stream().map(BeamSqlExpression::getOutputType).distinct().count() == 1;
int distinctCount = 2;
if (operands.size() < 2) {
return false;
}
if (operands.get(0).getOutputType().equals(operands.get(1).getOutputType())) {
distinctCount = 1;
}
return operands.stream().map(BeamSqlExpression::getOutputType).distinct().count()
== distinctCount;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;

/**
Expand All @@ -47,7 +48,18 @@ public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeNam

@Override
public boolean accept() {
return getOperands().size() == 1 && REINTERPRETER.canConvert(opType(0), SqlTypeName.BIGINT);
if (getOperands().size() != 1) {
return false;
}

// Interval types will be already converted into BIGINT after evaluation.
SqlTypeFamily opTypeFamily = opType(0).getFamily();
if (opTypeFamily.equals(SqlTypeFamily.INTERVAL_DAY_TIME)
|| opTypeFamily.equals(SqlTypeFamily.INTERVAL_YEAR_MONTH)) {
return true;
}

return REINTERPRETER.canConvert(opType(0), SqlTypeName.BIGINT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ public void testSelectMapField() {
pipeline.run();
}

@Test
public void testSelectMapFieldKeyValueSameType() {
PCollection<Row> input = pCollectionOf2Elements();

Schema resultType =
Schema.builder()
.addInt32Field("f_int")
.addMapField("f_intStringMap", Schema.FieldType.STRING, Schema.FieldType.STRING)
.build();

PCollection<Row> result =
input.apply(
"sqlQuery",
SqlTransform.query("SELECT 42, MAP['aa', '1'] as `f_map` FROM PCOLLECTION"));

PAssert.that(result)
.containsInAnyOrder(
Row.withSchema(resultType).addValues(42, ImmutableMap.of("aa", "1")).build(),
Row.withSchema(resultType).addValues(42, ImmutableMap.of("aa", "1")).build());

pipeline.run();
}

@Test
public void testAccessMapElement() {
PCollection<Row> input = pCollectionOf2Elements();
Expand Down