diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e0b0d76e0e..9007529a37 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -65,6 +65,7 @@ import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; @@ -1361,8 +1362,30 @@ private void readVectored(List allParts, ChunkListBuilder b totalSize += len; } LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size()); - // Request a vectored read; - f.readVectored(ranges, options.getAllocator()); + // Request a vectored read; track all buffers allocated during the call so that + // internal buffers (e.g. from ChecksumFileSystem) are also released. + List allocatedBuffers = new ArrayList<>(); + ByteBufferAllocator allocator = options.getAllocator(); + ByteBufferAllocator trackingAllocator = new ByteBufferAllocator() { + @Override + public ByteBuffer allocate(int size) { + ByteBuffer buf = allocator.allocate(size); + allocatedBuffers.add(buf); + return buf; + } + + @Override + public void release(ByteBuffer b) { + allocator.release(b); + } + + @Override + public boolean isDirect() { + return allocator.isDirect(); + } + }; + f.readVectored(ranges, trackingAllocator); + builder.addBuffersToRelease(allocatedBuffers); int k = 0; for (ConsecutivePartList consecutivePart : allParts) { ParquetFileRange currRange = ranges.get(k++); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java index 733fb9f085..17be565168 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java @@ -167,21 +167,22 @@ private String encryptParquetFile(String file, Configuration conf) throws IOExce } private void decryptParquetFileAndValid(String file, Configuration conf) throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)) .withConf(conf) - .build(); - for (int i = 0; i < numRecord; i++) { - Group group = reader.read(); - assertEquals(testData.get("Name")[i], group.getBinary("Name", 0).toStringUsingUTF8()); - assertEquals(testData.get("Age")[i], group.getLong("Age", 0)); - - Group subGroup = group.getGroup("WebLinks", 0); - assertArrayEquals( - subGroup.getBinary("LinkedIn", 0).getBytes(), ((String) testData.get("LinkedIn")[i]).getBytes()); - assertArrayEquals( - subGroup.getBinary("Twitter", 0).getBytes(), ((String) testData.get("Twitter")[i]).getBytes()); + .build()) { + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + assertEquals(testData.get("Name")[i], group.getBinary("Name", 0).toStringUsingUTF8()); + assertEquals(testData.get("Age")[i], group.getLong("Age", 0)); + + Group subGroup = group.getGroup("WebLinks", 0); + assertArrayEquals( + subGroup.getBinary("LinkedIn", 0).getBytes(), + ((String) testData.get("LinkedIn")[i]).getBytes()); + assertArrayEquals( + subGroup.getBinary("Twitter", 0).getBytes(), ((String) testData.get("Twitter")[i]).getBytes()); + } } - reader.close(); } private static String createTempFile(String prefix) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java index c35b13f8fc..ef5ec3e672 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java @@ -254,7 +254,7 @@ private void writeValuesToFile( SimpleGroupFactory message = new SimpleGroupFactory(schema); GroupWriteSupport.setSchema(schema, configuration); - ParquetWriter writer = ExampleParquetWriter.builder(file) + try (ParquetWriter writer = ExampleParquetWriter.builder(file) .withAllocator(allocator) .withCompressionCodec(compression) .withRowGroupSize(rowGroupSize) @@ -263,36 +263,35 @@ private void writeValuesToFile( .withDictionaryEncoding(enableDictionary) .withWriterVersion(version) .withConf(configuration) - .build(); - - for (Object o : values) { - switch (type) { - case BOOLEAN: - writer.write(message.newGroup().append("field", (Boolean) o)); - break; - case INT32: - writer.write(message.newGroup().append("field", (Integer) o)); - break; - case INT64: - writer.write(message.newGroup().append("field", (Long) o)); - break; - case FLOAT: - writer.write(message.newGroup().append("field", (Float) o)); - break; - case DOUBLE: - writer.write(message.newGroup().append("field", (Double) o)); - break; - case INT96: - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - writer.write(message.newGroup().append("field", (Binary) o)); - break; - default: - throw new IllegalArgumentException("Unknown type name: " + type); + .build()) { + + for (Object o : values) { + switch (type) { + case BOOLEAN: + writer.write(message.newGroup().append("field", (Boolean) o)); + break; + case INT32: + writer.write(message.newGroup().append("field", (Integer) o)); + break; + case INT64: + writer.write(message.newGroup().append("field", (Long) o)); + break; + case FLOAT: + writer.write(message.newGroup().append("field", (Float) o)); + break; + case DOUBLE: + writer.write(message.newGroup().append("field", (Double) o)); + break; + case INT96: + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + writer.write(message.newGroup().append("field", (Binary) o)); + break; + default: + throw new IllegalArgumentException("Unknown type name: " + type); + } } } - - writer.close(); } private List generateRandomValues(PrimitiveTypeName type, int count) { @@ -522,6 +521,8 @@ private static List readBlocksFromFile(Path file) throws IOExcept ParquetMetadata metadata = ParquetFileReader.readFooter(configuration, file, ParquetMetadataConverter.NO_FILTER); + // Not using try-with-resources here because closing the reader releases the codec factory, + // but the returned PageReadStore objects still hold compressed pages that need decompression later. ParquetFileReader fileReader = new ParquetFileReader( configuration, metadata.getFileMetaData(), diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java index d4324a1ee5..f80b8d865e 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java @@ -212,17 +212,13 @@ public void testOrMissingColumnFilter() throws Exception { } public static long countFilteredRecords(Path path, FilterPredicate pred) throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path) - .withFilter(FilterCompat.get(pred)) - .build(); - long count = 0; - try { + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path) + .withFilter(FilterCompat.get(pred)) + .build()) { while (reader.read() != null) { count += 1; } - } finally { - reader.close(); } return count; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index a17cf678f5..6daca6bb32 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -215,36 +215,39 @@ public void test(Configuration config, ByteBufferAllocator allocator) throws Exc { ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER); - ParquetFileReader reader = new ParquetFileReader( - config, footer.getFileMetaData(), file, footer.getBlocks(), schema.getColumns()); - PageReadStore rowGroup = reader.readNextRowGroup(); - PageReader pageReader = rowGroup.getPageReader(col); - DataPageV2 page = (DataPageV2) pageReader.readPage(); - assertEquals(rowCount, page.getRowCount()); - assertEquals(nullCount, page.getNullCount()); - assertEquals(valueCount, page.getValueCount()); - assertEquals(d, intValue(page.getDefinitionLevels())); - assertEquals(r, intValue(page.getRepetitionLevels())); - assertEquals(dataEncoding, page.getDataEncoding()); - assertEquals(v, intValue(page.getData())); + try (ParquetFileReader reader = new ParquetFileReader( + config, footer.getFileMetaData(), file, footer.getBlocks(), schema.getColumns())) { + PageReadStore rowGroup = reader.readNextRowGroup(); + PageReader pageReader = rowGroup.getPageReader(col); + DataPageV2 page = (DataPageV2) pageReader.readPage(); + assertEquals(rowCount, page.getRowCount()); + assertEquals(nullCount, page.getNullCount()); + assertEquals(valueCount, page.getValueCount()); + assertEquals(d, intValue(page.getDefinitionLevels())); + assertEquals(r, intValue(page.getRepetitionLevels())); + assertEquals(dataEncoding, page.getDataEncoding()); + assertEquals(v, intValue(page.getData())); - // Checking column/offset indexes for the one page - ColumnChunkMetaData column = footer.getBlocks().get(0).getColumns().get(0); - ColumnIndex columnIndex = reader.readColumnIndex(column); - assertArrayEquals( - statistics.getMinBytes(), columnIndex.getMinValues().get(0).array()); - assertArrayEquals( - statistics.getMaxBytes(), columnIndex.getMaxValues().get(0).array()); - assertEquals( - statistics.getNumNulls(), columnIndex.getNullCounts().get(0).longValue()); - assertFalse(columnIndex.getNullPages().get(0)); - OffsetIndex offsetIndex = reader.readOffsetIndex(column); - assertEquals(1, offsetIndex.getPageCount()); - assertEquals(pageSize, offsetIndex.getCompressedPageSize(0)); - assertEquals(0, offsetIndex.getFirstRowIndex(0)); - assertEquals(pageOffset, offsetIndex.getOffset(0)); - - reader.close(); + // Checking column/offset indexes for the one page + ColumnChunkMetaData column = + footer.getBlocks().get(0).getColumns().get(0); + ColumnIndex columnIndex = reader.readColumnIndex(column); + assertArrayEquals( + statistics.getMinBytes(), + columnIndex.getMinValues().get(0).array()); + assertArrayEquals( + statistics.getMaxBytes(), + columnIndex.getMaxValues().get(0).array()); + assertEquals( + statistics.getNumNulls(), + columnIndex.getNullCounts().get(0).longValue()); + assertFalse(columnIndex.getNullPages().get(0)); + OffsetIndex offsetIndex = reader.readOffsetIndex(column); + assertEquals(1, offsetIndex.getPageCount()); + assertEquals(pageSize, offsetIndex.getCompressedPageSize(0)); + assertEquals(0, offsetIndex.getFirstRowIndex(0)); + assertEquals(pageOffset, offsetIndex.getOffset(0)); + } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java index 551ae30e62..5c8dcdb020 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java @@ -78,20 +78,22 @@ public void testNoneCacheStrategy() throws IOException { String file = createTestFile("DocID"); ParquetReadOptions options = ParquetReadOptions.builder().build(); - ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options); - IndexCache indexCache = IndexCache.create(fileReader, new HashSet<>(), IndexCache.CacheStrategy.NONE, false); - Assert.assertTrue(indexCache instanceof NoneIndexCache); - List blocks = fileReader.getFooter().getBlocks(); - for (BlockMetaData blockMetaData : blocks) { - indexCache.setBlockMetadata(blockMetaData); - for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { - validateColumnIndex(fileReader.readColumnIndex(chunk), indexCache.getColumnIndex(chunk)); - validateOffsetIndex(fileReader.readOffsetIndex(chunk), indexCache.getOffsetIndex(chunk)); - - Assert.assertEquals( - "BloomFilter should match", - fileReader.readBloomFilter(chunk), - indexCache.getBloomFilter(chunk)); + try (ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options)) { + IndexCache indexCache = + IndexCache.create(fileReader, new HashSet<>(), IndexCache.CacheStrategy.NONE, false); + Assert.assertTrue(indexCache instanceof NoneIndexCache); + List blocks = fileReader.getFooter().getBlocks(); + for (BlockMetaData blockMetaData : blocks) { + indexCache.setBlockMetadata(blockMetaData); + for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { + validateColumnIndex(fileReader.readColumnIndex(chunk), indexCache.getColumnIndex(chunk)); + validateOffsetIndex(fileReader.readOffsetIndex(chunk), indexCache.getOffsetIndex(chunk)); + + Assert.assertEquals( + "BloomFilter should match", + fileReader.readBloomFilter(chunk), + indexCache.getBloomFilter(chunk)); + } } } } @@ -101,21 +103,23 @@ public void testPrefetchCacheStrategy() throws IOException { String file = createTestFile("DocID", "Name"); ParquetReadOptions options = ParquetReadOptions.builder().build(); - ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options); - Set columns = new HashSet<>(); - columns.add(ColumnPath.fromDotString("DocId")); - columns.add(ColumnPath.fromDotString("Name")); - columns.add(ColumnPath.fromDotString("Gender")); - columns.add(ColumnPath.fromDotString("Links.Backward")); - columns.add(ColumnPath.fromDotString("Links.Forward")); - - IndexCache indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, false); - Assert.assertTrue(indexCache instanceof PrefetchIndexCache); - validPrecacheIndexCache(fileReader, indexCache, columns, false); - - indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, true); - Assert.assertTrue(indexCache instanceof PrefetchIndexCache); - validPrecacheIndexCache(fileReader, indexCache, columns, true); + try (ParquetFileReader fileReader = new ParquetFileReader(new LocalInputFile(Paths.get(file)), options)) { + Set columns = new HashSet<>(); + columns.add(ColumnPath.fromDotString("DocId")); + columns.add(ColumnPath.fromDotString("Name")); + columns.add(ColumnPath.fromDotString("Gender")); + columns.add(ColumnPath.fromDotString("Links.Backward")); + columns.add(ColumnPath.fromDotString("Links.Forward")); + + IndexCache indexCache = + IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, false); + Assert.assertTrue(indexCache instanceof PrefetchIndexCache); + validPrecacheIndexCache(fileReader, indexCache, columns, false); + + indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, true); + Assert.assertTrue(indexCache instanceof PrefetchIndexCache); + validPrecacheIndexCache(fileReader, indexCache, columns, true); + } } private String createTestFile(String... bloomFilterEnabledColumns) throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java index 66e86ce2b7..8ee060d22a 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java @@ -97,34 +97,35 @@ public void testReadDataIndexBloomParquetFiles() throws IOException { } } - ParquetFileReader reader = new ParquetFileReader( + try (ParquetFileReader reader = new ParquetFileReader( HadoopInputFile.fromPath(filePath, new Configuration()), - ParquetReadOptions.builder().build()); - List blocks = reader.getRowGroups(); - blocks.forEach(block -> { - try { - assertEquals(14, block.getRowCount()); - ColumnChunkMetaData idMeta = block.getColumns().get(0); - BloomFilter bloomFilter = reader.readBloomFilter(idMeta); - Assert.assertNotNull(bloomFilter); - assertEquals(192, idMeta.getBloomFilterOffset()); - assertEquals(-1, idMeta.getBloomFilterLength()); - for (int i = 0; i < expectedRowCount; ++i) { - assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i])))); + ParquetReadOptions.builder().build())) { + List blocks = reader.getRowGroups(); + blocks.forEach(block -> { + try { + assertEquals(14, block.getRowCount()); + ColumnChunkMetaData idMeta = block.getColumns().get(0); + BloomFilter bloomFilter = reader.readBloomFilter(idMeta); + Assert.assertNotNull(bloomFilter); + assertEquals(192, idMeta.getBloomFilterOffset()); + assertEquals(-1, idMeta.getBloomFilterLength()); + for (int i = 0; i < expectedRowCount; ++i) { + assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i])))); + } + for (int i = 0; i < unexpectedValues.length; ++i) { + assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i])))); + } + assertEquals(152, idMeta.getTotalSize()); + assertEquals(163, idMeta.getTotalUncompressedSize()); + assertEquals(181, idMeta.getOffsetIndexReference().getOffset()); + assertEquals(11, idMeta.getOffsetIndexReference().getLength()); + assertEquals(156, idMeta.getColumnIndexReference().getOffset()); + assertEquals(25, idMeta.getColumnIndexReference().getLength()); + } catch (IOException e) { + fail("Should not throw exception: " + e.getMessage()); } - for (int i = 0; i < unexpectedValues.length; ++i) { - assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i])))); - } - assertEquals(152, idMeta.getTotalSize()); - assertEquals(163, idMeta.getTotalUncompressedSize()); - assertEquals(181, idMeta.getOffsetIndexReference().getOffset()); - assertEquals(11, idMeta.getOffsetIndexReference().getLength()); - assertEquals(156, idMeta.getColumnIndexReference().getOffset()); - assertEquals(25, idMeta.getColumnIndexReference().getLength()); - } catch (IOException e) { - fail("Should not throw exception: " + e.getMessage()); - } - }); + }); + } } @Test @@ -165,34 +166,35 @@ public void testReadDataIndexBloomWithLengthParquetFiles() throws IOException { } } - ParquetFileReader reader = new ParquetFileReader( + try (ParquetFileReader reader = new ParquetFileReader( HadoopInputFile.fromPath(filePath, new Configuration()), - ParquetReadOptions.builder().build()); - List blocks = reader.getRowGroups(); - blocks.forEach(block -> { - try { - assertEquals(14, block.getRowCount()); - ColumnChunkMetaData idMeta = block.getColumns().get(0); - BloomFilter bloomFilter = reader.readBloomFilter(idMeta); - Assert.assertNotNull(bloomFilter); - assertEquals(253, idMeta.getBloomFilterOffset()); - assertEquals(2064, idMeta.getBloomFilterLength()); - for (int i = 0; i < expectedRowCount; ++i) { - assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i])))); + ParquetReadOptions.builder().build())) { + List blocks = reader.getRowGroups(); + blocks.forEach(block -> { + try { + assertEquals(14, block.getRowCount()); + ColumnChunkMetaData idMeta = block.getColumns().get(0); + BloomFilter bloomFilter = reader.readBloomFilter(idMeta); + Assert.assertNotNull(bloomFilter); + assertEquals(253, idMeta.getBloomFilterOffset()); + assertEquals(2064, idMeta.getBloomFilterLength()); + for (int i = 0; i < expectedRowCount; ++i) { + assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i])))); + } + for (int i = 0; i < unexpectedValues.length; ++i) { + assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i])))); + } + assertEquals(199, idMeta.getTotalSize()); + assertEquals(199, idMeta.getTotalUncompressedSize()); + assertEquals(2342, idMeta.getOffsetIndexReference().getOffset()); + assertEquals(11, idMeta.getOffsetIndexReference().getLength()); + assertEquals(2317, idMeta.getColumnIndexReference().getOffset()); + assertEquals(25, idMeta.getColumnIndexReference().getLength()); + } catch (Exception e) { + fail("Should not throw exception: " + e.getMessage()); } - for (int i = 0; i < unexpectedValues.length; ++i) { - assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i])))); - } - assertEquals(199, idMeta.getTotalSize()); - assertEquals(199, idMeta.getTotalUncompressedSize()); - assertEquals(2342, idMeta.getOffsetIndexReference().getOffset()); - assertEquals(11, idMeta.getOffsetIndexReference().getLength()); - assertEquals(2317, idMeta.getColumnIndexReference().getOffset()); - assertEquals(25, idMeta.getColumnIndexReference().getLength()); - } catch (Exception e) { - fail("Should not throw exception: " + e.getMessage()); - } - }); + }); + } } private Path downloadInterOpFiles(Path rootPath, String fileName, OkHttpClient httpClient) throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index ca03ef4db8..d26e909152 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -1040,11 +1040,10 @@ public void testWriteReadStatisticsAllNulls() throws Exception { // close any filesystems to ensure that the FS used by the writer picks up the configuration FileSystem.closeAll(); - ParquetWriter writer = new ParquetWriter(path, configuration, new GroupWriteSupport()); - - Group r1 = new SimpleGroup(schema); - writer.write(r1); - writer.close(); + try (ParquetWriter writer = new ParquetWriter(path, configuration, new GroupWriteSupport())) { + Group r1 = new SimpleGroup(schema); + writer.write(r1); + } ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java index 807e61899a..2768f490c6 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java @@ -182,24 +182,25 @@ public void closeAllocator() { @Test public void testCurrentRowIndex() throws Exception { - ParquetReader reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP, allocator); - // Fetch row index without processing any row. - assertEquals(reader.getCurrentRowIndex(), -1); - reader.read(); - assertEquals(reader.getCurrentRowIndex(), 0); - // calling the same API again and again should return same result. - assertEquals(reader.getCurrentRowIndex(), 0); - - reader.read(); - assertEquals(reader.getCurrentRowIndex(), 1); - assertEquals(reader.getCurrentRowIndex(), 1); - long expectedCurrentRowIndex = 2L; - while (reader.read() != null) { - assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex); - expectedCurrentRowIndex++; + try (ParquetReader reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP, allocator)) { + // Fetch row index without processing any row. + assertEquals(reader.getCurrentRowIndex(), -1); + reader.read(); + assertEquals(reader.getCurrentRowIndex(), 0); + // calling the same API again and again should return same result. + assertEquals(reader.getCurrentRowIndex(), 0); + + reader.read(); + assertEquals(reader.getCurrentRowIndex(), 1); + assertEquals(reader.getCurrentRowIndex(), 1); + long expectedCurrentRowIndex = 2L; + while (reader.read() != null) { + assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex); + expectedCurrentRowIndex++; + } + // reader.read() returned null and so reader doesn't have any more rows. + assertEquals(reader.getCurrentRowIndex(), -1); } - // reader.read() returned null and so reader doesn't have any more rows. - assertEquals(reader.getCurrentRowIndex(), -1); } @Test diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index a7888b58d8..7d71233347 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -192,23 +192,23 @@ public void test() throws Exception { .append("int96_field", Binary.fromConstantByteArray(new byte[12]))); } writer.close(); - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) .withConf(conf) - .build(); - for (int i = 0; i < 1000; i++) { - Group group = reader.read(); - assertEquals( - "test" + (i % modulo), - group.getBinary("binary_field", 0).toStringUsingUTF8()); - assertEquals(32, group.getInteger("int32_field", 0)); - assertEquals(64l, group.getLong("int64_field", 0)); - assertEquals(true, group.getBoolean("boolean_field", 0)); - assertEquals(1.0f, group.getFloat("float_field", 0), 0.001); - assertEquals(2.0d, group.getDouble("double_field", 0), 0.001); - assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8()); - assertEquals(Binary.fromConstantByteArray(new byte[12]), group.getInt96("int96_field", 0)); + .build()) { + for (int i = 0; i < 1000; i++) { + Group group = reader.read(); + assertEquals( + "test" + (i % modulo), + group.getBinary("binary_field", 0).toStringUsingUTF8()); + assertEquals(32, group.getInteger("int32_field", 0)); + assertEquals(64l, group.getLong("int64_field", 0)); + assertEquals(true, group.getBoolean("boolean_field", 0)); + assertEquals(1.0f, group.getFloat("float_field", 0), 0.001); + assertEquals(2.0d, group.getDouble("double_field", 0), 0.001); + assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8()); + assertEquals(Binary.fromConstantByteArray(new byte[12]), group.getInt96("int96_field", 0)); + } } - reader.close(); ParquetMetadata footer = readFooter(conf, file, NO_FILTER); for (BlockMetaData blockMetaData : footer.getBlocks()) { for (ColumnChunkMetaData column : blockMetaData.getColumns()) { @@ -789,13 +789,14 @@ public void testParquetWriterConfiguringOutputFile() throws IOException { writer.write(factory.newGroup().append("name", testName)); } } - ParquetReader reader = - ParquetReader.builder(new GroupReadSupport(), path).build(); - assertEquals("new", reader.read().getBinary("name", 0).toStringUsingUTF8()); - assertEquals("writer", reader.read().getBinary("name", 0).toStringUsingUTF8()); - assertEquals("builder", reader.read().getBinary("name", 0).toStringUsingUTF8()); - assertEquals("without", reader.read().getBinary("name", 0).toStringUsingUTF8()); - assertEquals("file", reader.read().getBinary("name", 0).toStringUsingUTF8()); + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { + assertEquals("new", reader.read().getBinary("name", 0).toStringUsingUTF8()); + assertEquals("writer", reader.read().getBinary("name", 0).toStringUsingUTF8()); + assertEquals("builder", reader.read().getBinary("name", 0).toStringUsingUTF8()); + assertEquals("without", reader.read().getBinary("name", 0).toStringUsingUTF8()); + assertEquals("file", reader.read().getBinary("name", 0).toStringUsingUTF8()); + } } @Test diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java index 8d456f9735..b6502cc1fb 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java @@ -271,14 +271,14 @@ public void testAllowDroppingColumns() throws IOException { rowGroup.getColumns().size()); } - ParquetReader reader = - ParquetReader.builder(new GroupReadSupport(), droppedColumnFile).build(); - - Group next; - while ((next = reader.read()) != null) { - Group expectedNext = expected.removeFirst(); - Assert.assertEquals( - "Each string should match", expectedNext.getString("string", 0), next.getString("string", 0)); + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), droppedColumnFile).build()) { + Group next; + while ((next = reader.read()) != null) { + Group expectedNext = expected.removeFirst(); + Assert.assertEquals( + "Each string should match", expectedNext.getString("string", 0), next.getString("string", 0)); + } } Assert.assertEquals("All records should be present", 0, expected.size()); @@ -290,17 +290,18 @@ public void testFailDroppingColumns() throws IOException { Types.buildMessage().required(BINARY).as(UTF8).named("string").named("AppendTest"); final ParquetMetadata footer = ParquetFileReader.readFooter(CONF, file1, NO_FILTER); - final FSDataInputStream incoming = file1.getFileSystem(CONF).open(file1); - - Path droppedColumnFile = newTemp(); - final ParquetFileWriter writer = new ParquetFileWriter(CONF, droppedColumnSchema, droppedColumnFile); - writer.start(); - - TestUtils.assertThrows( - "Should complain that id column is dropped", IllegalArgumentException.class, (Callable) () -> { - writer.appendRowGroups(incoming, footer.getBlocks(), false); - return null; - }); + try (final FSDataInputStream incoming = file1.getFileSystem(CONF).open(file1)) { + Path droppedColumnFile = newTemp(); + final ParquetFileWriter writer = new ParquetFileWriter(CONF, droppedColumnSchema, droppedColumnFile); + writer.start(); + + TestUtils.assertThrows( + "Should complain that id column is dropped", IllegalArgumentException.class, (Callable) + () -> { + writer.appendRowGroups(incoming, footer.getBlocks(), false); + return null; + }); + } } @Test diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java index f05d59f272..f8e88fe70f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java @@ -97,24 +97,24 @@ public void test() throws Exception { } writer.close(); - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) .withConf(conf) - .build(); - for (int i = 0; i < 1000; i++) { - Group group = reader.read(); - assertEquals( - "test" + (i % modulo), - group.getBinary("binary_field", 0).toStringUsingUTF8()); - assertEquals(32, group.getInteger("int32_field", 0)); - assertEquals(64l, group.getLong("int64_field", 0)); - assertEquals(true, group.getBoolean("boolean_field", 0)); - assertEquals(1.0f, group.getFloat("float_field", 0), 0.001); - assertEquals(2.0d, group.getDouble("double_field", 0), 0.001); - assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8()); - assertEquals(Binary.fromConstantByteArray(new byte[12]), group.getInt96("int96_field", 0)); - assertEquals(0, group.getFieldRepetitionCount("null_field")); + .build()) { + for (int i = 0; i < 1000; i++) { + Group group = reader.read(); + assertEquals( + "test" + (i % modulo), + group.getBinary("binary_field", 0).toStringUsingUTF8()); + assertEquals(32, group.getInteger("int32_field", 0)); + assertEquals(64l, group.getLong("int64_field", 0)); + assertEquals(true, group.getBoolean("boolean_field", 0)); + assertEquals(1.0f, group.getFloat("float_field", 0), 0.001); + assertEquals(2.0d, group.getDouble("double_field", 0), 0.001); + assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8()); + assertEquals(Binary.fromConstantByteArray(new byte[12]), group.getInt96("int96_field", 0)); + assertEquals(0, group.getFieldRepetitionCount("null_field")); + } } - reader.close(); ParquetMetadata footer = readFooter(conf, file, NO_FILTER); for (BlockMetaData blockMetaData : footer.getBlocks()) { for (ColumnChunkMetaData column : blockMetaData.getColumns()) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java index 0f43ff5991..cf86e8fe0b 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java @@ -205,26 +205,26 @@ public void testDifferentCompression() throws IOException { } private void verifyResultDecryptionWithValidKey() throws IOException { - ParquetReader reader = createReader(outputFile); - for (int i = 0; i < numRecord; i++) { - Group group = reader.read(); - assertTrue(group.getLong("DocId", 0) == inputFile.getFileContent()[i].getLong("DocId", 0)); - assertArrayEquals( - group.getBinary("Name", 0).getBytes(), - inputFile.getFileContent()[i].getString("Name", 0).getBytes(StandardCharsets.UTF_8)); - assertArrayEquals( - group.getBinary("Gender", 0).getBytes(), - inputFile.getFileContent()[i].getString("Gender", 0).getBytes(StandardCharsets.UTF_8)); - Group subGroupInRead = group.getGroup("Links", 0); - Group expectedSubGroup = inputFile.getFileContent()[i].getGroup("Links", 0); - assertArrayEquals( - subGroupInRead.getBinary("Forward", 0).getBytes(), - expectedSubGroup.getBinary("Forward", 0).getBytes()); - assertArrayEquals( - subGroupInRead.getBinary("Backward", 0).getBytes(), - expectedSubGroup.getBinary("Backward", 0).getBytes()); + try (ParquetReader reader = createReader(outputFile)) { + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + assertTrue(group.getLong("DocId", 0) == inputFile.getFileContent()[i].getLong("DocId", 0)); + assertArrayEquals( + group.getBinary("Name", 0).getBytes(), + inputFile.getFileContent()[i].getString("Name", 0).getBytes(StandardCharsets.UTF_8)); + assertArrayEquals( + group.getBinary("Gender", 0).getBytes(), + inputFile.getFileContent()[i].getString("Gender", 0).getBytes(StandardCharsets.UTF_8)); + Group subGroupInRead = group.getGroup("Links", 0); + Group expectedSubGroup = inputFile.getFileContent()[i].getGroup("Links", 0); + assertArrayEquals( + subGroupInRead.getBinary("Forward", 0).getBytes(), + expectedSubGroup.getBinary("Forward", 0).getBytes()); + assertArrayEquals( + subGroupInRead.getBinary("Backward", 0).getBytes(), + expectedSubGroup.getBinary("Backward", 0).getBytes()); + } } - reader.close(); } private void verifyOffsetIndexes() throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java index c5772fb306..ccbe87d45c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java @@ -85,37 +85,37 @@ public void testSetup() throws Exception { @Test(expected = RuntimeException.class) public void testNullColumns() throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) .withConf(conf) - .build(); - Group group = reader.read(); - group.getLong("DocId", 0); - reader.close(); + .build()) { + Group group = reader.read(); + group.getLong("DocId", 0); + } } @Test(expected = RuntimeException.class) public void testNullNestedColumns() throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) .withConf(conf) - .build(); - Group group = reader.read(); - Group subGroup = group.getGroup("Links", 0); - subGroup.getBinary("Backward", 0).getBytes(); - reader.close(); + .build()) { + Group group = reader.read(); + Group subGroup = group.getGroup("Links", 0); + subGroup.getBinary("Backward", 0).getBytes(); + } } @Test public void validateNonNuLLColumns() throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) .withConf(conf) - .build(); - for (int i = 0; i < numRecord; i++) { - Group group = reader.read(); - assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes()); - Group subGroup = group.getGroup("Links", 0); - assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes()); + .build()) { + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes()); + Group subGroup = group.getGroup("Links", 0); + assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes()); + } } - reader.close(); } private void nullifyColumns(Configuration conf, String inputFile, String outputFile) throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java index 0582cabbb3..f5c95bd0be 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java @@ -184,31 +184,31 @@ public void testNotExistsNestedColumn() throws Exception { } private void validateColumns(String inputFile, List prunePaths) throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(inputFile)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(inputFile)) .withConf(conf) - .build(); - for (int i = 0; i < numRecord; i++) { - Group group = reader.read(); - if (!prunePaths.contains("DocId")) { - assertEquals(1l, group.getLong("DocId", 0)); - } - if (!prunePaths.contains("Name")) { - assertEquals("foo", group.getBinary("Name", 0).toStringUsingUTF8()); - } - if (!prunePaths.contains("Gender")) { - assertEquals("male", group.getBinary("Gender", 0).toStringUsingUTF8()); - } - if (!prunePaths.contains("Links")) { - Group subGroup = group.getGroup("Links", 0); - if (!prunePaths.contains("Links.Backward")) { - assertEquals(2l, subGroup.getLong("Backward", 0)); + .build()) { + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + if (!prunePaths.contains("DocId")) { + assertEquals(1l, group.getLong("DocId", 0)); + } + if (!prunePaths.contains("Name")) { + assertEquals("foo", group.getBinary("Name", 0).toStringUsingUTF8()); + } + if (!prunePaths.contains("Gender")) { + assertEquals("male", group.getBinary("Gender", 0).toStringUsingUTF8()); } - if (!prunePaths.contains("Links.Forward")) { - assertEquals(3l, subGroup.getLong("Forward", 0)); + if (!prunePaths.contains("Links")) { + Group subGroup = group.getGroup("Links", 0); + if (!prunePaths.contains("Links.Backward")) { + assertEquals(2l, subGroup.getLong("Backward", 0)); + } + if (!prunePaths.contains("Links.Forward")) { + assertEquals(3l, subGroup.getLong("Forward", 0)); + } } } } - reader.close(); } private String createParquetFile(String prefix) throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java index a1decb239c..7fb594bda9 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java @@ -135,19 +135,19 @@ private void convertCompression(Configuration conf, String inputFile, String out } private void validateColumns(String file, int numRecord, TestDocs testDocs) throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)) + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)) .withConf(conf) - .build(); - for (int i = 0; i < numRecord; i++) { - Group group = reader.read(); - assertTrue(group.getLong("DocId", 0) == testDocs.docId[i]); - assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes()); - assertArrayEquals(group.getBinary("Gender", 0).getBytes(), testDocs.gender[i].getBytes()); - Group subGroup = group.getGroup("Links", 0); - assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(), testDocs.linkBackward[i].getBytes()); - assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes()); + .build()) { + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + assertTrue(group.getLong("DocId", 0) == testDocs.docId[i]); + assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes()); + assertArrayEquals(group.getBinary("Gender", 0).getBytes(), testDocs.gender[i].getBytes()); + Group subGroup = group.getGroup("Links", 0); + assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(), testDocs.linkBackward[i].getBytes()); + assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes()); + } } - reader.close(); } private void validMeta(String inputFile, String outFile) throws Exception { diff --git a/pom.xml b/pom.xml index 1bd9893d87..cdfb56d7d3 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ 2.46.1 shaded.parquet - 3.3.0 + 3.4.0 2.12.0 1.17.0 thrift