Skip to content

Commit

Permalink
Merge 057dd6c into 9ee0f35
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinjmh committed Aug 20, 2018
2 parents 9ee0f35 + 057dd6c commit 83bebee
Show file tree
Hide file tree
Showing 16 changed files with 527 additions and 153 deletions.
Expand Up @@ -164,7 +164,7 @@ private void initDataMapFile() throws IOException {
}
}
for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) {
String dmFile = BloomCoarseGrainDataMap.getBloomIndexFile(dataMapPath,
String dmFile = BloomIndexFileStore.getBloomIndexFile(dataMapPath,
indexColumns.get(indexColId).getColName());
DataOutputStream dataOutStream = null;
try {
Expand Down
Expand Up @@ -17,7 +17,6 @@

package org.apache.carbondata.datamap.bloom;

import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
Expand Down Expand Up @@ -81,11 +80,12 @@
public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
private static final LogService LOGGER =
LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
private Map<String, CarbonColumn> name2Col;
private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
private String shardName;
private Path indexPath;
private Set<String> filteredShard;
private boolean needShardPrune;
/**
* This is used to convert literal filter value to internal carbon value
*/
Expand All @@ -102,6 +102,13 @@ public void init(DataMapModel dataMapModel) throws IOException {
}
}

public void setFilteredShard(Set<String> filteredShard) {
this.filteredShard = filteredShard;
// do shard prune when pruning only if bloom index files are merged
this.needShardPrune = filteredShard != null &&
shardName.equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME);
}

/**
* init field converters for index columns
*/
Expand Down Expand Up @@ -182,11 +189,16 @@ public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segm
BloomCacheKeyValue.CacheValue cacheValue = cache.get(cacheKey);
List<CarbonBloomFilter> bloomIndexList = cacheValue.getBloomFilters();
for (CarbonBloomFilter bloomFilter : bloomIndexList) {
if (needShardPrune && !filteredShard.contains(bloomFilter.getShardName())) {
// skip shard which has been pruned in Main datamap
continue;
}
boolean scanRequired = bloomFilter.membershipTest(new Key(bloomQueryModel.filterValue));
if (scanRequired) {
LOGGER.debug(String.format("BloomCoarseGrainDataMap: Need to scan -> blocklet#%s",
String.valueOf(bloomFilter.getBlockletNo())));
Blocklet blocklet = new Blocklet(shardName, String.valueOf(bloomFilter.getBlockletNo()));
Blocklet blocklet = new Blocklet(bloomFilter.getShardName(),
String.valueOf(bloomFilter.getBlockletNo()));
hitBlocklets.add(blocklet);
} else {
LOGGER.debug(String.format("BloomCoarseGrainDataMap: Skip scan -> blocklet#%s",
Expand Down Expand Up @@ -349,14 +361,6 @@ public boolean isScanRequired(FilterResolverIntf filterExp) {
public void clear() {
}

/**
* get bloom index file
* @param shardPath path for the shard
* @param colName index column name
*/
public static String getBloomIndexFile(String shardPath, String colName) {
return shardPath.concat(File.separator).concat(colName).concat(BLOOM_INDEX_SUFFIX);
}
static class BloomQueryModel {
private String columnName;
private byte[] filterValue;
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.carbondata.datamap.bloom;

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -31,16 +32,13 @@
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.features.TableOperation;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
Expand Down Expand Up @@ -222,102 +220,93 @@ public DataMapBuilder createBuilder(Segment segment, String shardName,

@Override
public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
List<CoarseGrainDataMap> dataMaps = new ArrayList<CoarseGrainDataMap>(1);
List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
try {
Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
if (shardPaths == null) {
String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
shardPaths = new HashSet<>();
for (CarbonFile carbonFile : carbonFiles) {
shardPaths.add(carbonFile.getAbsolutePath());
}
shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
segmentMap.put(segment.getSegmentNo(), shardPaths);
}
Set<String> filteredShards = segment.getFilteredIndexShardNames();
for (String shard : shardPaths) {
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
bloomDM.init(new BloomDataMapModel(shard, cache));
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
dataMaps.add(bloomDM);
if (shard.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) ||
filteredShards.contains(new File(shard).getName())) {
// Filter out the tasks which are filtered through Main datamap.
// for merge shard, shard pruning delay to be done before pruning blocklet
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
bloomDM.init(new BloomDataMapModel(shard, cache));
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
bloomDM.setFilteredShard(filteredShards);
dataMaps.add(bloomDM);
}
}
} catch (Exception e) {
throw new IOException("Error occurs while init Bloom DataMap", e);
}
return dataMaps;
}

/**
* returns all shard directories of bloom index files for query
* if bloom index files are merged we should get only one shard path
*/
private Set<String> getAllShardPaths(String tablePath, String segmentId) {
String dataMapStorePath = CarbonTablePath.getDataMapStorePath(
tablePath, segmentId, dataMapName);
CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
Set<String> shardPaths = new HashSet<>();
boolean mergeShardInprogress = false;
CarbonFile mergeShardFile = null;
for (CarbonFile carbonFile : carbonFiles) {
if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) {
mergeShardFile = carbonFile;
} else if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) {
mergeShardInprogress = true;
} else if (carbonFile.isDirectory()) {
shardPaths.add(carbonFile.getAbsolutePath());
}
}
if (mergeShardFile != null && !mergeShardInprogress) {
// should only get one shard path if mergeShard is generated successfully
shardPaths.clear();
shardPaths.add(mergeShardFile.getAbsolutePath());
}
return shardPaths;
}

@Override
public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
throws IOException {
List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap();
List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath();
bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache));
bloomCoarseGrainDataMap.initIndexColumnConverters(getCarbonTable(),
dataMapMeta.getIndexedColumns());
coarseGrainDataMaps.add(bloomCoarseGrainDataMap);
return coarseGrainDataMaps;
Set<String> filteredShards = ((BloomDataMapDistributable) distributable).getFilteredShards();
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
bloomDM.init(new BloomDataMapModel(indexPath, cache));
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
bloomDM.setFilteredShard(filteredShards);
dataMaps.add(bloomDM);
return dataMaps;
}

/**
* returns all the directories of lucene index files for query
* Note: copied from luceneDataMapFactory, will extract to a common interface
*/
private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) {
List<CarbonFile> indexDirs = new ArrayList<>();
List<TableDataMap> dataMaps;
try {
// there can be multiple bloom datamaps present on a table, so get all datamaps and form
// the path till the index file directories in all datamaps folders present in each segment
dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
} catch (IOException ex) {
LOGGER.error(ex, String.format("failed to get datamaps for tablePath %s, segmentId %s",
tablePath, segmentId));
throw new RuntimeException(ex);
}
if (dataMaps.size() > 0) {
for (TableDataMap dataMap : dataMaps) {
if (dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) {
List<CarbonFile> indexFiles;
String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, segmentId,
dataMap.getDataMapSchema().getDataMapName());
FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
return file.isDirectory();
}
}));
indexDirs.addAll(indexFiles);
}
}
}
return indexDirs.toArray(new CarbonFile[0]);
}

