Skip to content

Commit

Permalink
[FLINK-9444] [formats] Add full SQL support for Avro formats
Browse files Browse the repository at this point in the history
This PR adds full support of Apache Avro records for the Table API & SQL. It adds (de)serialization schemas to the row type for both specific and generic records. It converts all Avro types to Flink types and vice versa. It supports both physical and logical Avro types. Both an Avro class or a Avro schema string can be used for format initialization.

This closes #6218.
This closes #6082.
  • Loading branch information
twalthr committed Jul 3, 2018
1 parent 19040a6 commit c34c7e4
Show file tree
Hide file tree
Showing 35 changed files with 1,819 additions and 674 deletions.
73 changes: 72 additions & 1 deletion docs/dev/table/sqlClient.md
Expand Up @@ -237,6 +237,7 @@ The SQL Client does not require to setup a Java project using Maven or SBT. Inst
| :---------------- | :--------------------- |
| CSV | Built-in |
| JSON | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) |
| Apache Avro | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) |

{% endif %}

Expand Down Expand Up @@ -476,7 +477,7 @@ The CSV format is included in Flink and does not require an additional JAR file.

#### JSON Format

The JSON format allows to read JSON data that corresponds to a given format schema. The format schema can be defined either as a Flink [type string](sqlClient.html#type-strings), as a JSON schema, or derived from the desired table schema. A type string enables a more SQL-like definition and mapping to the corresponding SQL data types. The JSON schema allows for more complex and nested structures.
The JSON format allows to read and write JSON data that corresponds to a given format schema. The format schema can be defined either as a Flink [type string](sqlClient.html#type-strings), as a JSON schema, or derived from the desired table schema. A type string enables a more SQL-like definition and mapping to the corresponding SQL data types. The JSON schema allows for more complex and nested structures.

If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for defining schema information only once. The names, types, and field order of the format are determined by the table's schema. Time attributes are ignored. A `from` definition in the table schema is interpreted as a field renaming in the format.

Expand Down Expand Up @@ -507,6 +508,23 @@ format:
derive-schema: true
{% endhighlight %}

The following table shows the mapping of JSON schema types to Flink SQL types:

| JSON schema | Flink SQL |
| :-------------------------------- | :---------------------- |
| `object` | `ROW` |
| `boolean` | `BOOLEAN` |
| `array` | `ARRAY[_]` |
| `number` | `DECIMAL` |
| `integer` | `DECIMAL` |
| `string` | `VARCHAR` |
| `string` with `format: date-time` | `TIMESTAMP` |
| `string` with `format: date` | `DATE` |
| `string` with `format: time` | `TIME` |
| `string` with `encoding: base64` | `ARRAY[TINYINT]` |
| `null` | `NULL` (unsupported yet)|


Currently, Flink supports only a subset of the [JSON schema specification](http://json-schema.org/) `draft-07`. Union types (as well as `allOf`, `anyOf`, `not`) are not supported yet. `oneOf` and arrays of types are only supported for specifying nullability.

Simple references that link to a common definition in the document are supported as shown in the more complex example below:
Expand Down Expand Up @@ -558,6 +576,59 @@ Simple references that link to a common definition in the document are supported

Make sure to download the [JSON SQL JAR](sqlClient.html#dependencies) file and pass it to the SQL Client.

#### Apache Avro Format

The [Apache Avro](https://avro.apache.org/) format allows to read and write Avro data that corresponds to a given format schema. The format schema can be defined either as a fully qualified class name of an Avro specific record or as an Avro schema string. If a class name is used, the class must be available in the classpath during runtime.

{% highlight yaml %}
format:
type: avro

# required: define the schema either by using an Avro specific record class
record-class: "org.organization.types.User"

# or by using an Avro schema
avro-schema: >
{
"type": "record",
"name": "test",
"fields" : [
{"name": "a", "type": "long"},
{"name": "b", "type": "string"}
]
}
{% endhighlight %}

Avro types are mapped to the corresponding SQL data types. Union types are only supported for specifying nullability otherwise they are converted to an `ANY` type. The following table shows the mapping:

| Avro schema | Flink SQL |
| :------------------------------------------ | :---------------------- |
| `record` | `ROW` |
| `enum` | `VARCHAR` |
| `array` | `ARRAY[_]` |
| `map` | `MAP[VARCHAR, _]` |
| `union` | non-null type or `ANY` |
| `fixed` | `ARRAY[TINYINT]` |
| `string` | `VARCHAR` |
| `bytes` | `ARRAY[TINYINT]` |
| `int` | `INT` |
| `long` | `BIGINT` |
| `float` | `FLOAT` |
| `double` | `DOUBLE` |
| `boolean` | `BOOLEAN` |
| `int` with `logicalType: date` | `DATE` |
| `int` with `logicalType: time-millis` | `TIME` |
| `int` with `logicalType: time-micros` | `INT` |
| `long` with `logicalType: timestamp-millis` | `TIMESTAMP` |
| `long` with `logicalType: timestamp-micros` | `BIGINT` |
| `bytes` with `logicalType: decimal` | `DECIMAL` |
| `fixed` with `logicalType: decimal` | `DECIMAL` |
| `null` | `NULL` (unsupported yet)|

Avro uses [Joda-Time](http://www.joda.org/joda-time/) for representing logical date and time types in specific record classes. The Joda-Time dependency is not part of Flink's SQL JAR distribution. Therefore, make sure that Joda-Time is in your classpath together with your specific record class during runtime. Avro formats specified via a schema string do not require Joda-Time to be present.

Make sure to download the [Apache Avro SQL JAR](sqlClient.html#dependencies) file and pass it to the SQL Client.

{% top %}

Limitations & Future
Expand Down
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.StreamTableSource;
Expand Down Expand Up @@ -64,7 +64,7 @@ protected KafkaAvroTableSource(
topic,
properties,
schema,
AvroRecordClassConverter.convert(avroRecordClass));
AvroSchemaConverter.convertToTypeInfo(avroRecordClass));

this.avroRecordClass = avroRecordClass;
}
Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.AvroValidator;
import org.apache.flink.table.descriptors.DescriptorProperties;
Expand Down Expand Up @@ -65,7 +65,7 @@ protected KafkaTableSource.Builder createBuilderWithFormat(DescriptorProperties
final Class<? extends SpecificRecordBase> avroRecordClass =
params.getClass(AvroValidator.FORMAT_RECORD_CLASS, SpecificRecordBase.class);
builder.forAvroRecordClass(avroRecordClass);
final TableSchema formatSchema = TableSchema.fromTypeInfo(AvroRecordClassConverter.convert(avroRecordClass));
final TableSchema formatSchema = TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo(avroRecordClass));

// field mapping
final Map<String, String> mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));
Expand Down
Expand Up @@ -18,17 +18,14 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.avro.utils.AvroTestUtils;
import org.apache.flink.formats.avro.generated.DifferentSchemaRecord;
import org.apache.flink.formats.avro.generated.SchemaRecord;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Types;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.junit.Test;

import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -44,7 +41,7 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
@Override
protected void configureBuilder(KafkaTableSource.Builder builder) {
super.configureBuilder(builder);
((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SameFieldsAvroClass.class);
((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SchemaRecord.class);
}

@Test
Expand All @@ -67,8 +64,8 @@ public void testSameFieldsAvroClass() {
// check field types
assertEquals(Types.LONG(), returnType.getTypeAt(0));
assertEquals(Types.STRING(), returnType.getTypeAt(1));
assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(3));
assertEquals(Types.LONG(), returnType.getTypeAt(2));
assertEquals(Types.LONG(), returnType.getTypeAt(3));
assertEquals(Types.DOUBLE(), returnType.getTypeAt(4));

// check field mapping
Expand All @@ -91,7 +88,7 @@ public void testDifferentFieldsAvroClass() {
mapping.put("field3", "otherField3");

// set Avro class with different fields
b.forAvroRecordClass(DifferentFieldsAvroClass.class);
b.forAvroRecordClass(DifferentSchemaRecord.class);
b.withTableToAvroMapping(mapping);

KafkaAvroTableSource source = (KafkaAvroTableSource) b.build();
Expand All @@ -110,9 +107,9 @@ public void testDifferentFieldsAvroClass() {
// check field types
assertEquals(Types.LONG(), returnType.getTypeAt(0));
assertEquals(Types.STRING(), returnType.getTypeAt(1));
assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
assertEquals(Types.LONG(), returnType.getTypeAt(2));
assertEquals(Types.DOUBLE(), returnType.getTypeAt(3));
assertEquals(Types.BYTE(), returnType.getTypeAt(4));
assertEquals(Types.FLOAT(), returnType.getTypeAt(4));
assertEquals(Types.INT(), returnType.getTypeAt(5));

// check field mapping
Expand All @@ -127,68 +124,4 @@ public void testDifferentFieldsAvroClass() {
assertEquals(source.getReturnType(),
source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
}

/**
* Avro record that matches the table schema.
*/
@SuppressWarnings("unused")
public static class SameFieldsAvroClass extends SpecificRecordBase {

//CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(FIELD_NAMES, FIELD_TYPES);
//CHECKSTYLE.ON: StaticVariableNameCheck

public Long field1;
public String field2;
public Timestamp time1;
public Timestamp time2;
public Double field3;

@Override
public Schema getSchema() {
return null;
}

@Override
public Object get(int field) {
return null;
}

@Override
public void put(int field, Object value) { }
}

/**
* Avro record that does NOT match the table schema.
*/
@SuppressWarnings("unused")
public static class DifferentFieldsAvroClass extends SpecificRecordBase {

//CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(
new String[]{"otherField1", "otherField2", "otherTime1", "otherField3", "otherField4", "otherField5"},
new TypeInformation[]{Types.LONG(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.DOUBLE(), Types.BYTE(), Types.INT()});
//CHECKSTYLE.ON: StaticVariableNameCheck

public Long otherField1;
public String otherField2;
public Timestamp otherTime1;
public Double otherField3;
public Byte otherField4;
public Integer otherField5;

@Override
public Schema getSchema() {
return null;
}

@Override
public Object get(int field) {
return null;
}

@Override
public void put(int field, Object value) { }
}

}
Expand Up @@ -249,7 +249,22 @@ public boolean canEqual(Object obj) {

@Override
public int hashCode() {
return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
return 31 * super.hashCode();
}

/**
* The equals method does only check for field types. Field names do not matter during
* runtime so we can consider rows with the same field types as equal.
* Use {@link RowTypeInfo#schemaEquals(Object)} for checking schema-equivalence.
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof RowTypeInfo) {
final RowTypeInfo other = (RowTypeInfo) obj;
return other.canEqual(this) && super.equals(other);
} else {
return false;
}
}

@Override
Expand All @@ -274,6 +289,13 @@ public TypeInformation<?>[] getFieldTypes() {
return types;
}

/**
* Tests whether an other object describes the same, schema-equivalent row information.
*/
public boolean schemaEquals(Object obj) {
return equals(obj) && Arrays.equals(fieldNames, ((RowTypeInfo) obj).fieldNames);
}

private boolean hasDuplicateFieldNames(String[] fieldNames) {
HashSet<String> names = new HashSet<>();
for (String field : fieldNames) {
Expand Down
Expand Up @@ -21,14 +21,16 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.common.typeutils.TypeInformationTestBase;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Test for {@link RowTypeInfo}.
Expand All @@ -47,7 +49,10 @@ protected RowTypeInfo[] getTestData() {
return new RowTypeInfo[] {
new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO),
new RowTypeInfo(typeList)
new RowTypeInfo(typeList),
new RowTypeInfo(
new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO},
new String[]{"int", "int2"})
};
}

Expand Down Expand Up @@ -123,4 +128,24 @@ public void testNestedRowTypeInfo() {
assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString());
}

@Test
public void testSchemaEquals() {
final RowTypeInfo row1 = new RowTypeInfo(
new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
new String[] {"field1", "field2"});
final RowTypeInfo row2 = new RowTypeInfo(
new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
new String[] {"field1", "field2"});
assertTrue(row1.schemaEquals(row2));

final RowTypeInfo other1 = new RowTypeInfo(
new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
new String[] {"otherField", "field2"});
final RowTypeInfo other2 = new RowTypeInfo(
new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
new String[] {"field1", "field2"});
assertFalse(row1.schemaEquals(other1));
assertFalse(row1.schemaEquals(other2));
}

}
Expand Up @@ -55,9 +55,9 @@ protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
TypeInformation<T> typeInfo = new GenericTypeInfo<T>(type);
return typeInfo.createSerializer(conf);
}

public static final class LocalDateSerializer extends Serializer<LocalDate> implements java.io.Serializable {

private static final long serialVersionUID = 1L;

@Override
Expand All @@ -66,10 +66,10 @@ public void write(Kryo kryo, Output output, LocalDate object) {
output.writeInt(object.getMonthOfYear());
output.writeInt(object.getDayOfMonth());
}

@Override
public LocalDate read(Kryo kryo, Input input, Class<LocalDate> type) {
return new LocalDate(input.readInt(), input.readInt(), input.readInt());
}
}
}
}

0 comments on commit c34c7e4

Please sign in to comment.