New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DRAFT] PR to show Vectored IO integration, compilation fails now. #999
Conversation
import java.util.List; | ||
import java.util.function.IntFunction; | ||
|
||
import org.apache.hadoop.fs.FileRange; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this might be an issue. We probably don't want to introduce a Hadoop dependency here because it breaks the separation from Hadoop in the IO path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree with this. One solution would be to :
Introduce a custom ParquetFileRange class in the Parquet io module, use it in the interface and convert ParquetFileRange to hadoop FileRange in the implementation in H1SeekableInputStream and H2SeekableInputStream stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danielcweeks Do you agree with my suggested approach? Thanks for reviewing the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR! Can't wait to try it out.
fileRanges.add(FileRange.createFileRange(currentOffset, lastAllocationSize)); | ||
} | ||
LOG.warn("Doing vectored IO for ranges {}", fileRanges); | ||
f.readVectored(fileRanges, ByteBuffer::allocate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the allocator (options.getAllocator
)? Keep in mind the allocated buffer might be a direct byte buffer.
for (ConsecutivePartList consecutiveChunks : allParts) { | ||
consecutiveChunks.readAll(f, builder); | ||
ranges.add(FileRange.createFileRange(consecutiveChunks.offset, (int) consecutiveChunks.length)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do this the way you were planning to do it initially ( or so it appears). Move this into a readAllVectored method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And make it configurable to choose between vectored I/O and non vectored I/O (see HadoopReadOptions
and ParquetReadOptions
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed both the places where I thought the integration can be done. I am not really sure which will give better performance results, which is why left the other portion commented.
- One option is to change in readAllVectored as you suggested and I did before.
- Another change (current one) is at the top layer.
The reason I moved from 1st to 2nd of because of the name ConsecutivePartList. The name suggests it is a consecutive part essentially meaning just a single range and for which we won't be getting the real vectored io benefit like parallel IO and range coalescing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I was thinking that readAllVectored
will take all ConsecutiveParts
as input.
Or at least move this block into a new function. You will need to do the same thing in readNextRowGroup
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure moving to a new method makes sense. will do.
for (ConsecutivePartList consecutiveChunks : allParts) { | ||
consecutiveChunks.readAll(f, builder); | ||
ranges.add(FileRange.createFileRange(consecutiveChunks.offset, (int) consecutiveChunks.length)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I was thinking that readAllVectored
will take all ConsecutiveParts
as input.
Or at least move this block into a new function. You will need to do the same thing in readNextRowGroup
as well.
ranges.add(FileRange.createFileRange(consecutiveChunks.offset, (int) consecutiveChunks.length)); | ||
} | ||
LOG.warn("Doing vectored IO for ranges {}", ranges); | ||
f.readVectored(ranges, ByteBuffer::allocate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment below about using options.getAllocator
ranges.add(FileRange.createFileRange(consecutiveChunks.offset, (int) consecutiveChunks.length)); | ||
} | ||
LOG.warn("Doing vectored IO for ranges {}", ranges); | ||
f.readVectored(ranges, ByteBuffer::allocate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does readVectored
allocate a single buffer per range? Or does it split each range into bite sized pieces? If all the columns are being read, a single range can be the entire row group, potentially more than a GB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, vectored read allocates a single buffer per range.
If it can grow more than a GB, then I guess we will run into memory problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then it may make sense to do what the readAllVectored
method is doing - split each ConsecutiveParts
into contiguous sub-ranges of 8MB (configurable). Presumably when you merge ranges, you have a limit so you will get a large consecutive read but not hit the memory allocation issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I just went through the code of ConsecutivePartList#readAll() again. Yes, they are breaking the big range into smaller buffers but allocating all of them in one go only, so won't the memory issue still persists?
Also, if I do the change in readAll() like I have already done the commented readAllVectored(), we really won't be reducing the number of seek operations thus won't be getting the real benefits of vectored IO. It will just be like there is a big range to be fetched, we break into smaller ranges and fetch them parallelly. ( This is similar to PARQUET-2149 which you proposed and have already uploaded the PR :) ).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if I have something like this -
range 1, delta 1, range2
I will give you two ranges, range1
, and range2
separated by a gap of delta1
bytes. If delta1
is small enough, you will merge range1
and range2
and do a single scan.
If range1
and range2
are very large so that the resulting range is even larger, do you still do a single scan, or do you split the large range into smaller ranges and tradeoff some seek cost for increased parallelism?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you think it is similar to what I have done in the commented readVectored() implementation?
Yes, it is. However, the parallel reading will split a large range as opposed to what the vectored read will do.
I guess we need to run some benchmarks. First, I need to see what type of ranges parquet generates for real-world/tpcds queries. Do you have that by any chance?
No I don't have the ranges. TPCDS is a large set of queries. Pick a scale factor and a couple of queries and I'll see what I can get for you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You couldn't have ! :) My PR for async IO is rather different and a lot more complicated.
Oh yeah just revisited. You are right. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2022-10-18 21:51:28,308 [WARN] [TezChild] |hadoop.ParquetFileReader|: Doing vectored IO for ranges [range[1581775,2178915), range[6700883,9475957), range[10023390,10426141), range[12211215,15766053), range[24603672,25148984)]
I got these ranges while running tpcds query22 in hive on the parquet files in s3 with a scale factor of 1000. We can try the same in spark. thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try to get the corresponding ranges for Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that would be great. thanks.
Also, I reran the same with changes in readAllVectored and I can see that it breaks the big ranges into smaller ones if the size is greater than maxAllocationSize ( default 8 MB). For exmaple see the breaking in range[24928095,34504532]. note- this example is for query26.
So I guess changing in the above layer makes more sense.
2022-10-20 21:36:02,758 [WARN] [TezChild] |hadoop.ParquetFileReader|: Doing vectored IO for ranges [range[5326394,7673015), range[24928095,34504532), range[36603729,37991198), range[44105694,56402697), range[86874390,88752497)]
2022-10-20 21:36:03,146 [WARN] [TezChild] |hadoop.ParquetFileReader|: Reading through the vectored API.[readAllVectored]
2022-10-20 21:36:03,147 [WARN] [TezChild] |hadoop.ParquetFileReader|: Doing vectored IO for ranges [range[5326394,7673015)]
2022-10-20 21:36:03,225 [WARN] [TezChild] |hadoop.ParquetFileReader|: Reading through the vectored API.[readAllVectored]
2022-10-20 21:36:03,225 [WARN] [TezChild] |hadoop.ParquetFileReader|: Doing vectored IO for ranges [range[24928095,33316703), range[33316703,34504532)]
2022-10-20 21:36:03,352 [WARN] [TezChild] |hadoop.ParquetFileReader|: Reading through the vectored API.[readAllVectored]
2022-10-20 21:36:03,352 [WARN] [TezChild] |hadoop.ParquetFileReader|: Doing vectored IO for ranges [range[36603729,37991198)]
2022-10-20 21:36:03,439 [WARN] [TezChild] |hadoop.ParquetFileReader|: Reading through the vectored API.[readAllVectored]
2022-10-20 21:36:03,439 [WARN] [TezChild] |hadoop.ParquetFileReader|: Doing vectored IO for ranges [range[44105694,52494302), range[52494302,56402697)]
2022-10-20 21:36:03,652 [WARN] [TezChild] |hadoop.ParquetFileReader|: Reading through the vectored API.[readAllVectored]
2022-10-20 21:36:03,652 [WARN] [TezChild] |hadoop.ParquetFileReader|: Doing vectored IO for ranges [range[86874390,88752497)]
Hi @ggershinsky @shangxinli |
Hi @mukund-thakur Thanks for reaching out! I will have a look once I have time. It is the one in my radar. Meanwhile, can you fix the building errors? BTW, this is great feature and love to have it in next release. |
closing this follow up is here #1103 |
Make sure you have checked all steps below.
Jira
Tests
Commits
Documentation