Skip to content

Commit

Permalink
[FLINK-12844][table-planner-blink] Use default conversion class Local…
Browse files Browse the repository at this point in the history
…Date/LocalTime/LocalDateTime for DateType/TimeType/TimestampType in blink
  • Loading branch information
JingsongLi committed Jul 5, 2019
1 parent bcc994b commit ca31b33
Show file tree
Hide file tree
Showing 54 changed files with 1,495 additions and 938 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInfo;
Expand Down Expand Up @@ -1713,6 +1714,12 @@ private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz,
return timeTypeInfo;
}

// check for SQL time types
TypeInformation<OUT> localTimeTypeInfo = LocalTimeTypeInfo.getInfoFor(clazz);
if (localTimeTypeInfo != null) {
return localTimeTypeInfo;
}

// check for subclasses of Value
if (Value.class.isAssignableFrom(clazz)) {
Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -42,6 +45,7 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
Expand Down Expand Up @@ -2085,4 +2089,39 @@ public Tuple3<Date, Time, Timestamp> map(Tuple3<Date, Time, Timestamp> value) th
Assert.assertEquals(SqlTimeTypeInfo.TIME, TypeExtractor.getForObject(Time.valueOf("12:37:45")));
Assert.assertEquals(SqlTimeTypeInfo.TIMESTAMP, TypeExtractor.getForObject(Timestamp.valueOf("1998-12-12 12:37:45")));
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testLocalTimeTypes() {
MapFunction<?, ?> function = new MapFunction<Tuple3<LocalDate, LocalTime, LocalDateTime>, Tuple3<LocalDate, LocalTime, LocalDateTime>>() {
@Override
public Tuple3<LocalDate, LocalTime, LocalDateTime> map(Tuple3<LocalDate, LocalTime, LocalDateTime> value) throws Exception {
return null;
}
};

TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
function,
(TypeInformation) TypeInformation.of(new TypeHint<Tuple3<LocalDate, LocalTime, LocalDateTime>>() {
}));

Assert.assertTrue(ti.isTupleType());
TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
Assert.assertEquals(LocalTimeTypeInfo.LOCAL_DATE, tti.getTypeAt(0));
Assert.assertEquals(LocalTimeTypeInfo.LOCAL_TIME, tti.getTypeAt(1));
Assert.assertEquals(LocalTimeTypeInfo.LOCAL_DATE_TIME, tti.getTypeAt(2));

// use getForClass()
Assert.assertEquals(tti.getTypeAt(0), TypeExtractor.getForClass(LocalDate.class));
Assert.assertEquals(tti.getTypeAt(1), TypeExtractor.getForClass(LocalTime.class));
Assert.assertEquals(tti.getTypeAt(2), TypeExtractor.getForClass(LocalDateTime.class));

// use getForObject()
Assert.assertEquals(LocalTimeTypeInfo.LOCAL_DATE,
TypeExtractor.getForObject(LocalDate.of(1998, 12, 12)));
Assert.assertEquals(LocalTimeTypeInfo.LOCAL_TIME,
TypeExtractor.getForObject(LocalTime.of(12, 37, 45)));
Assert.assertEquals(LocalTimeTypeInfo.LOCAL_DATE_TIME,
TypeExtractor.getForObject(LocalDateTime.of(1998, 12, 12, 12, 37, 45)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ public static TypeInformation<java.sql.Timestamp> SQL_TIMESTAMP() {
return org.apache.flink.api.common.typeinfo.Types.SQL_TIMESTAMP;
}

/**
* Returns type information for a Table API LocalDate type.
*/
public static TypeInformation<java.time.LocalDate> LOCAL_DATE() {
return org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE;
}

/**
* Returns type information for a Table API LocalTime type.
*/
public static TypeInformation<java.time.LocalTime> LOCAL_TIME() {
return org.apache.flink.api.common.typeinfo.Types.LOCAL_TIME;
}

/**
* Returns type information for a Table API LocalDateTime type.
*/
public static TypeInformation<java.time.LocalDateTime> LOCAL_DATE_TIME() {
return org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME;
}

/**
* Returns type information for a Table API interval of months.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down Expand Up @@ -48,7 +49,9 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -99,6 +102,9 @@ public final class LegacyTypeInfoDataTypeConverter {
addMapping(Types.FLOAT, DataTypes.FLOAT().bridgedTo(Float.class));
addMapping(Types.DOUBLE, DataTypes.DOUBLE().bridgedTo(Double.class));
addMapping(Types.BIG_DEC, createLegacyType(LogicalTypeRoot.DECIMAL, Types.BIG_DEC));
addMapping(LocalTimeTypeInfo.LOCAL_DATE, DataTypes.DATE().bridgedTo(LocalDate.class));
addMapping(LocalTimeTypeInfo.LOCAL_TIME, DataTypes.TIME(0).bridgedTo(LocalTime.class));
addMapping(LocalTimeTypeInfo.LOCAL_DATE_TIME, DataTypes.TIMESTAMP(3).bridgedTo(LocalDateTime.class));
addMapping(Types.SQL_DATE, DataTypes.DATE().bridgedTo(java.sql.Date.class));
addMapping(Types.SQL_TIME, DataTypes.TIME(0).bridgedTo(java.sql.Time.class));
addMapping(Types.SQL_TIMESTAMP, DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, Objec
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.types.Row

import _root_.java.{lang, math, sql, util}
import _root_.java.{lang, math, sql, time, util}

import _root_.scala.annotation.varargs

Expand Down Expand Up @@ -92,6 +92,22 @@ object Types {
*/
val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP

/**
* Returns type information for a Table API LocalDate type.
*/
val LOCAL_DATE: TypeInformation[time.LocalDate] = JTypes.LOCAL_DATE

/**
* Returns type information for a Table API LocalTime type.
*/
val LOCAL_TIME: TypeInformation[time.LocalTime] = JTypes.LOCAL_TIME

/**
* Returns type information for a Table API LocalDateTime type.
*/
val LOCAL_DATE_TIME: TypeInformation[time.LocalDateTime] = JTypes.LOCAL_DATE_TIME


/**
* Returns type information for a Table API interval of months.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.GenerateUtils.generateRecordStatement
import org.apache.flink.table.dataformat.GenericRow
import org.apache.flink.table.dataformat.{DataFormatConverters, GenericRow}
import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
import org.apache.flink.table.runtime.TableStreamOperator
import org.apache.flink.table.runtime.util.collections._
Expand Down Expand Up @@ -433,22 +433,21 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}

/**
* Adds a reusable local timestamp to the beginning of the SAM of the generated class.
*/
def addReusableLocalTimestamp(): String = {
addReusableTimestamp()
}

/**
* Adds a reusable time to the beginning of the SAM of the generated class.
* Adds a reusable time to the beginning of the SAM of the generated [[Function]].
*/
def addReusableTime(): String = {
val fieldTerm = s"time"

val timestamp = addReusableTimestamp()

// declaration
reusableMemberStatements.add(s"private int $fieldTerm;")

// assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
val field =
s"""
|final int $fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|$fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|if (time < 0) {
| time += ${DateTimeUtils.MILLIS_PER_DAY};
|}
Expand All @@ -457,19 +456,44 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
fieldTerm
}

/**
* Adds a reusable local date time to the beginning of the SAM of the generated class.
*/
def addReusableLocalDateTime(): String = {
val fieldTerm = s"localtimestamp"

val timeZone = addReusableTimeZone()
val timestamp = addReusableTimestamp()

// declaration
reusableMemberStatements.add(s"private long $fieldTerm;")

// assignment
val field =
s"""
|$fieldTerm = $timestamp + $timeZone.getOffset($timestamp);
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
}

/**
* Adds a reusable local time to the beginning of the SAM of the generated class.
*/
def addReusableLocalTime(): String = {
val fieldTerm = s"localtime"
val timeZone = addReusableTimeZone()
val localtimestamp = addReusableLocalTimestamp()

val localtimestamp = addReusableLocalDateTime()

// declaration
reusableMemberStatements.add(s"private int $fieldTerm;")

// assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.localTime()
val field =
s"""
|final int $fieldTerm = (int) ( ($localtimestamp + $timeZone.getOffset($localtimestamp))
| % ${DateTimeUtils.MILLIS_PER_DAY});
|""".stripMargin
s"""
|$fieldTerm = (int) ($localtimestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
}
Expand All @@ -479,15 +503,18 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
*/
def addReusableDate(): String = {
val fieldTerm = s"date"

val timestamp = addReusableTimestamp()
val time = addReusableTime()
val timeZone = addReusableTimeZone()

// declaration
reusableMemberStatements.add(s"private int $fieldTerm;")

// assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.currentDate()
val field =
s"""
|final int $fieldTerm = (int) (($timestamp + $timeZone.getOffset($timestamp))
| / ${DateTimeUtils.MILLIS_PER_DAY});
|$fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY});
|if ($time < 0) {
| $fieldTerm -= 1;
|}
Expand All @@ -500,7 +527,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
* Adds a reusable TimeZone to the member area of the generated class.
*/
def addReusableTimeZone(): String = {
val zoneID = tableConfig.getTimeZone.getID
val zoneID = DataFormatConverters.DEFAULT_TIME_ZONE.getID
val stmt =
s"""private static final java.util.TimeZone $DEFAULT_TIMEZONE_TERM =
| java.util.TimeZone.getTimeZone("$zoneID");""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,14 +369,8 @@ object GenerateUtils {
generateNonNullLiteral(literalType, literalValue.toString, literalValue)

case TIMESTAMP_WITHOUT_TIME_ZONE =>
// Hack
// Currently, in RexLiteral/SqlLiteral(Calcite), TimestampString has no time zone.
// TimeString, DateString TimestampString are treated as UTC time/(unix time)
// when they are converted/formatted/validated
// Here, we adjust millis before Calcite solve TimeZone perfectly
val millis = literalValue.asInstanceOf[Long]
val adjustedValue = millis - ctx.tableConfig.getTimeZone.getOffset(millis)
generateNonNullLiteral(literalType, adjustedValue.toString + "L", adjustedValue)
generateNonNullLiteral(literalType, millis + "L", millis)

case INTERVAL_YEAR_MONTH =>
val decimal = BigDecimal(literalValue.asInstanceOf[JBigDecimal])
Expand Down

0 comments on commit ca31b33

Please sign in to comment.