-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add protobuf inputformat #11018
add protobuf inputformat #11018
Conversation
@clintropolis Hi, I create ProtobufInputFormat followed your suggestion. I don't use this interface before, so I'm not very familiar with it. Can you review my code and tell me if this implementation meet requirement or not? If this implementation is correct, I will add more unit tests. By the way, where should I change in document about this feature? |
@clintropolis I reviewed code about Avro for inputFormat and learnt it only supports batch ingestion jobs. Why do we not support stream ingestion jobs? I think it's not very hard to implement it and I'm glad to do that. |
Thanks! I will have a look this weekend. I think https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md is the appropriate place to document the new
👍 The only reason streaming Avro isn't supported yet is basically the same reason it wasn't done for Protobuf, simply that no one has done the conversion. I think it would be great if you would like to take that on, especially since I think Avro and Protobuf (until this PR) are the only "core" extensions that do not yet support |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the delay in having a look, overall lgtm 👍
@@ -37,7 +37,8 @@ | |||
return Collections.singletonList( | |||
new SimpleModule("ProtobufInputRowParserModule") | |||
.registerSubtypes( | |||
new NamedType(ProtobufInputRowParser.class, "protobuf") | |||
new NamedType(ProtobufInputRowParser.class, "protobuf"), | |||
new NamedType(ProtobufInputFormat.class, "protobuf_format") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could just be protobuf
the same as the parser name, since they are separate interfaces
return CloseableIterators.withEmptyBaggage( | ||
Iterators.singletonIterator(JsonFormat.printer().print(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()))) | ||
))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The InputRowParser
implementation for protobuf has an optimization that skips the conversion to JSON if a flattenSpec is not defined (see #9999), since the overhead to convert to be able to flatten can slow input processing a fair bit (from the numbers in that PR).
To retain this, it might make sense to make the intermediary format be ByteBuffer
or byte[]
, and handle the case of having a flattenSpec
or not separately. I think these could probably be done within this same class, just make parseInputRows
behave differently for each situation, and it maybe makes sense to use JSON conversion for the toMap
method (it is by InputSourceSampler
for the sampler API).
@clintropolis Thanks for your review. I didn't know this feature before, it's a good idea to change code for this optimization. And do you have other suggestions about my code? If no more concerns, I'll add unit test and document for it. |
Everything else looked good to me 👍 |
@clintropolis I've add document for this feature and supple document for last commit. Can you review those documents and help me refine it? And what else do you think I should add for unit tests? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs are almost there I think, 👍
As far as additional testing goes, I guess this covers most of the InputFormat
stuff, and a lot of the other stuff, such as the decoding logic itself is already covered by other tests.
I guess serialization/deserialization tests on ProtobufInputFormat
would be good to make sure that JSON requests work as expected when submitting tasks/supervisor specs. Beyond that, integration tests would probably be the most useful, but that can be done in a separate PR since there is going to be a bit of setup to do to get that working.
return CloseableIterators.withEmptyBaggage( | ||
Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()) | ||
)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: formatting is sort of funny here
return CloseableIterators.withEmptyBaggage( | |
Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()) | |
)))); | |
return CloseableIterators.withEmptyBaggage( | |
Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())))) | |
); |
docs/ingestion/data-formats.md
Outdated
@@ -1104,6 +1142,83 @@ Sample spec: | |||
See the [extension description](../development/extensions-core/protobuf.md) for | |||
more details and examples. | |||
|
|||
#### Protobuf Bytes Decoder | |||
|
|||
If `type` is not included, the avroBytesDecoder defaults to `schema_registry`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If `type` is not included, the avroBytesDecoder defaults to `schema_registry`. | |
If `type` is not included, the `protoBytesDecoder` defaults to `schema_registry`. |
docs/ingestion/data-formats.md
Outdated
|
||
| Field | Type | Description | Required | | ||
|-------|------|-------------|----------| | ||
|type| String| This should be set to `protobuf` to read Protobuf file| yes | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I guess this is more stream oriented, maybe:
|type| String| This should be set to `protobuf` to read Protobuf file| yes | | |
|type| String| This should be set to `protobuf` to read Protobuf serialized data| yes | |
or "to read Protobuf formatted data" or similar.
docs/ingestion/data-formats.md
Outdated
} | ||
] | ||
}, | ||
"binaryAsString": false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an Avro/Parquet/ORC parameter
I've changed document and add a serialization test. What is the integration tests you mentioned above? Can you give me an example for implementing it? @clintropolis |
This pull request introduces 2 alerts when merging 1751a3f into 6789ed0 - view on LGTM.com new alerts:
|
@clintropolis Hi, I add one unit test for serialization/deserialization when use schema registry. As for integration test, should I add it in this PR or other PR after this PR is merged? |
I think it would be fine to do the integration test as a follow-up, since it is a bit more involved than the Avro input format due to there not being an existing integration test for protobuf. Adding it would be pretty similar to what is already there for Avro and the other formats though, with the main bit being implementing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm 👍
Because of deprecated of parseSpec, I develop ProtobufInputFormat for new interface, which supports stream ingestion for data encoded by Protobuf.
This PR has: