Skip to content

Commit

Permalink
Added segmentPropertiesWrapper to BlockDataMap
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Aug 6, 2019
1 parent ed117f7 commit 8bdd932
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 57 deletions.
Expand Up @@ -98,7 +98,7 @@ public static SegmentPropertiesAndSchemaHolder getInstance() {
* @param columnCardinality
* @param segmentId
*/
public int addSegmentProperties(CarbonTable carbonTable,
public SegmentPropertiesWrapper addSegmentProperties(CarbonTable carbonTable,
List<ColumnSchema> columnsInTable, int[] columnCardinality, String segmentId) {
SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper segmentPropertiesWrapper =
new SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper(carbonTable,
Expand Down Expand Up @@ -137,7 +137,7 @@ public int addSegmentProperties(CarbonTable carbonTable,
.addMinMaxColumns(carbonTable);
}
}
return segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex();
return getSegmentPropertiesWrapper(segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex());
}

/**
Expand Down Expand Up @@ -222,32 +222,31 @@ public void invalidate(AbsoluteTableIdentifier absoluteTableIdentifier) {
* Method to remove the given segment ID
*
* @param segmentId
* @param segmentPropertiesIndex
* @param clearSegmentWrapperFromMap flag to specify whether to clear segmentPropertiesWrapper
* from Map if all the segment's using it have become stale
*/
public void invalidate(String segmentId, int segmentPropertiesIndex,
public void invalidate(String segmentId, SegmentPropertiesWrapper segmentPropertiesWrapper,
boolean clearSegmentWrapperFromMap) {
SegmentPropertiesWrapper segmentPropertiesWrapper =
indexToSegmentPropertiesWrapperMapping.get(segmentPropertiesIndex);
if (null != segmentPropertiesWrapper) {
SegmentIdAndSegmentPropertiesIndexWrapper segmentIdAndSegmentPropertiesIndexWrapper =
segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
SegmentIdAndSegmentPropertiesIndexWrapper segmentIdAndSegmentPropertiesIndexWrapper =
segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
if (segmentIdAndSegmentPropertiesIndexWrapper != null) {
synchronized (getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier())) {
segmentIdAndSegmentPropertiesIndexWrapper.removeSegmentId(segmentId);
// if after removal of given SegmentId, the segmentIdSet becomes empty that means this
// segmentPropertiesWrapper is not getting used at all. In that case this object can be
// removed from all the holders
if (clearSegmentWrapperFromMap && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet
.isEmpty()) {
indexToSegmentPropertiesWrapperMapping.remove(segmentPropertiesIndex);
indexToSegmentPropertiesWrapperMapping
.remove(segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex());
segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper);
} else if (!clearSegmentWrapperFromMap
&& segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) {
// min max columns can very when cache is modified. So even though entry is not required
// to be deleted from map clear the column cache so that it can filled again
segmentPropertiesWrapper.clear();
LOGGER.info("cleared min max for segmentProperties at index: " + segmentPropertiesIndex);
LOGGER.info("cleared min max for segmentProperties at index: "
+ segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex());
}
}
}
Expand Down
Expand Up @@ -230,7 +230,7 @@ private BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper i
// as segmentId will be same for all the dataMaps and segmentProperties cache is
// maintained at segment level so it need to be called only once for clearing
SegmentPropertiesAndSchemaHolder.getInstance()
.invalidate(segmentId, dataMaps.get(0).getSegmentPropertiesIndex(),
.invalidate(segmentId, dataMaps.get(0).getSegmentPropertiesWrapper(),
tableSegmentUniqueIdentifierWrapper.isAddTableBlockToUnsafeAndLRUCache());
}
}
Expand Down
Expand Up @@ -99,7 +99,8 @@ public class BlockDataMap extends CoarseGrainDataMap
/**
* index of segmentProperties in the segmentProperties holder
*/
protected int segmentPropertiesIndex;
protected transient SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper
segmentPropertiesWrapper;
/**
* flag to check for store from 1.1 or any prior version
*/
Expand Down Expand Up @@ -204,10 +205,10 @@ private SegmentProperties initSegmentProperties(BlockletDataMapModel blockletDat
DataFileFooter fileFooter) throws IOException {
List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
int[] columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
segmentPropertiesIndex = SegmentPropertiesAndSchemaHolder.getInstance()
segmentPropertiesWrapper = SegmentPropertiesAndSchemaHolder.getInstance()
.addSegmentProperties(blockletDataMapInfo.getCarbonTable(),
columnInTable, columnCardinality, blockletDataMapInfo.getSegmentId());
return getSegmentProperties();
return segmentPropertiesWrapper.getSegmentProperties();
}

/**
Expand Down Expand Up @@ -485,8 +486,7 @@ protected String getFilePath() {
return getTableTaskInfo(SUMMARY_INDEX_PATH);
}
// create the segment directory path
String tablePath = SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getTableIdentifier().getTablePath();
String tablePath = segmentPropertiesWrapper.getTableIdentifier().getTablePath();
String segmentId = getTableTaskInfo(SUMMARY_SEGMENTID);
return CarbonTablePath.getSegmentPath(tablePath, segmentId);
}
Expand Down Expand Up @@ -620,8 +620,7 @@ protected void createSummaryDMStore(BlockletDataMapModel blockletDataMapModel)
}

protected List<CarbonColumn> getMinMaxCacheColumns() {
return SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getMinMaxCacheColumns();
return segmentPropertiesWrapper.getMinMaxCacheColumns();
}

/**
Expand Down Expand Up @@ -1019,18 +1018,15 @@ public long getMemorySize() {
}

protected SegmentProperties getSegmentProperties() {
return SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentProperties(segmentPropertiesIndex);
return segmentPropertiesWrapper.getSegmentProperties();
}

public int[] getColumnCardinality() {
return SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getColumnCardinality();
return segmentPropertiesWrapper.getColumnCardinality();
}

public List<ColumnSchema> getColumnSchema() {
return SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getColumnsInTable();
return segmentPropertiesWrapper.getColumnsInTable();
}

protected AbstractMemoryDMStore getMemoryDMStore(boolean addToUnsafe)
Expand All @@ -1045,14 +1041,10 @@ protected AbstractMemoryDMStore getMemoryDMStore(boolean addToUnsafe)
}

protected CarbonRowSchema[] getFileFooterEntrySchema() {
return SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getBlockFileFooterEntrySchema();
return segmentPropertiesWrapper.getBlockFileFooterEntrySchema();
}

protected CarbonRowSchema[] getTaskSummarySchema() {
SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper segmentPropertiesWrapper =
SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertiesIndex);
try {
return segmentPropertiesWrapper.getTaskSummarySchemaForBlock(true, isFilePathStored);
} catch (MemoryException e) {
Expand Down Expand Up @@ -1080,12 +1072,8 @@ public void convertToUnsafeDMStore() throws MemoryException {
}
}

public void setSegmentPropertiesIndex(int segmentPropertiesIndex) {
this.segmentPropertiesIndex = segmentPropertiesIndex;
}

public int getSegmentPropertiesIndex() {
return segmentPropertiesIndex;
public SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper getSegmentPropertiesWrapper() {
return segmentPropertiesWrapper;
}

@Override public int getNumberOfEntries() {
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
Expand Down Expand Up @@ -83,9 +82,6 @@ protected CarbonRowSchema[] getTaskSummarySchema() {
if (isLegacyStore) {
return super.getTaskSummarySchema();
}
SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper segmentPropertiesWrapper =
SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertiesIndex);
try {
return segmentPropertiesWrapper.getTaskSummarySchemaForBlocklet(false, isFilePathStored);
} catch (MemoryException e) {
Expand All @@ -98,8 +94,7 @@ protected CarbonRowSchema[] getFileFooterEntrySchema() {
if (isLegacyStore) {
return super.getFileFooterEntrySchema();
}
return SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getBlockletFileFooterEntrySchema();
return segmentPropertiesWrapper.getBlockletFileFooterEntrySchema();
}

/**
Expand Down
Expand Up @@ -88,9 +88,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be

private def validateMinMaxColumnsCacheLength(dataMaps: List[DataMap[_ <: Blocklet]],
expectedLength: Int, storeBlockletCount: Boolean = false): Boolean = {
val index = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
val summarySchema = SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(index).getTaskSummarySchemaForBlock(storeBlockletCount, false)
val summarySchema = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper
.getTaskSummarySchemaForBlock(storeBlockletCount, false)
val minSchemas = summarySchema(BlockletDataMapRowIndexes.TASK_MIN_VALUES_INDEX)
.asInstanceOf[CarbonRowSchema.StructCarbonRowSchema]
.getChildSchemas
Expand All @@ -107,13 +106,11 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
assert(dataMaps.nonEmpty)
assert(dataMaps(0).isInstanceOf[BlockDataMap])
assert(validateMinMaxColumnsCacheLength(dataMaps, 3, true))
var segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex

// alter table to add column_meta_cache and cache_level
sql(
"alter table metaCache set tblproperties('column_meta_cache'='c2,c1', 'CACHE_LEVEL'='BLOCKLET')")
var wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertyIndex)
var wrapper = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper
// after alter operation cache should be cleaned and cache should be evicted
assert(null == wrapper)
checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa"))
Expand All @@ -125,20 +122,16 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
assert(validateMinMaxColumnsCacheLength(dataMaps, 2))

// alter table to add same value as previous with order change for column_meta_cache and cache_level
segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
sql(
"alter table metaCache set tblproperties('column_meta_cache'='c1,c2', 'CACHE_LEVEL'='BLOCKLET')")
wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertyIndex)
wrapper = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper
// after alter operation cache should not be cleaned as value are unchanged
assert(null != wrapper)

// alter table to cache no column in column_meta_cache
segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
sql(
"alter table metaCache set tblproperties('column_meta_cache'='')")
wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertyIndex)
wrapper = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper

// after alter operation cache should be cleaned and cache should be evicted
assert(null == wrapper)
Expand All @@ -151,11 +144,9 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
assert(validateMinMaxColumnsCacheLength(dataMaps, 0))

// alter table to cache no column in column_meta_cache
segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
sql(
"alter table metaCache unset tblproperties('column_meta_cache', 'cache_level')")
wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertyIndex)
wrapper = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper
// after alter operation cache should be cleaned and cache should be evicted
assert(null == wrapper)
checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa"))
Expand Down

0 comments on commit 8bdd932

Please sign in to comment.