Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 69 additions & 15 deletions parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
package org.apache.iceberg.parquet;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
Expand All @@ -29,6 +35,9 @@
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.ParquetFileReader;
Expand Down Expand Up @@ -101,18 +110,34 @@ private static class FileIterator<T> implements CloseableIterator<T> {
private final boolean reuseContainers;
private final long[] rowGroupsStartRowPos;

private int nextRowGroup = 0;
private long nextRowGroupStart = 0;
private long valuesRead = 0;
private T last = null;

private int totalRowGroups;
private static final ExecutorService prefetchService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor)
Executors.newFixedThreadPool(
4,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("iceberg-parquet-row-group-prefetchNext-pool-%d")
.build()));

private int prefetchedRowGroup = 0;
private Future<PageReadStore> prefetchRowGroupFuture;

FileIterator(ReadConf<T> conf) {
this.reader = conf.reader();
this.shouldSkip = conf.shouldSkip();
this.model = conf.model();
this.totalValues = conf.totalValues();
this.reuseContainers = conf.reuseContainers();
this.rowGroupsStartRowPos = conf.startRowPositions();
this.totalRowGroups = shouldSkip.length;
prefetchNextRowGroup();
advance();
}

@Override
Expand All @@ -122,6 +147,10 @@ public boolean hasNext() {

@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException("No more row groups to read");
}

if (valuesRead >= nextRowGroupStart) {
advance();
}
Expand All @@ -137,23 +166,48 @@ public T next() {
}

private void advance() {
while (shouldSkip[nextRowGroup]) {
nextRowGroup += 1;
reader.skipNextRowGroup();
}

PageReadStore pages;
try {
pages = reader.readNextRowGroup();
} catch (IOException e) {
throw new RuntimeIOException(e);
Preconditions.checkNotNull(prefetchRowGroupFuture, "future should not be null");
PageReadStore pages = prefetchRowGroupFuture.get();

if (prefetchedRowGroup >= totalRowGroups) {
return;
}
Preconditions.checkState(
pages != null,
"advance() should have been only when there was at least one row group to read");
long rowPosition = rowGroupsStartRowPos[prefetchedRowGroup];
nextRowGroupStart += pages.getRowCount();
prefetchedRowGroup += 1;

model.setPageSource(pages, rowPosition);
prefetchNextRowGroup(); // eagerly fetch the next row group
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

long rowPosition = rowGroupsStartRowPos[nextRowGroup];
nextRowGroupStart += pages.getRowCount();
nextRowGroup += 1;

model.setPageSource(pages, rowPosition);
private void prefetchNextRowGroup() {
prefetchRowGroupFuture =
prefetchService.submit(
() -> {
while (prefetchedRowGroup < totalRowGroups && shouldSkip[prefetchedRowGroup]) {
prefetchedRowGroup += 1;
reader.skipNextRowGroup();
}
try {
if (prefetchedRowGroup < totalRowGroups) {
PageReadStore pageReadStore = reader.readNextRowGroup();
return pageReadStore;
}
return null;
} catch (IOException e) {
throw new RuntimeIOException(e);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
Expand All @@ -32,6 +37,9 @@
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.ParquetFileReader;
Expand Down Expand Up @@ -109,11 +117,23 @@ private static class FileIterator<T> implements CloseableIterator<T> {
private final int batchSize;
private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadata;
private final boolean reuseContainers;
private int nextRowGroup = 0;
private long nextRowGroupStart = 0;
private long valuesRead = 0;
private T last = null;
private final long[] rowGroupsStartRowPos;
private final int totalRowGroups;
private static final ExecutorService prefetchService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor)
Executors.newFixedThreadPool(
4,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious as why you chose the number of threads in the pool to to be '4'. In this current implementation you are prefetching only one row group at a time. Correct? shouldn't one thread in the thread pool suffice?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ThreadPool should be shared by all the readers in the process. But the hard code may need to improve.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK.

new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("iceberg-parquet-row-group-prefetchNext-pool-%d")
.build()));

private int prefetchedRowGroup = 0;
private Future<PageReadStore> prefetchRowGroupFuture;

FileIterator(ReadConf conf) {
this.reader = conf.reader();
Expand All @@ -125,6 +145,9 @@ private static class FileIterator<T> implements CloseableIterator<T> {
this.model.setBatchSize(this.batchSize);
this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
this.rowGroupsStartRowPos = conf.startRowPositions();
this.totalRowGroups = shouldSkip.length;
prefetchNextRowGroup();
advance();
}

@Override
Expand Down Expand Up @@ -154,21 +177,47 @@ public T next() {
}

private void advance() {
while (shouldSkip[nextRowGroup]) {
nextRowGroup += 1;
reader.skipNextRowGroup();
}
PageReadStore pages;
try {
pages = reader.readNextRowGroup();
} catch (IOException e) {
throw new RuntimeIOException(e);
Preconditions.checkNotNull(prefetchRowGroupFuture, "future should not be null");
PageReadStore pages = prefetchRowGroupFuture.get();

if (prefetchedRowGroup >= totalRowGroups) {
return;
}
Preconditions.checkState(
pages != null,
"advance() should have been only when there was at least one row group to read");
long rowPosition = rowGroupsStartRowPos[prefetchedRowGroup];
model.setRowGroupInfo(pages, columnChunkMetadata.get(prefetchedRowGroup), rowPosition);
nextRowGroupStart += pages.getRowCount();
prefetchedRowGroup += 1;
prefetchNextRowGroup(); // eagerly fetch the next row group
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why stop with just the next row group? Can we instead maintain a RowGroup queue of RowGroup futures, where we keep populating the queue with the next RowGroup continuously and asynchronously till we exhaust all available RowGroups in the file?
We can deque the RowGroup queue as needed. This way, we may reduce the likelihood of the prefetchRowGroupFuture.get() call ever having to wait for the future computation to finish.. What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with you, we can certainly do that, at the moment just kept it simple and closer to the existing poc code. Let me add this in later revisions if folks are onboard.

Though we might not wanna load all the row-groups at one shot, as it might cause some memory pressure, may be having a conf for controlling the same would be helpful.

P.S. Will also add pre-fetch of parquet data pages as well shortly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in favor of having the "eagerness" of the prefetch be configurable so long as we ensure a sane default. We can measure what an appropriate value should be for this but it should balance between memory consumption and performance.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was testing this with concurrent reads, looks like reader.readRowGroup(int blockIndex) is not thread safe, it requires the SeekableInputStream that parquet reader is holding to be seeked to the given offset first, since this is a class variable it was causing correctness issue.
One possible solution is to have a pool of ParquetFileReaders and get one reader from the pool, ask it to get the desired row-group and then when reading all the row-groups is done close the pool. Thinking of prototyping this change

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

long rowPosition = rowGroupsStartRowPos[nextRowGroup];
model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition);
nextRowGroupStart += pages.getRowCount();
nextRowGroup += 1;
private void prefetchNextRowGroup() {
prefetchRowGroupFuture =
prefetchService.submit(
() -> {
while (prefetchedRowGroup < totalRowGroups && shouldSkip[prefetchedRowGroup]) {
prefetchedRowGroup += 1;
reader.skipNextRowGroup();
}
try {
if (prefetchedRowGroup < totalRowGroups) {
PageReadStore pageReadStore = reader.readNextRowGroup();
return pageReadStore;
}
return null;
} catch (IOException e) {
throw new RuntimeIOException(e);
}
});
}

@Override
Expand Down