Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Protocol Buffer Stream Decoder #8972

Merged
merged 7 commits into from Aug 2, 2022

Conversation

KKcorps
Copy link
Contributor

@KKcorps KKcorps commented Jun 24, 2022

Allows users to ingest protocol buffers in realtime streams.

Supported configs

  • descriptorFile - Path of the descriptor file. You can generate this file use protoc -o file.desc --include_imports file.proto command. The path can be a local path in which case it needs to be available on all servers OR it can be a DFS path such as s3, gcs etc. If providing a DFS path, you need to ensure pinot is configured to use that filesystem.

  • protoClassName - If the descriptor file contains multiple proto object, you can mention the name of the class to use for parsing.

Example

    "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.consumer.type": "lowLevel",
        "stream.kafka.topic.name": "metrics_pb",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder",
        "stream.kafka.decoder.prop.descriptorFile": "file:///tmp/Workspace/protobuf/metrics.desc",
        "stream.kafka.decoder.prop.protoClassName": "Metrics",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.broker.list": "localhost:19092",
        "stream.kafka.consumer.prop.auto.offset.reset": "largest",
        "realtime.segment.flush.threshold.time": "12h",
        "realtime.segment.flush.threshold.size": "100M"
      }

For future PRs

  • Support for schema registry

@codecov-commenter
Copy link

codecov-commenter commented Jun 24, 2022

Codecov Report

Merging #8972 (adaa65e) into master (e8f9d88) will decrease coverage by 7.21%.
The diff coverage is 0.00%.

@@             Coverage Diff              @@
##             master    #8972      +/-   ##
============================================
- Coverage     68.56%   61.35%   -7.22%     
+ Complexity     4640     4523     -117     
============================================
  Files          1741     1828      +87     
  Lines         91475    97090    +5615     
  Branches      13674    14641     +967     
============================================
- Hits          62724    59571    -3153     
- Misses        24363    33078    +8715     
- Partials       4388     4441      +53     
Flag Coverage Δ
integration1 26.31% <0.00%> (-0.76%) ⬇️
unittests1 66.93% <ø> (+0.70%) ⬆️
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...n/inputformat/protobuf/ProtoBufMessageDecoder.java 0.00% <0.00%> (ø)
...gin/inputformat/protobuf/ProtoBufRecordReader.java 0.00% <0.00%> (-80.00%) ⬇️
...not/plugin/inputformat/protobuf/ProtoBufUtils.java 0.00% <0.00%> (ø)
...pinot/controller/recommender/io/ConfigManager.java 0.00% <0.00%> (-100.00%) ⬇️
.../org/apache/pinot/client/AggregationResultSet.java 0.00% <0.00%> (-100.00%) ⬇️
...troller/recommender/io/metadata/FieldMetadata.java 0.00% <0.00%> (-100.00%) ⬇️
...roller/recommender/rules/impl/BloomFilterRule.java 0.00% <0.00%> (-100.00%) ⬇️
...oller/api/resources/PinotControllerAppConfigs.java 0.00% <0.00%> (-100.00%) ⬇️
...ler/recommender/data/generator/BytesGenerator.java 0.00% <0.00%> (-100.00%) ⬇️
...er/recommender/io/metadata/SchemaWithMetaData.java 0.00% <0.00%> (-100.00%) ⬇️
... and 597 more

Help us with your feedback. Take ten seconds to tell us how you rate us.

@KKcorps KKcorps marked this pull request as ready for review July 5, 2022 13:27
@KKcorps KKcorps changed the title Add protocol buffer Stream Decoder Add Protocol Buffer Stream Decoder Jul 5, 2022
@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes labels Jul 11, 2022
@KKcorps
Copy link
Contributor Author

KKcorps commented Jul 25, 2022

@xiangfu0 @navina can you please review this? We need to merge this before 0.11 release

@Override
public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
throws Exception {
Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with what a descriptor file represents after reading https://github.com/os72/protobuf-dynamic and viewing sample.desc . Is that a binary file that is generated by proto compiler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. So basically .proto file contains the schema. From this schema, you can either generate java classes or you can generate .desc binary file. The latter approach allows us to parse any proto message easily without relying on java impl. It is a bit slower though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the slowness should only be a one-time overhead right? we do not decode desc files on every message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slowness is not due to the decoding of .desc file.
It is due to the fact we are using DynamicMessage class instead of compiled Proto java class to deserialize the messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are on my machine with proto version we are using. Significant improvments from ones in blog post.

Benchmark                                             (_numRecords)  Mode  Cnt   Score   Error  Units
BenchmarkDynamicMessage.compiledClassDeserialization         100000  avgt    5  12.787 ± 0.297  ms/op
BenchmarkDynamicMessage.dynamicClassDeserialization          100000  avgt    5  28.150 ± 0.691  ms/op

@KKcorps KKcorps requested a review from xiangfu0 July 27, 2022 10:50
Copy link
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall lgtm.

@Jackie-Jiang Jackie-Jiang merged commit 8806dc3 into apache:master Aug 2, 2022
KKcorps added a commit to KKcorps/incubator-pinot that referenced this pull request Aug 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants