Skip to content

Commit

Permalink
merge bloom index
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinjmh committed Aug 17, 2018
1 parent f66a9fd commit 99b938a
Show file tree
Hide file tree
Showing 17 changed files with 580 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private void initDataMapFile() throws IOException {
}
}
for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) {
String dmFile = BloomCoarseGrainDataMap.getBloomIndexFile(dataMapPath,
String dmFile = BloomIndexFileStore.getBloomIndexFile(dataMapPath,
indexColumns.get(indexColId).getColName());
DataOutputStream dataOutStream = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.carbondata.datamap.bloom;

import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
Expand Down Expand Up @@ -81,11 +80,12 @@
public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
private Map<String, CarbonColumn> name2Col;
private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
private String shardName;
private Path indexPath;
private Set<String> filteredShard;
private boolean needShardPrune;
/**
* This is used to convert literal filter value to internal carbon value
*/
Expand All @@ -102,6 +102,13 @@ public void init(DataMapModel dataMapModel) throws IOException {
}
}

public void setFilteredShard(Set<String> filteredShard) {
this.filteredShard = filteredShard;
// do shard prune when pruning only if bloom index files are merged
this.needShardPrune = filteredShard != null &&
shardName.equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME);
}

/**
* init field converters for index columns
*/
Expand Down Expand Up @@ -182,11 +189,16 @@ public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segm
BloomCacheKeyValue.CacheValue cacheValue = cache.get(cacheKey);
List<CarbonBloomFilter> bloomIndexList = cacheValue.getBloomFilters();
for (CarbonBloomFilter bloomFilter : bloomIndexList) {
if (needShardPrune && !filteredShard.contains(bloomFilter.getShardName())) {
// skip shard which has been pruned in Main datamap
continue;
}
boolean scanRequired = bloomFilter.membershipTest(new Key(bloomQueryModel.filterValue));
if (scanRequired) {
LOGGER.debug(String.format("BloomCoarseGrainDataMap: Need to scan -> blocklet#%s",
String.valueOf(bloomFilter.getBlockletNo())));
Blocklet blocklet = new Blocklet(shardName, String.valueOf(bloomFilter.getBlockletNo()));
Blocklet blocklet = new Blocklet(bloomFilter.getShardName(),
String.valueOf(bloomFilter.getBlockletNo()));
hitBlocklets.add(blocklet);
} else {
LOGGER.debug(String.format("BloomCoarseGrainDataMap: Skip scan -> blocklet#%s",
Expand Down Expand Up @@ -349,14 +361,6 @@ public boolean isScanRequired(FilterResolverIntf filterExp) {
public void clear() {
}

/**
* get bloom index file
* @param shardPath path for the shard
* @param colName index column name
*/
public static String getBloomIndexFile(String shardPath, String colName) {
return shardPath.concat(File.separator).concat(colName).concat(BLOOM_INDEX_SUFFIX);
}
static class BloomQueryModel {
private String columnName;
private byte[] filterValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.carbondata.datamap.bloom;

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -31,16 +32,13 @@
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.features.TableOperation;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
Expand Down Expand Up @@ -222,102 +220,93 @@ public DataMapBuilder createBuilder(Segment segment, String shardName,

@Override
public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
List<CoarseGrainDataMap> dataMaps = new ArrayList<CoarseGrainDataMap>(1);
List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
try {
Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
if (shardPaths == null) {
String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
shardPaths = new HashSet<>();
for (CarbonFile carbonFile : carbonFiles) {
shardPaths.add(carbonFile.getAbsolutePath());
}
shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
segmentMap.put(segment.getSegmentNo(), shardPaths);
}
Set<String> filteredShards = segment.getFilteredIndexShardNames();
for (String shard : shardPaths) {
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
bloomDM.init(new BloomDataMapModel(shard, cache));
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
dataMaps.add(bloomDM);
if (shard.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) ||
filteredShards.contains(new File(shard).getName())) {
// Filter out the tasks which are filtered through Main datamap.
// for merge shard, shard pruning delay to be done before pruning blocklet
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
bloomDM.init(new BloomDataMapModel(shard, cache));
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
bloomDM.setFilteredShard(filteredShards);
dataMaps.add(bloomDM);
}
}
} catch (Exception e) {
throw new IOException("Error occurs while init Bloom DataMap", e);
}
return dataMaps;
}

/**
* returns all shard directories of bloom index files for query
* if bloom index files are merged we should get only one shard path
*/
private Set<String> getAllShardPaths(String tablePath, String segmentId) {
String dataMapStorePath = CarbonTablePath.getDataMapStorePath(
tablePath, segmentId, dataMapName);
CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
Set<String> shardPaths = new HashSet<>();
boolean mergeShardInprogress = false;
CarbonFile mergeShardFile = null;
for (CarbonFile carbonFile : carbonFiles) {
if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) {
mergeShardFile = carbonFile;
} else if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) {
mergeShardInprogress = true;
} else if (carbonFile.isDirectory()) {
shardPaths.add(carbonFile.getAbsolutePath());
}
}
if (mergeShardFile != null && !mergeShardInprogress) {
// should only get one shard path if mergeShard is generated successfully
shardPaths.clear();
shardPaths.add(mergeShardFile.getAbsolutePath());
}
return shardPaths;
}

@Override
public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
throws IOException {
List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap();
List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath();
bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache));
bloomCoarseGrainDataMap.initIndexColumnConverters(getCarbonTable(),
dataMapMeta.getIndexedColumns());
coarseGrainDataMaps.add(bloomCoarseGrainDataMap);
return coarseGrainDataMaps;
Set<String> filteredShards = ((BloomDataMapDistributable) distributable).getFilteredShards();
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
bloomDM.init(new BloomDataMapModel(indexPath, cache));
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
bloomDM.setFilteredShard(filteredShards);
dataMaps.add(bloomDM);
return dataMaps;
}

/**
* returns all the directories of lucene index files for query
* Note: copied from luceneDataMapFactory, will extract to a common interface
*/
private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) {
List<CarbonFile> indexDirs = new ArrayList<>();
List<TableDataMap> dataMaps;
try {
// there can be multiple bloom datamaps present on a table, so get all datamaps and form
// the path till the index file directories in all datamaps folders present in each segment
dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
} catch (IOException ex) {
LOGGER.error(ex, String.format("failed to get datamaps for tablePath %s, segmentId %s",
tablePath, segmentId));
throw new RuntimeException(ex);
}
if (dataMaps.size() > 0) {
for (TableDataMap dataMap : dataMaps) {
if (dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) {
List<CarbonFile> indexFiles;
String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, segmentId,
dataMap.getDataMapSchema().getDataMapName());
FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
return file.isDirectory();
}
}));
indexDirs.addAll(indexFiles);
}
}
}
return indexDirs.toArray(new CarbonFile[0]);
}

