From 09ea8e7bdbec342c2ff4bd47fb465085b8a3c8d9 Mon Sep 17 00:00:00 2001 From: sunjincheng121 Date: Sun, 4 Jun 2017 13:34:53 +0800 Subject: [PATCH] [FLINK-6811][table] Add TIMESTAMPADD supported in SQL --- docs/dev/table/sql.md | 12 ++ .../sql/fun/SqlTimestampAddFunction.java | 93 +++++++++++++ .../flink/table/codegen/CodeGenerator.scala | 13 +- .../table/codegen/calls/ScalarOperators.scala | 4 + .../table/validate/FunctionCatalog.scala | 1 + .../expressions/ScalarFunctionsTest.scala | 128 ++++++++++++++++++ 6 files changed, 249 insertions(+), 2 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlTimestampAddFunction.java diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 11b7b0ee244b9..6f8cf32dd1ddb 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -1906,6 +1906,18 @@ QUARTER(date)

Determines whether two anchored time intervals overlap. Time point and temporal are transformed into a range defined by two time points (start, end). The function evaluates leftEnd >= rightStart && rightEnd >= leftStart. E.g. (TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR) leads to true; (TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR) leads to false.

+ + + + {% highlight text %} +TIMESTAMPADD(unit, interval, timestamp_expr) +{% endhighlight %} + + +

Adds the integer expression interval to the timestamp expression timestamp_expr. The unit for interval is given by the unit argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or YEAR. E.g. TIMESTAMPADD(WEEK, 1, '2003-01-02') leads to 2003-01-09.

+ + + diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlTimestampAddFunction.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlTimestampAddFunction.java new file mode 100644 index 0000000000000..be7e5f284fa5c --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlTimestampAddFunction.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.sql.fun; + +import org.apache.calcite.avatica.util.TimeUnit; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * + * The TIMESTAMPADD function, which adds an interval to a + * timestamp. + * + *

The SQL syntax is + * + *

+ * TIMESTAMPADD(timestamp interval, quantity, + * timestamp) + *
+ * + *

The interval time unit can one of the following literals:

+ * + *

