From aa3b4b21d38d45e7db007c98c16f93e22b576a64 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Mon, 5 Jun 2017 23:23:31 -0700 Subject: [PATCH 1/3] [FLINK-6693] [table] Support DATE_FORMAT function in the Table / SQL API. --- flink-libraries/flink-table/pom.xml | 6 + .../table/runtime/DateTimeFunctions.java | 158 ++++++++++++++++++ .../flink/table/runtime/ThreadLocalCache.java | 71 ++++++++ .../table/codegen/calls/BuiltInMethods.scala | 3 + .../codegen/calls/FunctionGenerator.scala | 9 + .../apache/flink/table/expressions/time.scala | 14 +- .../table/functions/DateTimeSqlFunction.scala | 34 ++++ .../table/validate/FunctionCatalog.scala | 4 +- .../expressions/DateTimeFunctionTest.scala | 49 ++++++ 9 files changed, 346 insertions(+), 2 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java create mode 100644 flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/ThreadLocalCache.java create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/DateTimeSqlFunction.scala create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index 53b8dbbfe2742..bf27c8741ce21 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -110,6 +110,12 @@ under the License. compile + + joda-time + joda-time + provided + + diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java new file mode 100644 index 0000000000000..ff59d85a218cb --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime; + +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; + +/** + * Built-in scalar functions for date time related operations. + */ +public class DateTimeFunctions { + private static final int PIVOT_YEAR = 2020; + + private static final ThreadLocalCache DATETIME_FORMATTER_CACHE = + new ThreadLocalCache(100) { + @Override + protected DateTimeFormatter getNewInstance(String format) { + return createDateTimeFormatter(format); + } + }; + + public static String dateFormat(long ts, String formatString) { + DateTimeFormatter formatter = DATETIME_FORMATTER_CACHE.get(formatString); + return formatter.print(ts); + } + + private static DateTimeFormatter createDateTimeFormatter(String format) { + DateTimeFormatterBuilder builder = new DateTimeFormatterBuilder(); + + boolean escaped = false; + for (int i = 0; i < format.length(); i++) { + char character = format.charAt(i); + + if (escaped) { + switch (character) { + case 'a': // %a Abbreviated weekday name (Sun..Sat) + builder.appendDayOfWeekShortText(); + break; + case 'b': // %b Abbreviated month name (Jan..Dec) + builder.appendMonthOfYearShortText(); + break; + case 'c': // %c Month, numeric (0..12) + builder.appendMonthOfYear(1); + break; + case 'd': // %d Day of the month, numeric (00..31) + builder.appendDayOfMonth(2); + break; + case 'e': // %e Day of the month, numeric (0..31) + builder.appendDayOfMonth(1); + break; + case 'f': // %f Microseconds (000000..999999) + builder.appendFractionOfSecond(6, 9); + break; + case 'H': // %H Hour (00..23) + builder.appendHourOfDay(2); + break; + case 'h': // %h Hour (01..12) + case 'I': // %I Hour (01..12) + builder.appendClockhourOfHalfday(2); + break; + case 'i': // %i Minutes, numeric (00..59) + builder.appendMinuteOfHour(2); + break; + case 'j': // %j Day of year (001..366) + builder.appendDayOfYear(3); + break; + case 'k': // %k Hour (0..23) + builder.appendHourOfDay(1); + break; + case 'l': // %l Hour (1..12) + builder.appendClockhourOfHalfday(1); + break; + case 'M': // %M Month name (January..December) + builder.appendMonthOfYearText(); + break; + case 'm': // %m Month, numeric (00..12) + builder.appendMonthOfYear(2); + break; + case 'p': // %p AM or PM + builder.appendHalfdayOfDayText(); + break; + case 'r': // %r Time, 12-hour (hh:mm:ss followed by AM or PM) + builder.appendClockhourOfHalfday(2) + .appendLiteral(':') + .appendMinuteOfHour(2) + .appendLiteral(':') + .appendSecondOfMinute(2) + .appendLiteral(' ') + .appendHalfdayOfDayText(); + break; + case 'S': // %S Seconds (00..59) + case 's': // %s Seconds (00..59) + builder.appendSecondOfMinute(2); + break; + case 'T': // %T Time, 24-hour (hh:mm:ss) + builder.appendHourOfDay(2) + .appendLiteral(':') + .appendMinuteOfHour(2) + .appendLiteral(':') + .appendSecondOfMinute(2); + break; + case 'v': // %v Week (01..53), where Monday is the first day of the week; used with %x + builder.appendWeekOfWeekyear(2); + break; + case 'x': // %x Year for the week, where Monday is the first day of the week, numeric, four digits; used with %v + builder.appendWeekyear(4, 4); + break; + case 'W': // %W Weekday name (Sunday..Saturday) + builder.appendDayOfWeekText(); + break; + case 'Y': // %Y Year, numeric, four digits + builder.appendYear(4, 4); + break; + case 'y': // %y Year, numeric (two digits) + builder.appendTwoDigitYear(PIVOT_YEAR); + break; + case 'w': // %w Day of the week (0=Sunday..6=Saturday) + case 'U': // %U Week (00..53), where Sunday is the first day of the week + case 'u': // %u Week (00..53), where Monday is the first day of the week + case 'V': // %V Week (01..53), where Sunday is the first day of the week; used with %X + case 'X': // %X Year for the week where Sunday is the first day of the week, numeric, four digits; used with %V + case 'D': // %D Day of the month with English suffix (0th, 1st, 2nd, 3rd, …) + throw new UnsupportedOperationException(String.format("%%%s not supported in date format string", character)); + case '%': // %% A literal “%” character + builder.appendLiteral('%'); + break; + default: // % The literal character represented by + builder.appendLiteral(character); + break; + } + escaped = false; + } else if (character == '%') { + escaped = true; + } else { + builder.appendLiteral(character); + } + } + + return builder.toFormatter(); + } + +} diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/ThreadLocalCache.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/ThreadLocalCache.java new file mode 100644 index 0000000000000..3c8cf8617b7b5 --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/ThreadLocalCache.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime; + +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Provides a ThreadLocal cache with a maximum cache size per thread. + * Values must not be null. + * + * @param cache key type + * @param cache value type + */ +abstract class ThreadLocalCache { + private final ThreadLocal> cache = new ThreadLocal<>(); + private final int maxSizePerThread; + + ThreadLocalCache(int maxSizePerThread) { + checkArgument(maxSizePerThread > 0, "max size must be greater than zero"); + this.maxSizePerThread = maxSizePerThread; + } + + V get(K key) { + Map m = cache.get(); + if (m == null) { + m = new BoundedMap<>(maxSizePerThread); + cache.set(m); + } + + V v = m.get(key); + if (v == null) { + v = getNewInstance(key); + m.put(key, v); + } + return v; + } + + protected abstract V getNewInstance(K key); + + private static class BoundedMap extends LinkedHashMap { + private final int maxSize; + + BoundedMap(int maxSize) { + this.maxSize = maxSize; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxSize; + } + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index b7da141072d85..f3ddcd27b9263 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -23,6 +23,7 @@ import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.linq4j.tree.Types import org.apache.calcite.runtime.SqlFunctions import org.apache.flink.table.runtime.functions.ScalarFunctions +import org.apache.flink.table.runtime.DateTimeFunctions object BuiltInMethods { val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) @@ -90,4 +91,6 @@ object BuiltInMethods { Types.lookupMethod( classOf[ScalarFunctions], "concat_ws", classOf[String], classOf[Array[String]]) + val DATE_FORMAT = Types.lookupMethod(classOf[DateTimeFunctions], "dateFormat", + classOf[Long], classOf[String]) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 4da5514f0d851..4399074611f4b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -28,8 +28,10 @@ import org.apache.calcite.util.BuiltInMethod import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.GenericTypeInfo +import org.apache.flink.table.functions.sql.DateTimeSqlFunction import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction} import org.apache.flink.table.functions.sql.ScalarSqlFunctions._ + import scala.collection.mutable /** @@ -482,6 +484,13 @@ object FunctionGenerator { Seq(), new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true)) + addSqlFunctionMethod( + DateTimeSqlFunction.DATE_FORMAT, + Seq(SqlTimeTypeInfo.TIMESTAMP, STRING_TYPE_INFO), + STRING_TYPE_INFO, + BuiltInMethods.DATE_FORMAT + ) + // ---------------------------------------------------------------------------------------------- /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala index f09e2adab5ef3..d18ce063dba21 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala @@ -28,9 +28,11 @@ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} import org.apache.flink.table.calcite.FlinkRelBuilder import org.apache.flink.table.expressions.ExpressionUtils.{divide, getFactor, mod} import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit +import org.apache.flink.table.functions.DateTimeSqlFunction +import org.apache.flink.table.runtime.DateTimeFunctions import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils} -import org.apache.flink.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess} +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} import scala.collection.JavaConversions._ @@ -375,3 +377,13 @@ case class TemporalOverlaps( } } +case class DateFormat(timestamp: Expression, format: Expression) extends Expression { + override private[flink] def children = timestamp :: format :: Nil + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = + relBuilder.call(DateTimeSqlFunction.DATE_FORMAT, timestamp.toRexNode, format.toRexNode) + + override def toString: String = s"$timestamp.dateformat($format)" + + override private[flink] def resultType = STRING_TYPE_INFO +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/DateTimeSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/DateTimeSqlFunction.scala new file mode 100644 index 0000000000000..27ce4e9e2e13d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/DateTimeSqlFunction.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions + +import org.apache.calcite.sql.`type`._ +import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} + + +object DateTimeSqlFunction { + val DATE_FORMAT = new SqlFunction( + "DATE_FORMAT", + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.VARCHAR), + InferTypes.RETURN_TYPE, + OperandTypes.sequence("'(TIMESTAMP, FORMAT)'", OperandTypes.DATETIME, OperandTypes.STRING), + SqlFunctionCategory.TIMEDATE + ) +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index df77441e40077..56e986e70fe85 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.api._ import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.sql.ScalarSqlFunctions import org.apache.flink.table.functions.utils.{AggSqlFunction, ScalarSqlFunction, TableSqlFunction} -import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} +import org.apache.flink.table.functions.{AggregateFunction, DateTimeSqlFunction, ScalarFunction, TableFunction} import _root_.scala.collection.JavaConversions._ import _root_.scala.collection.mutable @@ -237,6 +237,7 @@ object FunctionCatalog { "quarter" -> classOf[Quarter], "temporalOverlaps" -> classOf[TemporalOverlaps], "dateTimePlus" -> classOf[Plus], + "dateFormat" -> classOf[DateFormat], // array "array" -> classOf[ArrayConstructor], @@ -374,6 +375,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.CURRENT_TIME, SqlStdOperatorTable.CURRENT_TIMESTAMP, SqlStdOperatorTable.CURRENT_DATE, + DateTimeSqlFunction.DATE_FORMAT, SqlStdOperatorTable.CAST, SqlStdOperatorTable.EXTRACT, SqlStdOperatorTable.QUARTER, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala new file mode 100644 index 0000000000000..41643923e0488 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import java.sql.Timestamp + +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.ExpressionTestBase +import org.apache.flink.types.Row +import org.junit.Test + +class DateTimeFunctionTest extends ExpressionTestBase { + + @Test + def testDateFormat(): Unit = { + testAllApis( + DateFormat('f0, "%Y"), + "dateFormat(f0, \"%Y\")", + "DATE_FORMAT(f0, '%Y')", + "1990") + } + + override def testData: Any = { + val testData = new Row(1) + testData.setField(0, Timestamp.valueOf("1990-10-14 12:10:10")) + testData + } + + override def typeInfo: TypeInformation[Any] = + new RowTypeInfo(Types.SQL_TIMESTAMP).asInstanceOf[TypeInformation[Any]] +} From 6e389e80b57aeef0527b517cabea732534e11e13 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Mon, 5 Jun 2017 23:23:31 -0700 Subject: [PATCH 2/3] Address comments. --- flink-libraries/flink-table/pom.xml | 1 - .../table/runtime/DateTimeFunctions.java | 158 ------------------ .../flink/table/runtime/ThreadLocalCache.java | 71 -------- .../flink/table/codegen/CodeGenerator.scala | 27 +++ .../table/codegen/calls/BuiltInMethods.scala | 4 - .../codegen/calls/DateFormatCallGen.scala | 44 +++++ .../codegen/calls/FunctionGenerator.scala | 5 +- .../apache/flink/table/expressions/time.scala | 6 +- .../functions/sql/DateTimeSqlFunction.scala | 34 ++++ .../runtime/functions/DateTimeFunctions.scala | 118 +++++++++++++ .../runtime/functions/ThreadLocalCache.scala | 49 ++++++ .../table/validate/FunctionCatalog.scala | 4 +- .../expressions/DateTimeFunctionTest.scala | 31 +++- 13 files changed, 303 insertions(+), 249 deletions(-) delete mode 100644 flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java delete mode 100644 flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/ThreadLocalCache.java create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/DateTimeSqlFunction.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index bf27c8741ce21..8a7e3ac3f8bc9 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -113,7 +113,6 @@ under the License. joda-time joda-time - provided diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java deleted file mode 100644 index ff59d85a218cb..0000000000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime; - -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.DateTimeFormatterBuilder; - -/** - * Built-in scalar functions for date time related operations. - */ -public class DateTimeFunctions { - private static final int PIVOT_YEAR = 2020; - - private static final ThreadLocalCache DATETIME_FORMATTER_CACHE = - new ThreadLocalCache(100) { - @Override - protected DateTimeFormatter getNewInstance(String format) { - return createDateTimeFormatter(format); - } - }; - - public static String dateFormat(long ts, String formatString) { - DateTimeFormatter formatter = DATETIME_FORMATTER_CACHE.get(formatString); - return formatter.print(ts); - } - - private static DateTimeFormatter createDateTimeFormatter(String format) { - DateTimeFormatterBuilder builder = new DateTimeFormatterBuilder(); - - boolean escaped = false; - for (int i = 0; i < format.length(); i++) { - char character = format.charAt(i); - - if (escaped) { - switch (character) { - case 'a': // %a Abbreviated weekday name (Sun..Sat) - builder.appendDayOfWeekShortText(); - break; - case 'b': // %b Abbreviated month name (Jan..Dec) - builder.appendMonthOfYearShortText(); - break; - case 'c': // %c Month, numeric (0..12) - builder.appendMonthOfYear(1); - break; - case 'd': // %d Day of the month, numeric (00..31) - builder.appendDayOfMonth(2); - break; - case 'e': // %e Day of the month, numeric (0..31) - builder.appendDayOfMonth(1); - break; - case 'f': // %f Microseconds (000000..999999) - builder.appendFractionOfSecond(6, 9); - break; - case 'H': // %H Hour (00..23) - builder.appendHourOfDay(2); - break; - case 'h': // %h Hour (01..12) - case 'I': // %I Hour (01..12) - builder.appendClockhourOfHalfday(2); - break; - case 'i': // %i Minutes, numeric (00..59) - builder.appendMinuteOfHour(2); - break; - case 'j': // %j Day of year (001..366) - builder.appendDayOfYear(3); - break; - case 'k': // %k Hour (0..23) - builder.appendHourOfDay(1); - break; - case 'l': // %l Hour (1..12) - builder.appendClockhourOfHalfday(1); - break; - case 'M': // %M Month name (January..December) - builder.appendMonthOfYearText(); - break; - case 'm': // %m Month, numeric (00..12) - builder.appendMonthOfYear(2); - break; - case 'p': // %p AM or PM - builder.appendHalfdayOfDayText(); - break; - case 'r': // %r Time, 12-hour (hh:mm:ss followed by AM or PM) - builder.appendClockhourOfHalfday(2) - .appendLiteral(':') - .appendMinuteOfHour(2) - .appendLiteral(':') - .appendSecondOfMinute(2) - .appendLiteral(' ') - .appendHalfdayOfDayText(); - break; - case 'S': // %S Seconds (00..59) - case 's': // %s Seconds (00..59) - builder.appendSecondOfMinute(2); - break; - case 'T': // %T Time, 24-hour (hh:mm:ss) - builder.appendHourOfDay(2) - .appendLiteral(':') - .appendMinuteOfHour(2) - .appendLiteral(':') - .appendSecondOfMinute(2); - break; - case 'v': // %v Week (01..53), where Monday is the first day of the week; used with %x - builder.appendWeekOfWeekyear(2); - break; - case 'x': // %x Year for the week, where Monday is the first day of the week, numeric, four digits; used with %v - builder.appendWeekyear(4, 4); - break; - case 'W': // %W Weekday name (Sunday..Saturday) - builder.appendDayOfWeekText(); - break; - case 'Y': // %Y Year, numeric, four digits - builder.appendYear(4, 4); - break; - case 'y': // %y Year, numeric (two digits) - builder.appendTwoDigitYear(PIVOT_YEAR); - break; - case 'w': // %w Day of the week (0=Sunday..6=Saturday) - case 'U': // %U Week (00..53), where Sunday is the first day of the week - case 'u': // %u Week (00..53), where Monday is the first day of the week - case 'V': // %V Week (01..53), where Sunday is the first day of the week; used with %X - case 'X': // %X Year for the week where Sunday is the first day of the week, numeric, four digits; used with %V - case 'D': // %D Day of the month with English suffix (0th, 1st, 2nd, 3rd, …) - throw new UnsupportedOperationException(String.format("%%%s not supported in date format string", character)); - case '%': // %% A literal “%” character - builder.appendLiteral('%'); - break; - default: // % The literal character represented by - builder.appendLiteral(character); - break; - } - escaped = false; - } else if (character == '%') { - escaped = true; - } else { - builder.appendLiteral(character); - } - } - - return builder.toFormatter(); - } - -} diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/ThreadLocalCache.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/ThreadLocalCache.java deleted file mode 100644 index 3c8cf8617b7b5..0000000000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/ThreadLocalCache.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime; - -import java.util.LinkedHashMap; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkArgument; - -/** - * Provides a ThreadLocal cache with a maximum cache size per thread. - * Values must not be null. - * - * @param cache key type - * @param cache value type - */ -abstract class ThreadLocalCache { - private final ThreadLocal> cache = new ThreadLocal<>(); - private final int maxSizePerThread; - - ThreadLocalCache(int maxSizePerThread) { - checkArgument(maxSizePerThread > 0, "max size must be greater than zero"); - this.maxSizePerThread = maxSizePerThread; - } - - V get(K key) { - Map m = cache.get(); - if (m == null) { - m = new BoundedMap<>(maxSizePerThread); - cache.set(m); - } - - V v = m.get(key); - if (v == null) { - v = getNewInstance(key); - m.put(key, v); - } - return v; - } - - protected abstract V getNewInstance(K key); - - private static class BoundedMap extends LinkedHashMap { - private final int maxSize; - - BoundedMap(int maxSize) { - this.maxSize = maxSize; - } - - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > maxSize; - } - } -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 045fbdd1ae156..9caf9bc330b24 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -2007,6 +2007,33 @@ class CodeGenerator( fieldTerm } + /** + * Adds a reusable [[java.util.Random]] to the member area of the generated [[Function]]. + * + * The seed parameter must be a literal/constant expression. + * + * @return member variable term + */ + def addReusableDateFormatter(format: GeneratedExpression): String = { + val fieldTerm = newName("dateFormatter") + + val field = + s""" + |transient org.joda.time.format.DateTimeFormatter $fieldTerm; + |""".stripMargin + reusableMemberStatements.add(field) + + val fieldInit = + s""" + |${format.code} + |$fieldTerm = org.apache.flink.table.runtime.functions. + |DateTimeFunctions$$.MODULE$$.createDateTimeFormatter(${format.resultTerm}); + |""".stripMargin + + reusableInitStatements.add(fieldInit) + fieldTerm + } + /** * Adds a reusable [[UserDefinedFunction]] to the member area of the generated [[Function]]. * diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index f3ddcd27b9263..b501c1e97bd40 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -23,7 +23,6 @@ import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.linq4j.tree.Types import org.apache.calcite.runtime.SqlFunctions import org.apache.flink.table.runtime.functions.ScalarFunctions -import org.apache.flink.table.runtime.DateTimeFunctions object BuiltInMethods { val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) @@ -90,7 +89,4 @@ object BuiltInMethods { val CONCAT_WS = Types.lookupMethod( classOf[ScalarFunctions], "concat_ws", classOf[String], classOf[Array[String]]) - - val DATE_FORMAT = Types.lookupMethod(classOf[DateTimeFunctions], "dateFormat", - classOf[Long], classOf[String]) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala new file mode 100644 index 0000000000000..f156b1aa70b07 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen.calls + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression} +import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull + +class DateFormatCallGen extends CallGenerator { + override def generate(codeGenerator: CodeGenerator, + operands: Seq[GeneratedExpression]) + : GeneratedExpression = { + + if (operands.last.literal) { + val formatter = codeGenerator.addReusableDateFormatter(operands.last) + generateCallIfArgsNotNull(codeGenerator.nullCheck, STRING_TYPE_INFO, operands) { + terms =>s"$formatter.print(${terms.head})" + } + } else { + generateCallIfArgsNotNull(codeGenerator.nullCheck, STRING_TYPE_INFO, operands) { + terms => s""" + |org.apache.flink.table.runtime.functions. + |DateTimeFunctions$$.MODULE$$.dateFormat(${terms.head}, ${terms.last}); + """.stripMargin + } + } + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 4399074611f4b..d071279e3e627 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -484,11 +484,10 @@ object FunctionGenerator { Seq(), new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true)) - addSqlFunctionMethod( + addSqlFunction( DateTimeSqlFunction.DATE_FORMAT, Seq(SqlTimeTypeInfo.TIMESTAMP, STRING_TYPE_INFO), - STRING_TYPE_INFO, - BuiltInMethods.DATE_FORMAT + new DateFormatCallGen ) // ---------------------------------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala index d18ce063dba21..250ec0ac73bf9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala @@ -28,8 +28,8 @@ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} import org.apache.flink.table.calcite.FlinkRelBuilder import org.apache.flink.table.expressions.ExpressionUtils.{divide, getFactor, mod} import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit -import org.apache.flink.table.functions.DateTimeSqlFunction -import org.apache.flink.table.runtime.DateTimeFunctions +import org.apache.flink.table.functions.sql.DateTimeSqlFunction +import org.apache.flink.table.runtime.functions.DateTimeFunctions import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils} import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} @@ -383,7 +383,7 @@ case class DateFormat(timestamp: Expression, format: Expression) extends Express override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = relBuilder.call(DateTimeSqlFunction.DATE_FORMAT, timestamp.toRexNode, format.toRexNode) - override def toString: String = s"$timestamp.dateformat($format)" + override def toString: String = s"$timestamp.dateFormat($format)" override private[flink] def resultType = STRING_TYPE_INFO } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/DateTimeSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/DateTimeSqlFunction.scala new file mode 100644 index 0000000000000..22de126b72305 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/DateTimeSqlFunction.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.sql + +import org.apache.calcite.sql.`type`._ +import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} + + +object DateTimeSqlFunction { + val DATE_FORMAT = new SqlFunction( + "DATE_FORMAT", + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.VARCHAR), + InferTypes.RETURN_TYPE, + OperandTypes.sequence("'(TIMESTAMP, FORMAT)'", OperandTypes.DATETIME, OperandTypes.STRING), + SqlFunctionCategory.TIMEDATE + ) +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala new file mode 100644 index 0000000000000..d69a5c946498d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.functions + +import org.joda.time.format.DateTimeFormatter +import org.joda.time.format.DateTimeFormatterBuilder + +object DateTimeFunctions { + private val PIVOT_YEAR = 2020 + + private val DATETIME_FORMATTER_CACHE = new ThreadLocalCache[String, DateTimeFormatter](64) { + protected override def getNewInstance(format: String): DateTimeFormatter + = createDateTimeFormatter(format) + } + + def dateFormat(ts: Long, formatString: String): String = { + val formatter = DATETIME_FORMATTER_CACHE.get(formatString) + formatter.print(ts) + } + + def createDateTimeFormatter(format: String): DateTimeFormatter = { + val builder = new DateTimeFormatterBuilder + var escaped = false + var i = 0 + while (i < format.length) { + val character = format.charAt(i) + i = i + 1 + if (escaped) { + character match { + // %a Abbreviated weekday name (Sun..Sat) + case 'a' => builder.appendDayOfWeekShortText + // %b Abbreviated month name (Jan..Dec) + case 'b' => builder.appendMonthOfYearShortText + // %c Month, numeric (0..12) + case 'c' => builder.appendMonthOfYear(1) + // %d Day of the month, numeric (00..31) + case 'd' => builder.appendDayOfMonth(2) + // %e Day of the month, numeric (0..31) + case 'e' => builder.appendDayOfMonth(1) + // %f Microseconds (000000..999999) + case 'f' => builder.appendFractionOfSecond(6, 9) + // %H Hour (00..23) + case 'H' => builder.appendHourOfDay(2) + case 'h' | 'I' => // %h Hour (01..12) + builder.appendClockhourOfHalfday(2) + // %i Minutes, numeric (00..59) + case 'i' => builder.appendMinuteOfHour(2) + // %j Day of year (001..366) + case 'j' => builder.appendDayOfYear(3) + // %k Hour (0..23) + case 'k' => builder.appendHourOfDay(1) + // %l Hour (1..12) + case 'l' => builder.appendClockhourOfHalfday(1) + // %M Month name (January..December) + case 'M' => builder.appendMonthOfYearText + // %m Month, numeric (00..12) + case 'm' => builder.appendMonthOfYear(2) + // %p AM or PM + case 'p' => builder.appendHalfdayOfDayText + // %r Time, 12-hour (hh:mm:ss followed by AM or PM) + case 'r' => builder.appendClockhourOfHalfday(2).appendLiteral(':'). + appendMinuteOfHour(2).appendLiteral(':').appendSecondOfMinute(2). + appendLiteral(' ').appendHalfdayOfDayText + // %S Seconds (00..59) + case 'S' | 's' => builder.appendSecondOfMinute(2) + // %T Time, 24-hour (hh:mm:ss) + case 'T' => builder.appendHourOfDay(2).appendLiteral(':'). + appendMinuteOfHour(2).appendLiteral(':').appendSecondOfMinute(2) + // %v Week (01..53), where Monday is the first day of the week; used with %x + case 'v' => builder.appendWeekOfWeekyear(2) + // %x Year for the week, where Monday is the first day of the week, numeric, + // four digits; used with %v + case 'x' => builder.appendWeekyear(4, 4) + // %W Weekday name (Sunday..Saturday) + case 'W' => builder.appendDayOfWeekText + // %Y Year, numeric, four digits + case 'Y' => builder.appendYear(4, 4) + // %y Year, numeric (two digits) + case 'y' => builder.appendTwoDigitYear(PIVOT_YEAR) + + // %w Day of the week (0=Sunday..6=Saturday) + // %U Week (00..53), where Sunday is the first day of the week + // %u Week (00..53), where Monday is the first day of the week + // %V Week (01..53), where Sunday is the first day of the week; used with %X + // %X Year for the week where Sunday is the first day of the + // week, numeric, four digits; used with %V + // %D Day of the month with English suffix (0th, 1st, 2nd, 3rd, ...) + case 'w' | 'U' | 'u' | 'V' | 'X' | 'D' => + throw new UnsupportedOperationException( + s"%%$character not supported in date format string") + // %% A literal "%" character + case '%' => builder.appendLiteral('%') + // % The literal character represented by + case _ => builder.appendLiteral(character) + } + escaped = false + } + else if (character == '%') { escaped = true } + else { builder.appendLiteral(character) } + } + builder.toFormatter + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala new file mode 100644 index 0000000000000..b3a8d7a96a2c8 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.functions + +import java.util.{LinkedHashMap => JLinkedHashMap} +import java.util.{Map => JMap} + +/** + * Provides a ThreadLocal cache with a maximum cache size per thread. + * Values must not be null. + */ +abstract class ThreadLocalCache[K, V](val maxSizePerThread: Int) { + private val cache = new ThreadLocal[BoundedMap[K, V]] + + protected def getNewInstance(key: K): V + + def get(key: K): V = { + var m = cache.get + if (m == null) { + m = new BoundedMap(maxSizePerThread) + cache.set(m) + } + var v = m.get(key) + if (v == null) { + v = getNewInstance(key) + m.put(key, v) + } + v + } +} + +private class BoundedMap[K, V](val maxSize: Int) extends JLinkedHashMap[K,V] { + override protected def removeEldestEntry(eldest: JMap.Entry[K, V]): Boolean = size > maxSize +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 56e986e70fe85..35d0e499cbb54 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -23,9 +23,9 @@ import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTabl import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable} import org.apache.flink.table.api._ import org.apache.flink.table.expressions._ -import org.apache.flink.table.functions.sql.ScalarSqlFunctions +import org.apache.flink.table.functions.sql.{DateTimeSqlFunction, ScalarSqlFunctions} import org.apache.flink.table.functions.utils.{AggSqlFunction, ScalarSqlFunction, TableSqlFunction} -import org.apache.flink.table.functions.{AggregateFunction, DateTimeSqlFunction, ScalarFunction, TableFunction} +import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} import _root_.scala.collection.JavaConversions._ import _root_.scala.collection.mutable diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala index 41643923e0488..5eb341708c796 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala @@ -25,25 +25,42 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils.ExpressionTestBase import org.apache.flink.types.Row +import org.joda.time.{DateTime, DateTimeZone} import org.junit.Test class DateTimeFunctionTest extends ExpressionTestBase { + private val INSTANT = DateTime.parse("1990-01-02T03:04:05.678Z") + private val LOCAL_ZONE = DateTimeZone.getDefault + private val LOCAL_TIME = INSTANT.toDateTime(LOCAL_ZONE) @Test def testDateFormat(): Unit = { + val expected = LOCAL_TIME.toString("MM/dd/yyyy HH:mm:ss.SSSSSS") testAllApis( - DateFormat('f0, "%Y"), - "dateFormat(f0, \"%Y\")", - "DATE_FORMAT(f0, '%Y')", - "1990") + DateFormat('f0, "%m/%d/%Y %H:%i:%s.%f"), + "dateFormat(f0, \"%m/%d/%Y %H:%i:%s.%f\")", + "DATE_FORMAT(f0, '%m/%d/%Y %H:%i:%s.%f')", + expected) + } + + @Test + def testDateFormatNonConstantFormatter(): Unit = { + val expected = LOCAL_TIME.toString("MM/dd/yyyy") + testAllApis( + DateFormat('f0, 'f1), + "dateFormat(f0, f1)", + "DATE_FORMAT(f0, f1)", + expected) } override def testData: Any = { - val testData = new Row(1) - testData.setField(0, Timestamp.valueOf("1990-10-14 12:10:10")) + val testData = new Row(2) + // SQL expect a timestamp in the local timezone + testData.setField(0, new Timestamp(LOCAL_ZONE.convertLocalToUTC(INSTANT.getMillis, true))) + testData.setField(1, "%m/%d/%Y") testData } override def typeInfo: TypeInformation[Any] = - new RowTypeInfo(Types.SQL_TIMESTAMP).asInstanceOf[TypeInformation[Any]] + new RowTypeInfo(Types.SQL_TIMESTAMP, Types.STRING).asInstanceOf[TypeInformation[Any]] } From 90fba02eb3cc1bc83085b0a295a9ed43ec1b0a0d Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Tue, 11 Jul 2017 23:21:36 -0700 Subject: [PATCH 3/3] Address comments. --- docs/dev/table/sql.md | 180 +++++++++++++++--- .../flink/table/codegen/CodeGenerator.scala | 4 +- .../table/codegen/calls/BuiltInMethods.scala | 1 + .../codegen/calls/DateFormatCallGen.scala | 2 +- .../table/functions/DateTimeSqlFunction.scala | 34 ---- 5 files changed, 155 insertions(+), 66 deletions(-) delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/DateTimeSqlFunction.scala diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 11b7b0ee244b9..3f35deda85512 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -245,8 +245,8 @@ SELECT PRETTY_PRINT(user) FROM Orders

Note: GroupBy on a streaming table produces an updating result. See the Streaming Concepts page for details.

{% highlight sql %} -SELECT a, SUM(b) as d -FROM Orders +SELECT a, SUM(b) as d +FROM Orders GROUP BY a {% endhighlight %} @@ -259,8 +259,8 @@ GROUP BY a

Use a group window to compute a single result row per group. See Group Windows section for more details.

{% highlight sql %} -SELECT user, SUM(amount) -FROM Orders +SELECT user, SUM(amount) +FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user {% endhighlight %} @@ -274,9 +274,9 @@ GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user

Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute

{% highlight sql %} SELECT COUNT(amount) OVER ( - PARTITION BY user - ORDER BY proctime - ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) + PARTITION BY user + ORDER BY proctime + ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders {% endhighlight %} @@ -299,8 +299,8 @@ SELECT DISTINCT users FROM Orders {% highlight sql %} -SELECT SUM(amount) -FROM Orders +SELECT SUM(amount) +FROM Orders GROUP BY GROUPING SETS ((user), (product)) {% endhighlight %} @@ -312,9 +312,9 @@ GROUP BY GROUPING SETS ((user), (product)) {% highlight sql %} -SELECT SUM(amount) -FROM Orders -GROUP BY users +SELECT SUM(amount) +FROM Orders +GROUP BY users HAVING SUM(amount) > 50 {% endhighlight %} @@ -327,8 +327,8 @@ HAVING SUM(amount) > 50

UDAGGs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register UDAGGs.

{% highlight sql %} -SELECT MyAggregate(amount) -FROM Orders +SELECT MyAggregate(amount) +FROM Orders GROUP BY users {% endhighlight %} @@ -358,10 +358,10 @@ GROUP BY users

Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.

Note: The order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. Make sure to specify tables in an order that does not yield a cross join (Cartesian product) which are not supported and would cause a query to fail.

{% highlight sql %} -SELECT * +SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id -SELECT * +SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id {% endhighlight %} @@ -374,7 +374,7 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id

Unnesting WITH ORDINALITY is not supported yet.

{% highlight sql %} -SELECT users, tag +SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) {% endhighlight %} @@ -387,7 +387,7 @@ FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

UDTFs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register UDTFs.

{% highlight sql %} -SELECT users, tag +SELECT users, tag FROM Orders LATERAL VIEW UNNEST_UDTF(tags) t AS tag {% endhighlight %} @@ -416,7 +416,7 @@ FROM Orders LATERAL VIEW UNNEST_UDTF(tags) t AS tag {% highlight sql %} -SELECT * +SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) UNION @@ -432,7 +432,7 @@ FROM ( {% highlight sql %} -SELECT * +SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) UNION ALL @@ -449,7 +449,7 @@ FROM ( {% highlight sql %} -SELECT * +SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) INTERSECT @@ -457,7 +457,7 @@ FROM ( ) {% endhighlight %} {% highlight sql %} -SELECT * +SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) EXCEPT @@ -489,11 +489,11 @@ FROM ( Batch Streaming -Note: The result of streaming queries must be primarily sorted on an ascending time attribute. Additional sorting attributes are supported. +Note: The result of streaming queries must be primarily sorted on an ascending time attribute. Additional sorting attributes are supported. {% highlight sql %} -SELECT * -FROM Orders +SELECT * +FROM Orders ORDER BY orderTime {% endhighlight %} @@ -505,8 +505,8 @@ ORDER BY orderTime {% highlight sql %} -SELECT * -FROM Orders +SELECT * +FROM Orders LIMIT 3 {% endhighlight %} @@ -549,7 +549,7 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que #### Time Attributes -For SQL queries on streaming tables, the `time_attr` argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the [documentation of time attributes](streaming.html#time-attributes) to learn how to define time attributes. +For SQL queries on streaming tables, the `time_attr` argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the [documentation of time attributes](streaming.html#time-attributes) to learn how to define time attributes. For SQL on batch tables, the `time_attr` argument of the group window function must be an attribute of type `TIMESTAMP`. @@ -1906,6 +1906,130 @@ QUARTER(date)

Determines whether two anchored time intervals overlap. Time point and temporal are transformed into a range defined by two time points (start, end). The function evaluates leftEnd >= rightStart && rightEnd >= leftStart. E.g. (TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR) leads to true; (TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR) leads to false.

+ + + {% highlight text %} +DATE_FORMAT(timestamp, format) +{% endhighlight %} + + +

Formats timestamp as a string using format. The format is compatible with the one used by date_parse and str_to_date in MySQL. The following table describes the specified.

+ + + + + +#### MySQL date format specifier + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
SpecifierDescription
{% highlight text %}%a{% endhighlight %}Abbreviated weekday name (Sun .. Sat)
{% highlight text %}%b{% endhighlight %}Abbreviated month name (Jan .. Dec)
{% highlight text %}%c{% endhighlight %}Month, numeric (1 .. 12)
{% highlight text %}%D{% endhighlight %}Day of the month with English suffix (0th, 1st, 2nd, 3rd, ...)
{% highlight text %}%d{% endhighlight %}Day of the month, numeric (01 .. 31)
{% highlight text %}%e{% endhighlight %}Day of the month, numeric (1 .. 31)
{% highlight text %}%f{% endhighlight %}Fraction of second (6 digits for printing: 000000 .. 999000; 1 - 9 digits for parsing: 0 .. 999999999) (Timestamp is truncated to milliseconds.)
{% highlight text %}%H{% endhighlight %}Hour (00 .. 23)
{% highlight text %}%h{% endhighlight %}Hour (01 .. 12)
{% highlight text %}%I{% endhighlight %}Hour (01 .. 12)
{% highlight text %}%i{% endhighlight %}Minutes, numeric (00 .. 59)
{% highlight text %}%j{% endhighlight %}Day of year (001 .. 366)
{% highlight text %}%k{% endhighlight %}Hour (0 .. 23)
{% highlight text %}%l{% endhighlight %}Hour (1 .. 12)
{% highlight text %}%M{% endhighlight %}Month name (January .. December)
{% highlight text %}%m{% endhighlight %}Month, numeric (01 .. 12)
{% highlight text %}%p{% endhighlight %}AM or PM
{% highlight text %}%r{% endhighlight %}Time, 12-hour (hh:mm:ss followed by AM or PM)
{% highlight text %}%S{% endhighlight %}Seconds (00 .. 59)
{% highlight text %}%s{% endhighlight %}Seconds (00 .. 59)
{% highlight text %}%T{% endhighlight %}Time, 24-hour (hh:mm:ss)
{% highlight text %}%U{% endhighlight %}Week (00 .. 53), where Sunday is the first day of the week
{% highlight text %}%u{% endhighlight %}Week (00 .. 53), where Monday is the first day of the week
{% highlight text %}%V{% endhighlight %}Week (01 .. 53), where Sunday is the first day of the week; used with %X
{% highlight text %}%v{% endhighlight %}Week (01 .. 53), where Monday is the first day of the week; used with %x
{% highlight text %}%W{% endhighlight %}Weekday name (Sunday .. Saturday)
{% highlight text %}%W{% endhighlight %}Weekday name (Sunday .. Saturday)
{% highlight text %}%w{% endhighlight %}Day of the week (0 .. 6), where Sunday is the first day of the week
{% highlight text %}%X{% endhighlight %}Year for the week where Sunday is the first day of the week, numeric, four digits; used with %V
{% highlight text %}%x{% endhighlight %}Year for the week, where Monday is the first day of the week, numeric, four digits; used with %v
{% highlight text %}%Y{% endhighlight %}Year, numeric, four digits
{% highlight text %}%y{% endhighlight %}Year, numeric (two digits)
{% highlight text %}%%{% endhighlight %}A literal % character
{% highlight text %}%x{% endhighlight %}x, for any x not listed above
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 9caf9bc330b24..e8bcdcfcdca97 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -2008,9 +2008,7 @@ class CodeGenerator( } /** - * Adds a reusable [[java.util.Random]] to the member area of the generated [[Function]]. - * - * The seed parameter must be a literal/constant expression. + * Adds a reusable DateFormatter to the member area of the generated [[Function]]. * * @return member variable term */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index b501c1e97bd40..b7da141072d85 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -89,4 +89,5 @@ object BuiltInMethods { val CONCAT_WS = Types.lookupMethod( classOf[ScalarFunctions], "concat_ws", classOf[String], classOf[Array[String]]) + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala index f156b1aa70b07..ba1e294bc6e84 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala @@ -30,7 +30,7 @@ class DateFormatCallGen extends CallGenerator { if (operands.last.literal) { val formatter = codeGenerator.addReusableDateFormatter(operands.last) generateCallIfArgsNotNull(codeGenerator.nullCheck, STRING_TYPE_INFO, operands) { - terms =>s"$formatter.print(${terms.head})" + terms => s"$formatter.print(${terms.head})" } } else { generateCallIfArgsNotNull(codeGenerator.nullCheck, STRING_TYPE_INFO, operands) { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/DateTimeSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/DateTimeSqlFunction.scala deleted file mode 100644 index 27ce4e9e2e13d..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/DateTimeSqlFunction.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.functions - -import org.apache.calcite.sql.`type`._ -import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} - - -object DateTimeSqlFunction { - val DATE_FORMAT = new SqlFunction( - "DATE_FORMAT", - SqlKind.OTHER_FUNCTION, - ReturnTypes.explicit(SqlTypeName.VARCHAR), - InferTypes.RETURN_TYPE, - OperandTypes.sequence("'(TIMESTAMP, FORMAT)'", OperandTypes.DATETIME, OperandTypes.STRING), - SqlFunctionCategory.TIMEDATE - ) -}