Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-314] Add zip compression support in TextIO #400

Closed
wants to merge 4 commits into from

Conversation

jbonofre
Copy link
Member

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

Add zip compression support in TextIO.

@davorbonaci
Copy link
Member

R: @dhalperi

@@ -404,6 +406,37 @@ public void testCompressedRead() throws Exception {

@Test
@Category(NeedsRunner.class)
public void testZipCompressedRead() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a test with an empty (but valid) zip file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, let me do it.

@jbonofre
Copy link
Member Author

jbonofre commented Jun 6, 2016

Rebased and updated. I have to figure out the expectException issue in the test.

if (zip.getNextEntry() != null) {
return Channels.newChannel(zip);
}
throw new IllegalArgumentException("ZIP file doesn't contain any entry");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will the behavior be today on a multi-entry zip? Will it silently produce bad data? Fail in some way?

Please comment, and then also add a test.

@jbonofre
Copy link
Member Author

jbonofre commented Jun 8, 2016

Rebased and updated.

p.run();

// test with auto-detect ZIP based on extension.
p = TestPipeline.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make this a separate unit test -- that way they can pass or fail independently :)

@dhalperi
Copy link
Contributor

dhalperi commented Jun 8, 2016

Hi JB,

This is looking pretty good!

But I have some questions about the tests. Specifically, since we mostly test empty files it seems tough to validate that the decompressor does exactly what we expect.

I've added some suggestions for improvements.

Thanks,
Dan

@jbonofre
Copy link
Member Author

jbonofre commented Jun 9, 2016

Updated. I added explanations on each test.
However, I have two points:

  1. Using ZipInputStream there's not way to get the number of entries. So, unfortunately, I don't see a good way to raise an exception when the zip stream contains multiple entries.
  2. In the testZipCompressedReadWithEmptyFile test, I got the right IllegalArgumentException but wrapped in a RuntimeException. That's why I'm doing expectedException.expect(RuntimeException.class) instead of expectedException.expect(IllegalArgumentException.class). Any idea why ?

PCollection<String> output = p.apply(read);

PAssert.that(output).empty();
p.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really like to find a way that this case either concats all the files or throws an exception. The current behavior is effectively silent data loss, the worst possible case!

One way to handle this could be a utility InputStream class that wraps ZipInputStream and under-the-hood concats all the different entry streams. This is probably the base case.

A second possibility is that once the input stream hits EOF, we check for a next entry and only then throw an exception. But this is less desirable as we don't fail until pretty late.

Can you look into it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your idea of wrapping the stream. Let me figure it out.

@jbonofre
Copy link
Member Author

I updated the PR to use another approach: I'm using directly the ZipInputStream with considering the entries. It allows user to read multi-entries zip file as single entry one. I also updated the tests to show the "new" behavior.


/**
* Read a ZIP compressed file with multiple entries. Only the first entry is actually read.
* We expect an empty PCollection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the javadoc to be correct.

@dhalperi
Copy link
Contributor

Thanks JB -- looking great.

  • Please ensure that the multi-entry test is actually passing by fixing the way the writers are used.
  • Fix the javadoc with new semantics.

Other than that, I'd love to see less code reuse in the tests. This is considered a strong recommendation, but I won't withhold and LGTM on that basis ;).

@jbonofre
Copy link
Member Author

My bad about the javadoc. I will fix that. I'm also fixing the multi-entries test. Then I will refactore a bit to use a share method for zip file creation ;)

@jbonofre
Copy link
Member Author

PR rebased and updated based on Dan's comments.

String tmpFileName = tmpFile.getPath();

ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));
PrintStream writer = new PrintStream(out);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use

PrintStream writer = new PrintStream(out, true /* auto-flush on write */);

this way we can be sure that the PrintStream itself does not buffer any data. See javadoc,

@dhalperi
Copy link
Contributor

Looking great! Only trivial changes left.

@jbonofre
Copy link
Member Author

Rebased and updated according to Dan's comments.

@jbonofre
Copy link
Member Author

There are tests failure in the DirectRunner due to the latest changes and rebase. I will fix that.

@dhalperi
Copy link
Contributor

Hi JB!

One small request:

  • Please don't rebase every time. It's much easier to review if you simply add new incremental CLs to the existing pull request, as this way I can see what changed ;) (This is mentioned briefly in the contribution guide, but is not obvious and may not be standard practice everywhere).

I checked out your PR and tried to get the tests to pass -- but I was never able to make both the single-file and multi-file tests pass. I think that right now the ZipInputStream does not automatically concat all file contents without more effort. Might need to make a new class that wraps the ZipInputStream and manually calls getNextEntry whenever the current entry reaches EOF.

@jbonofre
Copy link
Member Author

Hi Dan,

Thanks for the update and ok for the rebase (sorry about that). I'm checking and fixing the issue.

@jbonofre
Copy link
Member Author

Working on a ZipInputStream wrapper (which call getNextEntry() behind the hood). Update soon.

@jbonofre
Copy link
Member Author

@dhalperi It should be OK now.

currentEntry = getNextEntry();
}

public int read(byte[] b, int off, int len) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are a lot of variants of read in an InputStream. Is it obvious that this is the only variant you need to override? (May well be, I just don't know).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's the only read using in the IOChannel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think you should override all applicable functions from InputStream -- implementation of IOChannel might change and/or this stream might be used in a different way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only read that has to be implemented, all the others are just wrappers for this read method.

int read() is the only one that you may want to handle specially since its very inefficient if its not implemented but its really inefficient for people to use so having an efficient implementation is useful.

@dhalperi
Copy link
Contributor

JB this is awesome. One small fix to get the multiple-eof-in-a-row case fixed and let's merge it. A big accomplishment!

@jbonofre
Copy link
Member Author

Great feedback guys ! Much appreciated. Fixing the multi-entries. Thanks !

@jbonofre
Copy link
Member Author

Fixed the read() methods and now extends InputStream wrapping the ZipInputStream instead of extending it.

@dhalperi
Copy link
Contributor

dhalperi commented Jun 23, 2016

LGTM. Jenkins flaked on network errors, but Travis is fine. Merging.

@asfgit asfgit closed this in f2d2ce5 Jun 23, 2016
@jbonofre jbonofre deleted the BEAM-314 branch June 23, 2016 07:54
dhalperi pushed a commit to dhalperi/beam that referenced this pull request Sep 27, 2016
…pache#400)

* Add Additional Exists check to FileIOChannelFactory#create

This ensures that if the folder did not exist when first checked, but
did by the time mkdirs was executed (and thus mkdirs returned false) the
create will not fail.

* Dynamically choose number of shards in the InProcessPipelineRunner

Add a Write Override Factory that limits the number of shards if
unspecified. This ensures that we will not write an output file per-key
due to bundling.

Do so by obtaining a count of the elements and obtaining the number of
shards based on the number of outputs.
tvalentyn pushed a commit to tvalentyn/beam that referenced this pull request May 15, 2018
Abacn pushed a commit to Abacn/beam that referenced this pull request Jan 31, 2023
pl04351820 pushed a commit to pl04351820/beam that referenced this pull request Dec 20, 2023
…dev pin (apache#400)

Expand pins on library dependencies in preparation for these dependencies taking a new major version. See googleapis/google-cloud-python#10566.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants