Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8564] Add LZO compression and decompression support #10254

Merged
merged 29 commits into from Feb 25, 2020

Conversation

amoght
Copy link
Contributor

@amoght amoght commented Dec 2, 2019

LZO is a lossless data compression algorithm which is focused on compression and decompression speeds.

This will enable Apache Beam sdk to compress/decompress files using LZO/LZOP compression algorithm.

This includes the following functionalities:

  1. Appropriate Input and Output streams to enable working with LZO/LZOP files.
  2. Compression using LZO/LZOP compression algorithm on Apache beam.
  3. Decompression using LZO/LZOP decompression algorithm on Apache beam.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the GO sdk changes?

@@ -90,4 +90,6 @@ dependencies {
shadowTest library.java.avro_tests
shadowTest library.java.zstd_jni
testRuntimeOnly library.java.slf4j_jdk14
compile 'io.airlift:aircompressor:0.16'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This library requires a Java 1.8+ virtual machine containing the sun.misc.Unsafe interface running on a little endian platform. Is there a different implementation we could use?

It also depends on the library below which is 21mbs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other implementations that are present are either not pure java(contain jni, .c, .h files) or have licensing issues.
This solves both of these issues since it is pure java implemented and is under Apache Licence.
I am not sure if we have any other pure java implementation which is under Apache Licence.

Copy link
Member

@lukecwik lukecwik Dec 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked on your original review request email thread if there were any alternative suggestions and hopefully the community may provide some suggestions. LZO implementations that contain C code would be ok if they ran on the three most popular platforms (Linux, Mac, Windows).

@@ -152,6 +156,38 @@ public WritableByteChannel writeCompressed(WritableByteChannel channel) throws I
}
},

/** LZO compression. */
LZO(".lzo", ".lzo") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this use the .lzo_deflate extension for suggested and detected extensions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I have made the appropriate changes on this.

@@ -152,6 +156,38 @@ public WritableByteChannel writeCompressed(WritableByteChannel channel) throws I
}
},

