Skip to content

Commit

Permalink
PARQUET-41: Add bloom filters to parquet statistics
Browse files Browse the repository at this point in the history
PARQUET-41: Update patch addressing comments

Parquet-41: Adding other data types support and enable Unit tests

Change the bitset from arraylist to array

Add statistics option and enable tests for bloom filter

Fix failed unit tests

Remove the page level bloom filter bit set

Rebase code
  • Loading branch information
Ferdinand Xu committed Mar 28, 2016
1 parent c26fa78 commit 284b1b7
Show file tree
Hide file tree
Showing 47 changed files with 1,982 additions and 286 deletions.
5 changes: 5 additions & 0 deletions parquet-column/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
<artifactId>fastutil</artifactId>
<version>${fastutil.version}</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>

<dependency>
<groupId>com.carrotsearch</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -30,6 +30,7 @@
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.statistics.StatisticsOpts;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.boundedint.DevNullValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
Expand Down Expand Up @@ -96,11 +97,12 @@ public static WriterVersion fromString(String name) {
private final int maxRowCountForPageSizeCheck;
private final boolean estimateNextSizeCheck;
private final ByteBufferAllocator allocator;
private final StatisticsOpts statisticsOpts;

private final int initialSlabSize;

private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator) {
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator, StatisticsOpts statisticsOpts) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
Expand All @@ -110,6 +112,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
this.minRowCountForPageSizeCheck = minRowCountForPageSizeCheck;
this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
this.estimateNextSizeCheck = estimateNextSizeCheck;
this.statisticsOpts = statisticsOpts;
this.allocator = allocator;
}

Expand Down Expand Up @@ -280,6 +283,10 @@ public ByteBufferAllocator getAllocator() {
return allocator;
}

public StatisticsOpts getStatisticsOpts(){
return statisticsOpts;
}

