diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala index 930586f7efc..c8b80aff56e 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala @@ -64,6 +64,11 @@ class DistributedCountRDD(@transient ss: SparkSession, dataMapFormat: Distributa .newFixedThreadPool(numOfThreads, new CarbonThreadFactory("IndexPruningPool", true)) implicit val ec: ExecutionContextExecutor = ExecutionContext .fromExecutor(service) + if (dataMapFormat.ifAsyncCall()) { + // to clear cache of invalid segments during pre-priming in index server + DataMapStoreManager.getInstance().clearInvalidSegments(dataMapFormat.getCarbonTable, + dataMapFormat.getInvalidSegments) + } val futures = if (inputSplits.length <= numOfThreads) { inputSplits.map { split => generateFuture(Seq(split)) @@ -84,11 +89,6 @@ class DistributedCountRDD(@transient ss: SparkSession, dataMapFormat: Distributa } else { 0L } - if (dataMapFormat.ifAsyncCall()) { - // to clear cache of invalid segments during pre-priming in index server - DataMapStoreManager.getInstance().clearInvalidSegments(dataMapFormat.getCarbonTable, - dataMapFormat.getInvalidSegments) - } Iterator((executorIP + "_" + cacheSize.toString, results.map(_._2.toLong).sum.toString)) } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 2b80985300a..57590428bcf 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -455,20 +455,26 @@ object CarbonDataRDDFactory { if (resultSize == 0) { return null } - if (CarbonUpdateUtil.updateTableMetadataStatus( + if (!CarbonUpdateUtil.updateTableMetadataStatus( segmentDetails, carbonTable, updateModel.get.updatedTimeStamp + "", true, new util.ArrayList[Segment](0), new util.ArrayList[Segment](segmentFiles), "")) { - } else { LOGGER.error("Data update failed due to failure in table status updation.") updateModel.get.executorErrors.errorMsg = errorMessage updateModel.get.executorErrors.failureCauses = FailureCauses .STATUS_FILE_UPDATION_FAILURE return null } + // code to handle Pre-Priming cache for update command + if (!segmentFiles.isEmpty) { + val segmentsToPrePrime = segmentFiles.asScala.map(iterator => iterator.getSegmentNo).toSeq + DistributedRDDUtils + .triggerPrepriming(sqlContext.sparkSession, carbonTable, segmentsToPrePrime, + operationContext, hadoopConf, segmentsToPrePrime.toList) + } } return null } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index b2ba7f42232..c00061a4839 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -454,11 +454,12 @@ class CarbonFileMetastore extends CarbonMetaStore { def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession) { val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName + // Clear both driver and executor cache. + DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables val tableIdentifier = TableIdentifier(tableName, Option(dbName)) sparkSession.sessionState.catalog.refreshTable(tableIdentifier) - DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier) removeTableFromMetadata(dbName, tableName) }