/** LZO compression. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be obvious to users that the difference between LZO and LZOP is that one has headers and one is just the LZO algorithm. Please expand on this comment and the one below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I have added comments for both the modules (LZO, LZOP). This should make things clear.

@lukecwik lukecwik changed the title [Beam-8564] Add LZO compression and decompression support [BEAM-8564] Add LZO compression and decompression support Dec 2, 2019
@lukecwik
Copy link
Member

lukecwik commented Dec 2, 2019

R: @lukecwik

@amoght amoght requested a review from lukecwik December 4, 2019 08:08
Comment on lines 90 to 93
if (len == 0) {
return 0;
}
final int ret = lzoIS.read(buf, off, len);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for not delegating the call in the case of 0 length? Does lzoIS.read() not handle that case cleanly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this case is getting handled. This check has been put simply for the reason that if buffer length is 0, the read method doesn't even get executed and is handled here itself. Basically, to avoid unnecessary method call overhead.

@@ -90,4 +90,6 @@ dependencies {
shadowTest library.java.avro_tests
shadowTest library.java.zstd_jni
testRuntimeOnly library.java.slf4j_jdk14
compile 'io.airlift:aircompressor:0.16'
compile 'com.facebook.presto.hadoop:hadoop-apache2:3.2.0-1'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LZO itself should have no dependency on anything related to Hadoop, Presto, or Facebook.

  1. Why do we need to include this?
  2. If aircompressor really needs this, why does it need it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is included because LzoCodec class that has been used to create Input&Output streams is using some classes of the org.apache.hadoop package, which is a part of com.facebook.presto.hadoop.
Since the aircompressor is designed to also support optional hadoop configurations, hadoop is coming into picture(in our case, hadoop config is null).

@@ -161,6 +189,28 @@ public void testGzipSplittable() throws Exception {
assertFalse(source.isSplittable());
}

/** Test splittability of files in LZO mode -- none should be splittable. */
@Test
public void testLzoSplittable() throws Exception {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a similar test for testLzopSplittable().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out, this has been added.

Comment on lines +321 to +322
* <p>A concatenation of lzo files as one file is a valid lzo file and should decompress to be the
* concatenation of those individual files.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need an enclosing </p>, or we can simply remove the <p> tag. Either way, can you please ensure testReadConcatenatedGzip() and testReadMultiStreamBzip2() follow the same javadocs format?

Copy link
Contributor Author

@amoght amoght Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is happening when we run the spotlessApply task. When the

tag is closed, the spotlessCheck fails. Not sure of the reason behind that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The closing

tag isn't needed in javadoc even if your editor is inserting it.

* concatenation of those individual files.
*/
@Test
public void testReadConcatenatedLzo() throws IOException {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a testReadConcatenatedLzop() as well if it is applicable. Not sure if it is due to headers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behaviour of LZOP codec is that it returns the contents of the first file only, if concatenated files are given because of the presence of headers. This causes the test to fail. That is why we have not added this test.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be a good idea to add a test with an expected failure then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have added this in this update.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we either add support for multistream or throw an exception if the stream isn't finished?

It would be dangerous for users to have part of their data silently dropped in this scenario. We should also add to the comment that concatenated streams aren't supported.

Comment on lines +377 to +378
* <p>A lzo file may contain multiple streams and should decompress as the concatenation of those
* streams.
Copy link

@gsteelman gsteelman Dec 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need an enclosing </p>, or we can simply remove the <p> tag. Whichever, please be consistent with testReadConcatenatedLzo().

Copy link
Contributor Author

@amoght amoght Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is happening due to spotlessApply.

assertThat(readerOrig, instanceOf(CompressedReader.class));
CompressedReader<Byte> reader = (CompressedReader<Byte>) readerOrig;
// before starting
assertEquals(0.0, reader.getFractionConsumed(), 1e-6);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see 1e-6 is used by many of the test cases as a threshold. Should it be factored out into a constant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be done. But that would require altering all the tests that use this constant value. Will that be fine if we do that?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add the constant for CompressedSourceTest.java at least.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have added this in the update

Copy link

@gsteelman gsteelman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for opening this PR, happy to work with you to get these changes in.

@amoght
Copy link
Contributor Author

amoght commented Dec 10, 2019

While studying the code, we found that the airlift/ aircompressor library only requires some classes which are also present in apache hadoop common package(~3.9MB). Therefore, we are now thinking that of making changes in the airlift/ aircompressor package, replacing the
com.facebook.presto.hadoop with org.apache.hadoop.common and removing other compression mechanisms present in the airlift/aircompressor package(like zstd, gzip etc) while only keeping the required LZO package.
But if we go ahead with this approach, we will have to manually update this library whenever any changes are made to the airlift/aircompressor's LZO package.
@lukecwik @gsteelman please provide your thoughts on this.

@gsteelman
Copy link

While studying the code, we found that the airlift/ aircompressor library only requires some classes which are also present in apache hadoop common package(~3.9MB). Therefore, we are now thinking that of making changes in the airlift/ aircompressor package, replacing the
com.facebook.presto.hadoop with org.apache.hadoop.common and removing other compression mechanisms present in the airlift/aircompressor package(like zstd, gzip etc) while only keeping the required LZO package.
But if we go ahead with this approach, we will have to manually update this library whenever any changes are made to the airlift/aircompressor's LZO package.
@lukecwik @gsteelman please provide your thoughts on this.

Is it possible to instead add the dependencies on the apache.hadoop.common package directly in these changes, and not add a dependency on airlift/aircompressor this change? I would prefer to stick with strict dependencies when possible, rather than relying on transitive dependencies to bring in the classes we need.

Relying on the transitive dependencies brought in by airlift/aircompressor has its own set of issues, including having to update our libraries whenever changes are made to airlift.

@amoght
Copy link
Contributor Author

amoght commented Dec 13, 2019

@gsteelman we have used the airlift/aircompressor library to only get the compression and decompression mechanism, the implementation of Input/Output stream there introduces the transitive dependency, which can be removed and replaced with apache hadoop common library. This significantly reduces the size as well.
So, here are the 2 possible options:

  1. We only use the compression and decompression mechanism from airlift/aircompressor and design the Input/Output Streams for beam accordingly. This will be needed to be updated if there is any change in those classes on airlift/aircompressor's end. But, since we will only be using the compression and decompression mechanism from airlift/aircompressor, the updates will be small and quite rare. Therefore, this won't be that big of an issue.
  2. We introduce LZO as an optional package for beam. As this will give users the option to manage their beam size (if it is a constraint) or if LZO is not required.

@gsteelman
Copy link

@amoght I don't have enough context to make the call on that, as I am very new to Beam. I have reached out to some others at Twitter to also review this change, as they will have more context.

@amoght
Copy link
Contributor Author

amoght commented Dec 16, 2019

@amoght I don't have enough context to make the call on that, as I am very new to Beam. I have reached out to some others at Twitter to also review this change, as they will have more context.

Thanks Gary :) appreciate your help!

@amoght
Copy link
Contributor Author

amoght commented Jan 9, 2020

@lukecwik I've updated the PR based on the discussion that we had. Please let me know your thoughts and suggestions.

amoght and others added 10 commits February 20, 2020 15:46
…n.java

Co-Authored-By: Lukasz Cwik <lcwik@google.com>
…n.java

Co-Authored-By: Lukasz Cwik <lcwik@google.com>
…n.java

Co-Authored-By: Lukasz Cwik <lcwik@google.com>
…n.java

Co-Authored-By: Lukasz Cwik <lcwik@google.com>
…n.java

Co-Authored-By: Lukasz Cwik <lcwik@google.com>
…n.java

Co-Authored-By: Lukasz Cwik <lcwik@google.com>
…n.java

Co-Authored-By: Lukasz Cwik <lcwik@google.com>
…n.java

Co-Authored-By: Lukasz Cwik <lcwik@google.com>
beam merge 20/02/2020 3:54 PM
@amoght
Copy link
Contributor Author

amoght commented Feb 20, 2020

After committing the comments, you may need to run spotlessApply again.

Done

@lukecwik
Copy link
Member

retest this please

@lukecwik
Copy link
Member

retest this please

@lukecwik
Copy link
Member

Run JavaPortabilityApi PreCommit

@lukecwik
Copy link
Member

Run Java_Examples_Dataflow PreCommit

@lukecwik
Copy link
Member

Run Java PreCommit

@lukecwik
Copy link
Member

Run JavaPortabilityApi PreCommit

@lukecwik
Copy link
Member

Run Java_Examples_Dataflow PreCommit

@shubham-srivastav
Copy link
Contributor

shubham-srivastav commented Feb 24, 2020

@lukecwik Do we need to add test dependency for facebook-presto and airlift in /beam/examples/java/build.gradle ?

@lukecwik
Copy link
Member

WordCount doesn't depend on using LZO so it shouldn't be a dependency and the pipeline should execute successfully without it. The test may be picking up a legitimate case which users would hit as well.

@shubham-srivastav
Copy link
Contributor

@lukecwik We Observed replacing Compression I/O stream with java.io I/O stream in LzoCompression.java can resolve the issue. Should we go ahead and do that?

@lukecwik
Copy link
Member

lukecwik commented Feb 24, 2020

That sounds great. I should have caught that earlier.

@lukecwik
Copy link
Member

retest this please

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will merge when tests are green

provided some minor suggestions

@@ -83,7 +82,7 @@
@RunWith(JUnit4.class)
public class CompressedSourceTest {

private final double DELTA = 1e-6;
private final double delta = 1e-6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you should have declared this static and kept the capital letters instead of making it a member variable of CompressedSourceTest

amoght and others added 3 commits February 25, 2020 23:37
…ession.java

Co-Authored-By: Lukasz Cwik <lcwik@google.com>
…ession.java

Co-Authored-By: Lukasz Cwik <lcwik@google.com>
…ession.java

Co-Authored-By: Lukasz Cwik <lcwik@google.com>
@lukecwik lukecwik merged commit db6eff8 into apache:master Feb 25, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants