Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add avro stream input format #11040

Merged
merged 9 commits into from
Apr 13, 2021

Conversation

bananaaggle
Copy link
Contributor

Because of deprecated of parseSpec, I develop AvroStreamInputFormat for new interface, which supports stream ingestion for data encoded by Avro.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@bananaaggle
Copy link
Contributor Author

@clintropolis Hi, I implement AvroStreamInputFormat as I mentioned last weekend. Can you review it and help me refine it?

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code LGTM 👍

This PR needs docs added to https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md I think before it is ready to go.

I think it should also be relatively easy to add an integration test for this since we already have an integration test for the Parser implementation of Avro + Schema Registry. All that needs done is a new input_format directory be created in this location https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data/avro_schema_registry with a new input_format.json template (using the InputFormat instead of the Parser). See JSON for example: https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data/json. If this template is added, then i think it should be automatically picked up and run as part of the kafka data format integration tests.

Comment on lines 86 to 95
final AvroStreamInputFormat that = (AvroStreamInputFormat) o;
return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) &&
Objects.equals(avroBytesDecoder, that.avroBytesDecoder);
}

@Override
public int hashCode()
{
return Objects.hash(getFlattenSpec(), avroBytesDecoder);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equality/hashcode should probably consider binaryAsString for their computations

Comment on lines 67 to 69
return CloseableIterators.withEmptyBaggage(
Iterators.singletonIterator(avroBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())
))));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: strange formatting (occasionally style bot doesn't pick stuff up)

Suggested change
return CloseableIterators.withEmptyBaggage(
Iterators.singletonIterator(avroBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())
))));
return CloseableIterators.withEmptyBaggage(
Iterators.singletonIterator(avroBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()))))
);

@bananaaggle
Copy link
Contributor Author

bananaaggle commented Mar 30, 2021

code LGTM 👍

This PR needs docs added to https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md I think before it is ready to go.

I think it should also be relatively easy to add an integration test for this since we already have an integration test for the Parser implementation of Avro + Schema Registry. All that needs done is a new input_format directory be created in this location https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data/avro_schema_registry with a new input_format.json template (using the InputFormat instead of the Parser). See JSON for example: https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data/json. If this template is added, then i think it should be automatically picked up and run as part of the kafka data format integration tests.

Sorry, I don't understand what you said. Do you mean an integration test is developing for Avro stream and when it finished, I can add a new json about this test? Or I need create this integration test by myself?

@bananaaggle
Copy link
Contributor Author

@clintropolis Document done. Do I need to create integration test? Is there some examples of it? I'm interesting about it.

@clintropolis
Copy link
Member

clintropolis commented Mar 31, 2021

Sorry, I don't understand what you said. Do you mean an integration test is developing for Avro stream and when it finished, I can add a new json about this test? Or I need create this integration test by myself?

Ah sorry, let me try to explain a bit more. So in the case of avro inline schema and avro schema registry, you should be able to just add the JSON files with the InputFormat template and get the tests for free. The integration test is ITKafkaIndexingServiceDataFormatTest is used to test the same data with kafka streaming using a variety of different data formats which are supported by kafka ingestion. It works by iterating over the JSON templates in https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data to test each data-format present in that directory with the same set of data. For each of these data formats in the integration tests, there is implemented a corresponding EventSerializer, which writes data for the tests to the Kafka stream for the format, and the parser or inputFormat templates are then used to construct a supervisor to spawn indexing tasks to read the data and run the actual tests to verify stuff is working correctly.

AvroEventSerializer, and AvroSchemaRegistryEventSerializer are already present because there are integration tests using the Avro stream parsers (avro inline and avro schema registry), they are just missing the input format templates because it didn't exist until this PR (JSON, CSV, and TSV do have input format templates, which might be useful as a reference). You should just be able to adapt those parser templates into the equivalent input format template.

Comment on lines 238 to 246
"type" : "schema_repo",
"subjectAndIdConverter" : {
"type" : "avro_1124",
"topic" : "${YOUR_TOPIC}"
},
"schemaRepository" : {
"type" : "avro_1124_rest_client",
"url" : "${YOUR_SCHEMA_REPO_END_POINT}",
}
Copy link
Member

@clintropolis clintropolis Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we should switch to using 'inline' or 'schema-registry' as the example instead of 'schema_repo', which isn't used as frequently in practice as far as I know.

