From f0f34c8f338a405f5f3cfa4694258fe81c641ca8 Mon Sep 17 00:00:00 2001 From: twalthr Date: Wed, 13 Apr 2016 12:18:16 +0200 Subject: [PATCH 1/3] [FLINK-3739] [table] Add a null literal to Table API --- .../flink/api/scala/table/expressionDsl.scala | 2 +- .../api/table/codegen/CodeGenerator.scala | 21 +++--- .../table/expressions/ExpressionParser.scala | 16 ++++- .../api/table/expressions/literals.scala | 12 ++++ .../flink/api/table/expressions/package.scala | 4 +- .../api/table/typeutils/TypeConverter.scala | 4 ++ .../java/table/test/ExpressionsITCase.java | 25 ++++++++ .../scala/sql/test/ExpressionsITCase.scala | 64 +++++++++++++++++++ .../api/scala/sql/test/FilterITCase.scala | 1 - .../scala/table/test/ExpressionsITCase.scala | 25 +++++++- 10 files changed, 156 insertions(+), 18 deletions(-) create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index 5aa8a51f93d91..505d872636d55 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -27,7 +27,7 @@ import scala.language.implicitConversions * operations. * * These operations must be kept in sync with the parser in - * [[org.apache.flink.api.table.parser.ExpressionParser]]. + * [[org.apache.flink.api.table.expressions.ExpressionParser]]. */ trait ImplicitExpressionOperations { def expr: Expression diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index f213d4cdffea4..c336c8257d16a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -521,6 +521,11 @@ class CodeGenerator( override def visitLiteral(literal: RexLiteral): GeneratedExpression = { val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName) val value = literal.getValue3 + // null value with type + if (value == null) { + return generateNullLiteral(resultType) + } + // non-null values literal.getType.getSqlTypeName match { case BOOLEAN => generateNonNullLiteral(resultType, literal.getValue3.toString) @@ -574,8 +579,6 @@ class CodeGenerator( } case VARCHAR | CHAR => generateNonNullLiteral(resultType, "\"" + value.toString + "\"") - case NULL => - generateNullLiteral(resultType) case SYMBOL => val symbolOrdinal = value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal() generateNonNullLiteral(resultType, symbolOrdinal.toString) @@ -742,6 +745,12 @@ class CodeGenerator( } } + override def visitOver(over: RexOver): GeneratedExpression = ??? + + // ---------------------------------------------------------------------------------------------- + // generator helping methods + // ---------------------------------------------------------------------------------------------- + def checkNumericOrString(left: GeneratedExpression, right: GeneratedExpression): Unit = { if (isNumeric(left)) { requireNumeric(right) @@ -750,12 +759,6 @@ class CodeGenerator( } } - override def visitOver(over: RexOver): GeneratedExpression = ??? - - // ---------------------------------------------------------------------------------------------- - // generator helping methods - // ---------------------------------------------------------------------------------------------- - private def generateInputAccess( inputType: TypeInformation[Any], inputTerm: String, @@ -906,7 +909,7 @@ class CodeGenerator( val wrappedCode = if (nullCheck) { s""" - |$resultTypeTerm $resultTerm = null; + |$resultTypeTerm $resultTerm = $defaultValue; |boolean $nullTerm = true; |""".stripMargin } else { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala index 4c882490a47e1..8a24d3c43b7c8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala @@ -77,6 +77,18 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { str => Literal(str.toBoolean) } + lazy val nullLiteral: PackratParser[Expression] = + "Null(BYTE)" ^^ { e => Null(BasicTypeInfo.BYTE_TYPE_INFO) } | + "Null(SHORT)" ^^ { e => Null(BasicTypeInfo.SHORT_TYPE_INFO) } | + "Null(INT)" ^^ { e => Null(BasicTypeInfo.INT_TYPE_INFO) } | + "Null(LONG)" ^^ { e => Null(BasicTypeInfo.LONG_TYPE_INFO) } | + "Null(FLOAT)" ^^ { e => Null(BasicTypeInfo.FLOAT_TYPE_INFO) } | + "Null(DOUBLE)" ^^ { e => Null(BasicTypeInfo.DOUBLE_TYPE_INFO) } | + "Null(BOOL)" ^^ { e => Null(BasicTypeInfo.BOOLEAN_TYPE_INFO) } | + "Null(BOOLEAN)" ^^ { e => Null(BasicTypeInfo.BOOLEAN_TYPE_INFO) } | + "Null(STRING)" ^^ { e => Null(BasicTypeInfo.STRING_TYPE_INFO) } | + "Null(DATE)" ^^ { e => Null(BasicTypeInfo.DATE_TYPE_INFO) } + lazy val literalExpr: PackratParser[Expression] = numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | @@ -188,8 +200,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val suffix = isNull | isNotNull | - sum | min | max | count | avg | cast | - specialFunctionCalls |functionCall | functionCallWithoutArgs | + sum | min | max | count | avg | cast | nullLiteral | + specialFunctionCalls | functionCall | functionCallWithoutArgs | specialSuffixFunctionCalls | suffixFunctionCall | suffixFunctionCallWithoutArgs | atom diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala index efaa96d533cb6..1fbe5a3709073 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala @@ -23,6 +23,7 @@ import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.scala.table.ImplicitExpressionOperations +import org.apache.flink.api.table.typeutils.TypeConverter object Literal { def apply(l: Any): Literal = l match { @@ -49,3 +50,14 @@ case class Literal(value: Any, tpe: TypeInformation[_]) relBuilder.literal(value) } } + +case class Null(tpe: TypeInformation[_]) extends LeafExpression { + def expr = this + def typeInfo = tpe + + override def toString = s"null" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.getRexBuilder.makeNullLiteral(TypeConverter.typeInfoToSqlType(tpe)) + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala index c5c8c943e1549..2e5d0b2f71eae 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala @@ -21,8 +21,8 @@ package org.apache.flink.api.table * This package contains the base class of AST nodes and all the expression language AST classes. * Expression trees should not be manually constructed by users. They are implicitly constructed * from the implicit DSL conversions in - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API, + * [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and + * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]]. For the Java API, * expression trees should be generated from a string parser that parses expressions and creates * AST nodes. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala index dc3abb7ceebd5..02fe21d46a6cd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala @@ -75,6 +75,10 @@ object TypeConverter { case VARCHAR | CHAR => STRING_TYPE_INFO case DATE => DATE_TYPE_INFO + case NULL => + throw new TableException("Type NULL is not supported. " + + "Null values must have a supported type.") + // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING // are represented as integer case SYMBOL => INT_TYPE_INFO diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java index 8c301632a7f1a..996542df37033 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java @@ -100,5 +100,30 @@ public void testComparisons() throws Exception { compareResultAsText(results, expected); } + @Test + public void testNullLiteral() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); + + DataSource> input = + env.fromElements(new Tuple2<>(1, 0)); + + Table table = + tableEnv.fromDataSet(input, "a, b"); + + Table result = table.select("a, b, Null(INT), Null(STRING) === ''"); + + DataSet ds = tableEnv.toDataSet(result, Row.class); + List results = ds.collect(); + String expected; + if (getConfig().getNullCheck()) { + expected = "1,0,null,null"; + } + else { + expected = "1,0,-1,true"; + } + compareResultAsText(results, expected); + } + } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala new file mode 100644 index 0000000000000..1d72c5d9df96b --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.sql.test + +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class ExpressionsITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testNullLiteral(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT a, b, CAST(NULL AS INT), CAST(NULL AS VARCHAR) = '' FROM MyTable" + + val ds = env.fromElements((1, 0)) + tEnv.registerDataSet("MyTable", ds, 'a, 'b) + + val result = tEnv.sql(sqlQuery) + + val expected = if (getConfig.getNullCheck) { + "1,0,null,null" + } else { + "1,0,-1,true" + } + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala index c89e25ad004a8..171e200262c46 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala @@ -18,7 +18,6 @@ package org.apache.flink.api.scala.sql.test -import org.apache.calcite.tools.ValidationException import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala index ba0311ae9d3fc..29b3be4e7fe4d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala @@ -24,10 +24,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.Row -import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.test.utils.TableProgramsTestBase -import TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.expressions.{Literal, Null} import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ @@ -91,6 +90,26 @@ class ExpressionsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testNullLiteral(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + + val t = env.fromElements((1, 0)).as('a, 'b) + .select( + 'a, + 'b, + Null(BasicTypeInfo.INT_TYPE_INFO), + Null(BasicTypeInfo.STRING_TYPE_INFO) === "") + + val expected = if (getConfig.getNullCheck) { + "1,0,null,null" + } else { + "1,0,-1,true" + } + val results = t.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + // Date literals not yet supported @Ignore @Test From d7ec9097db5918a0cafd20312eb6bd73cedf6dcf Mon Sep 17 00:00:00 2001 From: twalthr Date: Thu, 14 Apr 2016 15:52:32 +0200 Subject: [PATCH 2/3] Doc updated --- docs/apis/batch/libs/table.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/apis/batch/libs/table.md b/docs/apis/batch/libs/table.md index 527d10dbe9230..b8a0e6d9cea6b 100644 --- a/docs/apis/batch/libs/table.md +++ b/docs/apis/batch/libs/table.md @@ -560,3 +560,10 @@ val result = tableEnv.sql("SELECT * FROM MyTable") {% top %} +Runtime Configuration +---- +The Table API provides a configuration (the so-called `TableConfig`) to modify runtime behavior. It can be accessed either through `TableEnvironment` or passed to the `toTable` method when using Scala implicit conversion. + +### Null Handling +By default, the Table API does not support `null` values at runtime for efficiency purposes. Null handling can be enabled by setting the `nullCheck` property in the `TableConfig` to `true`. + From 79a63229a132e33e27de26184f48443195ba904e Mon Sep 17 00:00:00 2001 From: twalthr Date: Thu, 14 Apr 2016 16:01:16 +0200 Subject: [PATCH 3/3] fix typo --- docs/apis/batch/libs/table.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apis/batch/libs/table.md b/docs/apis/batch/libs/table.md index b8a0e6d9cea6b..56e2b6b4b4cdd 100644 --- a/docs/apis/batch/libs/table.md +++ b/docs/apis/batch/libs/table.md @@ -562,7 +562,7 @@ val result = tableEnv.sql("SELECT * FROM MyTable") Runtime Configuration ---- -The Table API provides a configuration (the so-called `TableConfig`) to modify runtime behavior. It can be accessed either through `TableEnvironment` or passed to the `toTable` method when using Scala implicit conversion. +The Table API provides a configuration (the so-called `TableConfig`) to modify runtime behavior. It can be accessed either through `TableEnvironment` or passed to the `toDataSet`/`toDataStream` method when using Scala implicit conversion. ### Null Handling By default, the Table API does not support `null` values at runtime for efficiency purposes. Null handling can be enabled by setting the `nullCheck` property in the `TableConfig` to `true`.