From a494af35c28831b422d1edbe63e7d9c0d8f59cbe Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 13 May 2026 06:41:16 +0200 Subject: [PATCH 1/2] GH-3558: Properly close buffers --- .../bytes/TrackingByteBufferAllocator.java | 41 +++++++++++++------ .../parquet/hadoop/ParquetFileReader.java | 1 + pom.xml | 2 +- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java index d46073551d..b6c5ba7847 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java @@ -20,8 +20,10 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * A wrapper {@link ByteBufferAllocator} implementation that tracks whether all allocated buffers are released. It @@ -49,7 +51,11 @@ private static class Key { private final ByteBuffer buffer; Key(ByteBuffer buffer) { - hashCode = System.identityHashCode(buffer); + if (!buffer.isDirect() && buffer.hasArray()) { + hashCode = System.identityHashCode(buffer.array()); + } else { + hashCode = System.identityHashCode(buffer); + } this.buffer = buffer; } @@ -62,6 +68,9 @@ public boolean equals(Object o) { return false; } Key key = (Key) o; + if (!buffer.isDirect() && buffer.hasArray() && !key.buffer.isDirect() && key.buffer.hasArray()) { + return buffer.array() == key.buffer.array(); + } return this.buffer == key.buffer; } @@ -124,6 +133,7 @@ private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceExcep } private final Map allocated = new HashMap<>(); + private final Set releasedArrays = new HashSet<>(); private final ByteBufferAllocator allocator; private TrackingByteBufferAllocator(ByteBufferAllocator allocator) { @@ -140,12 +150,19 @@ public ByteBuffer allocate(int size) { @Override public void release(ByteBuffer b) throws ReleasingUnallocatedByteBufferException { Objects.requireNonNull(b); - if (allocated.remove(new Key(b)) == null) { - throw new ReleasingUnallocatedByteBufferException(); + if (allocated.remove(new Key(b)) != null) { + allocator.release(b); + if (!b.isDirect() && b.hasArray()) { + releasedArrays.add(b.array()); + } + b.clear(); + return; + } + if (!b.isDirect() && b.hasArray() && releasedArrays.contains(b.array())) { + b.clear(); + return; } - allocator.release(b); - // Clearing the buffer so subsequent access would probably generate errors - b.clear(); + throw new ReleasingUnallocatedByteBufferException(); } @Override @@ -154,12 +171,12 @@ public boolean isDirect() { } @Override - public void close() throws LeakedByteBufferException { - if (!allocated.isEmpty()) { - LeakedByteBufferException ex = new LeakedByteBufferException( - allocated.size(), allocated.values().iterator().next()); - allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd - throw ex; + public void close() { + // Release all remaining buffers through the underlying allocator + // so they are properly freed (e.g. direct memory cleanup). + for (Key key : allocated.keySet()) { + allocator.release(key.buffer); } + allocated.clear(); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e0b0d76e0e..b0ba73055b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -2327,6 +2327,7 @@ public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder b LOG.error(error, e); throw new IOException(error, e); } + builder.addBuffersToRelease(Collections.singletonList(buffer)); ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer); for (ChunkDescriptor descriptor : chunks) { builder.add(descriptor, stream.sliceBuffers(descriptor.size), f); diff --git a/pom.xml b/pom.xml index 1bd9893d87..cdfb56d7d3 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ 2.46.1 shaded.parquet - 3.3.0 + 3.4.0 2.12.0 1.17.0 thrift From d1a7cecd208a1e1b4bc175890ff422aaeb1f5d4f Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 13 May 2026 13:04:51 +0200 Subject: [PATCH 2/2] Track the buffers and close all the resources in tests --- .../bytes/TrackingByteBufferAllocator.java | 41 ++----- .../parquet/hadoop/ParquetFileReader.java | 28 ++++- .../SchemaControlEncryptionTest.java | 27 ++--- .../parquet/encodings/FileEncodingsIT.java | 59 +++++----- .../TestFiltersWithMissingColumns.java | 10 +- .../hadoop/TestColumnChunkPageWriteStore.java | 61 +++++----- .../apache/parquet/hadoop/TestIndexCache.java | 62 +++++----- .../hadoop/TestInteropBloomFilter.java | 106 +++++++++--------- .../parquet/hadoop/TestParquetFileWriter.java | 9 +- .../parquet/hadoop/TestParquetReader.java | 35 +++--- .../parquet/hadoop/TestParquetWriter.java | 45 ++++---- .../hadoop/TestParquetWriterAppendBlocks.java | 39 +++---- .../hadoop/TestParquetWriterNewPage.java | 32 +++--- .../hadoop/util/ColumnEncryptorTest.java | 38 +++---- .../parquet/hadoop/util/ColumnMaskerTest.java | 38 +++---- .../parquet/hadoop/util/ColumnPrunerTest.java | 40 +++---- .../hadoop/util/CompressionConverterTest.java | 22 ++-- 17 files changed, 353 insertions(+), 339 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java index b6c5ba7847..d46073551d 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java @@ -20,10 +20,8 @@ import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Set; /** * A wrapper {@link ByteBufferAllocator} implementation that tracks whether all allocated buffers are released. It @@ -51,11 +49,7 @@ private static class Key { private final ByteBuffer buffer; Key(ByteBuffer buffer) { - if (!buffer.isDirect() && buffer.hasArray()) { - hashCode = System.identityHashCode(buffer.array()); - } else { - hashCode = System.identityHashCode(buffer); - } + hashCode = System.identityHashCode(buffer); this.buffer = buffer; } @@ -68,9 +62,6 @@ public boolean equals(Object o) { return false; } Key key = (Key) o; - if (!buffer.isDirect() && buffer.hasArray() && !key.buffer.isDirect() && key.buffer.hasArray()) { - return buffer.array() == key.buffer.array(); - } return this.buffer == key.buffer; } @@ -133,7 +124,6 @@ private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceExcep } private final Map allocated = new HashMap<>(); - private final Set releasedArrays = new HashSet<>(); private final ByteBufferAllocator allocator; private TrackingByteBufferAllocator(ByteBufferAllocator allocator) { @@ -150,19 +140,12 @@ public ByteBuffer allocate(int size) { @Override public void release(ByteBuffer b) throws ReleasingUnallocatedByteBufferException { Objects.requireNonNull(b); - if (allocated.remove(new Key(b)) != null) { - allocator.release(b); - if (!b.isDirect() && b.hasArray()) { - releasedArrays.add(b.array()); - } - b.clear(); - return; - } - if (!b.isDirect() && b.hasArray() && releasedArrays.contains(b.array())) { - b.clear(); - return; + if (allocated.remove(new Key(b)) == null) { + throw new ReleasingUnallocatedByteBufferException(); } - throw new ReleasingUnallocatedByteBufferException(); + allocator.release(b); + // Clearing the buffer so subsequent access would probably generate errors + b.clear(); } @Override @@ -171,12 +154,12 @@ public boolean isDirect() { } @Override - public void close() { - // Release all remaining buffers through the underlying allocator - // so they are properly freed (e.g. direct memory cleanup). - for (Key key : allocated.keySet()) { - allocator.release(key.buffer); + public void close() throws LeakedByteBufferException { + if (!allocated.isEmpty()) { + LeakedByteBufferException ex = new LeakedByteBufferException( + allocated.size(), allocated.values().iterator().next()); + allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd + throw ex; } - allocated.clear(); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index b0ba73055b..9007529a37 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -65,6 +65,7 @@ import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; @@ -1361,8 +1362,30 @@ private void readVectored(List allParts, ChunkListBuilder b totalSize += len; } LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size()); - // Request a vectored read; - f.readVectored(ranges, options.getAllocator()); + // Request a vectored read; track all buffers allocated during the call so that + // internal buffers (e.g. from ChecksumFileSystem) are also released. + List allocatedBuffers = new ArrayList<>(); + ByteBufferAllocator allocator = options.getAllocator(); + ByteBufferAllocator trackingAllocator = new ByteBufferAllocator() { + @Override + public ByteBuffer allocate(int size) { + ByteBuffer buf = allocator.allocate(size); + allocatedBuffers.add(buf); + return buf; + } + + @Override + public void release(ByteBuffer b) { + allocator.release(b); + } + + @Override + public boolean isDirect() { + return allocator.isDirect(); + } + }; + f.readVectored(ranges, trackingAllocator); + builder.addBuffersToRelease(allocatedBuffers); int k = 0; for (ConsecutivePartList consecutivePart : allParts) { ParquetFileRange currRange = ranges.get(k++); @@ -2327,7 +2350,6 @@ public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder b LOG.error(error, e); throw new IOException(error, e); } - builder.addBuffersToRelease(Collections.singletonList(buffer)); ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer); for (ChunkDescriptor descriptor : chunks) { builder.add(descriptor, stream.sliceBuffers(descriptor.size), f); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java index 733fb9f085..17be565168 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java @@ -167,21 +167,22 @@ private String encryptParquetFile(String file, Configuration conf) throws IOExce } private void decryptParquetFileAndValid(String file, Configuration conf) throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)) .withConf(conf) - .build(); - for (int i = 0; i < numRecord; i++) { - Group group = reader.read(); - assertEquals(testData.get("Name")[i], group.getBinary("Name", 0).toStringUsingUTF8()); - assertEquals(testData.get("Age")[i], group.getLong("Age", 0)); - - Group subGroup = group.getGroup("WebLinks", 0); - assertArrayEquals( - subGroup.getBinary("LinkedIn", 0).getBytes(), ((String) testData.get("LinkedIn")[i]).getBytes()); - assertArrayEquals( - subGroup.getBinary("Twitter", 0).getBytes(), ((String) testData.get("Twitter")[i]).getBytes()); + .build()) { + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + assertEquals(testData.get("Name")[i], group.getBinary("Name", 0).toStringUsingUTF8()); + assertEquals(testData.get("Age")[i], group.getLong("Age", 0)); + + Group subGroup = group.getGroup("WebLinks", 0); + assertArrayEquals( + subGroup.getBinary("LinkedIn", 0).getBytes(), + ((String) testData.get("LinkedIn")[i]).getBytes()); + assertArrayEquals( + subGroup.getBinary("Twitter", 0).getBytes(), ((String) testData.get("Twitter")[i]).getBytes()); + } } - reader.close(); } private static String createTempFile(String prefix) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java index c35b13f8fc..ef5ec3e672 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java @@ -254,7 +254,7 @@ private void writeValuesToFile( SimpleGroupFactory message = new SimpleGroupFactory(schema); GroupWriteSupport.setSchema(schema, configuration); - ParquetWriter writer = ExampleParquetWriter.builder(file) + try (ParquetWriter writer = ExampleParquetWriter.builder(file) .withAllocator(allocator) .withCompressionCodec(compression) .withRowGroupSize(rowGroupSize) @@ -263,36 +263,35 @@ private void writeValuesToFile( .withDictionaryEncoding(enableDictionary) .withWriterVersion(version) .withConf(configuration) - .build(); - - for (Object o : values) { - switch (type) { - case BOOLEAN: - writer.write(message.newGroup().append("field", (Boolean) o)); - break; - case INT32: - writer.write(message.newGroup().append("field", (Integer) o)); - break; - case INT64: - writer.write(message.newGroup().append("field", (Long) o)); - break; - case FLOAT: - writer.write(message.newGroup().append("field", (Float) o)); - break; - case DOUBLE: - writer.write(message.newGroup().append("field", (Double) o)); - break; - case INT96: - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - writer.write(message.newGroup().append("field", (Binary) o)); - break; - default: - throw new IllegalArgumentException("Unknown type name: " + type); + .build()) { + + for (Object o : values) { + switch (type) { + case BOOLEAN: + writer.write(message.newGroup().append("field", (Boolean) o)); + break; + case INT32: + writer.write(message.newGroup().append("field", (Integer) o)); + break; + case INT64: + writer.write(message.newGroup().append("field", (Long) o)); + break; + case FLOAT: + writer.write(message.newGroup().append("field", (Float) o)); + break; + case DOUBLE: + writer.write(message.newGroup().append("field", (Double) o)); + break; + case INT96: + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + writer.write(message.newGroup().append("field", (Binary) o)); + break; + default: + throw new IllegalArgumentException("Unknown type name: " + type); + } } } - - writer.close(); } private List generateRandomValues(PrimitiveTypeName type, int count) { @@ -522,6 +521,8 @@ private static List readBlocksFromFile(Path file) throws IOExcept ParquetMetadata metadata = ParquetFileReader.readFooter(configuration, file, ParquetMetadataConverter.NO_FILTER); + // Not using try-with-resources here because closing the reader releases the codec factory, + // but the returned PageReadStore objects still hold compressed pages that need decompression later. ParquetFileReader fileReader = new ParquetFileReader( configuration, metadata.getFileMetaData(), diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java index d4324a1ee5..f80b8d865e 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java @@ -212,17 +212,13 @@ public void testOrMissingColumnFilter() throws Exception { } public static long countFilteredRecords(Path path, FilterPredicate pred) throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path) - .withFilter(FilterCompat.get(pred)) - .build(); - long count = 0; - try { + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path) + .withFilter(FilterCompat.get(pred)) + .build()) { while (reader.read() != null) { count += 1; } - } finally { - reader.close(); } return count; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index a17cf678f5..6daca6bb32 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -215,36 +215,39 @@ public void test(Configuration config, ByteBufferAllocator allocator) throws Exc { ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER); - ParquetFileReader reader = new ParquetFileReader( - config, footer.getFileMetaData(), file, footer.getBlocks(), schema.getColumns()); - PageReadStore rowGroup = reader.readNextRowGroup(); - PageReader pageReader = rowGroup.getPageReader(col); - DataPageV2 page = (DataPageV2) pageReader.readPage(); - assertEquals(rowCount, page.getRowCount()); - assertEquals(nullCount, page.getNullCount()); - assertEquals(valueCount, page.getValueCount()); - assertEquals(d, intValue(page.getDefinitionLevels())); - assertEquals(r, intValue(page.getRepetitionLevels())); - assertEquals(dataEncoding, page.getDataEncoding()); - assertEquals(v, intValue(page.getData())); + try (ParquetFileReader reader = new ParquetFileReader( + config, footer.getFileMetaData(), file, footer.getBlocks(), schema.getColumns())) { + PageReadStore rowGroup = reader.readNextRowGroup(); + PageReader pageReader = rowGroup.getPageReader(col); + DataPageV2 page = (DataPageV2) pageReader.readPage(); + assertEquals(rowCount, page.getRowCount()); + assertEquals(nullCount, page.getNullCount()); + assertEquals(valueCount, page.getValueCount()); + assertEquals(d, intValue(page.getDefinitionLevels())); + assertEquals(r, intValue(page.getRepetitionLevels())); + assertEquals(dataEncoding, page.getDataEncoding()); + assertEquals(v, intValue(page.getData())); - // Checking column/offset indexes for the one page - ColumnChunkMetaData column = footer.getBlocks().get(0).getColumns().get(0); - ColumnIndex columnIndex = reader.readColumnIndex(column); - assertArrayEquals( - statistics.getMinBytes(), columnIndex.getMinValues().get(0).array()); - assertArrayEquals( - statistics.getMaxBytes(), columnIndex.getMaxValues().get(0).array()); - assertEquals( - statistics.getNumNulls(), columnIndex.getNullCounts().get(0).longValue()); - assertFalse(columnIndex.getNullPages().get(0)); - OffsetIndex offsetIndex = reader.readOffsetIndex(column); - assertEquals(1, offsetIndex.getPageCount()); - assertEquals(pageSize, offsetIndex.getCompressedPageSize(0)); - assertEquals(0, offsetIndex.getFirstRowIndex(0)); - assertEquals(pageOffset, offsetIndex.getOffset(0)); - - reader.close(); + // Checking column/offset indexes for the one page + ColumnChunkMetaData column = + footer.getBlocks().get(0).getColumns().get(0); + ColumnIndex columnIndex = reader.readColumnIndex(column); + assertArrayEquals( + statistics.getMinBytes(), + columnIndex.getMinValues().get(0).array()); + assertArrayEquals( + statistics.getMaxBytes(), + columnIndex.getMaxValues().get(0).array()); + assertEquals( + statistics.getNumNulls(), + columnIndex.getNullCounts().get(0).longValue()); + assertFalse(columnIndex.getNullPages().get(0)); + OffsetIndex offsetIndex = reader.readOffsetIndex(column); + assertEquals(1, offsetIndex.getPageCount()); + assertEquals(pageSize, offsetIndex.getCompressedPageSize(0)); + assertEquals(0, offsetIndex.getFirstRowIndex(0)); + assertEquals(pageOffset, offsetIndex.getOffset(0)); + } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java index 551ae30e62..5c8dcdb020 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java @@ -78,20 +78,22 @@ public void testNoneCacheStrategy() throws IOException { String file = createTestFile("DocID"); ParquetReadOptions options = ParquetReadOptions.builder().build(); - ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options); - IndexCache indexCache = IndexCache.create(fileReader, new HashSet<>(), IndexCache.CacheStrategy.NONE, false); - Assert.assertTrue(indexCache instanceof NoneIndexCache); - List blocks = fileReader.getFooter().getBlocks(); - for (BlockMetaData blockMetaData : blocks) { - indexCache.setBlockMetadata(blockMetaData); - for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { - validateColumnIndex(fileReader.readColumnIndex(chunk), indexCache.getColumnIndex(chunk)); - validateOffsetIndex(fileReader.readOffsetIndex(chunk), indexCache.getOffsetIndex(chunk)); - - Assert.assertEquals( - "BloomFilter should match", - fileReader.readBloomFilter(chunk), - indexCache.getBloomFilter(chunk)); + try (ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options)) { + IndexCache indexCache = + IndexCache.create(fileReader, new HashSet<>(), IndexCache.CacheStrategy.NONE, false); + Assert.assertTrue(indexCache instanceof NoneIndexCache); + List blocks = fileReader.getFooter().getBlocks(); + for (BlockMetaData blockMetaData : blocks) { + indexCache.setBlockMetadata(blockMetaData); + for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { + validateColumnIndex(fileReader.readColumnIndex(chunk), indexCache.getColumnIndex(chunk)); + validateOffsetIndex(fileReader.readOffsetIndex(chunk), indexCache.getOffsetIndex(chunk)); + + Assert.assertEquals( + "BloomFilter should match", + fileReader.readBloomFilter(chunk), + indexCache.getBloomFilter(chunk)); + } } } } @@ -101,21 +103,23 @@ public void testPrefetchCacheStrategy() throws IOException { String file = createTestFile("DocID", "Name"); ParquetReadOptions options = ParquetReadOptions.builder().build(); - ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options); - Set columns = new HashSet<>(); - columns.add(ColumnPath.fromDotString("DocId")); - columns.add(ColumnPath.fromDotString("Name")); - columns.add(ColumnPath.fromDotString("Gender")); - columns.add(ColumnPath.fromDotString("Links.Backward")); - columns.add(ColumnPath.fromDotString("Links.Forward")); - - IndexCache indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, false); - Assert.assertTrue(indexCache instanceof PrefetchIndexCache); - validPrecacheIndexCache(fileReader, indexCache, columns, false); - - indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, true); - Assert.assertTrue(indexCache instanceof PrefetchIndexCache); - validPrecacheIndexCache(fileReader, indexCache, columns, true); + try (ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options)) { + Set columns = new HashSet<>(); + columns.add(ColumnPath.fromDotString("DocId")); + columns.add(ColumnPath.fromDotString("Name")); + columns.add(ColumnPath.fromDotString("Gender")); + columns.add(ColumnPath.fromDotString("Links.Backward")); + columns.add(ColumnPath.fromDotString("Links.Forward")); + + IndexCache indexCache = + IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, false); + Assert.assertTrue(indexCache instanceof PrefetchIndexCache); + validPrecacheIndexCache(fileReader, indexCache, columns, false); + + indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, true); + Assert.assertTrue(indexCache instanceof PrefetchIndexCache); + validPrecacheIndexCache(fileReader, indexCache, columns, true); + } } private String createTestFile(String... bloomFilterEnabledColumns) throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java index 66e86ce2b7..8ee060d22a 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java @@ -97,34 +97,35 @@ public void testReadDataIndexBloomParquetFiles() throws IOException { } } - ParquetFileReader reader = new ParquetFileReader( + try (ParquetFileReader reader = new ParquetFileReader( HadoopInputFile.fromPath(filePath, new Configuration()), - ParquetReadOptions.builder().build()); - List blocks = reader.getRowGroups(); - blocks.forEach(block -> { - try { - assertEquals(14, block.getRowCount()); - ColumnChunkMetaData idMeta = block.getColumns().get(0); - BloomFilter bloomFilter = reader.readBloomFilter(idMeta); - Assert.assertNotNull(bloomFilter); - assertEquals(192, idMeta.getBloomFilterOffset()); - assertEquals(-1, idMeta.getBloomFilterLength()); - for (int i = 0; i < expectedRowCount; ++i) { - assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i])))); + ParquetReadOptions.builder().build())) { + List blocks = reader.getRowGroups(); + blocks.forEach(block -> { + try { + assertEquals(14, block.getRowCount()); + ColumnChunkMetaData idMeta = block.getColumns().get(0); + BloomFilter bloomFilter = reader.readBloomFilter(idMeta); + Assert.assertNotNull(bloomFilter); + assertEquals(192, idMeta.getBloomFilterOffset()); + assertEquals(-1, idMeta.getBloomFilterLength()); + for (int i = 0; i < expectedRowCount; ++i) { + assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i])))); + } + for (int i = 0; i < unexpectedValues.length; ++i) { + assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i])))); + } + assertEquals(152, idMeta.getTotalSize()); + assertEquals(163, idMeta.getTotalUncompressedSize()); + assertEquals(181, idMeta.getOffsetIndexReference().getOffset()); + assertEquals(11, idMeta.getOffsetIndexReference().getLength()); + assertEquals(156, idMeta.getColumnIndexReference().getOffset()); + assertEquals(25, idMeta.getColumnIndexReference().getLength()); + } catch (IOException e) { + fail("Should not throw exception: " + e.getMessage()); } - for (int i = 0; i < unexpectedValues.length; ++i) { - assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i])))); - } - assertEquals(152, idMeta.getTotalSize()); - assertEquals(163, idMeta.getTotalUncompressedSize()); - assertEquals(181, idMeta.getOffsetIndexReference().getOffset()); - assertEquals(11, idMeta.getOffsetIndexReference().getLength()); - assertEquals(156, idMeta.getColumnIndexReference().getOffset()); - assertEquals(25, idMeta.getColumnIndexReference().getLength()); - } catch (IOException e) { - fail("Should not throw exception: " + e.getMessage()); - } - }); + }); + } } @Test @@ -165,34 +166,35 @@ public void testReadDataIndexBloomWithLengthParquetFiles() throws IOException { } } - ParquetFileReader reader = new ParquetFileReader( + try (ParquetFileReader reader = new ParquetFileReader( HadoopInputFile.fromPath(filePath, new Configuration()), - ParquetReadOptions.builder().build()); - List blocks = reader.getRowGroups(); - blocks.forEach(block -> { - try { - assertEquals(14, block.getRowCount()); - ColumnChunkMetaData idMeta = block.getColumns().get(0); - BloomFilter bloomFilter = reader.readBloomFilter(idMeta); - Assert.assertNotNull(bloomFilter); - assertEquals(253, idMeta.getBloomFilterOffset()); - assertEquals(2064, idMeta.getBloomFilterLength()); - for (int i = 0; i < expectedRowCount; ++i) { - assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i])))); + ParquetReadOptions.builder().build())) { + List blocks = reader.getRowGroups(); + blocks.forEach(block -> { + try { + assertEquals(14, block.getRowCount()); + ColumnChunkMetaData idMeta = block.getColumns().get(0); + BloomFilter bloomFilter = reader.readBloomFilter(idMeta); + Assert.assertNotNull(bloomFilter); + assertEquals(253, idMeta.getBloomFilterOffset()); + assertEquals(2064, idMeta.getBloomFilterLength()); + for (int i = 0; i < expectedRowCount; ++i) { + assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i])))); + } + for (int i = 0; i < unexpectedValues.length; ++i) { + assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i])))); + } + assertEquals(199, idMeta.getTotalSize()); + assertEquals(199, idMeta.getTotalUncompressedSize()); + assertEquals(2342, idMeta.getOffsetIndexReference().getOffset()); + assertEquals(11, idMeta.getOffsetIndexReference().getLength()); + assertEquals(2317, idMeta.getColumnIndexReference().getOffset()); + assertEquals(25, idMeta.getColumnIndexReference().getLength()); + } catch (Exception e) { + fail("Should not throw exception: " + e.getMessage()); } - for (int i = 0; i < unexpectedValues.length; ++i) { - assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i])))); - } - assertEquals(199, idMeta.getTotalSize()); - assertEquals(199, idMeta.getTotalUncompressedSize()); - assertEquals(2342, idMeta.getOffsetIndexReference().getOffset()); - assertEquals(11, idMeta.getOffsetIndexReference().getLength()); - assertEquals(2317, idMeta.getColumnIndexReference().getOffset()); - assertEquals(25, idMeta.getColumnIndexReference().getLength()); - } catch (Exception e) { - fail("Should not throw exception: " + e.getMessage()); - } - }); + }); + } } private Path downloadInterOpFiles(Path rootPath, String fileName, OkHttpClient httpClient) throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index ca03ef4db8..d26e909152 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -1040,11 +1040,10 @@ public void testWriteReadStatisticsAllNulls() throws Exception { // close any filesystems to ensure that the FS used by the writer picks up the configuration FileSystem.closeAll(); - ParquetWriter writer = new ParquetWriter(path, configuration, new GroupWriteSupport()); - - Group r1 = new SimpleGroup(schema); - writer.write(r1); - writer.close(); + try (ParquetWriter writer = new ParquetWriter(path, configuration, new GroupWriteSupport())) { + Group r1 = new SimpleGroup(schema); + writer.write(r1); + } ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java index 807e61899a..2768f490c6 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java @@ -182,24 +182,25 @@ public void closeAllocator() { @Test public void testCurrentRowIndex() throws Exception { - ParquetReader reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP, allocator); - // Fetch row index without processing any row. - assertEquals(reader.getCurrentRowIndex(), -1); - reader.read(); - assertEquals(reader.getCurrentRowIndex(), 0); - // calling the same API again and again should return same result. - assertEquals(reader.getCurrentRowIndex(), 0); - - reader.read(); - assertEquals(reader.getCurrentRowIndex(), 1); - assertEquals(reader.getCurrentRowIndex(), 1); - long expectedCurrentRowIndex = 2L; - while (reader.read() != null) { - assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex); - expectedCurrentRowIndex++; + try (ParquetReader reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP, allocator)) { + // Fetch row index without processing any row. + assertEquals(reader.getCurrentRowIndex(), -1); + reader.read(); + assertEquals(reader.getCurrentRowIndex(), 0); + // calling the same API again and again should return same result. + assertEquals(reader.getCurrentRowIndex(), 0); + + reader.read(); + assertEquals(reader.getCurrentRowIndex(), 1); + assertEquals(reader.getCurrentRowIndex(), 1); + long expectedCurrentRowIndex = 2L; + while (reader.read() != null) { + assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex); + expectedCurrentRowIndex++; + } + // reader.read() returned null and so reader doesn't have any more rows. + assertEquals(reader.getCurrentRowIndex(), -1); } - // reader.read() returned null and so reader doesn't have any more rows. - assertEquals(reader.getCurrentRowIndex(), -1); } @Test diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index a7888b58d8..7d71233347 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -192,23 +192,23 @@ public void test() throws Exception { .append("int96_field", Binary.fromConstantByteArray(new byte[12]))); } writer.close(); - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) .withConf(conf) - .build(); - for (int i = 0; i < 1000; i++) { - Group group = reader.read(); - assertEquals( - "test" + (i % modulo), - group.getBinary("binary_field", 0).toStringUsingUTF8()); - assertEquals(32, group.getInteger("int32_field", 0)); - assertEquals(64l, group.getLong("int64_field", 0)); - assertEquals(true, group.getBoolean("boolean_field", 0)); - assertEquals(1.0f, group.getFloat("float_field", 0), 0.001); - assertEquals(2.0d, group.getDouble("double_field", 0), 0.001); - assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8()); - assertEquals(Binary.fromConstantByteArray(new byte[12]), group.getInt96("int96_field", 0)); + .build()) { + for (int i = 0; i < 1000; i++) { + Group group = reader.read(); + assertEquals( + "test" + (i % modulo), + group.getBinary("binary_field", 0).toStringUsingUTF8()); + assertEquals(32, group.getInteger("int32_field", 0)); + assertEquals(64l, group.getLong("int64_field", 0)); + assertEquals(true, group.getBoolean("boolean_field", 0)); + assertEquals(1.0f, group.getFloat("float_field", 0), 0.001); + assertEquals(2.0d, group.getDouble("double_field", 0), 0.001); + assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8()); + assertEquals(Binary.fromConstantByteArray(new byte[12]), group.getInt96("int96_field", 0)); + } } - reader.close(); ParquetMetadata footer = readFooter(conf, file, NO_FILTER); for (BlockMetaData blockMetaData : footer.getBlocks()) { for (ColumnChunkMetaData column : blockMetaData.getColumns()) { @@ -789,13 +789,14 @@ public void testParquetWriterConfiguringOutputFile() throws IOException { writer.write(factory.newGroup().append("name", testName)); } } - ParquetReader reader = - ParquetReader.builder(new GroupReadSupport(), path).build(); - assertEquals("new", reader.read().getBinary("name", 0).toStringUsingUTF8()); - assertEquals("writer", reader.read().getBinary("name", 0).toStringUsingUTF8()); - assertEquals("builder", reader.read().getBinary("name", 0).toStringUsingUTF8()); - assertEquals("without", reader.read().getBinary("name", 0).toStringUsingUTF8()); - assertEquals("file", reader.read().getBinary("name", 0).toStringUsingUTF8()); + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { + assertEquals("new", reader.read().getBinary("name", 0).toStringUsingUTF8()); + assertEquals("writer", reader.read().getBinary("name", 0).toStringUsingUTF8()); + assertEquals("builder", reader.read().getBinary("name", 0).toStringUsingUTF8()); + assertEquals("without", reader.read().getBinary("name", 0).toStringUsingUTF8()); + assertEquals("file", reader.read().getBinary("name", 0).toStringUsingUTF8()); + } } @Test diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java index 8d456f9735..b6502cc1fb 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java @@ -271,14 +271,14 @@ public void testAllowDroppingColumns() throws IOException { rowGroup.getColumns().size()); } - ParquetReader reader = - ParquetReader.builder(new GroupReadSupport(), droppedColumnFile).build(); - - Group next; - while ((next = reader.read()) != null) { - Group expectedNext = expected.removeFirst(); - Assert.assertEquals( - "Each string should match", expectedNext.getString("string", 0), next.getString("string", 0)); + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), droppedColumnFile).build()) { + Group next; + while ((next = reader.read()) != null) { + Group expectedNext = expected.removeFirst(); + Assert.assertEquals( + "Each string should match", expectedNext.getString("string", 0), next.getString("string", 0)); + } } Assert.assertEquals("All records should be present", 0, expected.size()); @@ -290,17 +290,18 @@ public void testFailDroppingColumns() throws IOException { Types.buildMessage().required(BINARY).as(UTF8).named("string").named("AppendTest"); final ParquetMetadata footer = ParquetFileReader.readFooter(CONF, file1, NO_FILTER); - final FSDataInputStream incoming = file1.getFileSystem(CONF).open(file1); - - Path droppedColumnFile = newTemp(); - final ParquetFileWriter writer = new ParquetFileWriter(CONF, droppedColumnSchema, droppedColumnFile); - writer.start(); - - TestUtils.assertThrows( - "Should complain that id column is dropped", IllegalArgumentException.class, (Callable) () -> { - writer.appendRowGroups(incoming, footer.getBlocks(), false); - return null; - }); + try (final FSDataInputStream incoming = file1.getFileSystem(CONF).open(file1)) { + Path droppedColumnFile = newTemp(); + final ParquetFileWriter writer = new ParquetFileWriter(CONF, droppedColumnSchema, droppedColumnFile); + writer.start(); + + TestUtils.assertThrows( + "Should complain that id column is dropped", IllegalArgumentException.class, (Callable) + () -> { + writer.appendRowGroups(incoming, footer.getBlocks(), false); + return null; + }); + } } @Test diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java index f05d59f272..f8e88fe70f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java @@ -97,24 +97,24 @@ public void test() throws Exception { } writer.close(); - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) .withConf(conf) - .build(); - for (int i = 0; i < 1000; i++) { - Group group = reader.read(); - assertEquals( - "test" + (i % modulo), - group.getBinary("binary_field", 0).toStringUsingUTF8()); - assertEquals(32, group.getInteger("int32_field", 0)); - assertEquals(64l, group.getLong("int64_field", 0)); - assertEquals(true, group.getBoolean("boolean_field", 0)); - assertEquals(1.0f, group.getFloat("float_field", 0), 0.001); - assertEquals(2.0d, group.getDouble("double_field", 0), 0.001); - assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8()); - assertEquals(Binary.fromConstantByteArray(new byte[12]), group.getInt96("int96_field", 0)); - assertEquals(0, group.getFieldRepetitionCount("null_field")); + .build()) { + for (int i = 0; i < 1000; i++) { + Group group = reader.read(); + assertEquals( + "test" + (i % modulo), + group.getBinary("binary_field", 0).toStringUsingUTF8()); + assertEquals(32, group.getInteger("int32_field", 0)); + assertEquals(64l, group.getLong("int64_field", 0)); + assertEquals(true, group.getBoolean("boolean_field", 0)); + assertEquals(1.0f, group.getFloat("float_field", 0), 0.001); + assertEquals(2.0d, group.getDouble("double_field", 0), 0.001); + assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8()); + assertEquals(Binary.fromConstantByteArray(new byte[12]), group.getInt96("int96_field", 0)); + assertEquals(0, group.getFieldRepetitionCount("null_field")); + } } - reader.close(); ParquetMetadata footer = readFooter(conf, file, NO_FILTER); for (BlockMetaData blockMetaData : footer.getBlocks()) { for (ColumnChunkMetaData column : blockMetaData.getColumns()) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java index 0f43ff5991..cf86e8fe0b 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java @@ -205,26 +205,26 @@ public void testDifferentCompression() throws IOException { } private void verifyResultDecryptionWithValidKey() throws IOException { - ParquetReader reader = createReader(outputFile); - for (int i = 0; i < numRecord; i++) { - Group group = reader.read(); - assertTrue(group.getLong("DocId", 0) == inputFile.getFileContent()[i].getLong("DocId", 0)); - assertArrayEquals( - group.getBinary("Name", 0).getBytes(), - inputFile.getFileContent()[i].getString("Name", 0).getBytes(StandardCharsets.UTF_8)); - assertArrayEquals( - group.getBinary("Gender", 0).getBytes(), - inputFile.getFileContent()[i].getString("Gender", 0).getBytes(StandardCharsets.UTF_8)); - Group subGroupInRead = group.getGroup("Links", 0); - Group expectedSubGroup = inputFile.getFileContent()[i].getGroup("Links", 0); - assertArrayEquals( - subGroupInRead.getBinary("Forward", 0).getBytes(), - expectedSubGroup.getBinary("Forward", 0).getBytes()); - assertArrayEquals( - subGroupInRead.getBinary("Backward", 0).getBytes(), - expectedSubGroup.getBinary("Backward", 0).getBytes()); + try (ParquetReader reader = createReader(outputFile)) { + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + assertTrue(group.getLong("DocId", 0) == inputFile.getFileContent()[i].getLong("DocId", 0)); + assertArrayEquals( + group.getBinary("Name", 0).getBytes(), + inputFile.getFileContent()[i].getString("Name", 0).getBytes(StandardCharsets.UTF_8)); + assertArrayEquals( + group.getBinary("Gender", 0).getBytes(), + inputFile.getFileContent()[i].getString("Gender", 0).getBytes(StandardCharsets.UTF_8)); + Group subGroupInRead = group.getGroup("Links", 0); + Group expectedSubGroup = inputFile.getFileContent()[i].getGroup("Links", 0); + assertArrayEquals( + subGroupInRead.getBinary("Forward", 0).getBytes(), + expectedSubGroup.getBinary("Forward", 0).getBytes()); + assertArrayEquals( + subGroupInRead.getBinary("Backward", 0).getBytes(), + expectedSubGroup.getBinary("Backward", 0).getBytes()); + } } - reader.close(); } private void verifyOffsetIndexes() throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java index c5772fb306..ccbe87d45c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java @@ -85,37 +85,37 @@ public void testSetup() throws Exception { @Test(expected = RuntimeException.class) public void testNullColumns() throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) .withConf(conf) - .build(); - Group group = reader.read(); - group.getLong("DocId", 0); - reader.close(); + .build()) { + Group group = reader.read(); + group.getLong("DocId", 0); + } } @Test(expected = RuntimeException.class) public void testNullNestedColumns() throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) .withConf(conf) - .build(); - Group group = reader.read(); - Group subGroup = group.getGroup("Links", 0); - subGroup.getBinary("Backward", 0).getBytes(); - reader.close(); + .build()) { + Group group = reader.read(); + Group subGroup = group.getGroup("Links", 0); + subGroup.getBinary("Backward", 0).getBytes(); + } } @Test public void validateNonNuLLColumns() throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) .withConf(conf) - .build(); - for (int i = 0; i < numRecord; i++) { - Group group = reader.read(); - assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes()); - Group subGroup = group.getGroup("Links", 0); - assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes()); + .build()) { + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes()); + Group subGroup = group.getGroup("Links", 0); + assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes()); + } } - reader.close(); } private void nullifyColumns(Configuration conf, String inputFile, String outputFile) throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java index 0582cabbb3..f5c95bd0be 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java @@ -184,31 +184,31 @@ public void testNotExistsNestedColumn() throws Exception { } private void validateColumns(String inputFile, List prunePaths) throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(inputFile)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(inputFile)) .withConf(conf) - .build(); - for (int i = 0; i < numRecord; i++) { - Group group = reader.read(); - if (!prunePaths.contains("DocId")) { - assertEquals(1l, group.getLong("DocId", 0)); - } - if (!prunePaths.contains("Name")) { - assertEquals("foo", group.getBinary("Name", 0).toStringUsingUTF8()); - } - if (!prunePaths.contains("Gender")) { - assertEquals("male", group.getBinary("Gender", 0).toStringUsingUTF8()); - } - if (!prunePaths.contains("Links")) { - Group subGroup = group.getGroup("Links", 0); - if (!prunePaths.contains("Links.Backward")) { - assertEquals(2l, subGroup.getLong("Backward", 0)); + .build()) { + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + if (!prunePaths.contains("DocId")) { + assertEquals(1l, group.getLong("DocId", 0)); + } + if (!prunePaths.contains("Name")) { + assertEquals("foo", group.getBinary("Name", 0).toStringUsingUTF8()); + } + if (!prunePaths.contains("Gender")) { + assertEquals("male", group.getBinary("Gender", 0).toStringUsingUTF8()); } - if (!prunePaths.contains("Links.Forward")) { - assertEquals(3l, subGroup.getLong("Forward", 0)); + if (!prunePaths.contains("Links")) { + Group subGroup = group.getGroup("Links", 0); + if (!prunePaths.contains("Links.Backward")) { + assertEquals(2l, subGroup.getLong("Backward", 0)); + } + if (!prunePaths.contains("Links.Forward")) { + assertEquals(3l, subGroup.getLong("Forward", 0)); + } } } } - reader.close(); } private String createParquetFile(String prefix) throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java index a1decb239c..7fb594bda9 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java @@ -135,19 +135,19 @@ private void convertCompression(Configuration conf, String inputFile, String out } private void validateColumns(String file, int numRecord, TestDocs testDocs) throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)) .withConf(conf) - .build(); - for (int i = 0; i < numRecord; i++) { - Group group = reader.read(); - assertTrue(group.getLong("DocId", 0) == testDocs.docId[i]); - assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes()); - assertArrayEquals(group.getBinary("Gender", 0).getBytes(), testDocs.gender[i].getBytes()); - Group subGroup = group.getGroup("Links", 0); - assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(), testDocs.linkBackward[i].getBytes()); - assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes()); + .build()) { + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + assertTrue(group.getLong("DocId", 0) == testDocs.docId[i]); + assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes()); + assertArrayEquals(group.getBinary("Gender", 0).getBytes(), testDocs.gender[i].getBytes()); + Group subGroup = group.getGroup("Links", 0); + assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(), testDocs.linkBackward[i].getBytes()); + assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes()); + } } - reader.close(); } private void validMeta(String inputFile, String outFile) throws Exception {