Returns modified timestamp. + * + * __Note__: Due to the change of [[org.apache.calcite.rex.RexLiteral]] we should keep using this + * class until upgrade to calcite 1.13. + */ +class SqlTimestampAddFunction extends SqlFunction { + + private static final SqlReturnTypeInference RETURN_TYPE_INFERENCE = + new SqlReturnTypeInference() { + public RelDataType inferReturnType(SqlOperatorBinding opBinding) { + final RelDataTypeFactory typeFactory = opBinding.getTypeFactory(); + final TimeUnit unit = (TimeUnit) opBinding.getOperandLiteralValue(0); + switch (unit) { + case HOUR: + case MINUTE: + case SECOND: + case MILLISECOND: + case MICROSECOND: + return typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.TIMESTAMP), + opBinding.getOperandType(1).isNullable() + || opBinding.getOperandType(2).isNullable()); + default: + return opBinding.getOperandType(2); + } + } + }; + + /** Creates a SqlTimestampAddFunction. */ + SqlTimestampAddFunction() { + super("TIMESTAMPADD", SqlKind.TIMESTAMP_ADD, RETURN_TYPE_INFERENCE, null, + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER, + SqlTypeFamily.TIMESTAMP), + SqlFunctionCategory.TIMEDATE); + } +} + +// End SqlTimestampAddFunction.java diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 045fbdd1ae156..5328b799388a1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -1313,7 +1313,8 @@ class CodeGenerator( if (decimal.isValidInt) { generateNonNullLiteral(resultType, decimal.intValue().toString) } else { - throw new CodeGenException("Decimal can not be converted to interval of months.") + throw new CodeGenException( + s"Decimal '${decimal}' can not be converted to interval of months.") } case typeName if DAY_INTERVAL_TYPES.contains(typeName) => @@ -1321,7 +1322,8 @@ class CodeGenerator( if (decimal.isValidLong) { generateNonNullLiteral(resultType, decimal.longValue().toString + "L") } else { - throw new CodeGenException("Decimal can not be converted to interval of milliseconds.") + throw new CodeGenException( + s"Decimal '${decimal}' can not be converted to interval of milliseconds.") } case t@_ => @@ -1390,6 +1392,13 @@ class CodeGenerator( requireNumeric(right) generateArithmeticOperator("*", nullCheck, resultType, left, right) + case MULTIPLY if isTimeInterval(resultType) => + val left = operands.head + val right = operands(1) + requireTimeInterval(left) + requireNumeric(right) + generateArithmeticOperator("*", nullCheck, resultType, left, right) + case DIVIDE | DIVIDE_INTEGER if isNumeric(resultType) => val left = operands.head val right = operands(1) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala index af92df4461b47..5b4dcb0e2251c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala @@ -1101,6 +1101,10 @@ object ScalarOperators { && !isDecimal(operandType) && !isDecimal(resultType)) { (operandTerm) => s"(($resultTypeTerm) $operandTerm)" } + // result type is time interval and operand type is Integer + else if (isTimeInterval(resultType) && isInteger(operandType)){ + (operandTerm) => s"(($resultTypeTerm) $operandTerm)" + } else { throw new CodeGenException(s"Unsupported casting from $operandType to $resultType.") } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index df77441e40077..f26b394398d50 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -396,6 +396,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { ScalarSqlFunctions.E, ScalarSqlFunctions.CONCAT, ScalarSqlFunctions.CONCAT_WS, + SqlStdOperatorTable.TIMESTAMP_ADD, // EXTENSIONS SqlStdOperatorTable.TUMBLE, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index be28134bc0549..277ba6532e123 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{Types, ValidationException} import org.apache.flink.table.expressions.utils.ExpressionTestBase import org.apache.flink.types.Row +import org.apache.flink.table.api.SqlParserException import org.junit.Test class ScalarFunctionsTest extends ExpressionTestBase { @@ -1527,6 +1528,133 @@ class ScalarFunctionsTest extends ExpressionTestBase { "4") } + @Test + def testTimestampAdd(): Unit = { + val data = Seq( + (1, "timestamp '2017-11-29 22:58:58.998'"), + (3, "timestamp '2017-11-29 22:58:58.998'"), + (-1, "timestamp '2017-11-29 22:58:58.998'"), + (-61, "timestamp '2017-11-29 22:58:58.998'"), + (-1000, "timestamp '2017-11-29 22:58:58.998'") + ) + + val YEAR = Seq( + "2018-11-29 22:58:58.998", + "2020-11-29 22:58:58.998", + "2016-11-29 22:58:58.998", + "1956-11-29 22:58:58.998", + "1017-11-29 22:58:58.998") + + val QUARTER = Seq( + "2018-03-01 22:58:58.998", + "2018-08-31 22:58:58.998", + "2017-08-29 22:58:58.998", + "2002-08-29 22:58:58.998", + "1767-11-29 22:58:58.998") + + val MONTH = Seq( + "2017-12-29 22:58:58.998", + "2018-03-01 22:58:58.998", + "2017-10-29 22:58:58.998", + "2012-10-29 22:58:58.998", + "1934-07-29 22:58:58.998") + + val WEEK = Seq( + "2017-12-06 22:58:58.998", + "2017-12-20 22:58:58.998", + "2017-11-22 22:58:58.998", + "2016-09-28 22:58:58.998", + "1998-09-30 22:58:58.998") + + val DAY = Seq( + "2017-11-30 22:58:58.998", + "2017-12-02 22:58:58.998", + "2017-11-28 22:58:58.998", + "2017-09-29 22:58:58.998", + "2015-03-05 22:58:58.998") + + val HOUR = Seq( + "2017-11-29 23:58:58.998", + "2017-11-30 01:58:58.998", + "2017-11-29 21:58:58.998", + "2017-11-27 09:58:58.998", + "2017-10-19 06:58:58.998") + + val MINUTE = Seq( + "2017-11-29 22:59:58.998", + "2017-11-29 23:01:58.998", + "2017-11-29 22:57:58.998", + "2017-11-29 21:57:58.998", + "2017-11-29 06:18:58.998") + + val SECOND = Seq( + "2017-11-29 22:58:59.998", + "2017-11-29 22:59:01.998", + "2017-11-29 22:58:57.998", + "2017-11-29 22:57:57.998", + "2017-11-29 22:42:18.998") + + // Do not supported FRAC_SECOND MICROSECOND SQL_TSI_FRAC_SECOND SQL_TSI_MICROSECOND + val intervalMapResults = Map( + "YEAR" -> YEAR, + "SQL_TSI_YEAR" -> YEAR, + "QUARTER" -> QUARTER, + "SQL_TSI_QUARTER" -> QUARTER, + "MONTH" -> MONTH, + "SQL_TSI_MONTH" -> MONTH, + "WEEK" -> WEEK, + "SQL_TSI_WEEK" -> WEEK, + "DAY" -> DAY, + "SQL_TSI_DAY" -> DAY, + "HOUR" -> HOUR, + "SQL_TSI_HOUR" -> HOUR, + "MINUTE" -> MINUTE, + "SQL_TSI_MINUTE" -> MINUTE, + "SECOND" -> SECOND, + "SQL_TSI_SECOND" -> SECOND + ) + + for ((interval, result) <- intervalMapResults) { + testSqlApi( + s"timestampadd($interval, ${data(0)._1}, ${data(0)._2})", result(0)) + testSqlApi( + s"timestampadd($interval, ${data(1)._1}, ${data(1)._2})", result(1)) + testSqlApi( + s"timestampadd($interval, ${data(2)._1}, ${data(2)._2})", result(2)) + testSqlApi( + s"timestampadd($interval, ${data(3)._1}, ${data(3)._2})", result(3)) + testSqlApi( + s"timestampadd($interval, ${data(4)._1}, ${data(4)._2})", result(4)) + } + + testSqlApi("timestampadd(HOUR, CAST(NULL AS INTEGER),timestamp '2016-02-24 12:42:25')", "null") + + testSqlApi("timestampadd(HOUR, -200, CAST(NULL AS TIMESTAMP))", "null") + + testSqlApi("timestampadd(MONTH, 3, cast(null as timestamp))", "null") + + } + + @Test(expected = classOf[ValidationException]) + def testTimestampAddWithDATE(): Unit ={ + testSqlApi("timestampadd(DAY, 1, date '2016-06-15')", "2016-06-16") + } + + @Test(expected = classOf[SqlParserException]) + def testTimestampAddWithrongTimestampInterval(): Unit ={ + testSqlApi("timestampadd(XXX, 1, timestamp '2016-02-24'))", "2016-06-16") + } + + @Test(expected = classOf[SqlParserException]) + def testTimestampAddWithrongTimestampFormat(): Unit ={ + testSqlApi("timestampadd(YEAR, 1, timestamp '2016-02-24'))", "2016-06-16") + } + + @Test(expected = classOf[ValidationException]) + def testTimestampAddWithWrongQuantity(): Unit ={ + testSqlApi("timestampadd(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", "2016-06-16") + } + // ---------------------------------------------------------------------------------------------- // Other functions // ----------------------------------------------------------------------------------------------