From 3db3a65178a91fce8d92b741cf58d4e02c96212e Mon Sep 17 00:00:00 2001 From: DuBin Date: Thu, 21 Dec 2017 17:02:33 +0800 Subject: [PATCH] [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT supported in SQL --- docs/dev/table/sql.md | 44 +++++++++++++ .../table/codegen/calls/BuiltInMethods.scala | 5 ++ .../codegen/calls/FunctionGenerator.scala | 28 ++++++++ .../functions/sql/ScalarSqlFunctions.scala | 18 +++++ .../runtime/functions/ScalarFunctions.scala | 66 +++++++++++++++++++ .../table/validate/FunctionCatalog.scala | 2 + .../expressions/ScalarFunctionsTest.scala | 63 ++++++++++++++++++ 7 files changed, 226 insertions(+) diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 04d6e846cef9f..1b89157f7b884 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -1575,6 +1575,50 @@ LOG(x numeric), LOG(base numeric, x numeric)

Returns the natural logarithm of a specified number of a specified base. If called with one parameter, this function returns the natural logarithm of x. If called with two parameters, this function returns the logarithm of x to the base b. x must be greater than 0. b must be greater than 1.

+ + + + {% highlight text %} +SHIFT_LEFT(input integer, n integer) +{% endhighlight %} + + +

Returns the integer number after input shift left n bits, equals to input << n

+ + + + + + {% highlight text %} +SHIFT_LEFT(input long, n integer) +{% endhighlight %} + + +

Returns the long number after input shift left n bits, equals to input << n

+ + + + + + {% highlight text %} +SHIFT_RIGHT(input integer, n integer) +{% endhighlight %} + + +

Returns the integer number after input shift right n bits, equals to input >> n

+ + + + + + {% highlight text %} +SHIFT_RIGHT(input long, n integer) +{% endhighlight %} + + +

Returns the long number after input shift right n bits, equals to input >> n

+ + diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index 671ae070e7164..1b041c68a8330 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -96,4 +96,9 @@ object BuiltInMethods { Types.lookupMethod( classOf[ScalarFunctions], "concat_ws", classOf[String], classOf[Array[String]]) + val SHIFT_LEFT_INT = Types.lookupMethod(classOf[ScalarFunctions], "shiftLeft", classOf[Int], classOf[Int]) + val SHIFT_LEFT_LONG = Types.lookupMethod(classOf[ScalarFunctions], "shiftLeft", classOf[Long], classOf[Int]) + val SHIFT_RIGHT_INT = Types.lookupMethod(classOf[ScalarFunctions], "shiftRight", classOf[Int], classOf[Int]) + val SHIFT_RIGHT_LONG = Types.lookupMethod(classOf[ScalarFunctions], "shiftRight", classOf[Long], classOf[Int]) + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index f05b9ba776dc9..57c54f12e719f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -414,6 +414,34 @@ object FunctionGenerator { DOUBLE_TYPE_INFO, BuiltInMethods.LOG_WITH_BASE) + addSqlFunctionMethod( + SHIFT_LEFT, + Seq(INT_TYPE_INFO, INT_TYPE_INFO), + INT_TYPE_INFO, + BuiltInMethods.SHIFT_LEFT_INT + ) + + addSqlFunctionMethod( + SHIFT_LEFT, + Seq(LONG_TYPE_INFO, INT_TYPE_INFO), + LONG_TYPE_INFO, + BuiltInMethods.SHIFT_LEFT_LONG + ) + + addSqlFunctionMethod( + SHIFT_RIGHT, + Seq(INT_TYPE_INFO, INT_TYPE_INFO), + INT_TYPE_INFO, + BuiltInMethods.SHIFT_RIGHT_INT + ) + + addSqlFunctionMethod( + SHIFT_RIGHT, + Seq(LONG_TYPE_INFO, INT_TYPE_INFO), + LONG_TYPE_INFO, + BuiltInMethods.SHIFT_RIGHT_LONG + ) + // ---------------------------------------------------------------------------------------------- // Temporal functions // ---------------------------------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala index 4cac1f7751906..4fea7e605fd61 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala @@ -58,4 +58,22 @@ object ScalarSqlFunctions { OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)), SqlFunctionCategory.NUMERIC) + val SHIFT_LEFT = new SqlFunction( + "SHIFT_LEFT", + SqlKind.OTHER_FUNCTION, + ReturnTypes.ARG0, + null, + OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC), + SqlFunctionCategory.NUMERIC + ) + + val SHIFT_RIGHT = new SqlFunction( + "SHIFT_RIGHT", + SqlKind.OTHER_FUNCTION, + ReturnTypes.ARG0, + null, + OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC), + SqlFunctionCategory.NUMERIC + ) + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala index 36edba23676ac..b81d75e70de5a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.functions import scala.annotation.varargs import java.math.{BigDecimal => JBigDecimal} import java.lang.StringBuilder +import java.{lang => jl} /** * Built-in scalar runtime functions. @@ -109,4 +110,69 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** + * Returns the Int number after the input number left shift n bits + * @param input Int type + * @param n + * @return input << n + */ + def shiftLeft(input: Int, n: Int): Int = { + _shiftLeft(input, n).asInstanceOf[Int] + } + + /** + * Returns the Long number after the input number left shift n bits + * @param input Long type + * @param n + * @return input << n + */ + def shiftLeft(input: Long, n: Int): Long = { + _shiftLeft(input, n).asInstanceOf[Long] + } + + /** + * Returns the Int number after the input number right shift n bits + * @param input Int type + * @param n + * @return input >> n + */ + def shiftRight(input: Int, n: Int): Int = { + _shiftRight(input, n).asInstanceOf[Int] + } + + /** + * Returns the Long number after the input number right shift n bits + * @param input Long type + * @param n + * @return input >> n + */ + def shiftRight(input: Long, n: Int): Long = { + _shiftRight(input, n).asInstanceOf[Long] + } + + /** + * Returns the number after the input number left shift n bits + * Input must be type 'Long' or type 'Int' + */ + private def _shiftLeft(input: Any, n: Int): Any = { + input match { + case l: jl.Long => l << n + case i: jl.Integer => i << n + case _ => throw new IllegalArgumentException(s"type of input in function 'shiftLeft(input, n)' must be Long or Integer") + } + } + + /** + * Returns the number after the input number right shift n bits + * Input must be type 'Long' or type 'Int' + */ + private def _shiftRight(input: Any, n: Int): Any = { + input match { + case l: jl.Long => l >> n + case i: jl.Integer => i >> n + case _ => throw new IllegalArgumentException(s"type of input in function 'shiftRight(input, n)' must be Long or Integer") + } + } + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 281633e7a0147..b74cbfffd6e98 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -413,6 +413,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { ScalarSqlFunctions.CONCAT_WS, SqlStdOperatorTable.TIMESTAMP_ADD, ScalarSqlFunctions.LOG, + ScalarSqlFunctions.SHIFT_LEFT, + ScalarSqlFunctions.SHIFT_RIGHT, // EXTENSIONS BasicOperatorTable.TUMBLE, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index a902e397d27a6..1ab7075d640fb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -1216,6 +1216,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { ) } + @Test + def testShiftLeft(): Unit = { + testSqlApi( + "SHIFT_LEFT(1,1)", + "2" + ) + + testSqlApi( + "SHIFT_LEFT(21,1)", + "42" + ) + + testSqlApi( + "SHIFT_LEFT(21,1)", + "42" + ) + + testSqlApi( + "SHIFT_LEFT(2147483647,-2147483648)", + "2147483647" + ) + + testSqlApi( + "SHIFT_LEFT(-2147483648,2147483647)", + "0" + ) + + testSqlApi( + "SHIFT_LEFT(9223372036854775807,-2147483648)", + "9223372036854775807" + ) + } + + @Test + def testShiftRight(): Unit = { + testSqlApi( + "SHIFT_RIGHT(1,1)", + "0" + ) + + testSqlApi( + "SHIFT_RIGHT(21,1)", + "10" + ) + + testSqlApi( + "SHIFT_RIGHT(2147483647,-2147483648)", + "2147483647" + ) + + testSqlApi( + "SHIFT_RIGHT(-2147483648,2147483647)", + "-1" + ) + + testSqlApi( + "SHIFT_RIGHT(123456789,-2147483648)", + "123456789" + ) + } + + + // ---------------------------------------------------------------------------------------------- // Temporal functions // ----------------------------------------------------------------------------------------------