public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore) {
switch (writerVersion) {
Expand Down Expand Up @@ -321,6 +328,7 @@ public static class Builder {
private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
private StatisticsOpts statisticsOpts = new StatisticsOpts(null);

private Builder() {
}
Expand All @@ -333,6 +341,7 @@ private Builder(ParquetProperties toCopy) {
this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck;
this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck;
this.allocator = toCopy.allocator;
this.statisticsOpts = toCopy.statisticsOpts;
}

/**
Expand Down Expand Up @@ -409,10 +418,15 @@ public Builder withAllocator(ByteBufferAllocator allocator) {
return this;
}

public Builder withStatisticsOpts(StatisticsOpts statisticsOpts){
this.statisticsOpts = statisticsOpts;
return this;
}

public ParquetProperties build() {
return new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator);
estimateNextSizeCheck, allocator, statisticsOpts);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -33,6 +33,7 @@
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.StatisticsOpts;

public class ColumnWriteStoreV1 implements ColumnWriteStore {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -28,7 +28,9 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.ColumnStatisticsOpts;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.statistics.StatisticsOpts;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Binary;
Expand All @@ -54,6 +56,7 @@ final class ColumnWriterV1 implements ColumnWriter {
private ValuesWriter dataColumn;
private int valueCount;
private int valueCountForNextSizeCheck;
private StatisticsOpts statisticsOpts;

private Statistics statistics;

Expand All @@ -71,14 +74,17 @@ public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter,
this.repetitionLevelColumn = props.newRepetitionLevelWriter(path);
this.definitionLevelColumn = props.newDefinitionLevelWriter(path);
this.dataColumn = props.newValuesWriter(path);
this.statisticsOpts = props.getStatisticsOpts();
}

private void log(Object value, int r, int d) {
LOG.debug(path + " " + value + " r:" + r + " d:" + d);
}

private void resetStatistics() {
this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
ColumnStatisticsOpts columnStatisticsOpts =
(statisticsOpts == null) ? null : statisticsOpts.getStatistics(path);
this.statistics = Statistics.getStatsBasedOnType(this.path.getType(), columnStatisticsOpts);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -18,7 +18,6 @@
*/
package org.apache.parquet.column.impl;

import static java.lang.Math.max;
import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.io.IOException;
Expand All @@ -33,7 +32,9 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.ColumnStatisticsOpts;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.statistics.StatisticsOpts;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.io.ParquetEncodingException;
Expand All @@ -55,6 +56,7 @@ final class ColumnWriterV2 implements ColumnWriter {
private RunLengthBitPackingHybridEncoder definitionLevelColumn;
private ValuesWriter dataColumn;
private int valueCount;
private StatisticsOpts statisticsOpts;

private Statistics<?> statistics;
private long rowsWrittenSoFar = 0;
Expand All @@ -70,14 +72,17 @@ public ColumnWriterV2(
this.repetitionLevelColumn = props.newRepetitionLevelEncoder(path);
this.definitionLevelColumn = props.newDefinitionLevelEncoder(path);
this.dataColumn = props.newValuesWriter(path);
this.statisticsOpts = props.getStatisticsOpts();
}

private void log(Object value, int r, int d) {
LOG.debug(path + " " + value + " r:" + r + " d:" + d);
}

private void resetStatistics() {
this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
ColumnStatisticsOpts columnStatisticsOpts =
(statisticsOpts == null) ? null : statisticsOpts.getStatistics(path);
this.statistics = Statistics.getStatsBasedOnType(this.path.getType(), columnStatisticsOpts);
}

private void definitionLevel(int definitionLevel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,32 @@
*/
package org.apache.parquet.column.statistics;

import org.apache.parquet.column.statistics.bloomfilter.BloomFilter;
import org.apache.parquet.column.statistics.bloomfilter.BloomFilterOpts;
import org.apache.parquet.column.statistics.bloomfilter.BloomFilterStatistics;
import org.apache.parquet.io.api.Binary;

public class BinaryStatistics extends Statistics<Binary> {
public class BinaryStatistics extends Statistics<Binary> implements BloomFilterStatistics<Binary>{

private Binary max;
private Binary min;
private BloomFilter bloomFilter;
private boolean isBloomFilterEnabled = false;

public BinaryStatistics(ColumnStatisticsOpts columnStatisticsOpts) {
super();
if (columnStatisticsOpts != null) {
updateBloomFilterOptions(columnStatisticsOpts.getBloomFilterOpts());
}
}

private void updateBloomFilterOptions(BloomFilterOpts.BloomFilterEntry statisticsOpts) {
if (statisticsOpts != null) {
bloomFilter =
new BloomFilter(statisticsOpts.getNumBits(), statisticsOpts.getNumHashFunctions());
isBloomFilterEnabled = true;
}
}

@Override
public void updateStats(Binary value) {
Expand All @@ -32,6 +52,17 @@ public void updateStats(Binary value) {
} else {
updateStats(value, value);
}

if (isBloomFilterEnabled) {
add(value);
}
}

@Override
void mergeBloomFilters(Statistics stats) {
if (isBloomFilterEnabled && stats instanceof BloomFilterStatistics) {
this.bloomFilter.merge(((BloomFilterStatistics) stats).getBloomFilter());
}
}

@Override
Expand Down Expand Up @@ -111,4 +142,24 @@ public void setMinMax(Binary min, Binary max) {
this.min = min;
this.markAsNotEmpty();
}

@Override
public void add(Binary value) {
bloomFilter.addBinary(value);
}

@Override
public BloomFilter getBloomFilter() {
return bloomFilter;
}

@Override
public boolean test(Binary value) {
return bloomFilter.testBinary(value);
}

@Override
public boolean isBloomFilterEnabled() {
return isBloomFilterEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class BooleanStatistics extends Statistics<Boolean> {
private boolean max;
private boolean min;

@Override
public void updateStats(boolean value) {
if (!this.hasNonNullValue()) {
initializeStats(value, value);
Expand All @@ -34,6 +33,11 @@ public void updateStats(boolean value) {
}
}

@Override
void mergeBloomFilters(Statistics stats) {
// Do nothing
}

@Override
public void mergeStatisticsMinMax(Statistics stats) {
BooleanStatistics boolStats = (BooleanStatistics)stats;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.statistics;

import org.apache.parquet.column.statistics.bloomfilter.BloomFilterOpts;

public class ColumnStatisticsOpts {
BloomFilterOpts.BloomFilterEntry bloomFilterOpts;

public ColumnStatisticsOpts(BloomFilterOpts.BloomFilterEntry bloomFilterOpts) {
this.bloomFilterOpts = bloomFilterOpts;
}

public BloomFilterOpts.BloomFilterEntry getBloomFilterOpts() {
return bloomFilterOpts;
}
}
Loading

0 comments on commit 284b1b7

Please sign in to comment.