diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 2d43c40314f..4e625267ed2 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -65,9 +65,18 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory indexedColumns = carbonTable.getIndexedColumns(dataMapSchema); this.bloomFilterSize = validateAndGetBloomFilterSize(dataMapSchema); + this.bloomFilterFpp = validateAndGetBloomFilterFpp(dataMapSchema); List optimizedOperations = new ArrayList(); // todo: support more optimize operations optimizedOperations.add(ExpressionType.EQUALS); @@ -118,19 +128,51 @@ private int validateAndGetBloomFilterSize(DataMapSchema dmSchema) return bloomFilterSize; } + /** + * validate bloom DataMap BLOOM_FPP + * 1. BLOOM_FPP property is optional, 0.00001 will be the default value. + * 2. BLOOM_FPP should be (0, 1) + */ + private double validateAndGetBloomFilterFpp(DataMapSchema dmSchema) + throws MalformedDataMapCommandException { + String bloomFilterFppStr = dmSchema.getProperties().get(BLOOM_FPP); + if (StringUtils.isBlank(bloomFilterFppStr)) { + LOGGER.warn( + String.format("Bloom filter FPP is not configured for datamap %s, use default value %f", + dataMapName, DEFAULT_BLOOM_FILTER_FPP)); + return DEFAULT_BLOOM_FILTER_FPP; + } + double bloomFilterFpp; + try { + bloomFilterFpp = Double.parseDouble(bloomFilterFppStr); + } catch (NumberFormatException e) { + throw new MalformedDataMapCommandException( + String.format("Invalid value of bloom filter fpp '%s', it should be an numeric", + bloomFilterFppStr)); + } + if (bloomFilterFpp < 0 || bloomFilterFpp - 1 >= 0) { + throw new MalformedDataMapCommandException( + String.format("Invalid value of bloom filter fpp '%s', it should be in range 0~1", + bloomFilterFppStr)); + } + return bloomFilterFpp; + } + @Override public DataMapWriter createWriter(Segment segment, String shardName) throws IOException { LOGGER.info( String.format("Data of BloomCoarseGranDataMap %s for table %s will be written to %s", this.dataMapName, getCarbonTable().getTableName() , shardName)); return new BloomDataMapWriter(getCarbonTable().getTablePath(), this.dataMapName, - this.dataMapMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize); + this.dataMapMeta.getIndexedColumns(), segment, shardName, + this.bloomFilterSize, this.bloomFilterFpp); } @Override public DataMapRefresher createRefresher(Segment segment, String shardName) throws IOException { return new BloomDataMapRefresher(getCarbonTable().getTablePath(), this.dataMapName, - this.dataMapMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize); + this.dataMapMeta.getIndexedColumns(), segment, shardName, + this.bloomFilterSize, this.bloomFilterFpp); } @Override diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java index cb86c39df43..8e05133df8b 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java @@ -36,8 +36,10 @@ public class BloomDataMapRefresher extends BloomDataMapWriter implements DataMapRefresher { BloomDataMapRefresher(String tablePath, String dataMapName, List indexColumns, - Segment segment, String shardName, int bloomFilterSize) throws IOException { - super(tablePath, dataMapName, indexColumns, segment, shardName, bloomFilterSize); + Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp) + throws IOException { + super(tablePath, dataMapName, indexColumns, segment, shardName, + bloomFilterSize, bloomFilterFpp); } @Override diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java index 4e07182aeba..a55de1178be 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java @@ -50,6 +50,7 @@ public class BloomDataMapWriter extends DataMapWriter { private static final LogService LOG = LogServiceFactory.getLogService( BloomDataMapWriter.class.getCanonicalName()); private int bloomFilterSize; + private double bloomFilterFpp; protected int currentBlockletId; private List currentDMFiles; private List currentDataOutStreams; @@ -57,9 +58,11 @@ public class BloomDataMapWriter extends DataMapWriter { protected List> indexBloomFilters; BloomDataMapWriter(String tablePath, String dataMapName, List indexColumns, - Segment segment, String shardName, int bloomFilterSize) throws IOException { + Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp) + throws IOException { super(tablePath, dataMapName, indexColumns, segment, shardName); this.bloomFilterSize = bloomFilterSize; + this.bloomFilterFpp = bloomFilterFpp; currentDMFiles = new ArrayList(indexColumns.size()); currentDataOutStreams = new ArrayList(indexColumns.size()); @@ -86,7 +89,7 @@ protected void resetBloomFilters() { List indexColumns = getIndexColumns(); for (int i = 0; i < indexColumns.size(); i++) { indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(), - bloomFilterSize, 0.00001d)); + bloomFilterSize, bloomFilterFpp)); } }