From 4650f5ba5314bc65ded5134a1f76d457455b75d5 Mon Sep 17 00:00:00 2001 From: Liao Jiayi Date: Sat, 3 Mar 2018 15:00:13 +0800 Subject: [PATCH] add str_to_date sql function --- docs/dev/table/sql.md | 8 +++++ .../codegen/calls/FunctionGenerator.scala | 7 ++++ .../codegen/calls/StrToDateCallGen.scala | 34 +++++++++++++++++++ .../functions/sql/ScalarSqlFunctions.scala | 9 +++++ .../runtime/functions/DateTimeFunctions.scala | 34 +++++++++++++++++++ .../table/validate/FunctionCatalog.scala | 1 + .../expressions/ScalarFunctionsTest.scala | 9 +++++ 7 files changed, 102 insertions(+) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/StrToDateCallGen.scala diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index eb7204a4f1310..1b126732400e7 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -2442,6 +2442,14 @@ SHA256(string)

Returns the SHA-256 hash of the string argument as a string of 64 hexadecimal digits; null if string is null.

+ + + {% highlight text %} + STR_TO_DATE(str string, format string) + {% endhighlight %} + + Returns a DATETIME string value if the format string contains both date and time parts, or a DATE string or TIME string value if the string contains only date or time parts + diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 2cd73888a7f0a..30b00e16757aa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -531,6 +531,13 @@ object FunctionGenerator { Seq(SqlTimeTypeInfo.TIMESTAMP, STRING_TYPE_INFO), new DateFormatCallGen ) + + addSqlFunction( + ScalarSqlFunctions.STR_TO_DATE, + Seq(STRING_TYPE_INFO, STRING_TYPE_INFO), + new StrToDateCallGen + ) + addSqlFunctionMethod( ScalarSqlFunctions.LPAD, Seq(STRING_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO), diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/StrToDateCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/StrToDateCallGen.scala new file mode 100644 index 0000000000000..04199e90125af --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/StrToDateCallGen.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen.calls + +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull + +class StrToDateCallGen extends CallGenerator { + override def generate(codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression]) = { + generateCallIfArgsNotNull(codeGenerator.nullCheck, STRING_TYPE_INFO, operands) { + terms => s""" + |org.apache.flink.table.runtime.functions. + |DateTimeFunctions$$.MODULE$$.strToDate(${terms.head}, ${terms.last}); + """.stripMargin + } + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala index 891aba97bf474..b8a6dfbaf11e4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala @@ -120,4 +120,13 @@ object ScalarSqlFunctions { OperandTypes.sequence("'(TIMESTAMP, FORMAT)'", OperandTypes.DATETIME, OperandTypes.STRING), SqlFunctionCategory.TIMEDATE ) + + val STR_TO_DATE = new SqlFunction( + "STR_TO_DATE", + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.VARCHAR), + InferTypes.RETURN_TYPE, + OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING), + SqlFunctionCategory.TIMEDATE + ) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala index d69a5c946498d..7a9ffa3bbefa6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala @@ -28,11 +28,30 @@ object DateTimeFunctions { = createDateTimeFormatter(format) } + private val ONLY_TIME_FORMAT = 1 + private val ONLY_DATE_FORMAT = 2 + private val DATETIME_FORMAT = 3 + + private val timeCharacters = Seq('f', 'H', 'h', 'I', 'i', 'l', 'r', 'S', 's', 'p', 'T') + private val dateCharacters = Seq('a', 'b', 'c', 'd', 'e', 'j', 'k', 'M', + 'm', 'v', 'x', 'W', 'Y', 'y') + def dateFormat(ts: Long, formatString: String): String = { val formatter = DATETIME_FORMATTER_CACHE.get(formatString) formatter.print(ts) } + def strToDate(str: String, format: String): String = { + val dtf = createDateTimeFormatter(format) + val result = checkOutputFormat(format) match { + case ONLY_TIME_FORMAT => dtf.parseLocalTime(str).toString("HH:mm:ss").substring(0, 8) + case ONLY_DATE_FORMAT => dtf.parseLocalDate(str).toString("yyyy-MM-dd") + case DATETIME_FORMAT => dtf.parseDateTime(str).toString("yyyy-MM-dd HH:mm:ss") + case _ => "0000-00-00 00:00:00" + } + result + } + def createDateTimeFormatter(format: String): DateTimeFormatter = { val builder = new DateTimeFormatterBuilder var escaped = false @@ -115,4 +134,19 @@ object DateTimeFunctions { } builder.toFormatter } + + private def checkOutputFormat(format: String): Int = { + format.toCharArray.foldLeft[Int](0) { + case (0, char) => { + if (timeCharacters.contains(char)) ONLY_TIME_FORMAT + else if (dateCharacters.contains(char)) ONLY_DATE_FORMAT + else 0 + } + case (ONLY_TIME_FORMAT, char) => if (dateCharacters.contains(char)) DATETIME_FORMAT + else ONLY_TIME_FORMAT + case (ONLY_DATE_FORMAT, char) => if (timeCharacters.contains(char)) DATETIME_FORMAT + else ONLY_DATE_FORMAT + case (DATETIME_FORMAT, _) => DATETIME_FORMAT + } + } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 055bd13eb140d..6c6ced1fc61e4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -402,6 +402,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.CURRENT_TIMESTAMP, SqlStdOperatorTable.CURRENT_DATE, ScalarSqlFunctions.DATE_FORMAT, + ScalarSqlFunctions.STR_TO_DATE, SqlStdOperatorTable.CAST, SqlStdOperatorTable.EXTRACT, SqlStdOperatorTable.QUARTER, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index d449fba7cba56..b02e2ff089125 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.expressions import org.apache.flink.table.api.Types import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils.ScalarTypesTestBase +import org.apache.flink.table.runtime.functions.DateTimeFunctions import org.junit.Test class ScalarFunctionsTest extends ScalarTypesTestBase { @@ -59,6 +60,14 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "0") } + @Test + def testStrToDate(): Unit = { + testSqlApi("STR_TO_DATE('20170203', '%Y%m%d')", "2017-02-03") + testSqlApi("STR_TO_DATE('01,5,2013', '%d,%m,%Y')", "2013-05-01") + testSqlApi("STR_TO_DATE('pm09:30:17', '%p%h:%i:%s')", "21:30:17") + testSqlApi("STR_TO_DATE('20110303 am03:29:44', '%Y%m%d %p%h:%i:%s')", "2011-03-03 03:29:44") + } + @Test def testSubstring(): Unit = { testAllApis(