From d14c403f6282ca8b574dae2fa5ab77caa5cf3c18 Mon Sep 17 00:00:00 2001 From: xuchuanyin Date: Fri, 11 May 2018 21:49:43 +0800 Subject: [PATCH] [CARBONDATA-2459][DataMap] Add cache for bloom filter datamap Loading bloom filter from bloomindex file is slow. Adding cache for this procedure will surely improve the query performance This closes #2300 --- .../core/constants/CarbonCommonConstants.java | 13 + .../bloom/BloomCoarseGrainDataMap.java | 108 +++----- .../bloom/BloomCoarseGrainDataMapFactory.java | 4 + .../datamap/bloom/BloomDataMapCache.java | 232 ++++++++++++++++++ .../datamap/bloom/BloomDataMapWriter.java | 5 +- 5 files changed, 283 insertions(+), 79 deletions(-) create mode 100644 datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 56607b9a81d..f3a821b5e74 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1729,6 +1729,19 @@ public final class CarbonCommonConstants { // Property to enable parallel datamap loading for a table public static final String CARBON_LOAD_DATAMAPS_PARALLEL = "carbon.load.datamaps.parallel."; + /** + * Cache size in MB for bloom filter datamap. It is an integer and should be greater than 0 + * and it will be used during query. + */ + @CarbonProperty + public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE = + "carbon.query.datamap.bloom.cache.size"; + + /** + * default value in size for cache size of bloom filter datamap. + */ + public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL = "512"; + private CarbonCommonConstants() { } } diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java index 725d5cd51df..09de25e95c0 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java @@ -17,13 +17,10 @@ package org.apache.carbondata.datamap.bloom; -import java.io.DataInputStream; -import java.io.EOFException; +import java.io.File; import java.io.IOException; -import java.io.ObjectInputStream; import java.io.UnsupportedEncodingException; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -45,13 +42,8 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.util.CarbonUtil; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; /** * BloomDataCoarseGrainMap is constructed in blocklet level. For each indexed column, @@ -62,15 +54,16 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap { private static final LogService LOGGER = LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName()); + public static final String BLOOM_INDEX_SUFFIX = ".bloomindex"; private Set indexedColumn; private List bloomIndexList; - private Multimap> indexCol2BloomDMList; - public static final String BLOOM_INDEX_SUFFIX = ".bloomindex"; private String shardName; + private BloomDataMapCache bloomDataMapCache; + private Path indexPath; @Override public void init(DataMapModel dataMapModel) throws IOException { - Path indexPath = FileFactory.getPath(dataMapModel.getFilePath()); + this.indexPath = FileFactory.getPath(dataMapModel.getFilePath()); this.shardName = indexPath.getName(); FileSystem fs = FileFactory.getFileSystem(indexPath); if (!fs.exists(indexPath)) { @@ -81,52 +74,11 @@ public void init(DataMapModel dataMapModel) throws IOException { throw new IOException( String.format("Path %s for Bloom index dataMap must be a directory", indexPath)); } - - FileStatus[] indexFileStatus = fs.listStatus(indexPath, new PathFilter() { - @Override public boolean accept(Path path) { - return path.getName().endsWith(BLOOM_INDEX_SUFFIX); - } - }); - indexedColumn = new HashSet(); - bloomIndexList = new ArrayList(); - indexCol2BloomDMList = ArrayListMultimap.create(); - for (int i = 0; i < indexFileStatus.length; i++) { - String indexfilename = indexFileStatus[i].getPath().getName(); - String indexCol = - indexfilename.substring(0, indexfilename.length() - BLOOM_INDEX_SUFFIX.length()); - indexedColumn.add(indexCol); - List models = readBloomIndex(indexFileStatus[i].getPath().toString()); - bloomIndexList.addAll(models); - indexCol2BloomDMList.put(indexCol, models); - } - LOGGER.info("find bloom index datamap for column: " - + StringUtils.join(indexedColumn, ", ")); + this.bloomDataMapCache = BloomDataMapCache.getInstance(); } - private List readBloomIndex(String indexFile) throws IOException { - LOGGER.info("read bloom index from file: " + indexFile); - List bloomDMModelList = new ArrayList(); - DataInputStream dataInStream = null; - ObjectInputStream objectInStream = null; - try { - dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile)); - objectInStream = new ObjectInputStream(dataInStream); - try { - BloomDMModel model = null; - while ((model = (BloomDMModel) objectInStream.readObject()) != null) { - LOGGER.info("read bloom index: " + model); - bloomDMModelList.add(model); - } - } catch (EOFException e) { - LOGGER.info("read " + bloomDMModelList.size() + " bloom indices from " + indexFile); - } - return bloomDMModelList; - } catch (ClassNotFoundException e) { - LOGGER.error("Error occrus while reading bloom index"); - throw new RuntimeException("Error occrus while reading bloom index", e); - } finally { - CarbonUtil.closeStreams(objectInStream, dataInStream); - } + public void setIndexedColumn(Set indexedColumn) { + this.indexedColumn = indexedColumn; } @Override @@ -139,26 +91,22 @@ public List prune(FilterResolverIntf filterExp, SegmentProperties segm } List bloomQueryModels = getQueryValue(filterExp.getFilterExpression()); - for (BloomQueryModel bloomQueryModel : bloomQueryModels) { - LOGGER.info("prune blocklet for query: " + bloomQueryModel); - for (List bloomDMModels : indexCol2BloomDMList.get( - bloomQueryModel.columnName)) { - for (BloomDMModel bloomDMModel : bloomDMModels) { - boolean scanRequired = bloomDMModel.getBloomFilter().mightContain( - convertValueToBytes(bloomQueryModel.dataType, bloomQueryModel.filterValue)); - if (scanRequired) { - LOGGER.info(String.format( - "BloomCoarseGrainDataMap: Need to scan -> blocklet#%s", - String.valueOf(bloomDMModel.getBlockletNo()))); - Blocklet blocklet = - new Blocklet(shardName, String.valueOf(bloomDMModel.getBlockletNo())); - hitBlocklets.add(blocklet); - } else { - LOGGER.info(String.format( - "BloomCoarseGrainDataMap: Skip scan -> blocklet#%s", - String.valueOf(bloomDMModel.getBlockletNo()))); - } + LOGGER.debug("prune blocklet for query: " + bloomQueryModel); + BloomDataMapCache.CacheKey cacheKey = new BloomDataMapCache.CacheKey( + this.indexPath.toString(), bloomQueryModel.columnName); + List bloomDMModels = this.bloomDataMapCache.getBloomDMModelByKey(cacheKey); + for (BloomDMModel bloomDMModel : bloomDMModels) { + boolean scanRequired = bloomDMModel.getBloomFilter().mightContain( + convertValueToBytes(bloomQueryModel.dataType, bloomQueryModel.filterValue)); + if (scanRequired) { + LOGGER.debug(String.format("BloomCoarseGrainDataMap: Need to scan -> blocklet#%s", + String.valueOf(bloomDMModel.getBlockletNo()))); + Blocklet blocklet = new Blocklet(shardName, String.valueOf(bloomDMModel.getBlockletNo())); + hitBlocklets.add(blocklet); + } else { + LOGGER.debug(String.format("BloomCoarseGrainDataMap: Skip scan -> blocklet#%s", + String.valueOf(bloomDMModel.getBlockletNo()))); } } } @@ -228,12 +176,20 @@ public void clear() { bloomIndexList = null; } + /** + * get bloom index file + * @param shardPath path for the shard + * @param colName index column name + */ + public static String getBloomIndexFile(String shardPath, String colName) { + return shardPath.concat(File.separator).concat(colName).concat(BLOOM_INDEX_SUFFIX); + } static class BloomQueryModel { private String columnName; private DataType dataType; private Object filterValue; - public BloomQueryModel(String columnName, DataType dataType, Object filterValue) { + private BloomQueryModel(String columnName, DataType dataType, Object filterValue) { this.columnName = columnName; this.dataType = dataType; this.filterValue = filterValue; diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 95c21fad6d2..581c3a62641 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -189,6 +190,7 @@ public List getDataMaps(Segment segment) throws IOException for (CarbonFile carbonFile : carbonFiles) { BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap(); bloomDM.init(new DataMapModel(carbonFile.getAbsolutePath())); + bloomDM.setIndexedColumn(new HashSet(dataMapMeta.getIndexedColumnNames())); dataMaps.add(bloomDM); } } catch (Exception e) { @@ -204,6 +206,8 @@ public List getDataMaps(DataMapDistributable distributable) BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap(); String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath(); bloomCoarseGrainDataMap.init(new DataMapModel(indexPath)); + bloomCoarseGrainDataMap.setIndexedColumn( + new HashSet(dataMapMeta.getIndexedColumnNames())); coarseGrainDataMaps.add(bloomCoarseGrainDataMap); return coarseGrainDataMaps; } diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java new file mode 100644 index 00000000000..fc23f332f9d --- /dev/null +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.datamap.bloom; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.CacheStats; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +/** + * This class is used to add cache for bloomfilter datamap to accelerate query through it. + * The cache is implemented using guava cache and is a singleton which will be shared by all the + * bloomfilter datamaps. + * As for the cache, the key is a bloomindex file for a shard and the value is the bloomfilters + * for the blocklets in this shard. + * The size of cache can be configurable through CarbonProperties and the cache will be expired if + * no one access it in the past 2 hours. + */ +@InterfaceAudience.Internal +public class BloomDataMapCache implements Serializable { + private static final LogService LOGGER = LogServiceFactory.getLogService( + BloomDataMapCache.class.getName()); + private static final long serialVersionUID = 20160822L; + private static final int DEFAULT_CACHE_EXPIRED_HOURS = 2; + private LoadingCache> bloomDMCache = null; + + private BloomDataMapCache() { + RemovalListener> listener = + new RemovalListener>() { + @Override + public void onRemoval(RemovalNotification> notification) { + LOGGER.info( + String.format("Remove bloom datamap entry %s from cache due to %s", + notification.getKey(), notification.getCause())); + } + }; + CacheLoader> cacheLoader = + new CacheLoader>() { + @Override + public List load(CacheKey key) throws Exception { + LOGGER.info(String.format("Load bloom datamap entry %s to cache", key)); + return loadBloomDataMapModel(key); + } + }; + + int cacheSizeInBytes = validateAndGetCacheSize() + * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR + * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR; + this.bloomDMCache = CacheBuilder.newBuilder() + .recordStats() + .maximumSize(cacheSizeInBytes) + .expireAfterAccess(DEFAULT_CACHE_EXPIRED_HOURS, TimeUnit.HOURS) + .removalListener(listener) + .build(cacheLoader); + } + + private static class SingletonHolder { + private static final BloomDataMapCache INSTANCE = new BloomDataMapCache(); + } + + /** + * get instance + */ + public static BloomDataMapCache getInstance() { + return SingletonHolder.INSTANCE; + } + + /** + * for resolve from serialized + */ + protected Object readResolve() { + return getInstance(); + } + + private int validateAndGetCacheSize() { + String cacheSizeStr = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE, + CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL); + int cacheSize; + try { + cacheSize = Integer.parseInt(cacheSizeStr); + if (cacheSize <= 0) { + throw new NumberFormatException("Value should be greater than 0: " + cacheSize); + } + } catch (NumberFormatException ex) { + LOGGER.error(String.format( + "The value '%s' for '%s' is invalid, it must be an Integer that greater than 0." + + " Use default value '%s' instead.", cacheSizeStr, + CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE, + CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL)); + cacheSize = Integer.parseInt( + CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL); + } + return cacheSize; + } + + /** + * load datamap from bloomindex file + */ + private List loadBloomDataMapModel(CacheKey cacheKey) { + DataInputStream dataInStream = null; + ObjectInputStream objectInStream = null; + List bloomDMModels = new ArrayList(); + try { + String indexFile = getIndexFileFromCacheKey(cacheKey); + dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile)); + objectInStream = new ObjectInputStream(dataInStream); + try { + BloomDMModel model = null; + while ((model = (BloomDMModel) objectInStream.readObject()) != null) { + bloomDMModels.add(model); + } + } catch (EOFException e) { + LOGGER.info(String.format("Read %d bloom indices from %s", + bloomDMModels.size(), indexFile)); + } + this.bloomDMCache.put(cacheKey, bloomDMModels); + return bloomDMModels; + } catch (ClassNotFoundException | IOException e) { + LOGGER.error(e, "Error occurs while reading bloom index"); + throw new RuntimeException("Error occurs while reading bloom index", e); + } finally { + clear(); + CarbonUtil.closeStreams(objectInStream, dataInStream); + } + } + + /** + * get bloom index file name from cachekey + */ + private String getIndexFileFromCacheKey(CacheKey cacheKey) { + return BloomCoarseGrainDataMap.getBloomIndexFile(cacheKey.shardPath, cacheKey.indexColumn); + } + + /** + * get bloom datamap from cache + */ + public List getBloomDMModelByKey(CacheKey cacheKey) { + return this.bloomDMCache.getUnchecked(cacheKey); + } + + /** + * get cache status + */ + private String getCacheStatus() { + StringBuilder sb = new StringBuilder(); + CacheStats stats = this.bloomDMCache.stats(); + sb.append("hitCount: ").append(stats.hitCount()).append(System.lineSeparator()) + .append("hitRate: ").append(stats.hitCount()).append(System.lineSeparator()) + .append("loadCount: ").append(stats.loadCount()).append(System.lineSeparator()) + .append("averageLoadPenalty: ").append(stats.averageLoadPenalty()) + .append(System.lineSeparator()) + .append("evictionCount: ").append(stats.evictionCount()); + return sb.toString(); + } + + /** + * clear this cache + */ + private void clear() { + LOGGER.info(String.format("Current meta cache statistic: %s", getCacheStatus())); + LOGGER.info("Trigger invalid all the cache for bloom datamap"); + this.bloomDMCache.invalidateAll(); + } + + public static class CacheKey { + private String shardPath; + private String indexColumn; + + CacheKey(String shardPath, String indexColumn) { + this.shardPath = shardPath; + this.indexColumn = indexColumn; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("CacheKey{"); + sb.append("shardPath='").append(shardPath).append('\''); + sb.append(", indexColumn='").append(indexColumn).append('\''); + sb.append('}'); + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof CacheKey)) return false; + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(shardPath, cacheKey.shardPath) + && Objects.equals(indexColumn, cacheKey.indexColumn); + } + + @Override + public int hashCode() { + return Objects.hash(shardPath, indexColumn); + } + } +} diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java index a55de1178be..f6eb331aae2 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java @@ -25,7 +25,6 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -138,8 +137,8 @@ private void initDataMapFile() throws IOException { } List indexColumns = getIndexColumns(); for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { - String dmFile = dataMapPath + CarbonCommonConstants.FILE_SEPARATOR + - indexColumns.get(indexColId).getColName() + BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX; + String dmFile = BloomCoarseGrainDataMap.getBloomIndexFile(dataMapPath, + indexColumns.get(indexColId).getColName()); DataOutputStream dataOutStream = null; ObjectOutputStream objectOutStream = null; try {