From 97cdb12516a53c6f0c0ba9d09b833f513b5c4664 Mon Sep 17 00:00:00 2001 From: Alexey Romanenko Date: Tue, 10 Jul 2018 12:20:55 +0200 Subject: [PATCH 1/2] [BEAM-4622] Makes required to call Beam SQL expressions validation --- .../impl/interpreter/BeamSqlFnExecutor.java | 73 ++++++++++++------- .../operator/map/BeamSqlMapExpression.java | 10 ++- .../BeamSqlReinterpretExpression.java | 8 ++ .../sdk/extensions/sql/BeamSqlMapTest.java | 23 ++++++ 4 files changed, 86 insertions(+), 28 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java index de667f0a1e7e..228a0df07aa1 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java @@ -147,6 +147,17 @@ public BeamSqlFnExecutor(RexProgram program) { * represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}. */ static BeamSqlExpression buildExpression(RexNode rexNode) { + BeamSqlExpression ret = getBeamSqlExpression(rexNode); + + if (!ret.accept()) { + throw new IllegalStateException( + ret.getClass().getSimpleName() + " does not accept the operands.(" + rexNode + ")"); + } + + return ret; + } + + private static BeamSqlExpression getBeamSqlExpression(RexNode rexNode) { BeamSqlExpression ret; if (rexNode instanceof RexLiteral) { RexLiteral node = (RexLiteral) rexNode; @@ -156,11 +167,11 @@ static BeamSqlExpression buildExpression(RexNode rexNode) { if (SqlTypeName.CHAR_TYPES.contains(type) && node.getValue() instanceof NlsString) { // NlsString is not serializable, we need to convert // it to string explicitly. - return BeamSqlPrimitive.of(type, ((NlsString) value).getValue()); + ret = BeamSqlPrimitive.of(type, ((NlsString) value).getValue()); } else if (isDateNode(type, value)) { // does this actually make sense? // Calcite actually treat Calendar as the java type of Date Literal - return BeamSqlPrimitive.of(type, new DateTime(((Calendar) value).getTimeInMillis())); + ret = BeamSqlPrimitive.of(type, new DateTime(((Calendar) value).getTimeInMillis())); } else { // node.getTypeName().getSqlTypeName() and node.getSqlTypeName() can be different // e.g. sql: "select 1" @@ -207,7 +218,7 @@ static BeamSqlExpression buildExpression(RexNode rexNode) { } } - return BeamSqlPrimitive.of(realType, realValue); + ret = BeamSqlPrimitive.of(realType, realValue); } } else if (rexNode instanceof RexInputRef) { RexInputRef node = (RexInputRef) rexNode; @@ -394,13 +405,15 @@ static BeamSqlExpression buildExpression(RexNode rexNode) { // date functions case "Reinterpret": - return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName()); + ret = new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName()); + break; case "CEIL": if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) { - return new BeamSqlCeilExpression(subExps); + ret = new BeamSqlCeilExpression(subExps); } else { - return new BeamSqlOperatorExpression(DateOperators.DATETIME_CEIL, subExps); + ret = new BeamSqlOperatorExpression(DateOperators.DATETIME_CEIL, subExps); } + break; case "FLOOR": if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) { @@ -412,53 +425,67 @@ static BeamSqlExpression buildExpression(RexNode rexNode) { case "EXTRACT_DATE": case "EXTRACT": - return new BeamSqlOperatorExpression(DateOperators.EXTRACT, subExps); + ret = new BeamSqlOperatorExpression(DateOperators.EXTRACT, subExps); + break; case "LOCALTIME": case "CURRENT_TIME": - return new BeamSqlCurrentTimeExpression(subExps); + ret = new BeamSqlCurrentTimeExpression(subExps); + break; case "CURRENT_TIMESTAMP": case "LOCALTIMESTAMP": - return new BeamSqlCurrentTimestampExpression(subExps); + ret = new BeamSqlCurrentTimestampExpression(subExps); + break; case "CURRENT_DATE": - return new BeamSqlCurrentDateExpression(); + ret = new BeamSqlCurrentDateExpression(); + break; case "DATETIME_PLUS": - return new BeamSqlDatetimePlusExpression(subExps); + ret = new BeamSqlDatetimePlusExpression(subExps); + break; // array functions case "ARRAY": - return new BeamSqlArrayExpression(subExps); + ret = new BeamSqlArrayExpression(subExps); + break; // map functions case "MAP": - return new BeamSqlMapExpression(subExps); + ret = new BeamSqlMapExpression(subExps); + break; case "ITEM": switch (subExps.get(0).getOutputType()) { case MAP: - return new BeamSqlMapItemExpression(subExps, node.type.getSqlTypeName()); + ret = new BeamSqlMapItemExpression(subExps, node.type.getSqlTypeName()); + break; case ARRAY: - return new BeamSqlArrayItemExpression(subExps, node.type.getSqlTypeName()); + ret = new BeamSqlArrayItemExpression(subExps, node.type.getSqlTypeName()); + break; default: throw new UnsupportedOperationException( "Operator: " + opName + " is not supported yet"); } + break; // collections functions case "ELEMENT": - return new BeamSqlSingleElementExpression(subExps, node.type.getSqlTypeName()); + ret = new BeamSqlSingleElementExpression(subExps, node.type.getSqlTypeName()); + break; case "CARDINALITY": - return new BeamSqlCardinalityExpression(subExps, node.type.getSqlTypeName()); + ret = new BeamSqlCardinalityExpression(subExps, node.type.getSqlTypeName()); + break; case "DOT": - return new BeamSqlDotExpression(subExps, node.type.getSqlTypeName()); + ret = new BeamSqlDotExpression(subExps, node.type.getSqlTypeName()); + break; // DEFAULT keyword for UDF with optional parameter case "DEFAULT": - return new BeamSqlDefaultExpression(); + ret = new BeamSqlDefaultExpression(); + break; case "CASE": ret = new BeamSqlCaseExpression(subExps); @@ -506,14 +533,6 @@ static BeamSqlExpression buildExpression(RexNode rexNode) { throw new UnsupportedOperationException( String.format("%s is not supported yet", rexNode.getClass().toString())); } - - // TODO: https://issues.apache.org/jira/browse/BEAM-4622 - // Many paths above do not reach this validation - if (!ret.accept()) { - throw new IllegalStateException( - ret.getClass().getSimpleName() + " does not accept the operands.(" + rexNode + ")"); - } - return ret; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapExpression.java index e1ceb3c16404..f76ecacfd8de 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapExpression.java @@ -36,7 +36,15 @@ public BeamSqlMapExpression(List operands) { @Override public boolean accept() { - return operands.stream().map(BeamSqlExpression::getOutputType).distinct().count() == 1; + int distinctCount = 2; + if (operands.size() < 2) { + return false; + } + if (operands.get(0).getOutputType().equals(operands.get(1).getOutputType())) { + distinctCount = 1; + } + return operands.stream().map(BeamSqlExpression::getOutputType).distinct().count() + == distinctCount; } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java index 461a776cb5bb..c2607a5d8ce4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.Row; +import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -47,6 +48,13 @@ public BeamSqlReinterpretExpression(List operands, SqlTypeNam @Override public boolean accept() { + // Interval types will be already converted into BIGINT after evaluation. + SqlTypeFamily opTypeFamily = opType(0).getFamily(); + if (opTypeFamily.equals(SqlTypeFamily.INTERVAL_DAY_TIME) + || opTypeFamily.equals(SqlTypeFamily.INTERVAL_YEAR_MONTH)) { + return true; + } + return getOperands().size() == 1 && REINTERPRETER.canConvert(opType(0), SqlTypeName.BIGINT); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java index a49718ce52ad..7017ffaa066f 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java @@ -91,6 +91,29 @@ public void testSelectMapField() { pipeline.run(); } + @Test + public void testSelectMapFieldKeyValueSameType() { + PCollection input = pCollectionOf2Elements(); + + Schema resultType = + Schema.builder() + .addInt32Field("f_int") + .addMapField("f_intStringMap", Schema.FieldType.STRING, Schema.FieldType.STRING) + .build(); + + PCollection result = + input.apply( + "sqlQuery", + SqlTransform.query("SELECT 42, MAP['aa', '1'] as `f_map` FROM PCOLLECTION")); + + PAssert.that(result) + .containsInAnyOrder( + Row.withSchema(resultType).addValues(42, ImmutableMap.of("aa", "1")).build(), + Row.withSchema(resultType).addValues(42, ImmutableMap.of("aa", "1")).build()); + + pipeline.run(); + } + @Test public void testAccessMapElement() { PCollection input = pCollectionOf2Elements(); From 7cf8f6cbfd9724f51c012c9d46b74e0594b0a6e9 Mon Sep 17 00:00:00 2001 From: Alexey Romanenko Date: Wed, 11 Jul 2018 15:43:07 +0200 Subject: [PATCH 2/2] Check number of arguments at first --- .../operator/reinterpret/BeamSqlReinterpretExpression.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java index c2607a5d8ce4..9679c8d13e3c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java @@ -48,6 +48,10 @@ public BeamSqlReinterpretExpression(List operands, SqlTypeNam @Override public boolean accept() { + if (getOperands().size() != 1) { + return false; + } + // Interval types will be already converted into BIGINT after evaluation. SqlTypeFamily opTypeFamily = opType(0).getFamily(); if (opTypeFamily.equals(SqlTypeFamily.INTERVAL_DAY_TIME) @@ -55,7 +59,7 @@ public boolean accept() { return true; } - return getOperands().size() == 1 && REINTERPRETER.canConvert(opType(0), SqlTypeName.BIGINT); + return REINTERPRETER.canConvert(opType(0), SqlTypeName.BIGINT); } @Override