Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet Format #241

Merged

Conversation

tony810430
Copy link
Contributor

@tony810430 tony810430 commented Apr 1, 2019

After reviewing the works on #172 and other tests in the repo, I thought the overall implementation is great and test coverage is sufficient.

In order to accelerate helping this feature contributed back, I created this PR based on my own branch.
It is more convenient if there is any change need to make after reviewing from others.

The works in the PR also include:

  1. Rebase all commits to master in order to merge by fast-forward easily.
  2. Provide a new config to set parquet format compression type.
  3. Fix some minor issues, such as coding style and removing redundant code.

Implementation Brief:

Introduce ParquetFormat and ParquetRecordWriterProvider to receive records and write them into parquet file by converting SinkRecord to avro data and writing through AvroParquetWriter.

The inner class S3ParquetOutputFile in ParquetRecordWriterProvider implements OutputFile which should be provided in order to build AvroParquetWriter. Instead of other formats warp output stream into writer or just uses that stream directly, AvroParquetWriter uses OutputFile's method to create output stream, so the implementation passes filename and s3 storage object to S3ParquetOutputFile's constructor and creates the output stream when it is needed.

For the S3OutputStream, its interface changed to PositionOutputStream, which should implemented a new method, so that it could be accepted by ParquetWriter.

Because we can't control when or how to commit file through AvroParquetWriter and the only way to manually commit file is to close it, we wrapped S3OutputStream by S3ParquetOutputStream to make sure S3OutputStream#commit() must be called when S3ParquetOutputStream#close() is called. Even through we don't know that is trigged by commit or close, it's okey due to idempotent property of s3 sink connect.

The last modified file is S3SinkConnectorConfig. Added a new configuration for parquet compression type, such as gzip, snappy and etc. All supported compression type from parquet library could be configured. Since I took the values from parquet library directly, which were not matched to exist s3 compression type config, I chose to introduce a new config name to distinguish them.

Last but not least, all of the unit tests for parquet format implementations took those from avro format as reference, and I think those tests are sufficient. Because we removed dependency to hive, we also lost some dependencies for parquet. I added them back as few as possible.

@ghost
Copy link

ghost commented Apr 1, 2019

@confluentinc It looks like @tony810430 just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@tony810430
Copy link
Contributor Author

@kkonstantine, @eakmanrq
Please help to review this PR. Thanks.

btw, this PR's tests should rely on confluentinc/kafka-connect-storage-common#99 merged first, since it used a new API from parquet-mr 1.10.0.

@tony810430 tony810430 force-pushed the feature/add_parquet_format branch from 6509fa9 to 66af002 Compare April 1, 2019 09:29
@tony810430
Copy link
Contributor Author

@kkonstantine, @eakmanrq
Would you have some time to review this PR? Thanks.

@eakmanrq
Copy link

Code itself looks good to me. @tony810430 Have you had a chance to test this out in a production environment? Curious what the performance looks like.

@tony810430
Copy link
Contributor Author

@eakmanrq Yes, I have already try this out in our production environment. However, my use case is just a small topic with only one partition and little QPS. I have been running it over one week and have verified the uploaded data are all correct. I didn't see any critical performance downgrade during these days, but as I said above, it's hard to see how the performance is.

Copy link
Member

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tony810430 for driving this PR!

Just left a few initial comments. Haven't finished a complete review, but before we continue, it'd be nice to describe at the high level how parquet file upload is meant to be implemented here with this S3 Kafka connector. (ideally we'd add this description to the merge comment of the commit). Thanks!

@tony810430 tony810430 force-pushed the feature/add_parquet_format branch 2 times, most recently from d7debaf to 6cc7209 Compare May 8, 2019 08:56
@tony810430 tony810430 force-pushed the feature/add_parquet_format branch from 6cc7209 to 334799a Compare May 8, 2019 08:57
@tony810430
Copy link
Contributor Author

@kkonstantine, I have updated some descriptions of implementations in my initial comment. Not sure if those are what you meant and wanted. Please let me know if you need more information.

@jocelyndrean
Copy link

I tried this PR today and everything worked perfectly :) Thanks @tony810430 for this amazing job !

