Skip to content

Commit

Permalink
[CARBONDATA-2645] Segregate block and blocklet cache
Browse files Browse the repository at this point in the history
Things done as part of this PR

Segregate block and blocklet cache. In this driver will cache the metadata based on CACHE_LEVEL.
If CACHE_LEVEL is set to BLOCK then only carbondata files metadata will be cached in driver.
If CACHE_LEVEL is set to BLOCKLET thenmetadata for number of carbondata files * number of blocklets in each carbondata file will be cached in driver.

This closes #2437
  • Loading branch information
manishgupta88 authored and ravipesala committed Jul 30, 2018
1 parent 86f9773 commit 5111a4b
Show file tree
Hide file tree
Showing 16 changed files with 1,457 additions and 1,143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,12 @@ public TableBlockInfo copy() {
info.blockletId = blockletId;
info.locations = locations;
info.version = version;
info.isDataBlockFromOldStore = isDataBlockFromOldStore;
info.blockletInfos = blockletInfos;
info.blockStorageIdMap = blockStorageIdMap;
info.deletedDeltaFilePath = deletedDeltaFilePath;
info.detailInfo = detailInfo.copy();
info.dataMapWriterPath = dataMapWriterPath;
return info;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import org.apache.carbondata.core.cache.CarbonLRUCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.BlockletDataMapUtil;

/**
Expand Down Expand Up @@ -75,7 +77,7 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper id
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
BlockletDataMapIndexWrapper blockletDataMapIndexWrapper =
(BlockletDataMapIndexWrapper) lruCache.get(lruCacheKey);
List<BlockletDataMap> dataMaps = new ArrayList<>();
List<BlockDataMap> dataMaps = new ArrayList<>();
if (blockletDataMapIndexWrapper == null) {
try {
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
Expand All @@ -88,8 +90,9 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper id
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping);
BlockletDataMap blockletDataMap =
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
BlockDataMap blockletDataMap =
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable());
dataMaps.add(blockletDataMap);
blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
} else {
Expand All @@ -102,8 +105,9 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper id
new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping);
BlockletDataMap blockletDataMap =
loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap);
BlockDataMap blockletDataMap =
loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable());
dataMaps.add(blockletDataMap);
}
blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
Expand Down Expand Up @@ -149,7 +153,7 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper id
}
} catch (Throwable e) {
if (null != blockletDataMapIndexWrapper) {
List<BlockletDataMap> dataMaps = blockletDataMapIndexWrapper.getDataMaps();
List<BlockDataMap> dataMaps = blockletDataMapIndexWrapper.getDataMaps();
for (DataMap dataMap : dataMaps) {
dataMap.clear();
}
Expand Down Expand Up @@ -201,9 +205,9 @@ public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIden
if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) {
synchronized (lock) {
if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) {
List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
List<BlockDataMap> dataMaps = wrapper.getDataMaps();
try {
for (BlockletDataMap blockletDataMap: dataMaps) {
for (BlockDataMap blockletDataMap: dataMaps) {
blockletDataMap.convertToUnsafeDMStore();
}
lruCache.put(tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
Expand All @@ -230,20 +234,19 @@ public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIden
* @return map of taks id to segment mapping
* @throws IOException
*/
private BlockletDataMap loadAndGetDataMap(
TableBlockIndexUniqueIdentifier identifier,
SegmentIndexFileStore indexFileStore,
Map<String, BlockMetaInfo> blockMetaInfoMap)
private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier identifier,
SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> blockMetaInfoMap,
CarbonTable carbonTable)
throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
identifier.getUniqueTableSegmentIdentifier();
Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
if (lock == null) {
lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
}
BlockletDataMap dataMap;
BlockDataMap dataMap;
synchronized (lock) {
dataMap = new BlockletDataMap();
dataMap = (BlockDataMap) BlockletDataMapFactory.createDataMap(carbonTable);
dataMap.init(new BlockletDataMapModel(
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
Expand Down Expand Up @@ -278,7 +281,7 @@ private synchronized Object addAndGetSegmentLock(String uniqueIdentifier) {
List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiersWrapper) {
for (TableBlockIndexUniqueIdentifierWrapper
identifierWrapper : tableSegmentUniqueIdentifiersWrapper) {
BlockletDataMap cacheable = (BlockletDataMap) lruCache.get(
BlockDataMap cacheable = (BlockDataMap) lruCache.get(
identifierWrapper.getTableBlockIndexUniqueIdentifier().getUniqueTableSegmentIdentifier());
cacheable.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,23 @@
import java.util.List;

import org.apache.carbondata.core.cache.Cacheable;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;

/**
* A cacheable wrapper of datamaps
*/
public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {

private List<BlockletDataMap> dataMaps;
private List<BlockDataMap> dataMaps;

// size of the wrapper. basically the total size of the datamaps this wrapper is holding
private long wrapperSize;

public BlockletDataMapIndexWrapper(List<BlockletDataMap> dataMaps) {
public BlockletDataMapIndexWrapper(List<BlockDataMap> dataMaps) {
this.dataMaps = dataMaps;
this.wrapperSize = 0L;
// add the size of each and every datamap in this wrapper
for (BlockletDataMap dataMap : dataMaps) {
for (BlockDataMap dataMap : dataMaps) {
this.wrapperSize += dataMap.getMemorySize();
}
}
Expand All @@ -54,7 +54,7 @@ public BlockletDataMapIndexWrapper(List<BlockletDataMap> dataMaps) {
return wrapperSize;
}

public List<BlockletDataMap> getDataMaps() {
public List<BlockDataMap> getDataMaps() {
return dataMaps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;

Expand Down Expand Up @@ -68,6 +68,10 @@ public class BlockletDetailInfo implements Serializable, Writable {
private byte[] columnSchemaBinary;

private long blockSize;
/**
* flag to check for store from 1.1 or any prior version
*/
private boolean isLegacyStore;

public int getRowCount() {
return rowCount;
Expand Down Expand Up @@ -172,6 +176,7 @@ public void setBlockSize(long blockSize) {
out.writeInt(blockletInfoBinary.length);
out.write(blockletInfoBinary);
out.writeLong(blockSize);
out.writeBoolean(isLegacyStore);
}

@Override public void readFields(DataInput in) throws IOException {
Expand All @@ -198,6 +203,7 @@ public void setBlockSize(long blockSize) {
in.readFully(blockletInfoBinary);
setBlockletInfoFromBinary();
blockSize = in.readLong();
isLegacyStore = in.readBoolean();
}

/**
Expand All @@ -206,8 +212,8 @@ public void setBlockSize(long blockSize) {
* @throws IOException
*/
public void readColumnSchema(byte[] schemaArray) throws IOException {
BlockletDataMap blockletDataMap = new BlockletDataMap();
columnSchemas = blockletDataMap.readColumnSchema(schemaArray);
BlockDataMap blockDataMap = new BlockDataMap();
columnSchemas = blockDataMap.readColumnSchema(schemaArray);
}

/**
Expand All @@ -222,9 +228,12 @@ public BlockletDetailInfo copy() {
detailInfo.dimLens = dimLens;
detailInfo.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
detailInfo.blockletInfo = blockletInfo;
detailInfo.blockletInfoBinary = blockletInfoBinary;
detailInfo.blockFooterOffset = blockFooterOffset;
detailInfo.columnSchemas = columnSchemas;
detailInfo.columnSchemaBinary = columnSchemaBinary;
detailInfo.blockSize = blockSize;
detailInfo.isLegacyStore = isLegacyStore;
return detailInfo;
}

Expand Down Expand Up @@ -263,4 +272,11 @@ public void setBlockletInfoBinary(byte[] blockletInfoBinary) {
this.blockletInfoBinary = blockletInfoBinary;
}

public boolean isLegacyStore() {
return isLegacyStore;
}

public void setLegacyStore(boolean legacyStore) {
isLegacyStore = legacyStore;
}
}
Loading

0 comments on commit 5111a4b

Please sign in to comment.