Skip to content

Commit

Permalink
[CARBONDATA-3455] Job Group ID is not displayed for the IndexServer Jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
dhatchayani committed Jun 27, 2019
1 parent 188e7e4 commit 2099954
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 21 deletions.
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID
import scala.collection.JavaConverters._

import org.apache.log4j.Logger
import org.apache.spark.sql.execution.command.cache.CacheUtil
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.util.SizeEstimator

Expand Down Expand Up @@ -59,16 +60,8 @@ class DistributedDataMapJob extends AbstractDataMapJob {
val (resonse, time) = logTime {
try {
val spark = SparkSQLUtil.getSparkSession
val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
case null => ""
case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
}
val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match {
case null => ""
case _ => spark.sparkContext.getLocalProperty("spark.job.description")
}
dataMapFormat.setTaskGroupId(taskGroupId)
dataMapFormat.setTaskGroupDesc(taskGroupDesc)
dataMapFormat.setTaskGroupId(CacheUtil.getTaskGroupId(spark))
dataMapFormat.setTaskGroupDesc(CacheUtil.getTaskGroupDesc(spark))
var filterInf = dataMapFormat.getFilterResolverIntf
val filterProcessor = new FilterExpressionProcessor
filterInf = removeSparkUnknown(filterInf,
Expand Down
Expand Up @@ -55,7 +55,7 @@ trait ServerInterface {
* Invalidate the cache for the specified segments only. Used in case of compaction/Update/Delete.
*/
def invalidateSegmentCache(carbonTable: CarbonTable,
segmentIds: Array[String]): Unit
segmentIds: Array[String], jobGroupId: String = ""): Unit
}

/**
Expand Down Expand Up @@ -114,11 +114,12 @@ object IndexServer extends ServerInterface {
}

override def invalidateSegmentCache(carbonTable: CarbonTable,
segmentIds: Array[String]): Unit = doAs {
segmentIds: Array[String], jobGroupId: String = ""): Unit = doAs {
val databaseName = carbonTable.getDatabaseName
val tableName = carbonTable.getTableName
val jobgroup: String = " Invalided Segment Cache for " + databaseName + "." + tableName
sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", jobGroupId)
new InvalidateSegmentCacheRDD(sparkSession, carbonTable, segmentIds.toList)
.collect()
}
Expand Down
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, N
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
import org.apache.spark.sql.execution.command.cache.CacheUtil
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil}
Expand Down Expand Up @@ -259,7 +260,9 @@ object CarbonDataRDDFactory {
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)) {
try {
IndexServer.getClient.invalidateSegmentCache(carbonLoadModel
.getCarbonDataLoadSchema.getCarbonTable, compactedSegments.asScala.toArray)
.getCarbonDataLoadSchema.getCarbonTable,
compactedSegments.asScala.toArray,
CacheUtil.getTaskGroupId(sqlContext.sparkSession))
} catch {
case ex: Exception =>
LOGGER.warn(s"Clear cache job has failed for ${carbonLoadModel
Expand Down
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.cache
import scala.collection.JavaConverters._

import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.CacheType
Expand All @@ -45,7 +46,7 @@ object CacheUtil {
* @param carbonTable
* @return List of all index files
*/
def getAllIndexFiles(carbonTable: CarbonTable): List[String] = {
def getAllIndexFiles(carbonTable: CarbonTable)(sparkSession: SparkSession): List[String] = {
if (carbonTable.isTransactionalTable) {
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val validAndInvalidSegmentsInfo = new SegmentStatusManager(absoluteTableIdentifier)
Expand All @@ -56,7 +57,8 @@ object CacheUtil {
val invalidSegmentIds = validAndInvalidSegmentsInfo.getInvalidSegments.asScala
.map(_.getSegmentNo).toArray
try {
IndexServer.getClient.invalidateSegmentCache(carbonTable, invalidSegmentIds)
IndexServer.getClient
.invalidateSegmentCache(carbonTable, invalidSegmentIds, getTaskGroupId(sparkSession))
} catch {
case e: Exception =>
LOGGER.warn("Failed to clear cache from executors. ", e)
Expand Down Expand Up @@ -84,6 +86,22 @@ object CacheUtil {
}
}

def getTaskGroupId(sparkSession: SparkSession): String = {
val taskGroupId = sparkSession.sparkContext.getLocalProperty("spark.jobGroup.id") match {
case null => ""
case _ => sparkSession.sparkContext.getLocalProperty("spark.jobGroup.id")
}
taskGroupId
}

def getTaskGroupDesc(sparkSession: SparkSession): String = {
val taskGroupDesc = sparkSession.sparkContext.getLocalProperty("spark.job.description") match {
case null => ""
case _ => sparkSession.sparkContext.getLocalProperty("spark.job.description")
}
taskGroupDesc
}

/**
* Given a carbonTable file, returns a list of all dictionary entries which can be in cache
*
Expand Down
Expand Up @@ -55,7 +55,7 @@ case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall
carbonTable.getTableName)) {
DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME)
} else {
val allIndexFiles = CacheUtil.getAllIndexFiles(carbonTable)
val allIndexFiles = CacheUtil.getAllIndexFiles(carbonTable)(sparkSession)
// Extract dictionary keys for the table and create cache keys from those
val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable)

Expand Down
Expand Up @@ -82,7 +82,7 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
Checker
.validateTableExists(tableIdentifier.get.database, tableIdentifier.get.table, sparkSession)
val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size
val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable)(sparkSession).size
val driverRawResults = getTableCacheFromDriver(sparkSession, carbonTable, numberOfIndexFiles)
val indexRawResults = if (CarbonProperties.getInstance().isDistributedPruningEnabled
(tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase),
Expand Down
Expand Up @@ -116,7 +116,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
isUpdateOperation = false)

DeleteExecution.clearDistributedSegmentCache(carbonTable, deletedSegments)
DeleteExecution.clearDistributedSegmentCache(carbonTable, deletedSegments)(sparkSession)

if (executorErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executorErrors.errorMsg)
Expand Down
Expand Up @@ -158,7 +158,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
executionErrors,
segmentsToBeDeleted)

DeleteExecution.clearDistributedSegmentCache(carbonTable, segmentsToBeDeleted)
DeleteExecution
.clearDistributedSegmentCache(carbonTable, segmentsToBeDeleted)(sparkSession)

} else {
throw new ConcurrentOperationException(carbonTable, "compaction", "update")
Expand Down
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.ExecutionErrors
import org.apache.spark.sql.execution.command.cache.CacheUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.SparkSQLUtil

Expand Down Expand Up @@ -317,13 +318,13 @@ object DeleteExecution {
}

def clearDistributedSegmentCache(carbonTable: CarbonTable,
segmentsToBeCleared: Seq[Segment]): Unit = {
segmentsToBeCleared: Seq[Segment])(sparkSession: SparkSession): Unit = {
if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable
.getDatabaseName, carbonTable.getTableName)) {
try {
IndexServer.getClient
.invalidateSegmentCache(carbonTable, segmentsToBeCleared.map(_.getSegmentNo)
.toArray)
.toArray, CacheUtil.getTaskGroupId(sparkSession))
} catch {
case _: Exception =>
LOGGER.warn(s"Clearing of invalid segments for ${
Expand Down

0 comments on commit 2099954

Please sign in to comment.