From 8bdd932d695de6500d3751f7f4ab90b0e54fa295 Mon Sep 17 00:00:00 2001 From: kunal642 Date: Mon, 29 Jul 2019 14:31:31 +0530 Subject: [PATCH] Added segmentPropertiesWrapper to BlockDataMap --- .../SegmentPropertiesAndSchemaHolder.java | 21 ++++++----- .../indexstore/BlockletDataMapIndexStore.java | 2 +- .../blockletindex/BlockDataMap.java | 36 +++++++------------ .../blockletindex/BlockletDataMap.java | 7 +--- ...hColumnMetCacheAndCacheLevelProperty.scala | 21 ++++------- 5 files changed, 30 insertions(+), 57 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java index f2f2d8cfc06..056a0e71a24 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java @@ -98,7 +98,7 @@ public static SegmentPropertiesAndSchemaHolder getInstance() { * @param columnCardinality * @param segmentId */ - public int addSegmentProperties(CarbonTable carbonTable, + public SegmentPropertiesWrapper addSegmentProperties(CarbonTable carbonTable, List columnsInTable, int[] columnCardinality, String segmentId) { SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper segmentPropertiesWrapper = new SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper(carbonTable, @@ -137,7 +137,7 @@ public int addSegmentProperties(CarbonTable carbonTable, .addMinMaxColumns(carbonTable); } } - return segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex(); + return getSegmentPropertiesWrapper(segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex()); } /** @@ -222,17 +222,14 @@ public void invalidate(AbsoluteTableIdentifier absoluteTableIdentifier) { * Method to remove the given segment ID * * @param segmentId - * @param segmentPropertiesIndex * @param clearSegmentWrapperFromMap flag to specify whether to clear segmentPropertiesWrapper * from Map if all the segment's using it have become stale */ - public void invalidate(String segmentId, int segmentPropertiesIndex, + public void invalidate(String segmentId, SegmentPropertiesWrapper segmentPropertiesWrapper, boolean clearSegmentWrapperFromMap) { - SegmentPropertiesWrapper segmentPropertiesWrapper = - indexToSegmentPropertiesWrapperMapping.get(segmentPropertiesIndex); - if (null != segmentPropertiesWrapper) { - SegmentIdAndSegmentPropertiesIndexWrapper segmentIdAndSegmentPropertiesIndexWrapper = - segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper); + SegmentIdAndSegmentPropertiesIndexWrapper segmentIdAndSegmentPropertiesIndexWrapper = + segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper); + if (segmentIdAndSegmentPropertiesIndexWrapper != null) { synchronized (getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier())) { segmentIdAndSegmentPropertiesIndexWrapper.removeSegmentId(segmentId); // if after removal of given SegmentId, the segmentIdSet becomes empty that means this @@ -240,14 +237,16 @@ public void invalidate(String segmentId, int segmentPropertiesIndex, // removed from all the holders if (clearSegmentWrapperFromMap && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet .isEmpty()) { - indexToSegmentPropertiesWrapperMapping.remove(segmentPropertiesIndex); + indexToSegmentPropertiesWrapperMapping + .remove(segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex()); segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper); } else if (!clearSegmentWrapperFromMap && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) { // min max columns can very when cache is modified. So even though entry is not required // to be deleted from map clear the column cache so that it can filled again segmentPropertiesWrapper.clear(); - LOGGER.info("cleared min max for segmentProperties at index: " + segmentPropertiesIndex); + LOGGER.info("cleared min max for segmentProperties at index: " + + segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex()); } } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index ce1e8ac1290..32ee9cbdaaa 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -230,7 +230,7 @@ private BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper i // as segmentId will be same for all the dataMaps and segmentProperties cache is // maintained at segment level so it need to be called only once for clearing SegmentPropertiesAndSchemaHolder.getInstance() - .invalidate(segmentId, dataMaps.get(0).getSegmentPropertiesIndex(), + .invalidate(segmentId, dataMaps.get(0).getSegmentPropertiesWrapper(), tableSegmentUniqueIdentifierWrapper.isAddTableBlockToUnsafeAndLRUCache()); } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index 24ad43a0562..f1687617a2f 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -99,7 +99,8 @@ public class BlockDataMap extends CoarseGrainDataMap /** * index of segmentProperties in the segmentProperties holder */ - protected int segmentPropertiesIndex; + protected transient SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper + segmentPropertiesWrapper; /** * flag to check for store from 1.1 or any prior version */ @@ -204,10 +205,10 @@ private SegmentProperties initSegmentProperties(BlockletDataMapModel blockletDat DataFileFooter fileFooter) throws IOException { List columnInTable = fileFooter.getColumnInTable(); int[] columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); - segmentPropertiesIndex = SegmentPropertiesAndSchemaHolder.getInstance() + segmentPropertiesWrapper = SegmentPropertiesAndSchemaHolder.getInstance() .addSegmentProperties(blockletDataMapInfo.getCarbonTable(), columnInTable, columnCardinality, blockletDataMapInfo.getSegmentId()); - return getSegmentProperties(); + return segmentPropertiesWrapper.getSegmentProperties(); } /** @@ -485,8 +486,7 @@ protected String getFilePath() { return getTableTaskInfo(SUMMARY_INDEX_PATH); } // create the segment directory path - String tablePath = SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertiesIndex).getTableIdentifier().getTablePath(); + String tablePath = segmentPropertiesWrapper.getTableIdentifier().getTablePath(); String segmentId = getTableTaskInfo(SUMMARY_SEGMENTID); return CarbonTablePath.getSegmentPath(tablePath, segmentId); } @@ -620,8 +620,7 @@ protected void createSummaryDMStore(BlockletDataMapModel blockletDataMapModel) } protected List getMinMaxCacheColumns() { - return SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertiesIndex).getMinMaxCacheColumns(); + return segmentPropertiesWrapper.getMinMaxCacheColumns(); } /** @@ -1019,18 +1018,15 @@ public long getMemorySize() { } protected SegmentProperties getSegmentProperties() { - return SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentProperties(segmentPropertiesIndex); + return segmentPropertiesWrapper.getSegmentProperties(); } public int[] getColumnCardinality() { - return SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertiesIndex).getColumnCardinality(); + return segmentPropertiesWrapper.getColumnCardinality(); } public List getColumnSchema() { - return SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertiesIndex).getColumnsInTable(); + return segmentPropertiesWrapper.getColumnsInTable(); } protected AbstractMemoryDMStore getMemoryDMStore(boolean addToUnsafe) @@ -1045,14 +1041,10 @@ protected AbstractMemoryDMStore getMemoryDMStore(boolean addToUnsafe) } protected CarbonRowSchema[] getFileFooterEntrySchema() { - return SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertiesIndex).getBlockFileFooterEntrySchema(); + return segmentPropertiesWrapper.getBlockFileFooterEntrySchema(); } protected CarbonRowSchema[] getTaskSummarySchema() { - SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper segmentPropertiesWrapper = - SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertiesIndex); try { return segmentPropertiesWrapper.getTaskSummarySchemaForBlock(true, isFilePathStored); } catch (MemoryException e) { @@ -1080,12 +1072,8 @@ public void convertToUnsafeDMStore() throws MemoryException { } } - public void setSegmentPropertiesIndex(int segmentPropertiesIndex) { - this.segmentPropertiesIndex = segmentPropertiesIndex; - } - - public int getSegmentPropertiesIndex() { - return segmentPropertiesIndex; + public SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper getSegmentPropertiesWrapper() { + return segmentPropertiesWrapper; } @Override public int getNumberOfEntries() { diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 23d39ce22f4..11f24f592b5 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -27,7 +27,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.dev.DataMapModel; import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; @@ -83,9 +82,6 @@ protected CarbonRowSchema[] getTaskSummarySchema() { if (isLegacyStore) { return super.getTaskSummarySchema(); } - SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper segmentPropertiesWrapper = - SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertiesIndex); try { return segmentPropertiesWrapper.getTaskSummarySchemaForBlocklet(false, isFilePathStored); } catch (MemoryException e) { @@ -98,8 +94,7 @@ protected CarbonRowSchema[] getFileFooterEntrySchema() { if (isLegacyStore) { return super.getFileFooterEntrySchema(); } - return SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertiesIndex).getBlockletFileFooterEntrySchema(); + return segmentPropertiesWrapper.getBlockletFileFooterEntrySchema(); } /** diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala index ac0ca8b6a8d..ffaa09a73c8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala @@ -88,9 +88,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be private def validateMinMaxColumnsCacheLength(dataMaps: List[DataMap[_ <: Blocklet]], expectedLength: Int, storeBlockletCount: Boolean = false): Boolean = { - val index = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex - val summarySchema = SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(index).getTaskSummarySchemaForBlock(storeBlockletCount, false) + val summarySchema = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper + .getTaskSummarySchemaForBlock(storeBlockletCount, false) val minSchemas = summarySchema(BlockletDataMapRowIndexes.TASK_MIN_VALUES_INDEX) .asInstanceOf[CarbonRowSchema.StructCarbonRowSchema] .getChildSchemas @@ -107,13 +106,11 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be assert(dataMaps.nonEmpty) assert(dataMaps(0).isInstanceOf[BlockDataMap]) assert(validateMinMaxColumnsCacheLength(dataMaps, 3, true)) - var segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex // alter table to add column_meta_cache and cache_level sql( "alter table metaCache set tblproperties('column_meta_cache'='c2,c1', 'CACHE_LEVEL'='BLOCKLET')") - var wrapper = SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertyIndex) + var wrapper = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper // after alter operation cache should be cleaned and cache should be evicted assert(null == wrapper) checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa")) @@ -125,20 +122,16 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be assert(validateMinMaxColumnsCacheLength(dataMaps, 2)) // alter table to add same value as previous with order change for column_meta_cache and cache_level - segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex sql( "alter table metaCache set tblproperties('column_meta_cache'='c1,c2', 'CACHE_LEVEL'='BLOCKLET')") - wrapper = SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertyIndex) + wrapper = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper // after alter operation cache should not be cleaned as value are unchanged assert(null != wrapper) // alter table to cache no column in column_meta_cache - segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex sql( "alter table metaCache set tblproperties('column_meta_cache'='')") - wrapper = SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertyIndex) + wrapper = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper // after alter operation cache should be cleaned and cache should be evicted assert(null == wrapper) @@ -151,11 +144,9 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be assert(validateMinMaxColumnsCacheLength(dataMaps, 0)) // alter table to cache no column in column_meta_cache - segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex sql( "alter table metaCache unset tblproperties('column_meta_cache', 'cache_level')") - wrapper = SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(segmentPropertyIndex) + wrapper = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper // after alter operation cache should be cleaned and cache should be evicted assert(null == wrapper) checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa"))