From c8c0211c33371bf5cb5b2b912553076361097cc0 Mon Sep 17 00:00:00 2001 From: BJangir Date: Tue, 7 May 2019 19:06:36 +0530 Subject: [PATCH] [CARBONDATA-3378]Display original query in Indexserver Job When any query fired in main jdbcserver , in Index server there is no mapping of it. It is difficult to find which job in index server belong to which query specially in concurrent queries. This PR will display query in index server also along with Executionid . This closes #3208 --- .../core/constants/CarbonCommonConstants.java | 5 ++ .../datamap/DistributableDataMapFormat.java | 48 +++++++++++++++++++ .../carbondata/indexserver/DataMapJobs.scala | 23 +++++++++ .../carbondata/indexserver/IndexServer.scala | 6 +++ 4 files changed, 82 insertions(+) diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 311019ca61a..8b393439666 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -2184,4 +2184,9 @@ private CarbonCommonConstants() { public static final String LOAD_SYNC_TIME = "load_sync_time"; + public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH = + "carbon.index.server.max.jobname.length"; + + public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT = + "50"; } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java index 57540e4ab17..0478b400b0b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder; @@ -36,6 +37,8 @@ import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.ObjectSerializationUtil; import org.apache.commons.lang.StringUtils; @@ -78,6 +81,11 @@ public class DistributableDataMapFormat extends FileInputFormatCARBON_INDEX_SERVER_JOBNAME_LENGTH + * then need to cut as transferring big query to IndexServer will be costly. + */ + public void setTaskGroupDesc(String taskGroupDesc) { + int maxJobLenth; + try { + String maxJobLenthString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH , + CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT); + maxJobLenth = Integer.parseInt(maxJobLenthString); + } catch (Exception e) { + String maxJobLenthString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT); + maxJobLenth = Integer.parseInt(maxJobLenthString); + } + if (taskGroupDesc.length() > maxJobLenth) { + this.taskGroupDesc = taskGroupDesc.substring(0, maxJobLenth); + } else { + this.taskGroupDesc = taskGroupDesc; + } + } + public FilterResolverIntf getFilterResolverIntf() { return filterResolverIntf; } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala index 0ee4ebbef77..698dd58687c 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala @@ -21,6 +21,7 @@ import java.util import scala.collection.JavaConverters._ import org.apache.log4j.Logger +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.util.SizeEstimator import org.apache.carbondata.common.logging.LogServiceFactory @@ -47,6 +48,17 @@ class DistributedDataMapJob extends AbstractDataMapJob { LOGGER.debug(s"Size of message sent to Index Server: $messageSize") } val (resonse, time) = logTime { + val spark = SparkSQLUtil.getSparkSession + val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match { + case null => "" + case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id") + } + val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match { + case null => "" + case _ => spark.sparkContext.getLocalProperty("spark.job.description") + } + dataMapFormat.setTaskGroupId(taskGroupId) + dataMapFormat.setTaskGroupDesc(taskGroupDesc) var filterInf = dataMapFormat.getFilterResolverIntf val filterProcessor = new FilterExpressionProcessor filterInf = removeSparkUnknown(filterInf, @@ -95,6 +107,17 @@ class DistributedDataMapJob extends AbstractDataMapJob { class EmbeddedDataMapJob extends AbstractDataMapJob { override def execute(dataMapFormat: DistributableDataMapFormat): util.List[ExtendedBlocklet] = { + val spark = SparkSQLUtil.getSparkSession + val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match { + case null => "" + case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id") + } + val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match { + case null => "" + case _ => spark.sparkContext.getLocalProperty("spark.job.description") + } + dataMapFormat.setTaskGroupId(taskGroupId) + dataMapFormat.setTaskGroupDesc(taskGroupDesc) IndexServer.getSplits(dataMapFormat).toList.asJava } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala index f06609557cf..9eee6d72e5f 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala @@ -100,6 +100,8 @@ object IndexServer extends ServerInterface { } def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet] = doAs { + sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId) + sparkSession.sparkContext.setLocalProperty("spark.job.description", request.getTaskGroupDesc) val splits = new DistributedPruneRDD(sparkSession, request).collect() DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet) if (request.isJobToClearDataMaps) { @@ -110,11 +112,15 @@ object IndexServer extends ServerInterface { override def invalidateSegmentCache(databaseName: String, tableName: String, segmentIds: Array[String]): Unit = doAs { + val jobgroup: String = " Invalided Segment Cache for " + databaseName + "." + tableName + sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup) new InvalidateSegmentCacheRDD(sparkSession, databaseName, tableName, segmentIds.toList) .collect() } override def showCache(tableName: String = ""): Array[String] = doAs { + val jobgroup: String = "Show Cache for " + tableName + sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup) new DistributedShowCacheRDD(sparkSession, tableName).collect() }