Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,27 +181,35 @@ public static Schema convertToSchema(LogicalType logicalType) {

public static Schema convertToSchema(LogicalType logicalType, String rowName) {
int precision;
boolean isNullable = logicalType.isNullable();
switch (logicalType.getTypeRoot()) {
case NULL:
return SchemaBuilder.builder().nullType();
case BOOLEAN:
return getNullableBuilder(logicalType).booleanType();
Schema booleanType = SchemaBuilder.builder().booleanType();
return isNullable ? nullableSchema(booleanType) : booleanType;
case TINYINT:
case SMALLINT:
case INTEGER:
return getNullableBuilder(logicalType).intType();
Schema intType = SchemaBuilder.builder().intType();
return isNullable ? nullableSchema(intType) : intType;
case BIGINT:
return getNullableBuilder(logicalType).longType();
Schema longType = SchemaBuilder.builder().longType();
return isNullable ? nullableSchema(longType) : longType;
case FLOAT:
return getNullableBuilder(logicalType).floatType();
Schema floatType = SchemaBuilder.builder().floatType();
return isNullable ? nullableSchema(floatType) : floatType;
case DOUBLE:
return getNullableBuilder(logicalType).doubleType();
Schema doubleType = SchemaBuilder.builder().doubleType();
return isNullable ? nullableSchema(doubleType) : doubleType;
case CHAR:
case VARCHAR:
return getNullableBuilder(logicalType).stringType();
Schema stringType = SchemaBuilder.builder().stringType();
return isNullable ? nullableSchema(stringType) : stringType;
case BINARY:
case VARBINARY:
return getNullableBuilder(logicalType).bytesType();
Schema bytesType = SchemaBuilder.builder().bytesType();
return isNullable ? nullableSchema(bytesType) : bytesType;
case TIMESTAMP_WITHOUT_TIME_ZONE:
// use long to represents Timestamp
final TimestampType timestampType = (TimestampType) logicalType;
Expand All @@ -213,10 +221,12 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
throw new IllegalArgumentException("Avro does not support TIMESTAMP type " +
"with precision: " + precision + ", it only supports precision less than 3.");
}
return avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
Schema timestampeType = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
return isNullable ? nullableSchema(timestampeType) : timestampeType;
case DATE:
// use int to represents Date
return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
Schema dateType = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
return isNullable ? nullableSchema(dateType) : dateType;
case TIME_WITHOUT_TIME_ZONE:
precision = ((TimeType) logicalType).getPrecision();
if (precision > 3) {
Expand All @@ -225,13 +235,17 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
", it only supports precision less than 3.");
}
// use int to represents Time, we only support millisecond when deserialization
return LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
Schema timeType = LogicalTypes
.timeMillis()
.addToSchema(SchemaBuilder.builder().intType());
return isNullable ? nullableSchema(timeType) : timeType;
case DECIMAL:
DecimalType decimalType = (DecimalType) logicalType;
DecimalType decimalLogicalType = (DecimalType) logicalType;
// store BigDecimal as byte[]
return LogicalTypes
.decimal(decimalType.getPrecision(), decimalType.getScale())
Schema decimalType = LogicalTypes
.decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale())
.addToSchema(SchemaBuilder.builder().bytesType());
return isNullable ? nullableSchema(decimalType) : decimalType;
case ROW:
RowType rowType = (RowType) logicalType;
List<String> fieldNames = rowType.getFieldNames();
Expand All @@ -241,27 +255,33 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
.record(rowName)
.fields();
for (int i = 0; i < rowType.getFieldCount(); i++) {
String fieldName = rowName + "_" + fieldNames.get(i);
builder = builder
String fieldName = fieldNames.get(i);
LogicalType fieldType = rowType.getTypeAt(i);
SchemaBuilder.GenericDefault<Schema> fieldBuilder = builder
.name(fieldName)
.type(convertToSchema(rowType.getTypeAt(i), fieldName))
.noDefault();
.type(convertToSchema(fieldType, rowName + "_" + fieldName));

if (fieldType.isNullable()) {
builder = fieldBuilder.withDefault(null);
} else {
builder = fieldBuilder.noDefault();
}
}
return builder.endRecord();
case MULTISET:
case MAP:
return SchemaBuilder
Schema mapType = SchemaBuilder
.builder()
.nullable()
.map()
.values(convertToSchema(extractValueTypeToAvroMap(logicalType), rowName));
return isNullable ? nullableSchema(mapType) : mapType;
case ARRAY:
ArrayType arrayType = (ArrayType) logicalType;
return SchemaBuilder
ArrayType arrayLogicalType = (ArrayType) logicalType;
Schema arrayType = SchemaBuilder
.builder()
.nullable()
.array()
.items(convertToSchema(arrayType.getElementType(), rowName));
.items(convertToSchema(arrayLogicalType.getElementType(), rowName));
return isNullable ? nullableSchema(arrayType) : arrayType;
case RAW:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
default:
Expand Down Expand Up @@ -289,11 +309,8 @@ public static LogicalType extractValueTypeToAvroMap(LogicalType type) {
return valueType;
}