@tony810430
Copy link
Contributor Author

@jocelyndrean Thanks for sharing your experience. It will be better if you have free time to help review this PR and we can make this feature merged for more users to benefit from it. =)

Copy link

@jocelyndrean jocelyndrean left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. FYI: It's running in my staging environment for hours now and this version stored around 200GB of parquet files on S3. Nothing to report. Works perfectly.

@tony810430
Copy link
Contributor Author

@kkonstantine Have already addressed comments.

Copy link
Member

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tony810430 for the quick turnaround.
Your commit implementation is what I had in mind.
Added a few more comments. Catching my delayed flight from SFO, I'll return for a final look on the tests.
Almost there!
Thanks!

@kkonstantine
Copy link
Member

Also, DataWriterParquetTest.testProjectNoVersion seems to fail the build job. Are you able to reproduce locally?

@tony810430
Copy link
Contributor Author

I'm not sure why DataWriterParquetTest.testProjectNoVersion failed. It worked on my machine. And I also notice that it worked in the 12th build as well, and there is no change between 12th and 13th build.

I'll try to build other confluent's dependencies locally with latest version, and check if I can reproduce locally.

@tony810430
Copy link
Contributor Author

I found that there were some discrepancies between DataWriterParquetTest and DataWriterAvroTest. I have made them consistent, but not sure I have understood the intent from origin author.

Copy link
Member

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing the test @tony810430
I think we are almost set.

Let's pin the hadoop version to 3.2.0 in the pom.xml
Also, do you know whether we need hadoop-mapreduce-client-core outside tests? I'd like to include only the absolutely necessary deps.

One last thing that I'd prefer to have is a unified config for compression types. Adding another type doesn't seem absolutely necessary, but unfortunately I don't think we can check the format.class from within the validator (I might need to remember things here). Maybe it's worth considering on another iteration before we release.

@tony810430
Copy link
Contributor Author

I have run this feature since 5.2.x released version. I see it has hadoop-mapreduce-client-core jar in its lib/ folder, but I didn't try to remove it when I upgraded to 5.3.x. I have no idea whether it is necessary outside tests.

For unified compression types, it seems that the current Validator's API lacks other configs' settings like Recommender has, otherwise we can define dependents for format.class and access it in CompressionTypeValidator. For now, I don't have any good solution to verify format.class in the parsing phase.

  * parquet-codec will be added in storage-common for all the storage sink connectors
@kkonstantine
Copy link
Member

With my two last commits I'm setting the hadoop dependencies version to the latest bugfix (3.2.1) and I'm removing the config s3.parquet.compression.type. The current PR will be merged without compression support for parquet, but shortly and again targeting CP 5.4 parquet.codec similar to avro.codec will be added to storage-common.

Copy link
Member

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Terrific work @tony810430 !
LGTM

Merging parquet support for S3

@kkonstantine kkonstantine changed the title Add Parquet Format based on PR #172 Add Parquet Format Oct 8, 2019
@kkonstantine kkonstantine merged commit 61bb29d into confluentinc:master Oct 8, 2019
@yauhen-sobaleu
Copy link

Hi guys, is it possible to test Parquet Format with S3 Connector with confluentinc/cp-kafka-connect:5.4.0-beta1 docker image? Is it already included?

@tony810430 tony810430 deleted the feature/add_parquet_format branch October 28, 2019 02:36
@bstaudacher
Copy link

Very excited for this! Thank you for your hard work @tony810430.
What release will this be available in? Can't wait to try it out.

@NathanNam
Copy link
Contributor

It will be part of CP 5.4.

@dude0001
Copy link

When I try to use this, I'm getting an error that a Hadoop class is not found. Is this a defect or am I missing a prereq or other configuration in my container maybe?

Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Could not initialize class org.apache.hadoop.conf.Configuration   [org.apache.kafka.connect.runtime.WorkerSinkTask]
--
java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.conf.Configuration
at org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(ParquetWriter.java:345)
at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:162)
at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:153)
at org.apache.parquet.avro.AvroParquetWriter.builder(AvroParquetWriter.java:43)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:79)
at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:105)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:532)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:302)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:245)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:196)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.