@Override
public List<DataMapDistributable> toDistributable(Segment segment) {
List<DataMapDistributable> dataMapDistributableList = new ArrayList<>();
CarbonFile[] indexDirs =
getAllIndexDirs(getCarbonTable().getTablePath(), segment.getSegmentNo());
if (segment.getFilteredIndexShardNames().size() == 0) {
for (CarbonFile indexDir : indexDirs) {
DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable(
indexDir.getAbsolutePath());
dataMapDistributableList.add(bloomDataMapDistributable);
}
return dataMapDistributableList;
Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
if (shardPaths == null) {
shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
segmentMap.put(segment.getSegmentNo(), shardPaths);
}
for (CarbonFile indexDir : indexDirs) {
// Filter out the tasks which are filtered through CG datamap.
if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
continue;
Set<String> filteredShards = segment.getFilteredIndexShardNames();
for (String shardPath : shardPaths) {
// Filter out the tasks which are filtered through Main datamap.
// for merge shard, shard pruning delay to be done before pruning blocklet
if (shardPath.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) ||
filteredShards.contains(new File(shardPath).getName())) {
DataMapDistributable bloomDataMapDistributable =
new BloomDataMapDistributable(shardPath, filteredShards);
dataMapDistributableList.add(bloomDataMapDistributable);
}
DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable(
indexDir.getAbsolutePath());
dataMapDistributableList.add(bloomDataMapDistributable);
}
return dataMapDistributableList;
}
Expand Down
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.carbondata.datamap.bloom;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -26,9 +25,7 @@
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CarbonLRUCache;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.util.CarbonUtil;

import org.apache.hadoop.util.bloom.CarbonBloomFilter;

Expand Down Expand Up @@ -58,7 +55,9 @@ public BloomCacheKeyValue.CacheValue get(BloomCacheKeyValue.CacheKey key)
throws IOException {
BloomCacheKeyValue.CacheValue cacheValue = getIfPresent(key);
if (cacheValue == null) {
cacheValue = loadBloomDataMapModel(key);
List<CarbonBloomFilter> bloomFilters =
BloomIndexFileStore.loadBloomFilterFromFile(key.getShardPath(), key.getIndexColumn());
cacheValue = new BloomCacheKeyValue.CacheValue(bloomFilters);
lruCache.put(key.toString(), cacheValue, cacheValue.getMemorySize());
}
return cacheValue;
Expand Down Expand Up @@ -91,40 +90,6 @@ public void put(BloomCacheKeyValue.CacheKey key, BloomCacheKeyValue.CacheValue v
// No impl required.
}

/**
* load datamap from bloomindex file
*/
private BloomCacheKeyValue.CacheValue loadBloomDataMapModel(
BloomCacheKeyValue.CacheKey cacheKey) {
DataInputStream dataInStream = null;
List<CarbonBloomFilter> bloomFilters = new ArrayList<>();
try {
String indexFile = getIndexFileFromCacheKey(cacheKey);
dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile));
while (dataInStream.available() > 0) {
CarbonBloomFilter bloomFilter = new CarbonBloomFilter();
bloomFilter.readFields(dataInStream);
bloomFilters.add(bloomFilter);
}
LOGGER.info(String.format("Read %d bloom indices from %s", bloomFilters.size(), indexFile));

return new BloomCacheKeyValue.CacheValue(bloomFilters);
} catch (IOException e) {
LOGGER.error(e, "Error occurs while reading bloom index");
throw new RuntimeException("Error occurs while reading bloom index", e);
} finally {
CarbonUtil.closeStreams(dataInStream);
}
}

/**
* get bloom index file name from cachekey
*/
private String getIndexFileFromCacheKey(BloomCacheKeyValue.CacheKey cacheKey) {
return BloomCoarseGrainDataMap
.getBloomIndexFile(cacheKey.getShardPath(), cacheKey.getIndexColumn());
}

@Override
public void clearAccessCount(List<BloomCacheKeyValue.CacheKey> keys) {
}
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.carbondata.datamap.bloom;

import java.util.Set;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.datamap.DataMapDistributable;

Expand All @@ -27,11 +29,23 @@ class BloomDataMapDistributable extends DataMapDistributable {
*/
private String indexPath;

BloomDataMapDistributable(String indexPath) {
/**
* List of index shards which are already got filtered through CG index operation.
* This is used for merge shard which cannot prune shard in `toDistributable` function.
* Other case will be set to Null
*/
private Set<String> filteredShards;

BloomDataMapDistributable(String indexPath, Set<String> filteredShards) {
this.indexPath = indexPath;
this.filteredShards = filteredShards;
}

public String getIndexPath() {
return indexPath;
}

public Set<String> getFilteredShards() {
return filteredShards;
}
}

0 comments on commit 83bebee

Please sign in to comment.