Skip to content

Commit

Permalink
[CARBONDATA-2454][DataMap] Add fpp property for bloom datamap
Browse files Browse the repository at this point in the history
add fpp(false positive probability) property to configure bloom filter
that used by bloom datamap.

This closes #2279
  • Loading branch information
xuchuanyin authored and jackylk committed May 9, 2018
1 parent 2c0fa10 commit 6b94971
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 6 deletions.
Expand Up @@ -65,9 +65,18 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
* and all the indexed value is distinct.
*/
private static final int DEFAULT_BLOOM_FILTER_SIZE = 32000 * 20;
/**
* property for fpp(false-positive-probability) of bloom filter
*/
private static final String BLOOM_FPP = "bloom_fpp";
/**
* default value for fpp of bloom filter
*/
private static final double DEFAULT_BLOOM_FILTER_FPP = 0.00001d;
private DataMapMeta dataMapMeta;
private String dataMapName;
private int bloomFilterSize;
private double bloomFilterFpp;

public BloomCoarseGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema)
throws MalformedDataMapCommandException {
Expand All @@ -79,6 +88,7 @@ public BloomCoarseGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dat

List<CarbonColumn> indexedColumns = carbonTable.getIndexedColumns(dataMapSchema);
this.bloomFilterSize = validateAndGetBloomFilterSize(dataMapSchema);
this.bloomFilterFpp = validateAndGetBloomFilterFpp(dataMapSchema);
List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
// todo: support more optimize operations
optimizedOperations.add(ExpressionType.EQUALS);
Expand Down Expand Up @@ -118,19 +128,51 @@ private int validateAndGetBloomFilterSize(DataMapSchema dmSchema)
return bloomFilterSize;
}

/**
* validate bloom DataMap BLOOM_FPP
* 1. BLOOM_FPP property is optional, 0.00001 will be the default value.
* 2. BLOOM_FPP should be (0, 1)
*/
private double validateAndGetBloomFilterFpp(DataMapSchema dmSchema)
throws MalformedDataMapCommandException {
String bloomFilterFppStr = dmSchema.getProperties().get(BLOOM_FPP);
if (StringUtils.isBlank(bloomFilterFppStr)) {
LOGGER.warn(
String.format("Bloom filter FPP is not configured for datamap %s, use default value %f",
dataMapName, DEFAULT_BLOOM_FILTER_FPP));
return DEFAULT_BLOOM_FILTER_FPP;
}
double bloomFilterFpp;
try {
bloomFilterFpp = Double.parseDouble(bloomFilterFppStr);
} catch (NumberFormatException e) {
throw new MalformedDataMapCommandException(
String.format("Invalid value of bloom filter fpp '%s', it should be an numeric",
bloomFilterFppStr));
}
if (bloomFilterFpp < 0 || bloomFilterFpp - 1 >= 0) {
throw new MalformedDataMapCommandException(
String.format("Invalid value of bloom filter fpp '%s', it should be in range 0~1",
bloomFilterFppStr));
}
return bloomFilterFpp;
}

@Override
public DataMapWriter createWriter(Segment segment, String shardName) throws IOException {
LOGGER.info(
String.format("Data of BloomCoarseGranDataMap %s for table %s will be written to %s",
this.dataMapName, getCarbonTable().getTableName() , shardName));
return new BloomDataMapWriter(getCarbonTable().getTablePath(), this.dataMapName,
this.dataMapMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize);
this.dataMapMeta.getIndexedColumns(), segment, shardName,
this.bloomFilterSize, this.bloomFilterFpp);
}

@Override
public DataMapRefresher createRefresher(Segment segment, String shardName) throws IOException {
return new BloomDataMapRefresher(getCarbonTable().getTablePath(), this.dataMapName,
this.dataMapMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize);
this.dataMapMeta.getIndexedColumns(), segment, shardName,
this.bloomFilterSize, this.bloomFilterFpp);
}

@Override
Expand Down
Expand Up @@ -36,8 +36,10 @@
public class BloomDataMapRefresher extends BloomDataMapWriter implements DataMapRefresher {

BloomDataMapRefresher(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
Segment segment, String shardName, int bloomFilterSize) throws IOException {
super(tablePath, dataMapName, indexColumns, segment, shardName, bloomFilterSize);
Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp)
throws IOException {
super(tablePath, dataMapName, indexColumns, segment, shardName,
bloomFilterSize, bloomFilterFpp);
}

@Override
Expand Down
Expand Up @@ -50,16 +50,19 @@ public class BloomDataMapWriter extends DataMapWriter {
private static final LogService LOG = LogServiceFactory.getLogService(
BloomDataMapWriter.class.getCanonicalName());
private int bloomFilterSize;
private double bloomFilterFpp;
protected int currentBlockletId;
private List<String> currentDMFiles;
private List<DataOutputStream> currentDataOutStreams;
private List<ObjectOutputStream> currentObjectOutStreams;
protected List<BloomFilter<byte[]>> indexBloomFilters;

BloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
Segment segment, String shardName, int bloomFilterSize) throws IOException {
Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp)
throws IOException {
super(tablePath, dataMapName, indexColumns, segment, shardName);
this.bloomFilterSize = bloomFilterSize;
this.bloomFilterFpp = bloomFilterFpp;

currentDMFiles = new ArrayList<String>(indexColumns.size());
currentDataOutStreams = new ArrayList<DataOutputStream>(indexColumns.size());
Expand All @@ -86,7 +89,7 @@ protected void resetBloomFilters() {
List<CarbonColumn> indexColumns = getIndexColumns();
for (int i = 0; i < indexColumns.size(); i++) {
indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(),
bloomFilterSize, 0.00001d));
bloomFilterSize, bloomFilterFpp));
}
}

Expand Down

0 comments on commit 6b94971

Please sign in to comment.