Skip to content

Commit

Permalink
segregate block and blocklet cache
Browse files Browse the repository at this point in the history
  • Loading branch information
manishgupta88 committed Jul 5, 2018
1 parent dac5d3c commit f9f039a
Show file tree
Hide file tree
Showing 16 changed files with 1,465 additions and 1,143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,12 @@ public TableBlockInfo copy() {
info.blockletId = blockletId;
info.locations = locations;
info.version = version;
info.isDataBlockFromOldStore = isDataBlockFromOldStore;
info.blockletInfos = blockletInfos;
info.blockStorageIdMap = blockStorageIdMap;
info.deletedDeltaFilePath = deletedDeltaFilePath;
info.detailInfo = detailInfo.copy();
info.dataMapWriterPath = dataMapWriterPath;
return info;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import org.apache.carbondata.core.cache.CarbonLRUCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.BlockletDataMapUtil;

/**
Expand Down Expand Up @@ -75,7 +77,7 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper id
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
BlockletDataMapIndexWrapper blockletDataMapIndexWrapper =
(BlockletDataMapIndexWrapper) lruCache.get(lruCacheKey);
List<BlockletDataMap> dataMaps = new ArrayList<>();
List<BlockDataMap> dataMaps = new ArrayList<>();
if (blockletDataMapIndexWrapper == null) {
try {
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
Expand All @@ -88,8 +90,9 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper id
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping);
BlockletDataMap blockletDataMap =
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
BlockDataMap blockletDataMap =
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable());
dataMaps.add(blockletDataMap);
blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
} else {
Expand All @@ -102,8 +105,9 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper id
new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping);
BlockletDataMap blockletDataMap =
loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap);
BlockDataMap blockletDataMap =
loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable());
dataMaps.add(blockletDataMap);
}
blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
Expand Down Expand Up @@ -149,7 +153,7 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper id
}
} catch (Throwable e) {
if (null != blockletDataMapIndexWrapper) {
List<BlockletDataMap> dataMaps = blockletDataMapIndexWrapper.getDataMaps();
List<BlockDataMap> dataMaps = blockletDataMapIndexWrapper.getDataMaps();
for (DataMap dataMap : dataMaps) {
dataMap.clear();
}
Expand Down Expand Up @@ -201,9 +205,9 @@ public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIden
if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) {
synchronized (lock) {
if (null == getIfPresent(tableBlockIndexUniqueIdentifierWrapper)) {
List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
List<BlockDataMap> dataMaps = wrapper.getDataMaps();
try {
for (BlockletDataMap blockletDataMap: dataMaps) {
for (BlockDataMap blockletDataMap: dataMaps) {
blockletDataMap.convertToUnsafeDMStore();
}
lruCache.put(tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
Expand All @@ -230,20 +234,19 @@ public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIden
* @return map of taks id to segment mapping
* @throws IOException
*/
private BlockletDataMap loadAndGetDataMap(
TableBlockIndexUniqueIdentifier identifier,
SegmentIndexFileStore indexFileStore,
Map<String, BlockMetaInfo> blockMetaInfoMap)
private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier identifier,
SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> blockMetaInfoMap,
CarbonTable carbonTable)
throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
identifier.getUniqueTableSegmentIdentifier();
Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
if (lock == null) {
lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
}
BlockletDataMap dataMap;
BlockDataMap dataMap;
synchronized (lock) {
dataMap = new BlockletDataMap();
dataMap = (BlockDataMap) BlockletDataMapFactory.createDataMap(carbonTable);
dataMap.init(new BlockletDataMapModel(
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
Expand Down Expand Up @@ -278,7 +281,7 @@ private synchronized Object addAndGetSegmentLock(String uniqueIdentifier) {
List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiersWrapper) {
for (TableBlockIndexUniqueIdentifierWrapper
identifierWrapper : tableSegmentUniqueIdentifiersWrapper) {
BlockletDataMap cacheable = (BlockletDataMap) lruCache.get(
BlockDataMap cacheable = (BlockDataMap) lruCache.get(
identifierWrapper.getTableBlockIndexUniqueIdentifier().getUniqueTableSegmentIdentifier());
cacheable.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,23 @@
import java.util.List;

import org.apache.carbondata.core.cache.Cacheable;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;

/**
* A cacheable wrapper of datamaps
*/
public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {

private List<BlockletDataMap> dataMaps;
private List<BlockDataMap> dataMaps;

// size of the wrapper. basically the total size of the datamaps this wrapper is holding
private long wrapperSize;

public BlockletDataMapIndexWrapper(List<BlockletDataMap> dataMaps) {
public BlockletDataMapIndexWrapper(List<BlockDataMap> dataMaps) {
this.dataMaps = dataMaps;
this.wrapperSize = 0L;
// add the size of each and every datamap in this wrapper
for (BlockletDataMap dataMap : dataMaps) {
for (BlockDataMap dataMap : dataMaps) {
this.wrapperSize += dataMap.getMemorySize();
}
}
Expand All @@ -54,7 +54,7 @@ public BlockletDataMapIndexWrapper(List<BlockletDataMap> dataMaps) {
return wrapperSize;
}

public List<BlockletDataMap> getDataMaps() {
public List<BlockDataMap> getDataMaps() {
return dataMaps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;

Expand Down Expand Up @@ -68,6 +68,10 @@ public class BlockletDetailInfo implements Serializable, Writable {
private byte[] columnSchemaBinary;

private long blockSize;
/**
* flag to check for store from 1.1 or any prior version
*/
private boolean isLegacyStore;

public int getRowCount() {
return rowCount;
Expand Down Expand Up @@ -172,6 +176,7 @@ public void setBlockSize(long blockSize) {
out.writeInt(blockletInfoBinary.length);
out.write(blockletInfoBinary);
out.writeLong(blockSize);
out.writeBoolean(isLegacyStore);
}

@Override public void readFields(DataInput in) throws IOException {
Expand All @@ -198,6 +203,7 @@ public void setBlockSize(long blockSize) {
in.readFully(blockletInfoBinary);
setBlockletInfoFromBinary();
blockSize = in.readLong();
isLegacyStore = in.readBoolean();
}

/**
Expand All @@ -206,8 +212,8 @@ public void setBlockSize(long blockSize) {
* @throws IOException
*/
public void readColumnSchema(byte[] schemaArray) throws IOException {
BlockletDataMap blockletDataMap = new BlockletDataMap();
columnSchemas = blockletDataMap.readColumnSchema(schemaArray);
BlockDataMap blockDataMap = new BlockDataMap();
columnSchemas = blockDataMap.readColumnSchema(schemaArray);
}

/**
Expand All @@ -222,9 +228,12 @@ public BlockletDetailInfo copy() {
detailInfo.dimLens = dimLens;
detailInfo.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
detailInfo.blockletInfo = blockletInfo;
detailInfo.blockletInfoBinary = blockletInfoBinary;
detailInfo.blockFooterOffset = blockFooterOffset;
detailInfo.columnSchemas = columnSchemas;
detailInfo.columnSchemaBinary = columnSchemaBinary;
detailInfo.blockSize = blockSize;
detailInfo.isLegacyStore = isLegacyStore;
return detailInfo;
}

Expand Down Expand Up @@ -263,4 +272,11 @@ public void setBlockletInfoBinary(byte[] blockletInfoBinary) {
this.blockletInfoBinary = blockletInfoBinary;
}

public boolean isLegacyStore() {
return isLegacyStore;
}

public void setLegacyStore(boolean legacyStore) {
isLegacyStore = legacyStore;
}
}
Loading

0 comments on commit f9f039a

Please sign in to comment.