Skip to content

Commit

Permalink
Fix wuchong's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Jul 8, 2019
1 parent e2abcce commit 5f0e633
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 142 deletions.
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down Expand Up @@ -102,9 +101,9 @@ public final class LegacyTypeInfoDataTypeConverter {
addMapping(Types.FLOAT, DataTypes.FLOAT().bridgedTo(Float.class));
addMapping(Types.DOUBLE, DataTypes.DOUBLE().bridgedTo(Double.class));
addMapping(Types.BIG_DEC, createLegacyType(LogicalTypeRoot.DECIMAL, Types.BIG_DEC));
addMapping(LocalTimeTypeInfo.LOCAL_DATE, DataTypes.DATE().bridgedTo(LocalDate.class));
addMapping(LocalTimeTypeInfo.LOCAL_TIME, DataTypes.TIME(0).bridgedTo(LocalTime.class));
addMapping(LocalTimeTypeInfo.LOCAL_DATE_TIME, DataTypes.TIMESTAMP(3).bridgedTo(LocalDateTime.class));
addMapping(Types.LOCAL_DATE, DataTypes.DATE().bridgedTo(LocalDate.class));
addMapping(Types.LOCAL_TIME, DataTypes.TIME(0).bridgedTo(LocalTime.class));
addMapping(Types.LOCAL_DATE_TIME, DataTypes.TIMESTAMP(3).bridgedTo(LocalDateTime.class));
addMapping(Types.SQL_DATE, DataTypes.DATE().bridgedTo(java.sql.Date.class));
addMapping(Types.SQL_TIME, DataTypes.TIME(0).bridgedTo(java.sql.Time.class));
addMapping(Types.SQL_TIMESTAMP, DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class));
Expand Down
Expand Up @@ -462,7 +462,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
def addReusableLocalDateTime(): String = {
val fieldTerm = s"localtimestamp"

val timeZone = addReusableTimeZone()
val timestamp = addReusableTimestamp()

// declaration
Expand All @@ -471,7 +470,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
// assignment
val field =
s"""
|$fieldTerm = $timestamp + $timeZone.getOffset($timestamp);
|$fieldTerm = $timestamp + java.util.TimeZone.getDefault().getOffset($timestamp);
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
Expand Down Expand Up @@ -527,7 +526,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
* Adds a reusable TimeZone to the member area of the generated class.
*/
def addReusableTimeZone(): String = {
val zoneID = DataFormatConverters.DEFAULT_TIME_ZONE.getID
val zoneID = tableConfig.getTimeZone.getID
val stmt =
s"""private static final java.util.TimeZone $DEFAULT_TIMEZONE_TERM =
| java.util.TimeZone.getTimeZone("$zoneID");""".stripMargin
Expand Down
Expand Up @@ -304,16 +304,15 @@ class StreamExecGroupWindowAggregate(
val builder = WindowOperatorBuilder
.builder()
.withInputFields(inputFields.toArray)
val timeZoneOffset = 0

val newBuilder = window match {
case TumblingGroupWindow(_, timeField, size)
if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>
builder.tumble(toDuration(size), timeZoneOffset).withProcessingTime()
builder.tumble(toDuration(size)).withProcessingTime()

case TumblingGroupWindow(_, timeField, size)
if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) =>
builder.tumble(toDuration(size), timeZoneOffset).withEventTime(timeIdx)
builder.tumble(toDuration(size)).withEventTime(timeIdx)

case TumblingGroupWindow(_, timeField, size)
if isProctimeAttribute(timeField) && hasRowIntervalType(size) =>
Expand All @@ -328,12 +327,12 @@ class StreamExecGroupWindowAggregate(

case SlidingGroupWindow(_, timeField, size, slide)
if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>
builder.sliding(toDuration(size), toDuration(slide), timeZoneOffset)
builder.sliding(toDuration(size), toDuration(slide))
.withProcessingTime()

case SlidingGroupWindow(_, timeField, size, slide)
if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) =>
builder.sliding(toDuration(size), toDuration(slide), timeZoneOffset)
builder.sliding(toDuration(size), toDuration(slide))
.withEventTime(timeIdx)

case SlidingGroupWindow(_, timeField, size, slide)
Expand Down
Expand Up @@ -59,15 +59,13 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Function;
import java.util.stream.Stream;

import scala.Product;

import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Converters between internal data format and java format.
Expand All @@ -82,8 +80,6 @@
*/
public class DataFormatConverters {

public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getDefault();

private static final Map<DataType, DataFormatConverter> TYPE_TO_CONVERTER;
static {
Map<DataType, DataFormatConverter> t2C = new HashMap<>();
Expand All @@ -109,14 +105,17 @@ public class DataFormatConverters {
t2C.put(DataTypes.TINYINT().bridgedTo(Byte.class), ByteConverter.INSTANCE);
t2C.put(DataTypes.TINYINT().bridgedTo(byte.class), ByteConverter.INSTANCE);

t2C.put(DataTypes.DATE().bridgedTo(Date.class), DateConverter.INSTANCE);
t2C.put(DataTypes.DATE().bridgedTo(LocalDate.class), LocalDateConverter.INSTANCE);
t2C.put(DataTypes.DATE().bridgedTo(Integer.class), IntConverter.INSTANCE);
t2C.put(DataTypes.DATE().bridgedTo(int.class), IntConverter.INSTANCE);

t2C.put(DataTypes.TIME().bridgedTo(Time.class), TimeConverter.INSTANCE);
t2C.put(DataTypes.TIME().bridgedTo(LocalTime.class), LocalTimeConverter.INSTANCE);
t2C.put(DataTypes.TIME().bridgedTo(Integer.class), IntConverter.INSTANCE);
t2C.put(DataTypes.TIME().bridgedTo(int.class), IntConverter.INSTANCE);

t2C.put(DataTypes.TIMESTAMP(3).bridgedTo(Timestamp.class), TimestampConverter.INSTANCE);
t2C.put(DataTypes.TIMESTAMP(3).bridgedTo(LocalDateTime.class), LocalDateTimeConverter.INSTANCE);

t2C.put(DataTypes.INTERVAL(DataTypes.MONTH()).bridgedTo(Integer.class), IntConverter.INSTANCE);
Expand All @@ -137,19 +136,6 @@ public class DataFormatConverters {
*/
@Deprecated
public static DataFormatConverter getConverterForDataType(DataType originDataType) {
return getConverterForDataType(originDataType, new Context(DEFAULT_TIME_ZONE));
}

/**
* Get {@link DataFormatConverter} for {@link DataType}.
*
* @param originDataType DataFormatConverter is oriented to Java format, while LogicalType has
* lost its specific Java format. Only DataType retains all its
* Java format information.
* @param context context for converter.
*/
@SuppressWarnings("unchecked")
public static DataFormatConverter getConverterForDataType(DataType originDataType, Context context) {
DataType dataType = originDataType.nullable();
DataFormatConverter converter = TYPE_TO_CONVERTER.get(dataType);
if (converter != null) {
Expand Down Expand Up @@ -196,28 +182,28 @@ public static DataFormatConverter getConverterForDataType(DataType originDataTyp
}
if (dataType instanceof CollectionDataType) {
return new ObjectArrayConverter(
((CollectionDataType) dataType).getElementDataType().bridgedTo(clazz.getComponentType()), context);
((CollectionDataType) dataType).getElementDataType().bridgedTo(clazz.getComponentType()));
} else {
BasicArrayTypeInfo typeInfo =
(BasicArrayTypeInfo) ((LegacyTypeInformationType) dataType.getLogicalType()).getTypeInformation();
return new ObjectArrayConverter(
fromLegacyInfoToDataType(typeInfo.getComponentInfo())
.bridgedTo(clazz.getComponentType()), context);
.bridgedTo(clazz.getComponentType()));
}
case MAP:
if (clazz == BinaryMap.class) {
return BinaryMapConverter.INSTANCE;
}
KeyValueDataType keyValueDataType = (KeyValueDataType) dataType;
return new MapConverter(keyValueDataType.getKeyDataType(), keyValueDataType.getValueDataType(), context);
return new MapConverter(keyValueDataType.getKeyDataType(), keyValueDataType.getValueDataType());
case MULTISET:
if (clazz == BinaryMap.class) {
return BinaryMapConverter.INSTANCE;
}
CollectionDataType collectionDataType = (CollectionDataType) dataType;
return new MapConverter(
collectionDataType.getElementDataType(),
DataTypes.INT().bridgedTo(Integer.class), context);
DataTypes.INT().bridgedTo(Integer.class));
case ROW:
case STRUCTURED_TYPE:
CompositeType compositeType = (CompositeType) fromDataTypeToLegacyInfo(dataType);
Expand All @@ -227,13 +213,13 @@ public static DataFormatConverter getConverterForDataType(DataType originDataTyp
if (clazz == BaseRow.class) {
return new BaseRowConverter(compositeType.getArity());
} else if (clazz == Row.class) {
return new RowConverter(fieldTypes, context);
return new RowConverter(fieldTypes);
} else if (Tuple.class.isAssignableFrom(clazz)) {
return new TupleConverter((Class<Tuple>) clazz, fieldTypes, context);
return new TupleConverter((Class<Tuple>) clazz, fieldTypes);
} else if (Product.class.isAssignableFrom(clazz)) {
return new CaseClassConverter((TupleTypeInfoBase) compositeType, fieldTypes, context);
return new CaseClassConverter((TupleTypeInfoBase) compositeType, fieldTypes);
} else {
return new PojoConverter((PojoTypeInfo) compositeType, fieldTypes, context);
return new PojoConverter((PojoTypeInfo) compositeType, fieldTypes);
}
case ANY:
TypeInformation typeInfo = logicalType instanceof LegacyTypeInformationType ?
Expand All @@ -255,15 +241,6 @@ public static DataFormatConverter getConverterForDataType(DataType originDataTyp
return BinaryGenericConverter.INSTANCE;
}
return new GenericConverter(typeInfo.createSerializer(new ExecutionConfig()));
case DATE:
checkArgument(dataType.getConversionClass() == Date.class);
return new DateConverter(context.timeZone);
case TIME_WITHOUT_TIME_ZONE:
checkArgument(dataType.getConversionClass() == Time.class);
return new TimeConverter(context.timeZone);
case TIMESTAMP_WITHOUT_TIME_ZONE:
checkArgument(dataType.getConversionClass() == Timestamp.class);
return new TimestampConverter(context.timeZone);
default:
throw new RuntimeException("Not support dataType: " + dataType);
}
Expand Down Expand Up @@ -742,20 +719,18 @@ public static final class DateConverter extends DataFormatConverter<Integer, Dat

private static final long serialVersionUID = 1343457113582411650L;

private final TimeZone zone;
public static final DateConverter INSTANCE = new DateConverter();

public DateConverter(TimeZone zone) {
this.zone = zone;
}
private DateConverter() {}

@Override
Integer toInternalImpl(Date value) {
return SqlDateTimeUtils.dateToInternal(value, zone);
return SqlDateTimeUtils.dateToInternal(value);
}

@Override
Date toExternalImpl(Integer value) {
return SqlDateTimeUtils.internalToDate(value, zone);
return SqlDateTimeUtils.internalToDate(value);
}

@Override
Expand All @@ -771,20 +746,18 @@ public static final class TimeConverter extends DataFormatConverter<Integer, Tim

private static final long serialVersionUID = -8061475784916442483L;

private final TimeZone zone;
public static final TimeConverter INSTANCE = new TimeConverter();

public TimeConverter(TimeZone zone) {
this.zone = zone;
}
private TimeConverter() {}

@Override
Integer toInternalImpl(Time value) {
return SqlDateTimeUtils.timeToInternal(value, zone);
return SqlDateTimeUtils.timeToInternal(value);
}

@Override
Time toExternalImpl(Integer value) {
return SqlDateTimeUtils.internalToTime(value, zone);
return SqlDateTimeUtils.internalToTime(value);
}

@Override
Expand All @@ -800,20 +773,18 @@ public static final class TimestampConverter extends DataFormatConverter<Long, T

private static final long serialVersionUID = -779956524906131757L;

private final TimeZone zone;
public static final TimestampConverter INSTANCE = new TimestampConverter();

public TimestampConverter(TimeZone zone) {
this.zone = zone;
}
private TimestampConverter() {}

@Override
Long toInternalImpl(Timestamp value) {
return SqlDateTimeUtils.timestampToInternal(value, zone);
return SqlDateTimeUtils.timestampToInternal(value);
}

@Override
Timestamp toExternalImpl(Long value) {
return SqlDateTimeUtils.internalToTimestamp(value, zone);
return SqlDateTimeUtils.internalToTimestamp(value);
}

@Override
Expand Down Expand Up @@ -1018,10 +989,10 @@ public static final class ObjectArrayConverter<T> extends DataFormatConverter<Ba
private transient BinaryArray reuseArray;
private transient BinaryArrayWriter reuseWriter;

public ObjectArrayConverter(DataType elementType, Context context) {
public ObjectArrayConverter(DataType elementType) {
this.componentClass = (Class) elementType.getConversionClass();
this.elementType = LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(elementType);
this.elementConverter = DataFormatConverters.getConverterForDataType(elementType, context);
this.elementConverter = DataFormatConverters.getConverterForDataType(elementType);
this.elementSize = BinaryArray.calculateFixLengthPartSize(this.elementType);
this.eleSer = InternalSerializers.create(this.elementType, new ExecutionConfig());
this.isEleIndentity = elementConverter instanceof IdentityConverter;
Expand Down Expand Up @@ -1136,11 +1107,11 @@ public static final class MapConverter extends DataFormatConverter<BaseMap, Map>
private transient BinaryArray reuseVArray;
private transient BinaryArrayWriter reuseVWriter;

public MapConverter(DataType keyTypeInfo, DataType valueTypeInfo, Context context) {
public MapConverter(DataType keyTypeInfo, DataType valueTypeInfo) {
this.keyType = LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(keyTypeInfo);
this.valueType = LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(valueTypeInfo);
this.keyConverter = DataFormatConverters.getConverterForDataType(keyTypeInfo, context);
this.valueConverter = DataFormatConverters.getConverterForDataType(valueTypeInfo, context);
this.keyConverter = DataFormatConverters.getConverterForDataType(keyTypeInfo);
this.valueConverter = DataFormatConverters.getConverterForDataType(valueTypeInfo);
this.keyElementSize = BinaryArray.calculateFixLengthPartSize(keyType);
this.valueElementSize = BinaryArray.calculateFixLengthPartSize(valueType);
this.keyComponentClass = keyTypeInfo.getConversionClass();
Expand Down Expand Up @@ -1221,10 +1192,10 @@ public abstract static class AbstractBaseRowConverter<E> extends DataFormatConve

protected final DataFormatConverter[] converters;

public AbstractBaseRowConverter(DataType[] fieldTypes, Context context) {
public AbstractBaseRowConverter(DataType[] fieldTypes) {
converters = new DataFormatConverter[fieldTypes.length];
for (int i = 0; i < converters.length; i++) {
converters[i] = getConverterForDataType(fieldTypes[i], context);
converters[i] = getConverterForDataType(fieldTypes[i]);
}
}

Expand Down Expand Up @@ -1260,8 +1231,8 @@ public static final class PojoConverter<T> extends AbstractBaseRowConverter<T> {
private final PojoTypeInfo<T> t;
private final PojoField[] fields;

public PojoConverter(PojoTypeInfo<T> t, DataType[] fieldTypes, Context context) {
super(fieldTypes, context);
public PojoConverter(PojoTypeInfo<T> t, DataType[] fieldTypes) {
super(fieldTypes);
this.fields = new PojoField[t.getArity()];
for (int i = 0; i < t.getArity(); i++) {
fields[i] = t.getPojoFieldAt(i);
Expand Down Expand Up @@ -1305,8 +1276,8 @@ public static final class RowConverter extends AbstractBaseRowConverter<Row> {

private static final long serialVersionUID = -56553502075225785L;

public RowConverter(DataType[] fieldTypes, Context context) {
super(fieldTypes, context);
public RowConverter(DataType[] fieldTypes) {
super(fieldTypes);
}

@Override
Expand Down Expand Up @@ -1337,8 +1308,8 @@ public static final class TupleConverter extends AbstractBaseRowConverter<Tuple>

private final Class<Tuple> clazz;

public TupleConverter(Class<Tuple> clazz, DataType[] fieldTypes, Context context) {
super(fieldTypes, context);
public TupleConverter(Class<Tuple> clazz, DataType[] fieldTypes) {
super(fieldTypes);
this.clazz = clazz;
}

Expand Down Expand Up @@ -1376,8 +1347,8 @@ public static final class CaseClassConverter extends AbstractBaseRowConverter<Pr
private final TupleTypeInfoBase t;
private final TupleSerializerBase serializer;

public CaseClassConverter(TupleTypeInfoBase t, DataType[] fieldTypes, Context context) {
super(fieldTypes, context);
public CaseClassConverter(TupleTypeInfoBase t, DataType[] fieldTypes) {
super(fieldTypes);
this.t = t;
this.serializer = (TupleSerializerBase) t.createSerializer(new ExecutionConfig());
}
Expand All @@ -1400,16 +1371,4 @@ Product toExternalImpl(BaseRow value) {
return (Product) serializer.createInstance(fields);
}
}

/**
* Context for {@link DataFormatConverter}, now just contains time zone.
*/
public static class Context implements Serializable{

private TimeZone timeZone;

public Context(TimeZone timeZone) {
this.timeZone = timeZone;
}
}
}

0 comments on commit 5f0e633

Please sign in to comment.