private static SchemaBuilder.BaseTypeBuilder<Schema> getNullableBuilder(LogicalType logicalType) {
SchemaBuilder.TypeBuilder<Schema> builder = SchemaBuilder.builder();
if (logicalType.isNullable()) {
return builder.nullable();
}
return builder;
/** Returns schema with nullable true. */
private static Schema nullableSchema(Schema schema) {
return Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ public void testSpecificType() throws Exception {
byte[] input = byteArrayOutputStream.toByteArray();

DataType dataType = ROW(
FIELD("type_timestamp_millis", TIMESTAMP(3)),
FIELD("type_date", DATE()),
FIELD("type_time_millis", TIME(3)));
FIELD("type_timestamp_millis", TIMESTAMP(3).notNull()),
FIELD("type_date", DATE().notNull()),
FIELD("type_time_millis", TIME(3).notNull()));
final RowType rowType = (RowType) dataType.getLogicalType();
final TypeInformation<RowData> typeInfo = new RowDataTypeInfo(rowType);
AvroRowDataSerializationSchema serializationSchema = new AvroRowDataSerializationSchema(rowType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,27 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.utils.AvroTestUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DecoderFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/**
Expand All @@ -54,6 +64,44 @@ public void testAvroSchemaConversion() {
validateUserSchema(AvroSchemaConverter.convertToTypeInfo(schema));
}

@Test
public void testAddingOptionalField() throws IOException {
Schema oldSchema = SchemaBuilder.record("record")
.fields()
.requiredLong("category_id")
.optionalString("name")
.endRecord();

Schema newSchema = AvroSchemaConverter.convertToSchema(
DataTypes.ROW(
DataTypes.FIELD("category_id", DataTypes.BIGINT().notNull()),
DataTypes.FIELD("name", DataTypes.STRING().nullable()),
DataTypes.FIELD("description", DataTypes.STRING().nullable())
).getLogicalType()
);

byte[] serializedRecord = AvroTestUtils.writeRecord(
new GenericRecordBuilder(oldSchema)
.set("category_id", 1L)
.set("name", "test")
.build(),
oldSchema
);
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(
oldSchema,
newSchema);
GenericRecord newRecord = datumReader.read(
null,
DecoderFactory.get().binaryDecoder(serializedRecord, 0, serializedRecord.length, null));
assertThat(
newRecord,
equalTo(new GenericRecordBuilder(newSchema)
.set("category_id", 1L)
.set("name", "test")
.set("description", null)
.build()));
}

@Test
public void testInvalidRawTypeAvroSchemaConversion() {
RowType rowType = (RowType) TableSchema.builder()
Expand Down Expand Up @@ -97,48 +145,55 @@ public void testRowTypeAvroSchemaConversion() {
DataTypes.FIELD("row3", DataTypes.ROW(DataTypes.FIELD("c", DataTypes.STRING())))))
.build().toRowDataType().getLogicalType();
Schema schema = AvroSchemaConverter.convertToSchema(rowType);
assertEquals("{\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"record_row1\",\n" +
" \"type\" : {\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record_row1\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"record_row1_a\",\n" +
" \"type\" : [ \"string\", \"null\" ]\n" +
" } ]\n" +
" }\n" +
" }, {\n" +
" \"name\" : \"record_row2\",\n" +
" \"type\" : {\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record_row2\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"record_row2_b\",\n" +
" \"type\" : [ \"string\", \"null\" ]\n" +
" } ]\n" +
" }\n" +
" }, {\n" +
" \"name\" : \"record_row3\",\n" +
" \"type\" : {\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record_row3\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"record_row3_row3\",\n" +
" \"type\" : {\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record_row3_row3\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"record_row3_row3_c\",\n" +
" \"type\" : [ \"string\", \"null\" ]\n" +
" } ]\n" +
" }\n" +
" } ]\n" +
" }\n" +
" } ]\n" +
"}", schema.toString(true));
assertEquals("{\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"row1\",\n"
+ " \"type\" : {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record_row1\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"a\",\n"
+ " \"type\" : [ \"null\", \"string\" ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ " },\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"row2\",\n"
+ " \"type\" : {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record_row2\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"b\",\n"
+ " \"type\" : [ \"null\", \"string\" ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ " },\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"row3\",\n"
+ " \"type\" : {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record_row3\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"row3\",\n"
+ " \"type\" : {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record_row3_row3\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"c\",\n"
+ " \"type\" : [ \"null\", \"string\" ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ " },\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ " },\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ "}", schema.toString(true));
}

private void validateUserSchema(TypeInformation<?> actual) {
Expand Down