diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 311019ca61a..8b393439666 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -2184,4 +2184,9 @@ private CarbonCommonConstants() { public static final String LOAD_SYNC_TIME = "load_sync_time"; + public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH = + "carbon.index.server.max.jobname.length"; + + public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT = + "50"; } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java index 57540e4ab17..0478b400b0b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder; @@ -36,6 +37,8 @@ import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.ObjectSerializationUtil; import org.apache.commons.lang.StringUtils; @@ -78,6 +81,11 @@ public class DistributableDataMapFormat extends FileInputFormatCARBON_INDEX_SERVER_JOBNAME_LENGTH + * then need to cut as transferring big query to IndexServer will be costly. + */ + public void setTaskGroupDesc(String taskGroupDesc) { + int maxJobLenth; + try { + String maxJobLenthString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH , + CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT); + maxJobLenth = Integer.parseInt(maxJobLenthString); + } catch (Exception e) { + String maxJobLenthString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT); + maxJobLenth = Integer.parseInt(maxJobLenthString); + } + if (taskGroupDesc.length() > maxJobLenth) { + this.taskGroupDesc = taskGroupDesc.substring(0, maxJobLenth); + } else { + this.taskGroupDesc = taskGroupDesc; + } + } + public FilterResolverIntf getFilterResolverIntf() { return filterResolverIntf; } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala index 0ee4ebbef77..698dd58687c 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala @@ -21,6 +21,7 @@ import java.util import scala.collection.JavaConverters._ import org.apache.log4j.Logger +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.util.SizeEstimator import org.apache.carbondata.common.logging.LogServiceFactory @@ -47,6 +48,17 @@ class DistributedDataMapJob extends AbstractDataMapJob { LOGGER.debug(s"Size of message sent to Index Server: $messageSize") } val (resonse, time) = logTime { + val spark = SparkSQLUtil.getSparkSession + val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match { + case null => "" + case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id") + } + val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match { + case null => "" + case _ => spark.sparkContext.getLocalProperty("spark.job.description") + } + dataMapFormat.setTaskGroupId(taskGroupId) + dataMapFormat.setTaskGroupDesc(taskGroupDesc) var filterInf = dataMapFormat.getFilterResolverIntf val filterProcessor = new FilterExpressionProcessor filterInf = removeSparkUnknown(filterInf, @@ -95,6 +107,17 @@ class DistributedDataMapJob extends AbstractDataMapJob { class EmbeddedDataMapJob extends AbstractDataMapJob { override def execute(dataMapFormat: DistributableDataMapFormat): util.List[ExtendedBlocklet] = { + val spark = SparkSQLUtil.getSparkSession + val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match { + case null => "" + case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id") + } + val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match { + case null => "" + case _ => spark.sparkContext.getLocalProperty("spark.job.description") + } + dataMapFormat.setTaskGroupId(taskGroupId) + dataMapFormat.setTaskGroupDesc(taskGroupDesc) IndexServer.getSplits(dataMapFormat).toList.asJava } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala index f06609557cf..9eee6d72e5f 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala @@ -100,6 +100,8 @@ object IndexServer extends ServerInterface { } def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet] = doAs { + sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId) + sparkSession.sparkContext.setLocalProperty("spark.job.description", request.getTaskGroupDesc) val splits = new DistributedPruneRDD(sparkSession, request).collect() DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet) if (request.isJobToClearDataMaps) { @@ -110,11 +112,15 @@ object IndexServer extends ServerInterface { override def invalidateSegmentCache(databaseName: String, tableName: String, segmentIds: Array[String]): Unit = doAs { + val jobgroup: String = " Invalided Segment Cache for " + databaseName + "." + tableName + sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup) new InvalidateSegmentCacheRDD(sparkSession, databaseName, tableName, segmentIds.toList) .collect() } override def showCache(tableName: String = ""): Array[String] = doAs { + val jobgroup: String = "Show Cache for " + tableName + sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup) new DistributedShowCacheRDD(sparkSession, tableName).collect() }