@Override
public List<DataMapDistributable> toDistributable(Segment segment) {
List<DataMapDistributable> dataMapDistributableList = new ArrayList<>();
CarbonFile[] indexDirs =
getAllIndexDirs(getCarbonTable().getTablePath(), segment.getSegmentNo());
if (segment.getFilteredIndexShardNames().size() == 0) {
for (CarbonFile indexDir : indexDirs) {
DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable(
indexDir.getAbsolutePath());
dataMapDistributableList.add(bloomDataMapDistributable);
}
return dataMapDistributableList;
Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
if (shardPaths == null) {
shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
segmentMap.put(segment.getSegmentNo(), shardPaths);
}
for (CarbonFile indexDir : indexDirs) {
// Filter out the tasks which are filtered through CG datamap.
if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
continue;
Set<String> filteredShards = segment.getFilteredIndexShardNames();
for (String shardPath : shardPaths) {
// Filter out the tasks which are filtered through Main datamap.
// for merge shard, shard pruning delay to be done before pruning blocklet
if (shardPath.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) ||
filteredShards.contains(new File(shardPath).getName())) {
DataMapDistributable bloomDataMapDistributable =
new BloomDataMapDistributable(shardPath, filteredShards);
dataMapDistributableList.add(bloomDataMapDistributable);
}
DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable(
indexDir.getAbsolutePath());
dataMapDistributableList.add(bloomDataMapDistributable);
}
return dataMapDistributableList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.carbondata.datamap.bloom;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -26,9 +25,7 @@
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CarbonLRUCache;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.util.CarbonUtil;

import org.apache.hadoop.util.bloom.CarbonBloomFilter;

Expand Down Expand Up @@ -58,7 +55,9 @@ public BloomCacheKeyValue.CacheValue get(BloomCacheKeyValue.CacheKey key)
throws IOException {
BloomCacheKeyValue.CacheValue cacheValue = getIfPresent(key);
if (cacheValue == null) {
cacheValue = loadBloomDataMapModel(key);
List<CarbonBloomFilter> bloomFilters =
BloomIndexFileStore.loadBloomFilterFromFile(key.getShardPath(), key.getIndexColumn());
cacheValue = new BloomCacheKeyValue.CacheValue(bloomFilters);
lruCache.put(key.toString(), cacheValue, cacheValue.getMemorySize());
}
return cacheValue;
Expand Down Expand Up @@ -91,40 +90,6 @@ public void put(BloomCacheKeyValue.CacheKey key, BloomCacheKeyValue.CacheValue v
// No impl required.
}

/**
* load datamap from bloomindex file
*/
private BloomCacheKeyValue.CacheValue loadBloomDataMapModel(
BloomCacheKeyValue.CacheKey cacheKey) {
DataInputStream dataInStream = null;
List<CarbonBloomFilter> bloomFilters = new ArrayList<>();
try {
String indexFile = getIndexFileFromCacheKey(cacheKey);
dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile));
while (dataInStream.available() > 0) {
CarbonBloomFilter bloomFilter = new CarbonBloomFilter();
bloomFilter.readFields(dataInStream);
bloomFilters.add(bloomFilter);
}
LOGGER.info(String.format("Read %d bloom indices from %s", bloomFilters.size(), indexFile));

return new BloomCacheKeyValue.CacheValue(bloomFilters);
} catch (IOException e) {
LOGGER.error(e, "Error occurs while reading bloom index");
throw new RuntimeException("Error occurs while reading bloom index", e);
} finally {
CarbonUtil.closeStreams(dataInStream);
}
}

/**
* get bloom index file name from cachekey
*/
private String getIndexFileFromCacheKey(BloomCacheKeyValue.CacheKey cacheKey) {
return BloomCoarseGrainDataMap
.getBloomIndexFile(cacheKey.getShardPath(), cacheKey.getIndexColumn());
}

@Override
public void clearAccessCount(List<BloomCacheKeyValue.CacheKey> keys) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.carbondata.datamap.bloom;

import java.util.Set;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.datamap.DataMapDistributable;

Expand All @@ -27,11 +29,23 @@ class BloomDataMapDistributable extends DataMapDistributable {
*/
private String indexPath;

BloomDataMapDistributable(String indexPath) {
/**
* List of index shards which are already got filtered through CG index operation.
* This is used for merge shard which cannot prune shard in `toDistributable` function.
* Other case will be set to Null
*/
private Set<String> filteredShards;

BloomDataMapDistributable(String indexPath, Set<String> filteredShards) {
this.indexPath = indexPath;
this.filteredShards = filteredShards;
}

public String getIndexPath() {
return indexPath;
}

public Set<String> getFilteredShards() {
return filteredShards;
}
}

0 comments on commit 99b938a

Please sign in to comment.