Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3871] [table] Add Kafka TableSource with Avro serialization #3663

Closed
wants to merge 1 commit into from

Conversation

Projects
None yet
3 participants
@twalthr
Copy link
Contributor

commented Apr 3, 2017

This PR adds a KafkaAvroTableSource. It serializes/deserializes (nested) Avro records to (nested) Flink rows. Avro Utf8 strings are converted to regular Java strings.

@fhueske
Copy link
Contributor

left a comment

Hi @twalthr, thanks for the PR.
I think it looks pretty good and made a few comments.

Regarding the object reuse when converting the Row and Records back and forth, I think should only do this if it can be easily done. If it ends up being very complicated, we might want to skip it and rather include the TableSource in the next release. Eventually, we should also support code generation for TableSources.

Let me know what you think,
Fabian

* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
* Replaces generic Utf8 with basic String type information.
*/
private static TypeInformation<?> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) {

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

Change this to

private static TypeInformation<Row> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema)

and factor out the recursive logic to a method

convertToTypeInfomation(TypeInformation<?> extracted, Schema schema)

?

return extracted;
}

private static <T extends SpecificRecordBase> TypeInformation<?>[] createFieldTypes(Class<T> record) {

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

record -> avroClass?

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

This method can be removed as well, if we refactor KafkaTableSource as described below.

return types;
}

private static String[] createFieldNames(Class<? extends SpecificRecord> record) {

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

record -> avroClass?

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

We could refactor KafkaTableSource to not require a String[] fieldNames and a TypeInformation[] fieldTypes as constructor parameters but just a TypeInformation<Row> rowType.

The field names are a leftover from the time when TableSource did not publish the field names by the TypeInformation.

When we do that, we can remove this method.

throw new RuntimeException("Record type for row type expected. But is: " + schema);
}
final List<Schema.Field> fields = schema.getFields();
final GenericRecord record = new GenericData.Record(schema);

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

Can we reuse the GenericRecord?

This comment has been minimized.

Copy link
@twalthr

twalthr Apr 26, 2017

Author Contributor

I don't think so, because of the recursive execution for nested records.

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 26, 2017

Contributor

Those could be reused as well (and recreated if null at some point).
I think it would be possible, but a bit fiddly.
So let's keep it as it is for now.

// records can be wrapped in a union
if (schema.getType() == Schema.Type.UNION) {
final List<Schema> types = schema.getTypes();
if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

are union types always ordered? Could it happen that type 0 is RECORD and 1 is NULL?

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

This limitation exists because the Table API cannot handle UNION types either, right?
Isn't this the same as having a nullable record field?

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

See comment on UNION in deserializer

This comment has been minimized.

Copy link
@twalthr

twalthr Apr 26, 2017

Author Contributor

Thanks I added the case for the reverse order. This code is needed for nullable records in order to access the schema.

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 26, 2017

Contributor

Ah, OK. I see. Then let's keep it :-)

/**
* Record to deserialize byte array to.
*/
private GenericRecord record;

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

GenericRecord -> SpecificRecord

import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

Change all GenericRecord to SpecificRecord

final Row row = new Row(fields.size());
for (int i = 0; i < fields.size(); i++) {
final Schema.Field field = fields.get(i);
final GenericRecord record = (GenericRecord) recordObj;

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

Move cast out of the loop

throw new RuntimeException("Record type for row type expected. But is: " + schema);
}
final List<Schema.Field> fields = schema.getFields();
final Row row = new Row(fields.size());

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

Can we create a Row once and reuse it here?

This comment has been minimized.

Copy link
@twalthr

twalthr Apr 26, 2017

Author Contributor

No, because Row can again be nested.

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 26, 2017

Contributor

Sure, but the reusable row could also hold nested rows

private static Object convertToRow(Schema schema, Object recordObj) {
if (recordObj instanceof GenericRecord) {
// records can be wrapped in a union
if (schema.getType() == Schema.Type.UNION) {

This comment has been minimized.

Copy link
@fhueske

fhueske Apr 25, 2017

Contributor

Not sure if we should support UNION at all.
If the you have a UNION[NULL, RECORD] field in Avro, you'd expect it to be represented also as UNION field in a Table.
We change it here to a nullable Record field. Not sure if that's expected.

Should we just not accept it (its a corner case anyway) and add support once the Table API / SQL support union types?

@twalthr twalthr force-pushed the twalthr:FLINK-3871 branch from 589e45c to f76216e Apr 26, 2017

twalthr added a commit to twalthr/flink that referenced this pull request Apr 26, 2017

@twalthr twalthr force-pushed the twalthr:FLINK-3871 branch from f76216e to 0c48eeb Apr 26, 2017

@asfgit asfgit closed this in bbc5e29 Apr 26, 2017

PangZhi added a commit to PangZhi/flink that referenced this pull request May 1, 2017

fanyon pushed a commit to fanyon/flink that referenced this pull request May 11, 2017

hequn8128 pushed a commit to hequn8128/flink that referenced this pull request Jun 22, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.