Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet-400: Fixed issue reading some files from HDFS and S3 when usi… #306

Closed
wants to merge 5 commits into from

Conversation

jaltekruse
Copy link
Contributor

…ng Hadoop 2.x

The problem was not handling the case where a read request returns less
than the requested number of bytes. The FSDataInputStream lacks an
API equivalent for readFully when using ByteBuffers, which used to solve
this problem when using byte arrays as the destination. This has been
fixed by including a loop to manually request the remaining bytes until
everything has been read.

…ng Hadoop 2.x

The problem was not handling the case where a read request returns less
than the requested number of bytes. The FSDataInputStream lacks an
API equivalent for readFully when using ByteBuffers, which used to solve
this problem when using byte arrays as the destination. This has been
fixed by including a loop to manually request the remaining bytes until
everything has been read.
@danielcweeks
Copy link

@jaltekruse I verified that this fixes the issue. However, I'm also seeing a very significant performance impact on the initial loading of row group data. I haven't done exhaustive testing, but simply using parquet-tools cat, the load appears to have dropped from ~1 second (commit 5a45ae3) to ~45 seconds against a file with a single row group and about 75MB of compressed data.

I think we need to explore this a little more to make sure we aren't regressing.

@danielcweeks
Copy link

@jaltekruse I just added some logging and what I suspected seems to be accurate. That while loop is now thrashing for that 45 second period while reading the data.

@jaltekruse
Copy link
Contributor Author

Hmmm, I believe the loop should be a valid way of recreating the old readFully() functionality. Looks like the S3 filesytem implementation does not handle repetitive calls like this well?

For now I can detect the type of the filesystem and just use the old method, wrapping the data in a ByteBuffer after it has been read into a byte array.

Seem like a reasonable idea? @danielcweeks

@rdblue
Copy link
Contributor

rdblue commented Jan 12, 2016

This looks fine to me, but is it possible to add some tests? Maybe a FileSystem wrapper that returns a FSDataInputStream variant that will under-fill the buffer at random?

@danielcweeks
Copy link

@jaltekruse @rdblue Let me verify against hdfs and S3A filesystem. If we can reproduce with either of those then it will be easier to diagnose.

@danielcweeks
Copy link

Taking a closer look at FSInputStream behavior for readFully()->read(). This might be due to unusual seek behavior. If we're repeatedly calling readFully and the buffer is under-filled, than each successive call will result in a seek call, which is expensive for S3. I'll need to verify, but this might be the issue.

@jaltekruse
Copy link
Contributor Author

@danielcweeks @rdblue Daniel has provided a workaround for the S3 issue, I haven't had time to write a unit test, but he has manually verified that the performance is fixed with S3. I can file a follow-up JIRA for adding a test if you are okay merging this as is.

@rdblue
Copy link
Contributor

rdblue commented Feb 10, 2016

@jaltekruse, we just had another look at this problem and it isn't actually seeking. We tracked the problem to allocation. The underlying InputStream implementation for S3 doesn't expose read(ByteBuffer), so when we try to call it, this falls back to the old API, allocates a buffer for the read, and copies what is read into the ByteBuffer that was passed in. In the test case, that allocation is 75MB and the loop-based fix posted here keeps allocating a 75MB buffer (or progressively smaller) for each call to read. Those allocations and the GC runs that result are what take 50 seconds.

The immediate fix is to use readFully for the fall-back case, and to use hasArray to avoid needing to allocate and copy if the buffer is backed by an on-heap array.

This has exposed a problem in the getBuf method's contract. The use of CompatibilityUtil.getBuf here replaced a call to readFully. This is backed by a call to read(ByteBuffer) that doesn't guarantee the entire buffer will be read. I think we need to fix this method and document its guarantees. We should also make sure that calls to readFully before the byte buffer patch landed are getting full buffers as expected.

Also, the maxSize argument is ignored for the byte buffer case. If the value passed in isn't equal to the buffer's remaining size, then you get different behavior depending on the file stream and Hadoop version at runtime. I think that argument should be removed and it should always guarantee the same behavior as read(ByteBuffer), which is documented.

Last, this also exposes a problem with the fallback logic. If the read(ByteBuffer) method is missing for any implementation, the compatibility utils never try it again. If a JVM reads a Parquet file from S3, it looks like it will no longer use the new API for HDFS. I think a wrapper class that provides this method using the old API would be a good solution.

@julienledem
Copy link
Member

Thanks @rdblue
@jaltekruse does this sound good to you?

@@ -96,6 +96,10 @@

public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";

//URI Schemes to blacklist for bytebuffer read.
public static final String PARQUET_BYTEBUFFER_BLACKLIST = "parquet.bytebuffer.fs.blacklist";
public static final String[] PARQUET_BYTEBUFFER_BLACKLIST_DEFAULT = {"s3", "s3n", "s3a"};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're going with the blacklist approach, we should also handle pure hdfs file systems as well right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We found the underlying bug, so I don't think the plan is to blacklist filesystems anymore.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right saw your other comment :-)

@piyushnarang
Copy link

Ping @jaltekruse. We have a few changes in master we've been wanting to test out but we're running into this issue. Have you had a chance to rework based on Ryan / Daniel's comments?

@jaltekruse
Copy link
Contributor Author

Hey @piyushnarang, sorry this has been outstanding for so long. I have not had a chance to work on this since my last update. If you would be willing to take a look I can answer any questions and review your changes.

Conflicts:
	parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java
@piyushnarang
Copy link

hey @jaltekruse sure no worries. I'll try and take a stab over the next few days and reach out if I have any questions and such. Will include you on the PR as well.

@jaltekruse
Copy link
Contributor Author

@piyushnarang I looked back at my branch and saw I had a few changes I had not pushed. This doesn't address the last comment from Ryan about one instance of FileSystem that lacks the API prevents all reads from going through the new API for all instances. We had discussed this after one of the hangouts, to make it efficient we probably need to store our knowledge of which filesystems/input streams support the API in a cache. This will ensure aren't relying on exceptions to test for a valid implementation of the API on every read, and also avoid the current problems with storing this information in a global static.

@piyushnarang
Copy link

Thanks @jaltekruse, I'll start taking a look.

@piyushnarang
Copy link

@jaltekruse - can you grant me write perms to your branch? I'll push my updates so that they show up in this PR (can also spin up a new one if needed)

I guess I'm not entirely clear on the approach we'd like to take to ensure that CompatUtils tries again if the first attempt fails (cc @rdblue ):

  1. Refactoring the code in CompatUtils to not be static and creating a CompatUtils per ParquetFileReader will ensure that even if we're on the same JVM, we first try the byteBuffer call for a file and fallback otherwise. Problem though is that in case we are in a setup where we'll never have the byteBuffer call supported, we end up with this extra reflection call to read(byteBuffer) for each file. Could also provide a config flag for this so that users don't pay the perf hit when they know they don't support this.
  2. Cache approach - Think caching something like FSDataInputStream.class -> isV2Supported might not cut it. Seems like some of the FSDataInputStream classes just delegate to the underlying inputStream so we need to be able to see if that inner inputStream supports read(byteBuffer). Things get messier if the inner inputStream also delegates. (Not sure if I'm missing something here..)

@julienledem
Copy link
Member

@jaltekruse @piyushnarang do we still need this branch?

@piyushnarang
Copy link

No I don't need it anymore.

@julienledem
Copy link
Member

@jaltekruse can you close it if it's not needed anymore?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants