diff --git a/flink-core/src/main/java/org/apache/flink/util/TimeConvertUtils.java b/flink-core/src/main/java/org/apache/flink/util/TimeConvertUtils.java deleted file mode 100644 index b9cab7935f72d0..00000000000000 --- a/flink-core/src/main/java/org/apache/flink/util/TimeConvertUtils.java +++ /dev/null @@ -1,388 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.util; - -import java.nio.charset.Charset; -import java.sql.Timestamp; -import java.util.Calendar; -import java.util.Date; -import java.util.TimeZone; - -/** - * A {@link TimeConvertUtils} is used to convert Time Type. - */ -public class TimeConvertUtils { - - public static final Charset UTF_8 = Charset.forName("UTF-8"); - - private static final TimeZone UTC_TZ = TimeZone.getTimeZone("UTC"); - - /** The julian date of the epoch, 1970-01-01. */ - public static final int EPOCH_JULIAN = 2440588; - - /** - * The number of milliseconds in a second. - */ - public static final long MILLIS_PER_SECOND = 1000L; - - /** - * The number of milliseconds in a minute. - */ - public static final long MILLIS_PER_MINUTE = 60000L; - - /** - * The number of milliseconds in an hour. - */ - public static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000 - - /** - * The number of milliseconds in a day. - * - *
This is the modulo 'mask' used when converting - * TIMESTAMP values to DATE and TIME values. - */ - public static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000 - - public static int toInt(Date v) { - return toInt(v, UTC_TZ); - } - - public static int toInt(Date v, TimeZone timeZone) { - return (int) (toLong(v, timeZone) / MILLIS_PER_DAY); - } - - public static int toInt(java.sql.Time v) { - return (int) (toLong(v) % MILLIS_PER_DAY); - } - - public static long toLong(Date v) { - return toLong(v, UTC_TZ); - } - - // mainly intended for java.sql.Timestamp but works for other dates also - public static long toLong(Date v, TimeZone timeZone) { - final long time = v.getTime(); - return time + timeZone.getOffset(time); - } - - public static long toLong(Timestamp v) { - return toLong(v, UTC_TZ); - } - - public static long toLong(Object o) { - return o instanceof Long ? (Long) o - : o instanceof Number ? toLong(o) - : o instanceof String ? toLong(o) - : (Long) cannotConvert(o, long.class); - } - - private static Object cannotConvert(Object o, Class toType) { - throw new RuntimeException("Cannot convert " + o + " to " + toType); - } - - public static Timestamp internalToTimestamp(long v) { - return new Timestamp(v - UTC_TZ.getOffset(v)); - } - - public static Timestamp internalToTimestamp(Long v) { - return v == null ? null : internalToTimestamp(v.longValue()); - } - - public static java.sql.Date internalToDate(int v) { - final long t = v * MILLIS_PER_DAY; - return new java.sql.Date(t - UTC_TZ.getOffset(t)); - } - - public static java.sql.Date internalToDate(Integer v) { - return v == null ? null : internalToDate(v.intValue()); - } - - public static java.sql.Time internalToTime(int v) { - return new java.sql.Time(v - UTC_TZ.getOffset(v)); - } - - public static java.sql.Time internalToTime(Integer v) { - return v == null ? null : internalToTime(v.intValue()); - } - - public static int dateStringToUnixDate(String s) { - int hyphen1 = s.indexOf('-'); - int y; - int m; - int d; - if (hyphen1 < 0) { - y = Integer.parseInt(s.trim()); - m = 1; - d = 1; - } else { - y = Integer.parseInt(s.substring(0, hyphen1).trim()); - final int hyphen2 = s.indexOf('-', hyphen1 + 1); - if (hyphen2 < 0) { - m = Integer.parseInt(s.substring(hyphen1 + 1).trim()); - d = 1; - } else { - m = Integer.parseInt(s.substring(hyphen1 + 1, hyphen2).trim()); - d = Integer.parseInt(s.substring(hyphen2 + 1).trim()); - } - } - return ymdToUnixDate(y, m, d); - } - - public static int timeStringToUnixDate(String v) { - return timeStringToUnixDate(v, 0); - } - - public static int timeStringToUnixDate(String v, int start) { - final int colon1 = v.indexOf(':', start); - int hour; - int minute; - int second; - int milli; - if (colon1 < 0) { - hour = Integer.parseInt(v.trim()); - minute = 1; - second = 1; - milli = 0; - } else { - hour = Integer.parseInt(v.substring(start, colon1).trim()); - final int colon2 = v.indexOf(':', colon1 + 1); - if (colon2 < 0) { - minute = Integer.parseInt(v.substring(colon1 + 1).trim()); - second = 1; - milli = 0; - } else { - minute = Integer.parseInt(v.substring(colon1 + 1, colon2).trim()); - int dot = v.indexOf('.', colon2); - if (dot < 0) { - second = Integer.parseInt(v.substring(colon2 + 1).trim()); - milli = 0; - } else { - second = Integer.parseInt(v.substring(colon2 + 1, dot).trim()); - milli = parseFraction(v.substring(dot + 1).trim(), 100); - } - } - } - return hour * (int) MILLIS_PER_HOUR - + minute * (int) MILLIS_PER_MINUTE - + second * (int) MILLIS_PER_SECOND - + milli; - } - - /** Parses a fraction, multiplying the first character by {@code multiplier}, - * the second character by {@code multiplier / 10}, - * the third character by {@code multiplier / 100}, and so forth. - * - *
For example, {@code parseFraction("1234", 100)} yields {@code 123}. */ - private static int parseFraction(String v, int multiplier) { - int r = 0; - for (int i = 0; i < v.length(); i++) { - char c = v.charAt(i); - int x = c < '0' || c > '9' ? 0 : (c - '0'); - r += multiplier * x; - if (multiplier < 10) { - // We're at the last digit. Check for rounding. - if (i + 1 < v.length() - && v.charAt(i + 1) >= '5') { - ++r; - } - break; - } - multiplier /= 10; - } - return r; - } - - public static long timestampStringToUnixDate(String s) { - final long d; - final long t; - s = s.trim(); - int space = s.indexOf(' '); - if (space >= 0) { - d = dateStringToUnixDate(s.substring(0, space)); - t = timeStringToUnixDate(s, space + 1); - } else { - int hyphen = s.indexOf('-'); - if (hyphen >= 0) { - d = dateStringToUnixDate(s); - t = 0; - } else { - // Numeric To Timestamp : Precision is second - d = 0; - t = Integer.parseInt(s) * 1000; - } - } - return d * MILLIS_PER_DAY + t; - } - - public static String unixTimestampToString(long timestamp) { - return unixTimestampToString(timestamp, 0); - } - - public static String unixTimestampToString(long timestamp, int precision) { - final StringBuilder buf = new StringBuilder(17); - int date = (int) (timestamp / MILLIS_PER_DAY); - int time = (int) (timestamp % MILLIS_PER_DAY); - if (time < 0) { - --date; - time += MILLIS_PER_DAY; - } - unixDateToString(buf, date); - buf.append(' '); - unixTimeToString(buf, time, precision); - return buf.toString(); - } - - /** Helper for CAST({timestamp} AS VARCHAR(n)). */ - public static String unixTimeToString(int time) { - return unixTimeToString(time, 0); - } - - public static String unixTimeToString(int time, int precision) { - final StringBuilder buf = new StringBuilder(8); - unixTimeToString(buf, time, precision); - return buf.toString(); - } - - private static void unixTimeToString(StringBuilder buf, int time, int precision) { - while (time < 0) { - time += MILLIS_PER_DAY; - } - int h = time / 3600000; - int time2 = time % 3600000; - int m = time2 / 60000; - int time3 = time2 % 60000; - int s = time3 / 1000; - int ms = time3 % 1000; - int2(buf, h); - buf.append(':'); - int2(buf, m); - buf.append(':'); - int2(buf, s); - if (precision > 0) { - buf.append('.'); - while (precision > 0) { - buf.append((char) ('0' + (ms / 100))); - ms = ms % 100; - ms = ms * 10; - - // keep consistent with Timestamp.toString() - if (ms == 0) { - break; - } - - --precision; - } - } - } - - private static void int2(StringBuilder buf, int i) { - buf.append((char) ('0' + (i / 10) % 10)); - buf.append((char) ('0' + i % 10)); - } - - private static void int4(StringBuilder buf, int i) { - buf.append((char) ('0' + (i / 1000) % 10)); - buf.append((char) ('0' + (i / 100) % 10)); - buf.append((char) ('0' + (i / 10) % 10)); - buf.append((char) ('0' + i % 10)); - } - - public static String unixDateToString(int date) { - final StringBuilder buf = new StringBuilder(10); - unixDateToString(buf, date); - return buf.toString(); - } - - private static void unixDateToString(StringBuilder buf, int date) { - julianToString(buf, date + EPOCH_JULIAN); - } - - private static void julianToString(StringBuilder buf, int julian) { - // Algorithm the book "Astronomical Algorithms" by Jean Meeus, 1998 - int b, c; - if (julian > 2299160) { - int a = julian + 32044; - b = (4 * a + 3) / 146097; - c = a - b * 146097 / 4; - } else { - b = 0; - c = julian + 32082; - } - int d = (4 * c + 3) / 1461; - int e = c - (1461 * d) / 4; - int m = (5 * e + 2) / 153; - int day = e - (153 * m + 2) / 5 + 1; - int month = m + 3 - 12 * (m / 10); - int year = b * 100 + d - 4800 + (m / 10); - - int4(buf, year); - buf.append('-'); - int2(buf, month); - buf.append('-'); - int2(buf, day); - } - - public static int ymdToUnixDate(int year, int month, int day) { - final int julian = ymdToJulian(year, month, day); - return julian - EPOCH_JULIAN; - } - - public static int ymdToJulian(int year, int month, int day) { - int a = (14 - month) / 12; - int y = year + 4800 - a; - int m = month + 12 * a - 3; - int j = day + (153 * m + 2) / 5 - + 365 * y - + y / 4 - - y / 100 - + y / 400 - - 32045; - if (j < 2299161) { - j = day + (153 * m + 2) / 5 + 365 * y + y / 4 - 32083; - } - return j; - } - - public static String unixDateTimeToString(Object o) { - if (o instanceof java.sql.Date) { - return unixDateToString(toInt((java.sql.Date) o)); - } else if (o instanceof java.sql.Time) { - return unixTimeToString(toInt((java.sql.Time) o)); - } else if (o instanceof Timestamp) { - return unixTimestampToString(toLong((Timestamp) o), 3); - } else { - return o.toString(); - } - } - - public static String unixDateTimeToString(Object o, TimeZone tz) { - int offset = tz.getOffset(Calendar.ZONE_OFFSET); - - if (o instanceof java.sql.Date) { - return unixDateToString(toInt((java.sql.Date) o) + offset); - } else if (o instanceof java.sql.Time) { - return unixTimeToString((toInt((java.sql.Time) o) + offset) % (int) MILLIS_PER_DAY); - } else if (o instanceof java.sql.Timestamp) { - return unixTimestampToString(toLong((java.sql.Timestamp) o) + offset, 3); - } else { - return o.toString(); - } - } -} - diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/StateUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/StateUtil.java deleted file mode 100644 index 8b0536133cf5c2..00000000000000 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/StateUtil.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.util; - -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; - -/** - * An Utility about state. - */ -public class StateUtil { - - private StateUtil() { - // Deprecate default constructor - } - - public static final String STATE_BACKEND_ON_HEAP = "statebackend.onheap"; - - /** - * Message to indicate the state is cleared because of ttl restriction. The messgae could be - * used to output to log. - */ - public static final String STATE_CLEARED_WARN_MSG = "The state is cleared because of state ttl. " + - "This will result in incorrect result. " + - "You can increase the state ttl to avoid this."; - - /** - * Returns true when the statebackend is on heap. - */ - public static boolean isHeapState(StateBackend stateBackend) { - return stateBackend == null || - stateBackend instanceof MemoryStateBackend || - stateBackend instanceof FsStateBackend; - } -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 05869766b79285..8991aef374634b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -117,17 +117,16 @@ class BatchTableEnvironment( * @tparam T The expected type of the [[DataStream]] which represents the [[Table]]. */ override private[table] def writeToSink[T]( - table: Table, - sink: TableSink[T], - sinkName: String): Unit = { + table: Table, + sink: TableSink[T], + sinkName: String): Unit = { mergeParameters() val sinkNode = LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName) val optimizedPlan = optimize(sinkNode) val optimizedNodes = translateNodeDag(Seq(optimizedPlan)) - translateNodeDag(Seq(optimizedPlan)) require(optimizedNodes.size() == 1) - translate(optimizedNodes.head) + translateToPlan(optimizedNodes.head) } /** @@ -137,7 +136,7 @@ class BatchTableEnvironment( * @param node The plan to translate. * @return The [[StreamTransformation]] that corresponds to the given node. */ - private def translate(node: ExecNode[_, _]): StreamTransformation[_] = { + private def translateToPlan(node: ExecNode[_, _]): StreamTransformation[_] = { node match { case node: BatchExecNode[_] => node.translateToPlan(this) case _ => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 1a0b67f89a73a2..c999ab94cad904 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -20,6 +20,8 @@ package org.apache.flink.table.api import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.filesystem.FsStateBackend +import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -36,7 +38,6 @@ import org.apache.flink.table.plan.util.FlinkRelOptUtil import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink} import org.apache.flink.table.sources.{StreamTableSource, TableSource} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo -import org.apache.flink.table.util.StateUtil import org.apache.calcite.plan.ConventionTraitDef import org.apache.calcite.rel.RelCollationTraitDef @@ -119,9 +120,13 @@ abstract class StreamTableEnvironment( kv => parameters.setString(kv._1, kv._2) } } - parameters.setBoolean( - StateUtil.STATE_BACKEND_ON_HEAP, - StateUtil.isHeapState(execEnv.getStateBackend)) + val isHeapState = Option(execEnv.getStateBackend) match { + case Some(backend) if backend.isInstanceOf[MemoryStateBackend] || + backend.isInstanceOf[FsStateBackend]=> true + case None => true + case _ => false + } + parameters.setBoolean(TableConfigOptions.SQL_EXEC_STATE_BACKEND_ON_HEAP, isHeapState) execEnv.getConfig.setGlobalJobParameters(parameters) isConfigMerged = true } @@ -176,9 +181,8 @@ abstract class StreamTableEnvironment( val optimizedPlan = optimize(sink) val optimizedNodes = translateNodeDag(Seq(optimizedPlan)) - translateNodeDag(Seq(optimizedPlan)) require(optimizedNodes.size() == 1) - translate(optimizedNodes.head) + translateToPlan(optimizedNodes.head) } /** @@ -187,7 +191,7 @@ abstract class StreamTableEnvironment( * @param node The plan to translate. * @return The [[StreamTransformation]] of type [[BaseRow]]. */ - private def translate(node: ExecNode[_, _]): StreamTransformation[_] = { + private def translateToPlan(node: ExecNode[_, _]): StreamTransformation[_] = { node match { case node: StreamExecNode[_] => node.translateToPlan(this) case _ => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 07a84ead010b0b..82fe8e49608921 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -163,9 +163,10 @@ abstract class TableEnvironment(val config: TableConfig) { * @tparam T The data type that the [[TableSink]] expects. */ private[table] def writeToSink[T]( - table: Table, - sink: TableSink[T], - sinkName: String = null): Unit + table: Table, + sink: TableSink[T], + sinkName: String = null): Unit + /** * Generates the optimized [[RelNode]] dag from the original relational nodes. * diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 19e2422c311a9f..81fea7d71bb296 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -18,11 +18,9 @@ package org.apache.flink.table.calcite -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.`type`._ import org.apache.flink.table.api.{TableException, TableSchema} import org.apache.flink.table.plan.schema.{GenericRelDataType, _} -import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo -import org.apache.flink.table.`type`._ import org.apache.calcite.avatica.util.TimeUnit import org.apache.calcite.jdbc.JavaTypeFactoryImpl @@ -363,11 +361,6 @@ object FlinkTypeFactory { case _ => false } - def isRowtimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match { - case ti: TimeIndicatorTypeInfo if ti.isEventTime => true - case _ => false - } - def toInternalType(relDataType: RelDataType): InternalType = relDataType.getSqlTypeName match { case BOOLEAN => InternalTypes.BOOLEAN case TINYINT => InternalTypes.BYTE diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala index f588e06c026c3e..f611aec28647fa 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala @@ -765,9 +765,4 @@ object CodeGeneratorContext { def apply(config: TableConfig): CodeGeneratorContext = { new CodeGeneratorContext(config) } - - val DEFAULT_OUT_RECORD_TERM = "out" - - val DEFAULT_OUT_RECORD_WRITER_TERM = "outWriter" - } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala index d1758a315bbab9..7a41f13a38614c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala @@ -42,13 +42,13 @@ object InputFormatCodeGenerator { * @return instance of GeneratedFunction */ def generateValuesInputFormat[T]( - ctx: CodeGeneratorContext, - name: String, - records: Seq[String], - returnType: InternalType, - outRecordTerm: String = CodeGeneratorContext.DEFAULT_OUT_RECORD_TERM, - outRecordWriterTerm: String = CodeGeneratorContext.DEFAULT_OUT_RECORD_WRITER_TERM) - : GeneratedInput[GenericInputFormat[T]] = { + ctx: CodeGeneratorContext, + name: String, + records: Seq[String], + returnType: InternalType, + outRecordTerm: String = CodeGenUtils.DEFAULT_OUT_RECORD_TERM, + outRecordWriterTerm: String = CodeGenUtils.DEFAULT_OUT_RECORD_WRITER_TERM) + : GeneratedInput[GenericInputFormat[T]] = { val funcName = newName(name) ctx.addReusableOutputRecord(returnType, classOf[GenericRow], outRecordTerm, @@ -61,7 +61,7 @@ object InputFormatCodeGenerator { ${ctx.reuseMemberCode()} - public $funcName() throws Exception { + public $funcName(Object[] references) throws Exception { ${ctx.reuseInitCode()} } @@ -74,12 +74,12 @@ object InputFormatCodeGenerator { public Object nextRecord(Object reuse) { switch (nextIdx) { ${records.zipWithIndex.map { case (r, i) => - s""" - |case $i: - | $r - |break; + s""" + |case $i: + | $r + |break; """.stripMargin - }.mkString("\n")} + }.mkString("\n")} } nextIdx++; return $outRecordTerm; @@ -87,7 +87,7 @@ object InputFormatCodeGenerator { } """.stripMargin - new GeneratedInput(funcName, funcCode) + new GeneratedInput(funcName, funcCode, ctx.references.toArray) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala index 79842a077c83d6..2da8d8a7e02e0c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala @@ -21,8 +21,8 @@ package org.apache.flink.table.codegen import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.dataformat.{BaseRow, GenericRow} -import org.apache.flink.table.generated.ValuesInputFormat -import org.apache.flink.table.`type`.TypeConverters +import org.apache.flink.table.runtime.ValuesInputFormat +import org.apache.flink.table.typeutils.BaseRowTypeInfo import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.RexLiteral @@ -56,9 +56,8 @@ object ValuesCodeGenerator { generatedRecords.map(_.code), outputType) - new ValuesInputFormat( - generatedFunction, - TypeConverters.toBaseRowTypeInfo(outputType)) + val baseRowTypeInfo = new BaseRowTypeInfo(outputType.getFieldTypes, outputType.getFieldNames) + new ValuesInputFormat(generatedFunction, baseRowTypeInfo) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala index 9604a94c26c306..4b5ed084859243 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala @@ -53,17 +53,8 @@ class BatchExecSink[T]( new BatchExecSink(cluster, traitSet, inputs.get(0), sink, sinkName) } - /** - * Returns [[DamBehavior]] of this node. - */ override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED - /** - * Returns an array of this node's inputs. If there are no inputs, - * returns an empty list, not null. - * - * @return Array of this node's inputs - */ override def getInputNodes: util.List[ExecNode[BatchTableEnvironment, _]] = { List(getInput.asInstanceOf[ExecNode[BatchTableEnvironment, _]]) } @@ -74,31 +65,27 @@ class BatchExecSink[T]( * @param tableEnv The [[BatchTableEnvironment]] of the translated Table. */ override protected def translateToPlanInternal( - tableEnv: BatchTableEnvironment): StreamTransformation[Any] = { + tableEnv: BatchTableEnvironment): StreamTransformation[Any] = { val convertTransformation = sink match { case _: BatchTableSink[T] => - translate(withChangeFlag = false, tableEnv) + val transformation = translateToStreamTransformation(withChangeFlag = false, tableEnv) + val stream = new DataStream(tableEnv.streamEnv, transformation) + emitBoundedStreamSink(stream, tableEnv).getTransformation case streamTableSink: DataStreamTableSink[T] => - translate(withChangeFlag = streamTableSink.withChangeFlag, tableEnv) + // In case of table to bounded stream through BatchTableEnvironment#toBoundedStream, we insert a + // DataStreamTableSink then wrap it as a LogicalSink, there is no real batch table sink, so + // we do not need to invoke TableSink#emitBoundedStream and set resource, just a translation to + // StreamTransformation is ok. + translateToStreamTransformation(withChangeFlag = streamTableSink.withChangeFlag, tableEnv) case _ => throw new TableException("Only Support BatchTableSink or BatchCompatibleStreamTableSink") } - // In case of table to bounded stream through BatchTableEnvironment#toBoundedStream, we insert a - // DataStreamTableSink then wrap it as a LogicalSink, there is no real batch table sink, so - // we do not need to invoke TableSink#emitBoundedStream and set resource, just a translation to - // StreamTransformation is ok. - val resultTransformation = if (sink.isInstanceOf[DataStreamTableSink[T]]) { - convertTransformation - } else { - val stream = new DataStream(tableEnv.streamEnv, convertTransformation) - emitBoundedStreamSink(stream, tableEnv).getTransformation - } - resultTransformation.asInstanceOf[StreamTransformation[Any]] + convertTransformation.asInstanceOf[StreamTransformation[Any]] } private def emitBoundedStreamSink( - boundedStream: DataStream[T], - tableEnv: BatchTableEnvironment): DataStreamSink[_] = { + boundedStream: DataStream[T], + tableEnv: BatchTableEnvironment): DataStreamSink[_] = { val config = tableEnv.getConfig sink match { case sinkBatch: BatchTableSink[T] => @@ -115,9 +102,9 @@ class BatchExecSink[T]( * * @return The [[StreamTransformation]] that corresponds to the translated [[Table]]. */ - private def translate( - withChangeFlag: Boolean, - tableEnv: BatchTableEnvironment): StreamTransformation[T] = { + private def translateToStreamTransformation( + withChangeFlag: Boolean, + tableEnv: BatchTableEnvironment): StreamTransformation[T] = { val resultType = sink.getOutputType TableEnvironment.validateType(resultType) val inputNode = getInputNodes.get(0) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala index 58061124480fa5..fcb005da629d07 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala @@ -55,7 +55,7 @@ class StreamExecSink[T]( override def producesUpdates: Boolean = false override def needsUpdatesAsRetraction(input: RelNode): Boolean = - sink.isInstanceOf[BaseRetractStreamTableSink[_]] + sink.isInstanceOf[RetractStreamTableSink[_]] override def consumesRetractions: Boolean = false @@ -67,12 +67,6 @@ class StreamExecSink[T]( new StreamExecSink(cluster, traitSet, inputs.get(0), sink, sinkName) } - /** - * Returns an array of this node's inputs. If there are no inputs, - * returns an empty list, not null. - * - * @return Array of this node's inputs - */ override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = { List(getInput.asInstanceOf[ExecNode[StreamTableEnvironment, _]]) } @@ -83,19 +77,19 @@ class StreamExecSink[T]( * @param tableEnv The [[StreamTransformation]] of the translated Table. */ override protected def translateToPlanInternal( - tableEnv: StreamTableEnvironment): StreamTransformation[Any] = { + tableEnv: StreamTableEnvironment): StreamTransformation[Any] = { val convertTransformation = sink match { - case _: BaseRetractStreamTableSink[T] => + case _: RetractStreamTableSink[T] => // translate the Table into a DataStream and provide the type that the TableSink expects. - translate(withChangeFlag = true, tableEnv) + translateToStreamTransformation(withChangeFlag = true, tableEnv) - case upsertSink: BaseUpsertStreamTableSink[T] => + case upsertSink: UpsertStreamTableSink[T] => // check for append only table val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(this) upsertSink.setIsAppendOnly(isAppendOnlyTable) // translate the Table into a DataStream and provide the type that the TableSink expects. - translate(withChangeFlag = true, tableEnv) + translateToStreamTransformation(withChangeFlag = true, tableEnv) case _: AppendStreamTableSink[T] => // verify table is an insert-only (append-only) table @@ -111,20 +105,21 @@ class StreamExecSink[T]( } // translate the Table into a DataStream and provide the type that the TableSink expects. - translate(withChangeFlag = false, tableEnv) + translateToStreamTransformation(withChangeFlag = false, tableEnv) case s: DataStreamTableSink[_] => - translate(s.withChangeFlag, tableEnv) + // In case of table to stream through BatchTableEnvironment#translateToDataStream, + // we insert a DataStreamTableSink then wrap it as a LogicalSink, there is no real batch + // table sink, so we do not need to invoke TableSink#emitBoundedStream and set resource, + // just a translation to StreamTransformation is ok. + return translateToStreamTransformation(s.withChangeFlag, tableEnv) + .asInstanceOf[StreamTransformation[Any]] case _ => throw new TableException("Stream Tables can only be emitted by AppendStreamTableSink, " + "RetractStreamTableSink, or UpsertStreamTableSink.") } - val resultTransformation = if (sink.isInstanceOf[DataStreamTableSink[T]]) { - convertTransformation - } else { - val stream = new DataStream(tableEnv.execEnv, convertTransformation) - emitDataStream(tableEnv.getConfig.getConf, stream).getTransformation - } + val stream = new DataStream(tableEnv.execEnv, convertTransformation) + val resultTransformation = emitDataStream(tableEnv.getConfig.getConf, stream).getTransformation resultTransformation.asInstanceOf[StreamTransformation[Any]] } @@ -134,9 +129,9 @@ class StreamExecSink[T]( * @param withChangeFlag Set to true to emit records with change flags. * @return The [[StreamTransformation]] that corresponds to the translated [[Table]]. */ - private def translate( - withChangeFlag: Boolean, - tableEnv: StreamTableEnvironment): StreamTransformation[T] = { + private def translateToStreamTransformation( + withChangeFlag: Boolean, + tableEnv: StreamTableEnvironment): StreamTransformation[T] = { val inputNode = getInput val resultType = sink.getOutputType // if no change flags are requested, verify table is an insert-only (append-only) table. @@ -186,15 +181,15 @@ class StreamExecSink[T]( * @param dataStream The [[DataStream]] to emit. */ private def emitDataStream( - tableConf: Configuration, - dataStream: DataStream[T]) : DataStreamSink[_] = { + tableConf: Configuration, + dataStream: DataStream[T]) : DataStreamSink[_] = { sink match { - case retractSink: BaseRetractStreamTableSink[T] => + case retractSink: RetractStreamTableSink[T] => // Give the DataStream to the TableSink to emit it. retractSink.emitDataStream(dataStream) - case upsertSink: BaseUpsertStreamTableSink[T] => + case upsertSink: UpsertStreamTableSink[T] => // Give the DataStream to the TableSink to emit it. upsertSink.emitDataStream(dataStream) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala index e54c9d2c6d7e0d..97ff92b5dff67c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala @@ -31,7 +31,7 @@ trait Optimizer { *
NOTES: *
1. The reused node in result DAG will be converted to the same RelNode. *
2. If a root node requires retract changes on Stream, the node should be
- * a [[org.apache.flink.table.sinks.BaseRetractStreamTableSink]] or
+ * a [[org.apache.flink.table.sinks.RetractStreamTableSink]] or
* a regular node with [[org.apache.flink.table.plan.trait.UpdateAsRetractionTrait]]
* which `updateAsRetraction` is true.
*
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
similarity index 95%
rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
index 82b95901c72725..b7f0ae64698fdd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.api.Table
*
* @tparam T Type of records that this [[TableSink]] expects and supports.
*/
-trait BaseRetractStreamTableSink[T] extends StreamTableSink[T] {
+trait RetractStreamTableSink[T] extends StreamTableSink[T] {
/** Emits the DataStream. */
def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_]
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseUpsertStreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
similarity index 93%
rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseUpsertStreamTableSink.scala
rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
index c0d6385069f68a..49ffbd05d0c3f1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseUpsertStreamTableSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
@@ -30,7 +30,7 @@ import java.lang.{Boolean => JBool}
* If the [[Table]] does not have a unique key and is not append-only, a
* [[org.apache.flink.table.api.TableException]] will be thrown.
*
- * The unique key of the table is configured by the [[BaseUpsertStreamTableSink#setKeyFields()]]
+ * The unique key of the table is configured by the [[UpsertStreamTableSink#setKeyFields()]]
* method.
*
* If the table is append-only, all messages will have a true flag and must be interpreted
@@ -38,7 +38,7 @@ import java.lang.{Boolean => JBool}
*
* @tparam T Type of records that this [[TableSink]] expects and supports.
*/
-trait BaseUpsertStreamTableSink[T] extends StreamTableSink[T] {
+trait UpsertStreamTableSink[T] extends StreamTableSink[T] {
/**
* Configures the unique key fields of the [[Table]] to write.
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestSinkUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BaseRowUtil.scala
similarity index 62%
rename from flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestSinkUtil.scala
rename to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BaseRowUtil.scala
index ed97ae15f2e54b..ab1f9a0d91a811 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestSinkUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BaseRowUtil.scala
@@ -19,20 +19,20 @@
package org.apache.flink.table.runtime.utils
import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
-import org.apache.flink.table.dataformat.util.BaseRowUtil
+import org.apache.flink.table.dataformat.{BaseRow, GenericRow, TypeGetterSetters}
import org.apache.flink.table.typeutils.BaseRowTypeInfo
-import org.apache.flink.util.{StringUtils, TimeConvertUtils}
+import org.apache.flink.util.StringUtils
import java.sql.{Date, Time, Timestamp}
import java.util.TimeZone
-object TestSinkUtil {
+object BaseRowUtil {
def fieldToString(field: Any, tz: TimeZone): String = {
field match {
case _: Date | _: Time | _: Timestamp =>
- TimeConvertUtils.unixDateTimeToString(field, tz)
+ // Support after FLINK-11898 is merged
+ throw new UnsupportedOperationException
case _ => StringUtils.arrayAwareToString(field)
}
}
@@ -52,15 +52,32 @@ object TestSinkUtil {
}
def baseRowToString(
- value: BaseRow,
- rowTypeInfo: BaseRowTypeInfo,
- tz: TimeZone,
- withHeader: Boolean = true): String = {
+ value: BaseRow,
+ rowTypeInfo: BaseRowTypeInfo,
+ tz: TimeZone,
+ withHeader: Boolean = true): String = {
val config = new ExecutionConfig
- val fieldTypes = rowTypeInfo.getFieldTypes
- val fieldSerializers = fieldTypes.map(_.createSerializer(config))
- val genericRow = BaseRowUtil.toGenericRow(value, fieldTypes, fieldSerializers)
+ val genericRow = RowUtil.toGenericRow(value, rowTypeInfo)
genericRowToString(genericRow, tz, withHeader)
}
+ def toGenericRow(baseRow: BaseRow, baseRowTypeInfo: BaseRowTypeInfo): GenericRow = {
+ baseRow match {
+ case genericRow: GenericRow => genericRow
+ case _ =>
+ val fieldNum = baseRow.getArity
+ val row: GenericRow = new GenericRow(fieldNum)
+ row.setHeader(baseRow.getHeader)
+ val types = baseRowTypeInfo.getInternalTypes
+ for (i <- 0 until fieldNum) {
+ if (baseRow.isNullAt(i)) {
+ row.setField(i, null)
+ } else {
+ row.setField(i, TypeGetterSetters.get(baseRow, i, types(i)))
+ }
+ }
+ row
+ }
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
index 12a6016a978083..7d1ba9822c55cc 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -68,7 +68,7 @@ class BatchTestBase {
val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
val datas: Seq[BaseRow] = SerializedListAccumulator.deserializeList(accResult, typeSerializer)
val tz = TimeZone.getTimeZone("UTC")
- datas.map(TestSinkUtil.baseRowToString(_, outType, tz, false))
+ datas.map(BaseRowUtil.baseRowToString(_, outType, tz, false))
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
index f3b9601c1c2f7a..811ba557d848c1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
@@ -125,7 +125,7 @@ final class TestingAppendBaseRowSink(
}
def invoke(value: BaseRow): Unit = localResults +=
- TestSinkUtil.baseRowToString(value, rowTypeInfo, tz)
+ BaseRowUtil.baseRowToString(value, rowTypeInfo, tz)
def getAppendResults: List[String] = getResults
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
index 887f61b45b3459..5c5909fa13e44d 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
@@ -84,4 +84,13 @@ public class TableConfigOptions {
.defaultValue(Long.MIN_VALUE)
.withDescription("MiniBatch allow latency(ms). Value > 0 means MiniBatch enabled.");
+ // ------------------------------------------------------------------------
+ // STATE BACKEND Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption