Skip to content

Commit

Permalink
- test readNextFilteredRowGroup()
Browse files Browse the repository at this point in the history
- add test file for empty blocks next to each other
  • Loading branch information
wgtmac committed Jan 10, 2023
1 parent fbcc3e6 commit 01c7301
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,7 @@ public PageReadStore readNextFilteredRowGroup() throws IOException {
}
BlockMetaData block = blocks.get(currentBlock);
if (block.getRowCount() == 0L) {
LOG.warn("Read empty block at index {}", currentBlock);
// Skip the empty block
advanceToNextBlock();
return readNextFilteredRowGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,32 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.net.URISyntaxException;

import static org.apache.parquet.filter2.predicate.FilterApi.gt;
import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;

public class TestParquetReaderEmptyBlock {

// The parquet file contains only one empty row group
private static final Path EMPTY_BLOCK_FILE_1 = createPathFromCP("/test-empty-row-group_1.parquet");

// The parquet file contains three row groups, the second one is empty
private static final Path EMPTY_BLOCK_FILE_2 = createPathFromCP("/test-empty-row-group_2.parquet");

// The parquet file contains four row groups, the second one and third one are empty
private static final Path EMPTY_BLOCK_FILE_3 = createPathFromCP("/test-empty-row-group_3.parquet");

private static Path createPathFromCP(String path) {
try {
return new Path(TestParquetReaderEmptyBlock.class.getResource(path).toURI());
Expand All @@ -45,37 +57,112 @@ private static Path createPathFromCP(String path) {
@Test
public void testReadOnlyEmptyBlock() throws IOException {
Configuration conf = new Configuration();
ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, EMPTY_BLOCK_FILE_1);
ParquetReadOptions options = ParquetReadOptions.builder().build();
InputFile inputFile = HadoopInputFile.fromPath(EMPTY_BLOCK_FILE_1, conf);

// The parquet file contains only one empty row group
ParquetMetadata readFooter = ParquetFileReader.readFooter(inputFile, options, inputFile.newStream());
Assert.assertEquals(1, readFooter.getBlocks().size());

// The empty block is skipped
try (ParquetFileReader r = new ParquetFileReader(conf, EMPTY_BLOCK_FILE_1, readFooter)) {
// The empty block is skipped via readNextRowGroup()
try (ParquetFileReader r = new ParquetFileReader(inputFile, options)) {
Assert.assertNull(r.readNextRowGroup());
}

// The empty block is skipped via readNextFilteredRowGroup()
FilterCompat.Filter filter = FilterCompat.get(gt(intColumn("a"), 1));
ParquetReadOptions filterOptions = ParquetReadOptions.builder()
.copy(options)
.withRecordFilter(filter)
.useStatsFilter(true)
.build();
try (ParquetFileReader r = new ParquetFileReader(inputFile, filterOptions)) {
Assert.assertNull(r.readNextFilteredRowGroup());
}
}

@Test
public void testSkipEmptyBlock() throws IOException {
Configuration conf = new Configuration();
ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, EMPTY_BLOCK_FILE_2);
ParquetReadOptions options = ParquetReadOptions.builder().build();
InputFile inputFile = HadoopInputFile.fromPath(EMPTY_BLOCK_FILE_2, conf);

// The parquet file contains three row groups, the second one is empty
ParquetMetadata readFooter = ParquetFileReader.readFooter(inputFile, options, inputFile.newStream());
Assert.assertEquals(3, readFooter.getBlocks().size());

// Second row group is empty and skipped
try (ParquetFileReader r = new ParquetFileReader(conf, EMPTY_BLOCK_FILE_2, readFooter)) {
// Second row group is empty and skipped via readNextRowGroup()
try (ParquetFileReader r = new ParquetFileReader(inputFile, options)) {
PageReadStore pages = null;
pages = r.readNextRowGroup();
Assert.assertNotNull(pages);
Assert.assertEquals(1, pages.getRowCount());

pages = r.readNextRowGroup();
Assert.assertNotNull(pages);
Assert.assertEquals(3, pages.getRowCount());

pages = r.readNextRowGroup();
Assert.assertNull(pages);
}

// Only the last row group is read via readNextRowGroup()
FilterCompat.Filter filter = FilterCompat.get(gt(intColumn("a"), 1));
ParquetReadOptions filterOptions = ParquetReadOptions.builder()
.copy(options)
.withRecordFilter(filter)
.useStatsFilter(true)
.build();
try (ParquetFileReader r = new ParquetFileReader(inputFile, filterOptions)) {
PageReadStore pages = null;
pages = r.readNextFilteredRowGroup();
Assert.assertNotNull(pages);
Assert.assertEquals(3, pages.getRowCount());

pages = r.readNextFilteredRowGroup();
Assert.assertNull(pages);
}
}

@Test
public void testSkipEmptyBlocksNextToEachOther() throws IOException {
Configuration conf = new Configuration();
ParquetReadOptions options = ParquetReadOptions.builder().build();
InputFile inputFile = HadoopInputFile.fromPath(EMPTY_BLOCK_FILE_3, conf);

// The parquet file contains four row groups, the second one and third one are empty
ParquetMetadata readFooter = ParquetFileReader.readFooter(inputFile, options, inputFile.newStream());
Assert.assertEquals(4, readFooter.getBlocks().size());

// Second and third row groups are empty and skipped
try (ParquetFileReader r = new ParquetFileReader(inputFile, options)) {
PageReadStore pages = null;
pages = r.readNextRowGroup();
Assert.assertNotNull(pages);
Assert.assertEquals(1, pages.getRowCount());

pages = r.readNextRowGroup();
Assert.assertNotNull(pages);
Assert.assertEquals(4, pages.getRowCount());

pages = r.readNextRowGroup();
Assert.assertNull(pages);
}

// Only the last row group is read via readNextRowGroup()
FilterCompat.Filter filter = FilterCompat.get(gt(intColumn("a"), 1));
ParquetReadOptions filterOptions = ParquetReadOptions.builder()
.copy(options)
.withRecordFilter(filter)
.useStatsFilter(true)
.build();
try (ParquetFileReader r = new ParquetFileReader(inputFile, filterOptions)) {
PageReadStore pages = null;
pages = r.readNextFilteredRowGroup();
Assert.assertNotNull(pages);
Assert.assertEquals(4, pages.getRowCount());

pages = r.readNextFilteredRowGroup();
Assert.assertNull(pages);
}
}
Expand Down
Binary file modified parquet-hadoop/src/test/resources/test-empty-row-group_2.parquet
Binary file not shown.
Binary file not shown.

0 comments on commit 01c7301

Please sign in to comment.