Skip to content

Commit

Permalink
[CARBONDATA-2459][DataMap] Add cache for bloom filter datamap
Browse files Browse the repository at this point in the history
Loading bloom filter from bloomindex file is slow. Adding cache for this procedure will surely improve the query performance

This closes #2300
  • Loading branch information
xuchuanyin authored and jackylk committed May 12, 2018
1 parent ffddba7 commit d14c403
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 79 deletions.
Expand Up @@ -1729,6 +1729,19 @@ public final class CarbonCommonConstants {
// Property to enable parallel datamap loading for a table
public static final String CARBON_LOAD_DATAMAPS_PARALLEL = "carbon.load.datamaps.parallel.";

/**
* Cache size in MB for bloom filter datamap. It is an integer and should be greater than 0
* and it will be used during query.
*/
@CarbonProperty
public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE =
"carbon.query.datamap.bloom.cache.size";

/**
* default value in size for cache size of bloom filter datamap.
*/
public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL = "512";

private CarbonCommonConstants() {
}
}
Expand Up @@ -17,13 +17,10 @@

package org.apache.carbondata.datamap.bloom;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

Expand All @@ -45,13 +42,8 @@
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonUtil;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

/**
* BloomDataCoarseGrainMap is constructed in blocklet level. For each indexed column,
Expand All @@ -62,15 +54,16 @@
public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
private Set<String> indexedColumn;
private List<BloomDMModel> bloomIndexList;
private Multimap<String, List<BloomDMModel>> indexCol2BloomDMList;
public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
private String shardName;
private BloomDataMapCache bloomDataMapCache;
private Path indexPath;

@Override
public void init(DataMapModel dataMapModel) throws IOException {
Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
this.indexPath = FileFactory.getPath(dataMapModel.getFilePath());
this.shardName = indexPath.getName();
FileSystem fs = FileFactory.getFileSystem(indexPath);
if (!fs.exists(indexPath)) {
Expand All @@ -81,52 +74,11 @@ public void init(DataMapModel dataMapModel) throws IOException {
throw new IOException(
String.format("Path %s for Bloom index dataMap must be a directory", indexPath));
}

FileStatus[] indexFileStatus = fs.listStatus(indexPath, new PathFilter() {
@Override public boolean accept(Path path) {
return path.getName().endsWith(BLOOM_INDEX_SUFFIX);
}
});
indexedColumn = new HashSet<String>();
bloomIndexList = new ArrayList<BloomDMModel>();
indexCol2BloomDMList = ArrayListMultimap.create();
for (int i = 0; i < indexFileStatus.length; i++) {
String indexfilename = indexFileStatus[i].getPath().getName();
String indexCol =
indexfilename.substring(0, indexfilename.length() - BLOOM_INDEX_SUFFIX.length());
indexedColumn.add(indexCol);
List<BloomDMModel> models = readBloomIndex(indexFileStatus[i].getPath().toString());
bloomIndexList.addAll(models);
indexCol2BloomDMList.put(indexCol, models);
}
LOGGER.info("find bloom index datamap for column: "
+ StringUtils.join(indexedColumn, ", "));
this.bloomDataMapCache = BloomDataMapCache.getInstance();
}

private List<BloomDMModel> readBloomIndex(String indexFile) throws IOException {
LOGGER.info("read bloom index from file: " + indexFile);
List<BloomDMModel> bloomDMModelList = new ArrayList<BloomDMModel>();
DataInputStream dataInStream = null;
ObjectInputStream objectInStream = null;
try {
dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile));
objectInStream = new ObjectInputStream(dataInStream);
try {
BloomDMModel model = null;
while ((model = (BloomDMModel) objectInStream.readObject()) != null) {
LOGGER.info("read bloom index: " + model);
bloomDMModelList.add(model);
}
} catch (EOFException e) {
LOGGER.info("read " + bloomDMModelList.size() + " bloom indices from " + indexFile);
}
return bloomDMModelList;
} catch (ClassNotFoundException e) {
LOGGER.error("Error occrus while reading bloom index");
throw new RuntimeException("Error occrus while reading bloom index", e);
} finally {
CarbonUtil.closeStreams(objectInStream, dataInStream);
}
public void setIndexedColumn(Set<String> indexedColumn) {
this.indexedColumn = indexedColumn;
}

@Override
Expand All @@ -139,26 +91,22 @@ public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segm
}

List<BloomQueryModel> bloomQueryModels = getQueryValue(filterExp.getFilterExpression());

for (BloomQueryModel bloomQueryModel : bloomQueryModels) {
LOGGER.info("prune blocklet for query: " + bloomQueryModel);
for (List<BloomDMModel> bloomDMModels : indexCol2BloomDMList.get(
bloomQueryModel.columnName)) {
for (BloomDMModel bloomDMModel : bloomDMModels) {
boolean scanRequired = bloomDMModel.getBloomFilter().mightContain(
convertValueToBytes(bloomQueryModel.dataType, bloomQueryModel.filterValue));
if (scanRequired) {
LOGGER.info(String.format(
"BloomCoarseGrainDataMap: Need to scan -> blocklet#%s",
String.valueOf(bloomDMModel.getBlockletNo())));
Blocklet blocklet =
new Blocklet(shardName, String.valueOf(bloomDMModel.getBlockletNo()));
hitBlocklets.add(blocklet);
} else {
LOGGER.info(String.format(
"BloomCoarseGrainDataMap: Skip scan -> blocklet#%s",
String.valueOf(bloomDMModel.getBlockletNo())));
}
LOGGER.debug("prune blocklet for query: " + bloomQueryModel);
BloomDataMapCache.CacheKey cacheKey = new BloomDataMapCache.CacheKey(
this.indexPath.toString(), bloomQueryModel.columnName);
List<BloomDMModel> bloomDMModels = this.bloomDataMapCache.getBloomDMModelByKey(cacheKey);
for (BloomDMModel bloomDMModel : bloomDMModels) {
boolean scanRequired = bloomDMModel.getBloomFilter().mightContain(
convertValueToBytes(bloomQueryModel.dataType, bloomQueryModel.filterValue));
if (scanRequired) {
LOGGER.debug(String.format("BloomCoarseGrainDataMap: Need to scan -> blocklet#%s",
String.valueOf(bloomDMModel.getBlockletNo())));
Blocklet blocklet = new Blocklet(shardName, String.valueOf(bloomDMModel.getBlockletNo()));
hitBlocklets.add(blocklet);
} else {
LOGGER.debug(String.format("BloomCoarseGrainDataMap: Skip scan -> blocklet#%s",
String.valueOf(bloomDMModel.getBlockletNo())));
}
}
}
Expand Down Expand Up @@ -228,12 +176,20 @@ public void clear() {
bloomIndexList = null;
}

/**
* get bloom index file
* @param shardPath path for the shard
* @param colName index column name
*/
public static String getBloomIndexFile(String shardPath, String colName) {
return shardPath.concat(File.separator).concat(colName).concat(BLOOM_INDEX_SUFFIX);
}
static class BloomQueryModel {
private String columnName;
private DataType dataType;
private Object filterValue;

public BloomQueryModel(String columnName, DataType dataType, Object filterValue) {
private BloomQueryModel(String columnName, DataType dataType, Object filterValue) {
this.columnName = columnName;
this.dataType = dataType;
this.filterValue = filterValue;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -189,6 +190,7 @@ public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException
for (CarbonFile carbonFile : carbonFiles) {
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
bloomDM.init(new DataMapModel(carbonFile.getAbsolutePath()));
bloomDM.setIndexedColumn(new HashSet<String>(dataMapMeta.getIndexedColumnNames()));
dataMaps.add(bloomDM);
}
} catch (Exception e) {
Expand All @@ -204,6 +206,8 @@ public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap();
String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath();
bloomCoarseGrainDataMap.init(new DataMapModel(indexPath));
bloomCoarseGrainDataMap.setIndexedColumn(
new HashSet<String>(dataMapMeta.getIndexedColumnNames()));
coarseGrainDataMaps.add(bloomCoarseGrainDataMap);
return coarseGrainDataMaps;
}
Expand Down

0 comments on commit d14c403

Please sign in to comment.