Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.ByteBufferReleaser;
import org.apache.parquet.bytes.BytesInput;
Expand Down Expand Up @@ -1361,8 +1362,30 @@ private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder b
totalSize += len;
}
LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size());
// Request a vectored read;
f.readVectored(ranges, options.getAllocator());
// Request a vectored read; track all buffers allocated during the call so that
// internal buffers (e.g. from ChecksumFileSystem) are also released.
List<ByteBuffer> allocatedBuffers = new ArrayList<>();
ByteBufferAllocator allocator = options.getAllocator();
ByteBufferAllocator trackingAllocator = new ByteBufferAllocator() {
@Override
public ByteBuffer allocate(int size) {
ByteBuffer buf = allocator.allocate(size);
allocatedBuffers.add(buf);
return buf;
}

@Override
public void release(ByteBuffer b) {
allocator.release(b);
}

@Override
public boolean isDirect() {
return allocator.isDirect();
}
};
f.readVectored(ranges, trackingAllocator);
builder.addBuffersToRelease(allocatedBuffers);
int k = 0;
for (ConsecutivePartList consecutivePart : allParts) {
ParquetFileRange currRange = ranges.get(k++);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,22 @@ private String encryptParquetFile(String file, Configuration conf) throws IOExce
}

private void decryptParquetFileAndValid(String file, Configuration conf) throws IOException {
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(file))
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(file))
.withConf(conf)
.build();
for (int i = 0; i < numRecord; i++) {
Group group = reader.read();
assertEquals(testData.get("Name")[i], group.getBinary("Name", 0).toStringUsingUTF8());
assertEquals(testData.get("Age")[i], group.getLong("Age", 0));

Group subGroup = group.getGroup("WebLinks", 0);
assertArrayEquals(
subGroup.getBinary("LinkedIn", 0).getBytes(), ((String) testData.get("LinkedIn")[i]).getBytes());
assertArrayEquals(
subGroup.getBinary("Twitter", 0).getBytes(), ((String) testData.get("Twitter")[i]).getBytes());
.build()) {
for (int i = 0; i < numRecord; i++) {
Group group = reader.read();
assertEquals(testData.get("Name")[i], group.getBinary("Name", 0).toStringUsingUTF8());
assertEquals(testData.get("Age")[i], group.getLong("Age", 0));

Group subGroup = group.getGroup("WebLinks", 0);
assertArrayEquals(
subGroup.getBinary("LinkedIn", 0).getBytes(),
((String) testData.get("LinkedIn")[i]).getBytes());
assertArrayEquals(
subGroup.getBinary("Twitter", 0).getBytes(), ((String) testData.get("Twitter")[i]).getBytes());
}
}
reader.close();
}

