From 9b4ff28e6d6d2e2f4e601d501a5f8bfe56440473 Mon Sep 17 00:00:00 2001 From: Vikram Ahuja Date: Tue, 17 Dec 2019 12:11:07 +0530 Subject: [PATCH] added segments for prepriming in case of update success --- .../carbondata/spark/rdd/CarbonDataRDDFactory.scala | 10 ++++++++-- .../apache/spark/sql/hive/CarbonFileMetastore.scala | 3 ++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 71c5a89cfd2..f4a428f4801 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -459,20 +459,26 @@ object CarbonDataRDDFactory { if (resultSize == 0) { return null } - if (CarbonUpdateUtil.updateTableMetadataStatus( + if (!CarbonUpdateUtil.updateTableMetadataStatus( segmentDetails, carbonTable, updateModel.get.updatedTimeStamp + "", true, new util.ArrayList[Segment](0), new util.ArrayList[Segment](segmentFiles), "")) { - } else { LOGGER.error("Data update failed due to failure in table status updation.") updateModel.get.executorErrors.errorMsg = errorMessage updateModel.get.executorErrors.failureCauses = FailureCauses .STATUS_FILE_UPDATION_FAILURE return null } + // code to handle Pre-Priming cache for update command + if (!segmentFiles.isEmpty) { + val segmentsToPrePrime = segmentFiles.asScala.map(iterator => iterator.getSegmentNo).toSeq + DistributedRDDUtils + .triggerPrepriming(sqlContext.sparkSession, carbonTable, segmentsToPrePrime, + operationContext, hadoopConf, segmentsToPrePrime.toList) + } } return null } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 6c9c6cd9c4f..377d8a294e4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -477,11 +477,12 @@ class CarbonFileMetastore extends CarbonMetaStore { // clear driver B-tree and dictionary cache ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) } + // Clear both driver and executor cache. + DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables val tableIdentifier = TableIdentifier(tableName, Option(dbName)) sparkSession.sessionState.catalog.refreshTable(tableIdentifier) - DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier) removeTableFromMetadata(dbName, tableName) }