From 7656ad2a549ddb29dd95bcf7381b7aee16200ff9 Mon Sep 17 00:00:00 2001 From: sounakr Date: Thu, 8 Jun 2017 20:28:50 +0530 Subject: [PATCH] IUD Performance Changes --- .../core/datastore/SegmentTaskIndexStore.java | 27 ++++++++---- .../SegmentUpdateStatusManager.java | 1 + .../carbondata/hadoop/CarbonInputFormat.java | 37 ++++++++++++---- .../presto/impl/CarbonTableReader.java | 42 ++++++++++++++----- .../iud/HorizontalCompactionTestCase.scala | 2 +- .../spark/rdd/CarbonMergerRDD.scala | 10 +++-- .../store/CarbonFactDataHandlerModel.java | 3 +- 7 files changed, 88 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java index b1eaa84a36e..734aaaf7129 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java @@ -187,19 +187,25 @@ private SegmentTaskIndexWrapper loadAndGetTaskIdToSegmentsMap( TaskBucketHolder taskBucketHolder = null; try { while (iteratorOverSegmentBlocksInfos.hasNext()) { + // Initialize the UpdateVO to Null for each segment. + UpdateVO updateVO = null; // segment id to table block mapping Map.Entry> next = iteratorOverSegmentBlocksInfos.next(); // group task id to table block info mapping for the segment Map> taskIdToTableBlockInfoMap = mappedAndGetTaskIdToTableBlockInfo(segmentToTableBlocksInfos); segmentId = next.getKey(); - // get the existing map of task id to table segment map - UpdateVO updateVO = updateStatusManager.getInvalidTimestampRange(segmentId); + // updateVO is only required when Updates Or Delete performed on the Table. + if (updateStatusManager.getUpdateStatusDetails().length != 0) { + // get the existing map of task id to table segment map + updateVO = updateStatusManager.getInvalidTimestampRange(segmentId); + } // check if segment is already loaded, if segment is already loaded //no need to load the segment block String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(); segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) lruCache.get(lruCacheKey); - if (segmentTaskIndexWrapper == null || tableSegmentUniqueIdentifier.isSegmentUpdated()) { + if ((segmentTaskIndexWrapper == null) || ((null != updateVO) + && (tableSegmentUniqueIdentifier.isSegmentUpdated()))) { // get the segment loader lock object this is to avoid // same segment is getting loaded multiple times // in case of concurrent query @@ -210,8 +216,8 @@ private SegmentTaskIndexWrapper loadAndGetTaskIdToSegmentsMap( // acquire lock to lod the segment synchronized (segmentLoderLockObject) { segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) lruCache.get(lruCacheKey); - if (null == segmentTaskIndexWrapper || tableSegmentUniqueIdentifier - .isSegmentUpdated()) { + if ((null == segmentTaskIndexWrapper) || ((null != updateVO) + && (tableSegmentUniqueIdentifier.isSegmentUpdated()))) { // if the segment is updated then get the existing block task id map details // so that the same can be updated after loading the btree. if (tableSegmentUniqueIdentifier.isSegmentUpdated() @@ -245,9 +251,14 @@ private SegmentTaskIndexWrapper loadAndGetTaskIdToSegmentsMap( "Can not load the segment. No Enough space available."); } - // set the latest timestamp. - segmentTaskIndexWrapper - .setRefreshedTimeStamp(updateVO.getCreatedOrUpdatedTimeStamp()); + // Refresh the Timestamp for those tables which underwent through IUD Operations. + if (null != updateVO) { + // set the latest timestamp. + segmentTaskIndexWrapper + .setRefreshedTimeStamp(updateVO.getCreatedOrUpdatedTimeStamp()); + } else { + segmentTaskIndexWrapper.setRefreshedTimeStamp(0L); + } // tableSegmentMapTemp.put(next.getKey(), taskIdToSegmentIndexMap); // removing from segment lock map as once segment is loaded // if concurrent query is coming for same segment diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 5a93bc5c9f2..3e0a2cb1299 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -923,6 +923,7 @@ public UpdateVO getInvalidTimestampRange(String segmentId) { range.setLatestUpdateTimestamp( CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaEndTimestamp())); } + return range; } } return range; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index f5fab102b3c..1e696482230 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -340,23 +340,37 @@ private List getSplits(JobContext job, FilterResolverIntf filterReso List result = new LinkedList(); FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor(); + UpdateVO invalidBlockVOForSegmentId = null; + Boolean isIUDTable = false; AbsoluteTableIdentifier absoluteTableIdentifier = getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier(); SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier); + + isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); + //for each segment fetch blocks matching filter in Driver BTree for (String segmentNo : getSegmentsToAccess(job)) { List dataRefNodes = getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier, filterResolver, matchedPartitions, segmentNo, cacheClient, updateStatusManager); + + // Get the UpdateVO for those tables on which IUD operations being performed. + if (isIUDTable) { + invalidBlockVOForSegmentId = + updateStatusManager.getInvalidTimestampRange(segmentNo); + } for (DataRefNode dataRefNode : dataRefNodes) { BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode; TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo(); - if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, - updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()), - updateStatusManager)) { - continue; + if (isIUDTable) { + // In case IUD is not performed in this table avoid searching for + // invalidated blocks. + if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId, + updateStatusManager)) { + continue; + } } String[] deleteDeltaFilePath = null; try { @@ -535,16 +549,21 @@ private Map getSegmentAbs CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException { Map segmentIndexMap = null; SegmentTaskIndexWrapper segmentTaskIndexWrapper = null; + UpdateVO updateDetails = null; boolean isSegmentUpdated = false; Set taskKeys = null; TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); segmentTaskIndexWrapper = cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier); - UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId); + + if (updateStatusManager.getUpdateStatusDetails().length != 0) { + updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId); + } + if (null != segmentTaskIndexWrapper) { segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); - if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) { + if (null != updateDetails && isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) { taskKeys = segmentIndexMap.keySet(); isSegmentUpdated = true; } @@ -629,9 +648,9 @@ public BlockMappingVO getBlockRowCount(JobContext job, private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper, UpdateVO updateDetails) { - if (null != updateDetails.getLatestUpdateTimestamp() - && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper - .getRefreshedTimeStamp()) { + Long refreshedTime = segmentTaskIndexWrapper.getRefreshedTimeStamp(); + Long updateTimeStamp = updateDetails.getLatestUpdateTimestamp(); + if (null != refreshedTime && null != updateTimeStamp && updateTimeStamp > refreshedTime) { return true; } return false; diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index 04616033092..c328a6485c4 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -251,6 +251,8 @@ public List getInputSplits2(CarbonTableCacheModel tableCa // need apply filters to segment FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor(); + UpdateVO invalidBlockVOForSegmentId = null; + Boolean IUDTable = false; AbsoluteTableIdentifier absoluteTableIdentifier = tableCacheModel.carbonTable.getAbsoluteTableIdentifier(); @@ -289,9 +291,17 @@ public List getInputSplits2(CarbonTableCacheModel tableCa FilterResolverIntf filterInterface = CarbonInputFormatUtil .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier()); + IUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); List result = new ArrayList<>(); // for each segment fetch blocks matching filter in Driver BTree for (String segmentNo : tableCacheModel.segments) { + + if (IUDTable) { + // update not being performed on this table. + invalidBlockVOForSegmentId = + updateStatusManager.getInvalidTimestampRange(segmentNo); + } + try { List dataRefNodes = getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier, @@ -301,10 +311,11 @@ public List getInputSplits2(CarbonTableCacheModel tableCa BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode; TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo(); - if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, - updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()), - updateStatusManager)) { - continue; + if (IUDTable) { + if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId, + updateStatusManager)) { + continue; + } } result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(), @@ -361,9 +372,9 @@ private List getDataBlocksOfSegment( private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper, UpdateVO updateDetails) { - if (null != updateDetails.getLatestUpdateTimestamp() - && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper - .getRefreshedTimeStamp()) { + Long refreshedTime = segmentTaskIndexWrapper.getRefreshedTimeStamp(); + Long updateTime = updateDetails.getLatestUpdateTimestamp(); + if (null != refreshedTime && null != updateTime && updateTime > refreshedTime) { return true; } return false; @@ -374,16 +385,25 @@ private Map getSegmentAbs CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException { Map segmentIndexMap = null; SegmentTaskIndexWrapper segmentTaskIndexWrapper = null; + UpdateVO updateDetails = null; boolean isSegmentUpdated = false; Set taskKeys = null; TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); segmentTaskIndexWrapper = cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier); - UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId); + + // Until Updates or Deletes being performed on the table Invalid Blocks will not + // be formed. So it is unnecessary to get the InvalidTimeStampRange. + if (updateStatusManager.getUpdateStatusDetails().length != 0) { + updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId); + } + if (null != segmentTaskIndexWrapper) { segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); - if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) { + // IUD operations should be performed on the table in order to mark the segment as Updated. + // For Normal table no need to check for invalided blocks as there will be none of them. + if ((null != updateDetails) && isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) { taskKeys = segmentIndexMap.keySet(); isSegmentUpdated = true; } @@ -413,8 +433,8 @@ private Map getSegmentAbs List tableBlockInfoList = new ArrayList<>(); for (FileSplit inputSplit : carbonSplits) { - if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails, - updateStatusManager, segmentId)) { + if ((null == updateDetails) || ((null != updateDetails) && isValidBlockBasedOnUpdateDetails( + taskKeys, inputSplit, updateDetails, updateStatusManager, segmentId))) { BlockletInfos blockletInfos = new BlockletInfos(0, 0, 0);//this level we do not need blocklet info!!!! Is this a trick? diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala index 9c3b2614ab5..d8310daa06e 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala @@ -53,7 +53,7 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll { - test("test IUD Horizontal Compaction Update Alter Clean") { + test("test IUD Horizontal Compaction Update Alter Clean.") { sql("""drop database if exists iud4 cascade""") sql("""create database iud4""") sql("""use iud4""") diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 28988704a0b..caa389aa940 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -261,6 +261,7 @@ class CarbonMergerRDD[K, V]( val jobConf: JobConf = new JobConf(new Configuration) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) + var updateDetails: UpdateVO = null // initialise query_id for job job.getConfiguration.set("query.id", queryId) var defaultParallelism = sparkContext.defaultParallelism @@ -287,8 +288,11 @@ class CarbonMergerRDD[K, V]( // map for keeping the relation of a task and its blocks. job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg) - val updateDetails: UpdateVO = updateStatusManager.getInvalidTimestampRange(eachSeg) + if (updateStatusManager.getUpdateStatusDetails.length != 0) { + updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg) + } + var updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0 // get splits val splits = format.getSplits(job) @@ -303,8 +307,8 @@ class CarbonMergerRDD[K, V]( entry.getLocations, entry.getLength, entry.getVersion, updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString) ) - !CarbonUtil - .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager) + ((!updated) || ((updated) && (!CarbonUtil + .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager)))) }) } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index d400a6dc9da..763274cc9f7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -297,8 +297,7 @@ public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoa .checkAndCreateCarbonStoreLocation(loadModel.getStorePath(), loadModel.getDatabaseName(), tableName, loadModel.getPartitionId(), loadModel.getSegmentId()); carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); - List dimensionByTableName = - loadModel.getCarbonDataLoadSchema().getCarbonTable().getDimensionByTableName(tableName); + List dimensionByTableName = carbonTable.getDimensionByTableName(tableName); boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()]; int index = 0; for (CarbonDimension dimension : dimensionByTableName) {