diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java index c5508fe4715..38234605db5 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java @@ -164,7 +164,7 @@ private void initDataMapFile() throws IOException { } } for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { - String dmFile = BloomCoarseGrainDataMap.getBloomIndexFile(dataMapPath, + String dmFile = BloomIndexFileStore.getBloomIndexFile(dataMapPath, indexColumns.get(indexColId).getColName()); DataOutputStream dataOutStream = null; try { diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java index 71b1c55c164..27911cabbd3 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java @@ -17,7 +17,6 @@ package org.apache.carbondata.datamap.bloom; -import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.text.DateFormat; @@ -81,11 +80,12 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap { private static final LogService LOGGER = LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName()); - public static final String BLOOM_INDEX_SUFFIX = ".bloomindex"; private Map name2Col; private Cache cache; private String shardName; private Path indexPath; + private Set filteredShard; + private boolean needShardPrune; /** * This is used to convert literal filter value to internal carbon value */ @@ -102,6 +102,13 @@ public void init(DataMapModel dataMapModel) throws IOException { } } + public void setFilteredShard(Set filteredShard) { + this.filteredShard = filteredShard; + // do shard prune when pruning only if bloom index files are merged + this.needShardPrune = filteredShard != null && + shardName.equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME); + } + /** * init field converters for index columns */ @@ -182,11 +189,16 @@ public List prune(FilterResolverIntf filterExp, SegmentProperties segm BloomCacheKeyValue.CacheValue cacheValue = cache.get(cacheKey); List bloomIndexList = cacheValue.getBloomFilters(); for (CarbonBloomFilter bloomFilter : bloomIndexList) { + if (needShardPrune && !filteredShard.contains(bloomFilter.getShardName())) { + // skip shard which has been pruned in Main datamap + continue; + } boolean scanRequired = bloomFilter.membershipTest(new Key(bloomQueryModel.filterValue)); if (scanRequired) { LOGGER.debug(String.format("BloomCoarseGrainDataMap: Need to scan -> blocklet#%s", String.valueOf(bloomFilter.getBlockletNo()))); - Blocklet blocklet = new Blocklet(shardName, String.valueOf(bloomFilter.getBlockletNo())); + Blocklet blocklet = new Blocklet(bloomFilter.getShardName(), + String.valueOf(bloomFilter.getBlockletNo())); hitBlocklets.add(blocklet); } else { LOGGER.debug(String.format("BloomCoarseGrainDataMap: Skip scan -> blocklet#%s", @@ -349,14 +361,6 @@ public boolean isScanRequired(FilterResolverIntf filterExp) { public void clear() { } - /** - * get bloom index file - * @param shardPath path for the shard - * @param colName index column name - */ - public static String getBloomIndexFile(String shardPath, String colName) { - return shardPath.concat(File.separator).concat(colName).concat(BLOOM_INDEX_SUFFIX); - } static class BloomQueryModel { private String columnName; private byte[] filterValue; diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 0d240c448d9..853f49beb97 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -16,6 +16,7 @@ */ package org.apache.carbondata.datamap.bloom; +import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -31,16 +32,13 @@ import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.DataMapMeta; -import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.Segment; -import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datamap.dev.DataMapBuilder; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -222,24 +220,25 @@ public DataMapBuilder createBuilder(Segment segment, String shardName, @Override public List getDataMaps(Segment segment) throws IOException { - List dataMaps = new ArrayList(1); + List dataMaps = new ArrayList<>(); try { Set shardPaths = segmentMap.get(segment.getSegmentNo()); if (shardPaths == null) { - String dataMapStorePath = DataMapWriter.getDefaultDataMapPath( - getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName); - CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles(); - shardPaths = new HashSet<>(); - for (CarbonFile carbonFile : carbonFiles) { - shardPaths.add(carbonFile.getAbsolutePath()); - } + shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo()); segmentMap.put(segment.getSegmentNo(), shardPaths); } + Set filteredShards = segment.getFilteredIndexShardNames(); for (String shard : shardPaths) { - BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap(); - bloomDM.init(new BloomDataMapModel(shard, cache)); - bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns()); - dataMaps.add(bloomDM); + if (shard.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) || + filteredShards.contains(new File(shard).getName())) { + // Filter out the tasks which are filtered through Main datamap. + // for merge shard, shard pruning delay to be done before pruning blocklet + BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap(); + bloomDM.init(new BloomDataMapModel(shard, cache)); + bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns()); + bloomDM.setFilteredShard(filteredShards); + dataMaps.add(bloomDM); + } } } catch (Exception e) { throw new IOException("Error occurs while init Bloom DataMap", e); @@ -247,77 +246,67 @@ public List getDataMaps(Segment segment) throws IOException return dataMaps; } + /** + * returns all shard directories of bloom index files for query + * if bloom index files are merged we should get only one shard path + */ + private Set getAllShardPaths(String tablePath, String segmentId) { + String dataMapStorePath = CarbonTablePath.getDataMapStorePath( + tablePath, segmentId, dataMapName); + CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles(); + Set shardPaths = new HashSet<>(); + boolean mergeShardInprogress = false; + CarbonFile mergeShardFile = null; + for (CarbonFile carbonFile : carbonFiles) { + if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) { + mergeShardFile = carbonFile; + } else if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) { + mergeShardInprogress = true; + } else if (carbonFile.isDirectory()) { + shardPaths.add(carbonFile.getAbsolutePath()); + } + } + if (mergeShardFile != null && !mergeShardInprogress) { + // should only get one shard path if mergeShard is generated successfully + shardPaths.clear(); + shardPaths.add(mergeShardFile.getAbsolutePath()); + } + return shardPaths; + } + @Override public List getDataMaps(DataMapDistributable distributable) throws IOException { - List coarseGrainDataMaps = new ArrayList<>(); - BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap(); + List dataMaps = new ArrayList<>(); String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath(); - bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache)); - bloomCoarseGrainDataMap.initIndexColumnConverters(getCarbonTable(), - dataMapMeta.getIndexedColumns()); - coarseGrainDataMaps.add(bloomCoarseGrainDataMap); - return coarseGrainDataMaps; + Set filteredShards = ((BloomDataMapDistributable) distributable).getFilteredShards(); + BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap(); + bloomDM.init(new BloomDataMapModel(indexPath, cache)); + bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns()); + bloomDM.setFilteredShard(filteredShards); + dataMaps.add(bloomDM); + return dataMaps; } - /** - * returns all the directories of lucene index files for query - * Note: copied from luceneDataMapFactory, will extract to a common interface - */ - private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) { - List indexDirs = new ArrayList<>(); - List dataMaps; - try { - // there can be multiple bloom datamaps present on a table, so get all datamaps and form - // the path till the index file directories in all datamaps folders present in each segment - dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable()); - } catch (IOException ex) { - LOGGER.error(ex, String.format("failed to get datamaps for tablePath %s, segmentId %s", - tablePath, segmentId)); - throw new RuntimeException(ex); - } - if (dataMaps.size() > 0) { - for (TableDataMap dataMap : dataMaps) { - if (dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) { - List indexFiles; - String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, segmentId, - dataMap.getDataMapSchema().getDataMapName()); - FileFactory.FileType fileType = FileFactory.getFileType(dmPath); - final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType); - indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() { - @Override - public boolean accept(CarbonFile file) { - return file.isDirectory(); - } - })); - indexDirs.addAll(indexFiles); - } - } - } - return indexDirs.toArray(new CarbonFile[0]); - } @Override public List toDistributable(Segment segment) { List dataMapDistributableList = new ArrayList<>(); - CarbonFile[] indexDirs = - getAllIndexDirs(getCarbonTable().getTablePath(), segment.getSegmentNo()); - if (segment.getFilteredIndexShardNames().size() == 0) { - for (CarbonFile indexDir : indexDirs) { - DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable( - indexDir.getAbsolutePath()); - dataMapDistributableList.add(bloomDataMapDistributable); - } - return dataMapDistributableList; + Set shardPaths = segmentMap.get(segment.getSegmentNo()); + if (shardPaths == null) { + shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo()); + segmentMap.put(segment.getSegmentNo(), shardPaths); } - for (CarbonFile indexDir : indexDirs) { - // Filter out the tasks which are filtered through CG datamap. - if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) { - continue; + Set filteredShards = segment.getFilteredIndexShardNames(); + for (String shardPath : shardPaths) { + // Filter out the tasks which are filtered through Main datamap. + // for merge shard, shard pruning delay to be done before pruning blocklet + if (shardPath.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) || + filteredShards.contains(new File(shardPath).getName())) { + DataMapDistributable bloomDataMapDistributable = + new BloomDataMapDistributable(shardPath, filteredShards); + dataMapDistributableList.add(bloomDataMapDistributable); } - DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable( - indexDir.getAbsolutePath()); - dataMapDistributableList.add(bloomDataMapDistributable); } return dataMapDistributableList; } diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java index c1e625156dd..4063c2e79ee 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java @@ -16,7 +16,6 @@ */ package org.apache.carbondata.datamap.bloom; -import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -26,9 +25,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CarbonLRUCache; -import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.util.CarbonUtil; import org.apache.hadoop.util.bloom.CarbonBloomFilter; @@ -58,7 +55,9 @@ public BloomCacheKeyValue.CacheValue get(BloomCacheKeyValue.CacheKey key) throws IOException { BloomCacheKeyValue.CacheValue cacheValue = getIfPresent(key); if (cacheValue == null) { - cacheValue = loadBloomDataMapModel(key); + List bloomFilters = + BloomIndexFileStore.loadBloomFilterFromFile(key.getShardPath(), key.getIndexColumn()); + cacheValue = new BloomCacheKeyValue.CacheValue(bloomFilters); lruCache.put(key.toString(), cacheValue, cacheValue.getMemorySize()); } return cacheValue; @@ -91,40 +90,6 @@ public void put(BloomCacheKeyValue.CacheKey key, BloomCacheKeyValue.CacheValue v // No impl required. } - /** - * load datamap from bloomindex file - */ - private BloomCacheKeyValue.CacheValue loadBloomDataMapModel( - BloomCacheKeyValue.CacheKey cacheKey) { - DataInputStream dataInStream = null; - List bloomFilters = new ArrayList<>(); - try { - String indexFile = getIndexFileFromCacheKey(cacheKey); - dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile)); - while (dataInStream.available() > 0) { - CarbonBloomFilter bloomFilter = new CarbonBloomFilter(); - bloomFilter.readFields(dataInStream); - bloomFilters.add(bloomFilter); - } - LOGGER.info(String.format("Read %d bloom indices from %s", bloomFilters.size(), indexFile)); - - return new BloomCacheKeyValue.CacheValue(bloomFilters); - } catch (IOException e) { - LOGGER.error(e, "Error occurs while reading bloom index"); - throw new RuntimeException("Error occurs while reading bloom index", e); - } finally { - CarbonUtil.closeStreams(dataInStream); - } - } - - /** - * get bloom index file name from cachekey - */ - private String getIndexFileFromCacheKey(BloomCacheKeyValue.CacheKey cacheKey) { - return BloomCoarseGrainDataMap - .getBloomIndexFile(cacheKey.getShardPath(), cacheKey.getIndexColumn()); - } - @Override public void clearAccessCount(List keys) { } diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java index 86d69328dbb..b0a7ace1a6d 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java @@ -17,6 +17,8 @@ package org.apache.carbondata.datamap.bloom; +import java.util.Set; + import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.core.datamap.DataMapDistributable; @@ -27,11 +29,23 @@ class BloomDataMapDistributable extends DataMapDistributable { */ private String indexPath; - BloomDataMapDistributable(String indexPath) { + /** + * List of index shards which are already got filtered through CG index operation. + * This is used for merge shard which cannot prune shard in `toDistributable` function. + * Other case will be set to Null + */ + private Set filteredShards; + + BloomDataMapDistributable(String indexPath, Set filteredShards) { this.indexPath = indexPath; + this.filteredShards = filteredShards; } public String getIndexPath() { return indexPath; } + + public Set getFilteredShards() { + return filteredShards; + } } diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java new file mode 100644 index 00000000000..2abdc3f9aa9 --- /dev/null +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.datamap.bloom; + +import java.io.*; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.bloom.CarbonBloomFilter; + +/** + * This class works for merging and loading bloom index + */ +@InterfaceAudience.Internal +public class BloomIndexFileStore { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(BloomIndexFileStore.class.getName()); + + // suffix of original generated file + public static final String BLOOM_INDEX_SUFFIX = ".bloomindex"; + // suffix of merged bloom index file + public static final String MERGE_BLOOM_INDEX_SUFFIX = ".bloomindexmerge"; + // directory to store merged bloom index files + public static final String MERGE_BLOOM_INDEX_SHARD_NAME = "mergeShard"; + /** + * flag file for merging + * if flag file exists, query won't use mergeShard + * if flag file not exists and mergeShard generated, query will use mergeShard + */ + public static final String MERGE_INPROGRESS_FILE = "mergeShard.inprogress"; + + + public static void mergeBloomIndexFile(String dmSegmentPathString, List indexCols) { + // get all shard paths of old store + CarbonFile segmentPath = FileFactory.getCarbonFile(dmSegmentPathString, + FileFactory.getFileType(dmSegmentPathString)); + CarbonFile[] shardPaths = segmentPath.listFiles(new CarbonFileFilter() { + @Override + public boolean accept(CarbonFile file) { + return file.isDirectory() && !file.getName().equals(MERGE_BLOOM_INDEX_SHARD_NAME); + } + }); + + String mergeShardPath = dmSegmentPathString + File.separator + MERGE_BLOOM_INDEX_SHARD_NAME; + String mergeInprogressFile = dmSegmentPathString + File.separator + MERGE_INPROGRESS_FILE; + try { + // delete mergeShard folder if exists + if (FileFactory.isFileExist(mergeShardPath)) { + FileFactory.deleteFile(mergeShardPath, FileFactory.getFileType(mergeShardPath)); + } + // create flag file before creating mergeShard folder + if (!FileFactory.isFileExist(mergeInprogressFile)) { + FileFactory.createNewFile( + mergeInprogressFile, FileFactory.getFileType(mergeInprogressFile)); + } + // create mergeShard output folder + if (!FileFactory.mkdirs(mergeShardPath, FileFactory.getFileType(mergeShardPath))) { + throw new RuntimeException("Failed to create directory " + mergeShardPath); + } + } catch (IOException e) { + LOGGER.error(e, "Error occurs while create directory " + mergeShardPath); + throw new RuntimeException("Error occurs while create directory " + mergeShardPath); + } + + // for each index column, merge the bloomindex files from all shards into one + for (String indexCol: indexCols) { + String mergeIndexFile = getMergeBloomIndexFile(mergeShardPath, indexCol); + DataInputStream dataInputStream = null; + DataOutputStream dataOutputStream = null; + try { + FileFactory.createNewFile(mergeIndexFile, FileFactory.getFileType(mergeIndexFile)); + dataOutputStream = FileFactory.getDataOutputStream( + mergeIndexFile, FileFactory.getFileType(mergeIndexFile)); + for (CarbonFile shardPath : shardPaths) { + String bloomIndexFile = getBloomIndexFile(shardPath.getCanonicalPath(), indexCol); + dataInputStream = FileFactory.getDataInputStream( + bloomIndexFile, FileFactory.getFileType(bloomIndexFile)); + byte[] fileData = new byte[(int) FileFactory.getCarbonFile(bloomIndexFile).getSize()]; + dataInputStream.readFully(fileData); + byte[] shardName = shardPath.getName().getBytes(Charset.forName("UTF-8")); + dataOutputStream.writeInt(shardName.length); + dataOutputStream.write(shardName); + dataOutputStream.writeInt(fileData.length); + dataOutputStream.write(fileData); + CarbonUtil.closeStream(dataInputStream); + } + } catch (IOException e) { + LOGGER.error(e, "Error occurs while merge bloom index file of column: " + indexCol); + // delete merge shard of bloom index for this segment when failed + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(mergeShardPath)); + throw new RuntimeException( + "Error occurs while merge bloom index file of column: " + indexCol); + } finally { + CarbonUtil.closeStreams(dataInputStream, dataOutputStream); + } + } + // delete flag file and mergeShard can be used + try { + FileFactory.deleteFile(mergeInprogressFile, FileFactory.getFileType(mergeInprogressFile)); + } catch (IOException e) { + LOGGER.error(e, "Error occurs while deleting file " + mergeInprogressFile); + throw new RuntimeException("Error occurs while deleting file " + mergeInprogressFile); + } + // remove old store + for (CarbonFile shardpath: shardPaths) { + FileFactory.deleteAllCarbonFilesOfDir(shardpath); + } + } + + /** + * load bloom filter from bloom index file + */ + public static List loadBloomFilterFromFile( + String shardPath, String colName) { + if (shardPath.endsWith(MERGE_BLOOM_INDEX_SHARD_NAME)) { + return loadMergeBloomIndex(shardPath, colName); + } else { + return loadBloomIndex(shardPath, colName); + } + } + + /** + * load bloom filter of {@code colName} from {@code shardPath} + */ + public static List loadBloomIndex( + String shardPath, String colName) { + DataInputStream dataInStream = null; + List bloomFilters = new ArrayList<>(); + try { + String indexFile = getBloomIndexFile(shardPath, colName); + dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile)); + while (dataInStream.available() > 0) { + CarbonBloomFilter bloomFilter = new CarbonBloomFilter(); + bloomFilter.readFields(dataInStream); + bloomFilter.setShardName(new Path(shardPath).getName()); + bloomFilters.add(bloomFilter); + } + LOGGER.info(String.format("Read %d bloom indices from %s", bloomFilters.size(), indexFile)); + + return bloomFilters; + } catch (IOException e) { + LOGGER.error(e, "Error occurs while reading bloom index"); + throw new RuntimeException("Error occurs while reading bloom index", e); + } finally { + CarbonUtil.closeStreams(dataInStream); + } + } + + + /** + * load bloom filter of {@code colName} from {@code mergeShardPath} + */ + public static List loadMergeBloomIndex( + String mergeShardPath, String colName) { + String mergeIndexFile = getMergeBloomIndexFile(mergeShardPath, colName); + DataInputStream mergeIndexInStream = null; + List bloomFilters = new ArrayList<>(); + try { + mergeIndexInStream = FileFactory.getDataInputStream( + mergeIndexFile, FileFactory.getFileType(mergeIndexFile)); + while (mergeIndexInStream.available() > 0) { + // read shard name + int shardNameByteLength = mergeIndexInStream.readInt(); + byte[] shardNameBytes = new byte[shardNameByteLength]; + mergeIndexInStream.readFully(shardNameBytes); + String shardName = new String(shardNameBytes, Charset.forName("UTF-8")); + // read bloom index file data + int indexFileByteLength = mergeIndexInStream.readInt(); + byte[] indexFileBytes = new byte[indexFileByteLength]; + mergeIndexInStream.readFully(indexFileBytes); + // warp byte array as input stream to get bloom filters + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(indexFileBytes); + DataInputStream indexDataInStream = new DataInputStream(byteArrayInputStream); + while (indexDataInStream.available() > 0) { + CarbonBloomFilter bloomFilter = new CarbonBloomFilter(); + bloomFilter.readFields(indexDataInStream); + bloomFilter.setShardName(shardName); + bloomFilters.add(bloomFilter); + } + } + LOGGER.info( + String.format("Read %d bloom indices from %s", bloomFilters.size(), mergeIndexFile)); + return bloomFilters; + } catch (IOException e) { + LOGGER.error(e, "Error occurs while reading merge bloom index"); + throw new RuntimeException("Error occurs while reading merge bloom index", e); + } finally { + CarbonUtil.closeStreams(mergeIndexInStream); + } + } + + /** + * get bloom index file + */ + public static String getBloomIndexFile(String shardPath, String colName) { + return shardPath.concat(File.separator).concat(colName).concat(BLOOM_INDEX_SUFFIX); + } + + /** + * get merge bloom index file + */ + public static String getMergeBloomIndexFile(String mergeShardPath, String colName) { + return mergeShardPath.concat(File.separator).concat(colName).concat(MERGE_BLOOM_INDEX_SUFFIX); + } +} diff --git a/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java b/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java index c6a62cc248f..4b111dfd232 100644 --- a/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java +++ b/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java @@ -36,6 +36,9 @@ public class CarbonBloomFilter extends BloomFilter { private int blockletNo; + // used for building blocklet when query + private String shardName; + public CarbonBloomFilter() { } @@ -166,4 +169,13 @@ public void setBlockletNo(int blockletNo) { public int getBlockletNo() { return blockletNo; } + + public String getShardName() { + return shardName; + } + + public void setShardName(String shardName) { + this.shardName = shardName; + } + } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala index e6016336aa8..503729a2015 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala @@ -60,7 +60,7 @@ case class BuildDataMapPreExecutionEvent(sparkSession: SparkSession, * example: bloom datamap, Lucene datamap */ case class BuildDataMapPostExecutionEvent(sparkSession: SparkSession, - identifier: AbsoluteTableIdentifier) + identifier: AbsoluteTableIdentifier, segmentIdList: Seq[String], isFromRebuild: Boolean) extends Event with TableEventInfo /** diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala new file mode 100644 index 00000000000..78f2afc35f6 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap + +import scala.collection.JavaConverters._ + +import org.apache.spark.Partition +import org.apache.spark.rdd.CarbonMergeFilePartition +import org.apache.spark.SparkContext +import org.apache.spark.TaskContext + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.datamap.bloom.BloomIndexFileStore +import org.apache.carbondata.spark.rdd.CarbonRDD + + +/** + * RDD to merge all bloomindex files of specified segment for bloom datamap. + * + * @param sc + * @param carbonTable + * @param segmentIds segments to be merged + * @param bloomDatamapNames list of bloom datamap + * @param bloomIndexColumns list of index columns correspond to datamap + */ +class CarbonMergeBloomIndexFilesRDD( + sc: SparkContext, + carbonTable: CarbonTable, + segmentIds: Seq[String], + bloomDatamapNames: Seq[String], + bloomIndexColumns: Seq[Seq[String]]) + extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) { + + override def getPartitions: Array[Partition] = { + segmentIds.zipWithIndex.map {s => + CarbonMergeFilePartition(id, s._2, s._1) + }.toArray + } + + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = { + val tablePath = carbonTable.getTablePath + val split = theSplit.asInstanceOf[CarbonMergeFilePartition] + logInfo("Merging bloom index files of " + + s"segment ${split.segmentId} for ${carbonTable.getTableName}") + + bloomDatamapNames.zipWithIndex.map( dm => { + val dmSegmentPath = CarbonTablePath.getDataMapStorePath( + tablePath, split.segmentId, dm._1) + BloomIndexFileStore.mergeBloomIndexFile(dmSegmentPath, bloomIndexColumns(dm._2).asJava) + }) + + val iter = new Iterator[String] { + var havePair = false + var finished = false + + override def hasNext: Boolean = { + if (!finished && !havePair) { + finished = true + havePair = !finished + } + !finished + } + + override def next(): String = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + havePair = false + "" + } + } + iter + } + +} + diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index f92ed6c510c..a6ff40e1563 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -133,7 +133,7 @@ object IndexDataMapRebuildRDD { } val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession, - tableIdentifier) + tableIdentifier, validSegments.asScala.map(_.getSegmentNo), true) OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext) } } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index fcc649eae8e..710b6bae007 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -285,9 +285,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, OperationListenerBus.getInstance() .fireEvent(compactionLoadStatusPostEvent, operationContext) if (null != tableDataMaps) { - val buildDataMapPostExecutionEvent: BuildDataMapPostExecutionEvent = - new BuildDataMapPostExecutionEvent(sqlContext.sparkSession, - carbonTable.getAbsoluteTableIdentifier) + val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent( + sqlContext.sparkSession, carbonTable.getAbsoluteTableIdentifier, + Seq(carbonLoadModel.getSegmentId), true) OperationListenerBus.getInstance() .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext) } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 7f268883c1d..997fefa3d1c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -17,19 +17,15 @@ package org.apache.spark.sql -import java.io.File import java.util.concurrent.ConcurrentHashMap -import scala.util.Try - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog -import org.apache.spark.sql.events.MergeIndexEventListener +import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener} import org.apache.spark.sql.execution.command.preaaggregate._ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction import org.apache.spark.sql.hive._ -import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -185,6 +181,7 @@ object CarbonEnv { .addListener(classOf[LoadTablePostExecutionEvent], new MergeIndexEventListener) .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener) .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener) + .addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener) } /** diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala new file mode 100644 index 00000000000..7d65340b36f --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.events + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD +import org.apache.carbondata.events._ + +class MergeBloomIndexEventListener extends OperationEventListener with Logging { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + override def onEvent(event: Event, operationContext: OperationContext): Unit = { + event match { + case datamapPostEvent: BuildDataMapPostExecutionEvent => + LOGGER.audit("Load post status event-listener called for merge bloom index") + val carbonTableIdentifier = datamapPostEvent.identifier + val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier) + val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable) + val sparkSession = SparkSession.getActiveSession.get + + // filter out bloom datamap + var bloomDatamaps = tableDataMaps.asScala.filter( + _.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + + // for load process, filter lazy datamap + if (!datamapPostEvent.isFromRebuild) { + bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy) + } + + val segmentIds = datamapPostEvent.segmentIdList + if (bloomDatamaps.size > 0 && segmentIds.size > 0) { + // we extract bloom datamap name and index columns here + // because TableDataMap is not serializable + val bloomDMnames = ListBuffer.empty[String] + val bloomIndexColumns = ListBuffer.empty[Seq[String]] + bloomDatamaps.foreach( dm => { + bloomDMnames += dm.getDataMapSchema.getDataMapName + bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase) + }) + new CarbonMergeBloomIndexFilesRDD(sparkSession.sparkContext, carbonTable, + segmentIds, bloomDMnames, bloomIndexColumns).collect() + } + } + } + + + private def clearBloomCache(carbonTable: CarbonTable, segmentIds: Seq[String]): Unit = { + DataMapStoreManager.getInstance.clearDataMaps(carbonTable.getTableUniqueName) + } + +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala index 5bff9aa3004..9716fe971bc 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala @@ -46,9 +46,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { event match { case preStatusUpdateEvent: LoadTablePostExecutionEvent => LOGGER.audit("Load post status event-listener called for merge index") - val loadTablePreStatusUpdateEvent = event.asInstanceOf[LoadTablePostExecutionEvent] - val carbonTableIdentifier = loadTablePreStatusUpdateEvent.getCarbonTableIdentifier - val loadModel = loadTablePreStatusUpdateEvent.getCarbonLoadModel + val loadModel = preStatusUpdateEvent.getCarbonLoadModel val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable val compactedSegments = loadModel.getMergedSegmentIds val sparkSession = SparkSession.getActiveSession.get @@ -74,7 +72,6 @@ class MergeIndexEventListener extends OperationEventListener with Logging { } case alterTableCompactionPostEvent: AlterTableCompactionPostEvent => LOGGER.audit("Merge index for compaction called") - val alterTableCompactionPostEvent = event.asInstanceOf[AlterTableCompactionPostEvent] val carbonTable = alterTableCompactionPostEvent.carbonTable val mergedLoads = alterTableCompactionPostEvent.compactedLoads val sparkContext = alterTableCompactionPostEvent.sparkSession.sparkContext @@ -82,11 +79,9 @@ class MergeIndexEventListener extends OperationEventListener with Logging { mergeIndexFilesForCompactedSegments(sparkContext, carbonTable, mergedLoads) } case alterTableMergeIndexEvent: AlterTableMergeIndexEvent => - val exceptionEvent = event.asInstanceOf[AlterTableMergeIndexEvent] - val alterTableModel = exceptionEvent.alterTableModel - val carbonMainTable = exceptionEvent.carbonTable - val compactionType = alterTableModel.compactionType - val sparkSession = exceptionEvent.sparkSession + val alterTableModel = alterTableMergeIndexEvent.alterTableModel + val carbonMainTable = alterTableMergeIndexEvent.carbonTable + val sparkSession = alterTableMergeIndexEvent.sparkSession if (!carbonMainTable.isStreamingSink) { LOGGER.audit(s"Compaction request received for table " + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") @@ -132,8 +127,8 @@ class MergeIndexEventListener extends OperationEventListener with Logging { readFileFooterFromCarbonDataFile = true) // clear Block dataMap Cache clearBlockDataMapCache(carbonMainTable, validSegmentIds) - val requestMessage = "Compaction request completed for table " - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" + val requestMessage = "Compaction request completed for table " + + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" LOGGER.audit(requestMessage) LOGGER.info(requestMessage) } else { @@ -181,7 +176,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { // So, it is enough to do merge index only for 0.2 as it is the only valid segment in this list val validMergedSegIds = validSegments .filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) }.map(_.getSegmentNo) - if (null != validMergedSegIds && !validMergedSegIds.isEmpty) { + if (null != validMergedSegIds && validMergedSegIds.nonEmpty) { CommonUtil.mergeIndexFiles(sparkContext, validMergedSegIds, segmentFileNameMap, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index d50aea1b2d0..66534df339c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -244,7 +244,7 @@ case class CarbonLoadDataCommand( // Add pre event listener for index datamap val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(table) val dataMapOperationContext = new OperationContext() - if (null != tableDataMaps) { + if (tableDataMaps.size() > 0) { val dataMapNames: mutable.Buffer[String] = tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName) val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent = @@ -321,9 +321,9 @@ case class CarbonLoadDataCommand( table.getCarbonTableIdentifier, carbonLoadModel) OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext) - if (null != tableDataMaps) { - val buildDataMapPostExecutionEvent: BuildDataMapPostExecutionEvent = - BuildDataMapPostExecutionEvent(sparkSession, table.getAbsoluteTableIdentifier) + if (tableDataMaps.size() > 0) { + val buildDataMapPostExecutionEvent = BuildDataMapPostExecutionEvent(sparkSession, + table.getAbsoluteTableIdentifier, Seq(carbonLoadModel.getSegmentId), false) OperationListenerBus.getInstance() .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext) } diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala index fd1345cc6b0..254475f3ccc 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala @@ -780,7 +780,7 @@ class BloomCoarseGrainDataMapFunctionSuite extends QueryTest with BeforeAndAfte import scala.collection.JavaConverters._ (0 to 1).foreach { segId => val datamapPath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, segId.toString, dataMapName) - assert(FileUtils.listFiles(FileUtils.getFile(datamapPath), Array("bloomindex"), true).asScala.nonEmpty) + assert(FileUtils.listFiles(FileUtils.getFile(datamapPath), Array("bloomindexmerge"), true).asScala.nonEmpty) } // delete and clean the first segment, the corresponding datamap files should be cleaned too sql(s"DELETE FROM TABLE $bloomDMSampleTable WHERE SEGMENT.ID IN (0)") @@ -788,7 +788,7 @@ class BloomCoarseGrainDataMapFunctionSuite extends QueryTest with BeforeAndAfte var datamapPath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, "0", dataMapName) assert(!FileUtils.getFile(datamapPath).exists(), "index file of this segment has been deleted, should not exist") datamapPath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, "1", dataMapName) - assert(FileUtils.listFiles(FileUtils.getFile(datamapPath), Array("bloomindex"), true).asScala.nonEmpty) + assert(FileUtils.listFiles(FileUtils.getFile(datamapPath), Array("bloomindexmerge"), true).asScala.nonEmpty) } // two blocklets in one block are hit by bloom datamap while block cache level hit this block