private static String createTempFile(String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private void writeValuesToFile(
SimpleGroupFactory message = new SimpleGroupFactory(schema);
GroupWriteSupport.setSchema(schema, configuration);

ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
.withAllocator(allocator)
.withCompressionCodec(compression)
.withRowGroupSize(rowGroupSize)
Expand All @@ -263,36 +263,35 @@ private void writeValuesToFile(
.withDictionaryEncoding(enableDictionary)
.withWriterVersion(version)
.withConf(configuration)
.build();

for (Object o : values) {
switch (type) {
case BOOLEAN:
writer.write(message.newGroup().append("field", (Boolean) o));
break;
case INT32:
writer.write(message.newGroup().append("field", (Integer) o));
break;
case INT64:
writer.write(message.newGroup().append("field", (Long) o));
break;
case FLOAT:
writer.write(message.newGroup().append("field", (Float) o));
break;
case DOUBLE:
writer.write(message.newGroup().append("field", (Double) o));
break;
case INT96:
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
writer.write(message.newGroup().append("field", (Binary) o));
break;
default:
throw new IllegalArgumentException("Unknown type name: " + type);
.build()) {

for (Object o : values) {
switch (type) {
case BOOLEAN:
writer.write(message.newGroup().append("field", (Boolean) o));
break;
case INT32:
writer.write(message.newGroup().append("field", (Integer) o));
break;
case INT64:
writer.write(message.newGroup().append("field", (Long) o));
break;
case FLOAT:
writer.write(message.newGroup().append("field", (Float) o));
break;
case DOUBLE:
writer.write(message.newGroup().append("field", (Double) o));
break;
case INT96:
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
writer.write(message.newGroup().append("field", (Binary) o));
break;
default:
throw new IllegalArgumentException("Unknown type name: " + type);
}
}
}

writer.close();
}

private List<?> generateRandomValues(PrimitiveTypeName type, int count) {
Expand Down Expand Up @@ -522,6 +521,8 @@ private static List<PageReadStore> readBlocksFromFile(Path file) throws IOExcept

ParquetMetadata metadata =
ParquetFileReader.readFooter(configuration, file, ParquetMetadataConverter.NO_FILTER);
// Not using try-with-resources here because closing the reader releases the codec factory,
// but the returned PageReadStore objects still hold compressed pages that need decompression later.
ParquetFileReader fileReader = new ParquetFileReader(
configuration,
metadata.getFileMetaData(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,13 @@ public void testOrMissingColumnFilter() throws Exception {
}

public static long countFilteredRecords(Path path, FilterPredicate pred) throws IOException {
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path)
.withFilter(FilterCompat.get(pred))
.build();

long count = 0;
try {
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path)
.withFilter(FilterCompat.get(pred))
.build()) {
while (reader.read() != null) {
count += 1;
}
} finally {
reader.close();
}
return count;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,36 +215,39 @@ public void test(Configuration config, ByteBufferAllocator allocator) throws Exc

{
ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER);
ParquetFileReader reader = new ParquetFileReader(
config, footer.getFileMetaData(), file, footer.getBlocks(), schema.getColumns());
PageReadStore rowGroup = reader.readNextRowGroup();
PageReader pageReader = rowGroup.getPageReader(col);
DataPageV2 page = (DataPageV2) pageReader.readPage();
assertEquals(rowCount, page.getRowCount());
assertEquals(nullCount, page.getNullCount());
assertEquals(valueCount, page.getValueCount());
assertEquals(d, intValue(page.getDefinitionLevels()));
assertEquals(r, intValue(page.getRepetitionLevels()));
assertEquals(dataEncoding, page.getDataEncoding());
assertEquals(v, intValue(page.getData()));
try (ParquetFileReader reader = new ParquetFileReader(
config, footer.getFileMetaData(), file, footer.getBlocks(), schema.getColumns())) {
PageReadStore rowGroup = reader.readNextRowGroup();
PageReader pageReader = rowGroup.getPageReader(col);
DataPageV2 page = (DataPageV2) pageReader.readPage();
assertEquals(rowCount, page.getRowCount());
assertEquals(nullCount, page.getNullCount());
assertEquals(valueCount, page.getValueCount());
assertEquals(d, intValue(page.getDefinitionLevels()));
assertEquals(r, intValue(page.getRepetitionLevels()));
assertEquals(dataEncoding, page.getDataEncoding());
assertEquals(v, intValue(page.getData()));

// Checking column/offset indexes for the one page
ColumnChunkMetaData column = footer.getBlocks().get(0).getColumns().get(0);
ColumnIndex columnIndex = reader.readColumnIndex(column);
assertArrayEquals(
statistics.getMinBytes(), columnIndex.getMinValues().get(0).array());
assertArrayEquals(
statistics.getMaxBytes(), columnIndex.getMaxValues().get(0).array());
assertEquals(
statistics.getNumNulls(), columnIndex.getNullCounts().get(0).longValue());
assertFalse(columnIndex.getNullPages().get(0));
OffsetIndex offsetIndex = reader.readOffsetIndex(column);
assertEquals(1, offsetIndex.getPageCount());
assertEquals(pageSize, offsetIndex.getCompressedPageSize(0));
assertEquals(0, offsetIndex.getFirstRowIndex(0));
assertEquals(pageOffset, offsetIndex.getOffset(0));

reader.close();
// Checking column/offset indexes for the one page
ColumnChunkMetaData column =
footer.getBlocks().get(0).getColumns().get(0);
ColumnIndex columnIndex = reader.readColumnIndex(column);
assertArrayEquals(
statistics.getMinBytes(),
columnIndex.getMinValues().get(0).array());
assertArrayEquals(
statistics.getMaxBytes(),
columnIndex.getMaxValues().get(0).array());
assertEquals(
statistics.getNumNulls(),
columnIndex.getNullCounts().get(0).longValue());
assertFalse(columnIndex.getNullPages().get(0));
OffsetIndex offsetIndex = reader.readOffsetIndex(column);
assertEquals(1, offsetIndex.getPageCount());
assertEquals(pageSize, offsetIndex.getCompressedPageSize(0));
assertEquals(0, offsetIndex.getFirstRowIndex(0));
assertEquals(pageOffset, offsetIndex.getOffset(0));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,22 @@ public void testNoneCacheStrategy() throws IOException {
String file = createTestFile("DocID");

ParquetReadOptions options = ParquetReadOptions.builder().build();
ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options);
IndexCache indexCache = IndexCache.create(fileReader, new HashSet<>(), IndexCache.CacheStrategy.NONE, false);
Assert.assertTrue(indexCache instanceof NoneIndexCache);
List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
for (BlockMetaData blockMetaData : blocks) {
indexCache.setBlockMetadata(blockMetaData);
for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
validateColumnIndex(fileReader.readColumnIndex(chunk), indexCache.getColumnIndex(chunk));
validateOffsetIndex(fileReader.readOffsetIndex(chunk), indexCache.getOffsetIndex(chunk));

Assert.assertEquals(
"BloomFilter should match",
fileReader.readBloomFilter(chunk),
indexCache.getBloomFilter(chunk));
try (ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options)) {
IndexCache indexCache =
IndexCache.create(fileReader, new HashSet<>(), IndexCache.CacheStrategy.NONE, false);
Assert.assertTrue(indexCache instanceof NoneIndexCache);
List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
for (BlockMetaData blockMetaData : blocks) {
indexCache.setBlockMetadata(blockMetaData);
for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
validateColumnIndex(fileReader.readColumnIndex(chunk), indexCache.getColumnIndex(chunk));
validateOffsetIndex(fileReader.readOffsetIndex(chunk), indexCache.getOffsetIndex(chunk));

Assert.assertEquals(
"BloomFilter should match",
fileReader.readBloomFilter(chunk),
indexCache.getBloomFilter(chunk));
}
}
}
}
Expand All @@ -101,21 +103,23 @@ public void testPrefetchCacheStrategy() throws IOException {
String file = createTestFile("DocID", "Name");

ParquetReadOptions options = ParquetReadOptions.builder().build();
ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options);
Set<ColumnPath> columns = new HashSet<>();
columns.add(ColumnPath.fromDotString("DocId"));
columns.add(ColumnPath.fromDotString("Name"));
columns.add(ColumnPath.fromDotString("Gender"));
columns.add(ColumnPath.fromDotString("Links.Backward"));
columns.add(ColumnPath.fromDotString("Links.Forward"));

IndexCache indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, false);
Assert.assertTrue(indexCache instanceof PrefetchIndexCache);
validPrecacheIndexCache(fileReader, indexCache, columns, false);

indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, true);
Assert.assertTrue(indexCache instanceof PrefetchIndexCache);
validPrecacheIndexCache(fileReader, indexCache, columns, true);
try (ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options)) {
Set<ColumnPath> columns = new HashSet<>();
columns.add(ColumnPath.fromDotString("DocId"));
columns.add(ColumnPath.fromDotString("Name"));
columns.add(ColumnPath.fromDotString("Gender"));
columns.add(ColumnPath.fromDotString("Links.Backward"));
columns.add(ColumnPath.fromDotString("Links.Forward"));

IndexCache indexCache =
IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, false);
Assert.assertTrue(indexCache instanceof PrefetchIndexCache);
validPrecacheIndexCache(fileReader, indexCache, columns, false);

indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, true);
Assert.assertTrue(indexCache instanceof PrefetchIndexCache);
validPrecacheIndexCache(fileReader, indexCache, columns, true);
}
}

private String createTestFile(String... bloomFilterEnabledColumns) throws IOException {
Expand Down
Loading