Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class ColumnValueCollector {
private SizeStatistics.Builder sizeStatisticsBuilder;
private GeospatialStatistics.Builder geospatialStatisticsBuilder;

// track the required `num_nulls` field in DataPageHeaderV2
// https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift
private int nullCount;

ColumnValueCollector(ColumnDescriptor path, BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
this.path = path;
this.statisticsEnabled = props.getStatisticsEnabled(path);
Expand All @@ -54,6 +58,7 @@ class ColumnValueCollector {
}

void resetPageStatistics() {
this.nullCount = 0;
this.statistics = statisticsEnabled
? Statistics.createStats(path.getPrimitiveType())
: Statistics.noopStats(path.getPrimitiveType());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to change Statistics.noopStats to make it possible to ignore min/max stats but still collects null count?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible, but putting it outside Statistics is a design choice. By reading code I think the Statistics object is a 1:1 mapping to parquet header's statistics object, which is optional and may be disabled by user.

Putting counter of num_nulls outside statistics object makes the code clearer, because the code maps clearly to parquet header structure.

Expand All @@ -68,6 +73,7 @@ void resetPageStatistics() {
}

void writeNull(int repetitionLevel, int definitionLevel) {
++nullCount;
statistics.incrementNumNulls();
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
}
Expand Down Expand Up @@ -198,6 +204,14 @@ void finalizeColumnChunk() {
}
}

/**
* Returns the number of null values written in the current page.
* This counter is to supply the required field `num_nulls` in DataPageHeaderV2.
*/
int getNullCount() {
return nullCount;
}

Statistics<?> getStatistics() {
return statistics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ void writePage() {
writePage(
pageRowCount,
valueCount,
collector.getNullCount(),
collector.getStatistics(),
collector.getSizeStatistics(),
collector.getGeospatialStatistics(),
Expand All @@ -403,6 +404,7 @@ void writePage() {
abstract void writePage(
int rowCount,
int valueCount,
int nullCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
GeospatialStatistics geospatialStatistics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
void writePage(
int rowCount,
int valueCount,
int nullCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
GeospatialStatistics geospatialStatistics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
void writePage(
int rowCount,
int valueCount,
int nullCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
GeospatialStatistics geospatialStatistics,
Expand All @@ -100,7 +101,7 @@ void writePage(
Encoding encoding = values.getEncoding();
pageWriter.writePageV2(
rowCount,
Math.toIntExact(statistics.getNumNulls()),
nullCount,
valueCount,
repetitionLevels.getBytes(),
definitionLevels.getBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.crypto.AesCipher;
import org.apache.parquet.crypto.ColumnEncryptionProperties;
Expand Down Expand Up @@ -858,4 +863,68 @@ public void testNoFlushAfterException() throws Exception {
FileSystem fs = file.getFileSystem(conf);
assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0);
}

@Test
public void testV2PageNullCountWithStatisticsDisabled() throws Exception {
// Regression test: when using PARQUET_2_0 with statistics disabled on a nullable column,
// DataPageHeaderV2.num_nulls must still contain the correct null count (not -1).
MessageType schema = Types.buildMessage()
.required(INT32)
.named("id")
.optional(BINARY)
.as(stringType())
.named("value")
.named("test_schema");

File file = temp.newFile();
temp.delete();
Path path = new Path(file.getAbsolutePath());

int totalRecords = 10;
int expectedNulls = 4; // records where i % 3 == 0: i=0,3,6,9

// Write with PARQUET_2_0 and statistics disabled on the nullable "value" column
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
.withType(schema)
.withWriterVersion(PARQUET_2_0)
.withStatisticsEnabled("value", false)
.withPageSize(1024 * 1024) // large page to keep all records in one page
.build()) {
SimpleGroupFactory factory = new SimpleGroupFactory(schema);
for (int i = 0; i < totalRecords; i++) {
Group group = factory.newGroup().append("id", i);
if (i % 3 != 0) {
group.append("value", "hello-" + i);
}
writer.write(group);
}
}

// Read back the page-level metadata and verify num_nulls
try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) {
MessageType fileSchema = reader.getFooter().getFileMetaData().getSchema();

// Find the "value" column descriptor
ColumnDescriptor valueColumn = fileSchema.getColumns().stream()
.filter(c -> c.getPath()[0].equals("value"))
.findFirst()
.orElseThrow(() -> new AssertionError("Column 'value' not found"));

PageReadStore rowGroup = reader.readNextRowGroup();
PageReader pageReader = rowGroup.getPageReader(valueColumn);
DataPage page = pageReader.readPage();

// Verify it's a V2 page (because we used PARQUET_2_0)
assertTrue(
"PARQUET_2_0 writer should produce DataPageV2 pages, got: "
+ page.getClass().getSimpleName(),
page instanceof DataPageV2);

DataPageV2 pageV2 = (DataPageV2) page;
assertEquals(
"DataPageV2.num_nulls should be the actual null count even when statistics are disabled",
expectedNulls,
pageV2.getNullCount());
}
}
}