|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
|`avroBytesDecoder`| JSON Object |Specifies how to decode bytes to Avro record. | yes |
| binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md#avro-bytes-decoder (which currently lives with the 'parsers' documentation) up to this 'input formats' section, and have the parsers section link to the bytes decoder docs here.

@clintropolis
Copy link
Member

clintropolis commented Apr 9, 2021

I haven't quite determined what is going on yet, but it seems like there is some sort of serialization error that is causing the newly added schema-registry input format integration test to fail:

Caused by: com.fasterxml.jackson.databind.exc.ValueInstantiationException: Cannot construct instance of `org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder`, problem: Expected at least one URL to be passed in constructor
 at [Source: (byte[])"{"type":"index_kafka","id":"index_kafka_kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?_a0ffca1f01390e3_ejndffbp","resource":{"availabilityGroup":"index_kafka_kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?_a0ffca1f01390e3","requiredCapacity":1},"dataSchema":{"dataSource":"kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?","[truncated 4940 bytes]; line: 1, column: 5072] (through reference chain: org.apache.druid.indexing.kafka.KafkaIndexTask["ioConfig"]->org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig["inputFormat"]->org.apache.druid.data.input.avro.AvroStreamInputFormat["avroBytesDecoder"])
	at com.fasterxml.jackson.databind.exc.ValueInstantiationException.from(ValueInstantiationException.java:47)
	at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1735)
	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.wrapAsJsonMappingException(StdValueInstantiator.java:491)
	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.rewrapCtorProblem(StdValueInstantiator.java:514)
	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:285)

https://travis-ci.com/github/apache/druid/jobs/496046398#L9197

The inline schema test is passing 👍

@clintropolis
Copy link
Member

clintropolis commented Apr 9, 2021

It seems like https://github.com/apache/druid/blob/master/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java#L50 is missing getter methods annotated with @JsonProperty, which I suspect is related to the test failure. I'm not really sure how the parser based integration test is passing since it doesn't seem like serializing the schema-registry bytes decoder should work... (looking into this).

Could you add serialization round trip tests for more AvroBytesDecoder implementations to your unit tests with the input format so we can get coverage on this?

@bananaaggle
Copy link
Contributor Author

I haven't quite determined what is going on yet, but it seems like there is some sort of serialization error that is causing the newly added schema-registry input format integration test to fail:

Caused by: com.fasterxml.jackson.databind.exc.ValueInstantiationException: Cannot construct instance of `org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder`, problem: Expected at least one URL to be passed in constructor
 at [Source: (byte[])"{"type":"index_kafka","id":"index_kafka_kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?_a0ffca1f01390e3_ejndffbp","resource":{"availabilityGroup":"index_kafka_kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?_a0ffca1f01390e3","requiredCapacity":1},"dataSchema":{"dataSource":"kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?","[truncated 4940 bytes]; line: 1, column: 5072] (through reference chain: org.apache.druid.indexing.kafka.KafkaIndexTask["ioConfig"]->org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig["inputFormat"]->org.apache.druid.data.input.avro.AvroStreamInputFormat["avroBytesDecoder"])
	at com.fasterxml.jackson.databind.exc.ValueInstantiationException.from(ValueInstantiationException.java:47)
	at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1735)
	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.wrapAsJsonMappingException(StdValueInstantiator.java:491)
	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.rewrapCtorProblem(StdValueInstantiator.java:514)
	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:285)

https://travis-ci.com/github/apache/druid/jobs/496046398#L9197

The inline schema test is passing 👍

I test it last week and I know something wrong with schema registry decoder. I will fix it this weekend, thanks for your advice and I will change code for this exception.

@bananaaggle
Copy link
Contributor Author

@clintropolis Hi, I fix this bug and pass integration-tests. Then I add one unit test for this.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for fixing this up 👍

public AvroStreamInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder,
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing a binaryAsString getter annotated with @JsonProperty i think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks @bananaaggle 👍

@clintropolis clintropolis merged commit d0a94a8 into apache:master Apr 13, 2021
@clintropolis clintropolis mentioned this pull request Apr 13, 2021
6 tasks
@clintropolis clintropolis added this to the 0.22.0 milestone Aug 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants