Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flink-formats/flink-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ under the License.
<configuration>
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
<testOutputDirectory>${project.basedir}/target/generated-test-sources/</testOutputDirectory>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;

import org.apache.avro.generic.GenericFixed;
Expand Down Expand Up @@ -128,12 +130,13 @@ private static AvroToRowDataConverter createConverter(
case TIME_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTimestamp;
return createTimestampConverter(((TimestampType) type).getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
if (legacyTimestampMapping) {
throw new UnsupportedOperationException("Unsupported type: " + type);
} else {
return AvroToRowDataConverters::convertToTimestamp;
return createTimestampConverter(
((LocalZonedTimestampType) type).getPrecision());
}
case CHAR:
case VARCHAR:
Expand Down Expand Up @@ -211,24 +214,37 @@ private static AvroToRowDataConverter createMapConverter(
};
}

private static TimestampData convertToTimestamp(Object object) {
final long millis;
if (object instanceof Long) {
millis = (Long) object;
} else if (object instanceof Instant) {
millis = ((Instant) object).toEpochMilli();
} else if (object instanceof LocalDateTime) {
return TimestampData.fromLocalDateTime((LocalDateTime) object);
} else {
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
millis = jodaConverter.convertTimestamp(object);
private static AvroToRowDataConverter createTimestampConverter(int precision) {
return object -> {
if (object instanceof Long) {
long val = (Long) object;
if (precision <= 3) {
return TimestampData.fromEpochMillis(val);
} else if (precision <= 6) {
long millis = Math.floorDiv(val, 1000L);
int nanosOfMilli = (int) (Math.floorMod(val, 1000L) * 1000);
return TimestampData.fromEpochMillis(millis, nanosOfMilli);
} else {
long millis = Math.floorDiv(val, 1000_000L);
int nanosOfMilli = (int) Math.floorMod(val, 1000_000L);
return TimestampData.fromEpochMillis(millis, nanosOfMilli);
}
} else if (object instanceof Instant) {
return TimestampData.fromInstant((Instant) object);
} else if (object instanceof LocalDateTime) {
return TimestampData.fromLocalDateTime((LocalDateTime) object);
} else {
throw new IllegalArgumentException(
"Unexpected object type for TIMESTAMP logical type. Received: " + object);
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
long millis = jodaConverter.convertTimestamp(object);
return TimestampData.fromEpochMillis(millis);
} else {
throw new IllegalArgumentException(
"Unexpected object type for TIMESTAMP logical type. Received: "
+ object);
}
}
}
return TimestampData.fromEpochMillis(millis);
};
}

private static int convertToDate(Object object) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.CollectionUtil;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -156,6 +158,7 @@ public Object convert(Schema schema, Object object) {
};
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int timestampPrecision = ((TimestampType) type).getPrecision();
if (legacyTimestampMapping) {
converter =
new RowDataToAvroConverter() {
Expand All @@ -173,15 +176,26 @@ public Object convert(Schema schema, Object object) {

@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object)
.toLocalDateTime()
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
java.time.Instant instant =
((TimestampData) object)
.toLocalDateTime()
.toInstant(ZoneOffset.UTC);
if (timestampPrecision <= 3) {
return instant.toEpochMilli();
} else if (timestampPrecision <= 6) {
return instant.getEpochSecond() * 1_000_000L
+ instant.getNano() / 1000L;
} else {
return instant.getEpochSecond() * 1_000_000_000L
+ instant.getNano();
}
}
};
}
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int localZonedTimestampPrecision =
((LocalZonedTimestampType) type).getPrecision();
if (legacyTimestampMapping) {
throw new UnsupportedOperationException("Unsupported type: " + type);
} else {
Expand All @@ -191,7 +205,17 @@ public Object convert(Schema schema, Object object) {

@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
java.time.Instant instant =
((TimestampData) object).toInstant();
if (localZonedTimestampPrecision <= 3) {
return instant.toEpochMilli();
} else if (localZonedTimestampPrecision <= 6) {
return instant.getEpochSecond() * 1_000_000L
+ instant.getNano() / 1000L;
} else {
return instant.getEpochSecond() * 1_000_000_000L
+ instant.getNano();
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ private static TypeInformation<?> convertToTypeInfo(
case LONG:
if (legacyTimestampMapping) {
if (schema.getLogicalType() == LogicalTypes.timestampMillis()
|| schema.getLogicalType() == LogicalTypes.timestampMicros()) {
|| schema.getLogicalType() == LogicalTypes.timestampMicros()
|| schema.getLogicalType() == LogicalTypes.timestampNanos()) {
return Types.SQL_TIMESTAMP;
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()
|| schema.getLogicalType() == LogicalTypes.timeMillis()) {
Expand All @@ -203,10 +204,12 @@ private static TypeInformation<?> convertToTypeInfo(
} else {
// Avro logical timestamp types to Flink DataStream timestamp types
if (schema.getLogicalType() == LogicalTypes.timestampMillis()
|| schema.getLogicalType() == LogicalTypes.timestampMicros()) {
|| schema.getLogicalType() == LogicalTypes.timestampMicros()
|| schema.getLogicalType() == LogicalTypes.timestampNanos()) {
return Types.INSTANT;
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()
|| schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
|| schema.getLogicalType() == LogicalTypes.localTimestampMicros()
|| schema.getLogicalType() == LogicalTypes.localTimestampNanos()) {
return Types.LOCAL_DATE_TIME;
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()
|| schema.getLogicalType() == LogicalTypes.timeMillis()) {
Expand Down Expand Up @@ -350,6 +353,8 @@ private static DataType convertToDataType(Schema schema, boolean legacyMapping)
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timestampNanos()) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
Expand All @@ -358,6 +363,8 @@ private static DataType convertToDataType(Schema schema, boolean legacyMapping)
return DataTypes.TIMESTAMP(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
return DataTypes.TIMESTAMP(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.localTimestampNanos()) {
return DataTypes.TIMESTAMP(9).notNull();
}
}

Expand Down Expand Up @@ -479,12 +486,14 @@ public static Schema convertToSchema(
avroLogicalType = LogicalTypes.localTimestampMillis();
} else if (precision <= 6) {
avroLogicalType = LogicalTypes.localTimestampMicros();
} else if (precision <= 9) {
avroLogicalType = LogicalTypes.localTimestampNanos();
} else {
throw new IllegalArgumentException(
"Avro does not support LOCAL TIMESTAMP type "
+ "with precision: "
+ precision
+ ", it only supports precision less than 6.");
+ ", it only supports precision less than 9.");
}
}
Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
Expand All @@ -501,12 +510,14 @@ public static Schema convertToSchema(
avroLogicalType = LogicalTypes.timestampMillis();
} else if (precision <= 6) {
avroLogicalType = LogicalTypes.timestampMicros();
} else if (precision <= 9) {
avroLogicalType = LogicalTypes.timestampNanos();
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type "
+ "with precision: "
+ precision
+ ", it only supports precision less than 6.");
+ ", it only supports precision less than 9.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It supports 9 as well I think.

}
timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat.Codec;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -185,12 +184,11 @@ public User map(Tuple3<String, Integer, String> value) {
user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
user.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS));
// 20.00
user.setTypeDecimalBytes(
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
user.setTypeDecimalFixed(
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));
return user;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.mock.Whitebox;

Expand Down Expand Up @@ -198,13 +197,12 @@ public int getAttemptNumber() {
user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
user.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS));

// 20.00
user.setTypeDecimalBytes(
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
user.setTypeDecimalFixed(
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));

outputFormat.writeRecord(user);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
Expand Down Expand Up @@ -139,12 +138,11 @@ public static void writeTestFile(File testFile) throws IOException {
user1.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
user1.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS));
// 20.00
user1.setTypeDecimalBytes(
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user1.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
user1.setTypeDecimalFixed(
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user1.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));

// Construct via builder
User user2 =
Expand Down Expand Up @@ -179,14 +177,12 @@ public static void writeTestFile(File testFile) throws IOException {
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
.setTypeTimestampNanos(
Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS))
// 20.00
.setTypeDecimalBytes(
ByteBuffer.wrap(
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
// 20.00
.setTypeDecimalFixed(
new Fixed2(
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
Expand Down
Loading