Skip to content

Commit

Permalink
[CARBONDATA-3378]Display original query in Indexserver Job
Browse files Browse the repository at this point in the history
When any query fired in main jdbcserver , in Index server
there is no mapping of it.
It is difficult to find which job in index server belong to which
query specially in concurrent queries.
This PR will display query in index server also along with Executionid .

This closes #3208
  • Loading branch information
BJangir authored and kunal642 committed May 30, 2019
1 parent 6817e77 commit c8c0211
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 0 deletions.
Expand Up @@ -2184,4 +2184,9 @@ private CarbonCommonConstants() {

public static final String LOAD_SYNC_TIME = "load_sync_time";

public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH =
"carbon.index.server.max.jobname.length";

public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT =
"50";
}
Expand Up @@ -26,6 +26,7 @@
import java.util.List;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
Expand All @@ -36,6 +37,8 @@
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;

import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.ObjectSerializationUtil;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -78,6 +81,11 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl

private ReadCommittedScope readCommittedScope;

private String taskGroupId = "";

private String taskGroupDesc = "";


DistributableDataMapFormat() {

}
Expand Down Expand Up @@ -270,6 +278,8 @@ public void write(DataOutput out) throws IOException {
out.writeBoolean(false);
}
out.writeUTF(dataMapToClear);
out.writeUTF(taskGroupId);
out.writeUTF(taskGroupDesc);
}

@Override
Expand Down Expand Up @@ -311,6 +321,8 @@ public void readFields(DataInput in) throws IOException {
.convertStringToObject(new String(filterResolverBytes, Charset.defaultCharset()));
}
this.dataMapToClear = in.readUTF();
this.taskGroupId = in.readUTF();
this.taskGroupDesc = in.readUTF();
}

private void initReadCommittedScope() throws IOException {
Expand All @@ -335,6 +347,42 @@ public boolean isJobToClearDataMaps() {
return isJobToClearDataMaps;
}

public String getTaskGroupId() {
return taskGroupId;
}

/* setTaskGroupId will be used for Index server to display ExecutionId*/
public void setTaskGroupId(String taskGroupId) {
this.taskGroupId = taskGroupId;
}

public String getTaskGroupDesc() {
return taskGroupDesc;
}

/* setTaskGroupId will be used for Index server to display Query
* If Job name is >CARBON_INDEX_SERVER_JOBNAME_LENGTH
* then need to cut as transferring big query to IndexServer will be costly.
*/
public void setTaskGroupDesc(String taskGroupDesc) {
int maxJobLenth;
try {
String maxJobLenthString = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH ,
CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT);
maxJobLenth = Integer.parseInt(maxJobLenthString);
} catch (Exception e) {
String maxJobLenthString = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT);
maxJobLenth = Integer.parseInt(maxJobLenthString);
}
if (taskGroupDesc.length() > maxJobLenth) {
this.taskGroupDesc = taskGroupDesc.substring(0, maxJobLenth);
} else {
this.taskGroupDesc = taskGroupDesc;
}
}

public FilterResolverIntf getFilterResolverIntf() {
return filterResolverIntf;
}
Expand Down
Expand Up @@ -21,6 +21,7 @@ import java.util
import scala.collection.JavaConverters._

import org.apache.log4j.Logger
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.util.SizeEstimator

import org.apache.carbondata.common.logging.LogServiceFactory
Expand All @@ -47,6 +48,17 @@ class DistributedDataMapJob extends AbstractDataMapJob {
LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
}
val (resonse, time) = logTime {
val spark = SparkSQLUtil.getSparkSession
val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
case null => ""
case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
}
val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match {
case null => ""
case _ => spark.sparkContext.getLocalProperty("spark.job.description")
}
dataMapFormat.setTaskGroupId(taskGroupId)
dataMapFormat.setTaskGroupDesc(taskGroupDesc)
var filterInf = dataMapFormat.getFilterResolverIntf
val filterProcessor = new FilterExpressionProcessor
filterInf = removeSparkUnknown(filterInf,
Expand Down Expand Up @@ -95,6 +107,17 @@ class DistributedDataMapJob extends AbstractDataMapJob {
class EmbeddedDataMapJob extends AbstractDataMapJob {

override def execute(dataMapFormat: DistributableDataMapFormat): util.List[ExtendedBlocklet] = {
val spark = SparkSQLUtil.getSparkSession
val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
case null => ""
case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
}
val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match {
case null => ""
case _ => spark.sparkContext.getLocalProperty("spark.job.description")
}
dataMapFormat.setTaskGroupId(taskGroupId)
dataMapFormat.setTaskGroupDesc(taskGroupDesc)
IndexServer.getSplits(dataMapFormat).toList.asJava
}

Expand Down
Expand Up @@ -100,6 +100,8 @@ object IndexServer extends ServerInterface {
}

def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet] = doAs {
sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId)
sparkSession.sparkContext.setLocalProperty("spark.job.description", request.getTaskGroupDesc)
val splits = new DistributedPruneRDD(sparkSession, request).collect()
DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
if (request.isJobToClearDataMaps) {
Expand All @@ -110,11 +112,15 @@ object IndexServer extends ServerInterface {

override def invalidateSegmentCache(databaseName: String, tableName: String,
segmentIds: Array[String]): Unit = doAs {
val jobgroup: String = " Invalided Segment Cache for " + databaseName + "." + tableName
sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
new InvalidateSegmentCacheRDD(sparkSession, databaseName, tableName, segmentIds.toList)
.collect()
}

override def showCache(tableName: String = ""): Array[String] = doAs {
val jobgroup: String = "Show Cache for " + tableName
sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
new DistributedShowCacheRDD(sparkSession, tableName).collect()
}

Expand Down

0 comments on commit c8c0211

Please sign in to comment.