Skip to content

Commit

Permalink
Merge 063984b into d7d70a8
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Aug 2, 2019
2 parents d7d70a8 + 063984b commit 702aa5d
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 14 deletions.
Expand Up @@ -1306,6 +1306,16 @@ private CarbonCommonConstants() {

public static final String IS_DRIVER_INSTANCE_DEFAULT = "false";

/**
* property to set input metrics update interval (in records count), after every interval,
* input metrics will be updated to spark, else will be update in the end of query
*/
@CarbonProperty(dynamicConfigurable = true)
public static final String INPUT_METRICS_UPDATE_INTERVAL = "carbon.input.metrics.update.interval";

public static final Long INPUT_METRICS_UPDATE_INTERVAL_DEFAULT = 500000L;


/**
* property for enabling unsafe based query processing
*/
Expand Down
Expand Up @@ -36,7 +36,7 @@ public class TaskMetricsMap {
private static final Logger LOGGER =
LogServiceFactory.getLogService(TaskMetricsMap.class.getName());

public static final InheritableThreadLocal<Long> threadLocal = new InheritableThreadLocal<>();
private static final InheritableThreadLocal<Long> threadLocal = new InheritableThreadLocal<>();
/**
* In this map we are maintaining all spawned child threads callback info for each parent thread
* here key = parent thread id & values = list of spawned child threads callbacks
Expand All @@ -50,6 +50,10 @@ public static TaskMetricsMap getInstance() {
return taskMetricsMap;
}

public static InheritableThreadLocal<Long> getThreadLocal() {
return threadLocal;
}

/**
* registers current thread callback using parent thread id
*
Expand Down
Expand Up @@ -28,5 +28,5 @@
*/
public interface InitInputMetrics extends InputMetricsStats {

void initBytesReadCallback(TaskContext context, CarbonMultiBlockSplit carbonMultiBlockSplit);
void initBytesReadCallback(TaskContext context, CarbonMultiBlockSplit carbonMultiBlockSplit, Long inputMetricsInterval);
}
Expand Up @@ -364,7 +364,12 @@ object DataLoadProcessBuilderOnSpark {
TaskContext.get.addTaskCompletionListener { _ =>
CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
}
TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
if (TaskMetricsMap.getThreadLocal.get() == null) {
// Incase of multi level RDD.
// parent thread id should not be overwritten by child thread id.
// so don't set if it is already set.
TaskMetricsMap.getThreadLocal.set(Thread.currentThread().getId)
}
val carbonTaskInfo = new CarbonTaskInfo
carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
Expand Down
Expand Up @@ -51,6 +51,26 @@ abstract class CarbonRDD[T: ClassTag](
info
}

val inputMetricsInterval: Long = {
val metrics = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL)
if (metrics == null) {
CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT
} else {
try {
val configuredValue = metrics.toLong
if (configuredValue < 0 || configuredValue > Long.MaxValue) {
CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT
} else {
configuredValue
}
} catch {
case ex: Exception =>
CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT
}
}
}

@transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf()

val config = SparkSQLUtil.broadCastHadoopConf(sparkContext, hadoopConf)
Expand All @@ -73,7 +93,12 @@ abstract class CarbonRDD[T: ClassTag](
TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll())
carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", getConf)
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
if (TaskMetricsMap.getThreadLocal.get() == null) {
// Incase of multi level RDD (say insert into scenario, where DataFrameRDD calling ScanRDD)
// parent thread id should not be overwritten by child thread id.
// so don't set if it is already set.
TaskMetricsMap.getThreadLocal.set(Thread.currentThread().getId)
}
val carbonTaskInfo = new CarbonTaskInfo
carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
Expand Down
Expand Up @@ -422,7 +422,7 @@ class CarbonScanRDD[T: ClassTag](
val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
TaskMetricsMap.getInstance().registerThreadCallback()
inputMetricsStats.initBytesReadCallback(context, inputSplit)
inputMetricsStats.initBytesReadCallback(context, inputSplit, inputMetricsInterval)
val iterator = if (inputSplit.getAllSplits.size() > 0) {
val model = format.createQueryModel(inputSplit, attemptContext, filterExpression)
// one query id per table
Expand Down
Expand Up @@ -18,10 +18,10 @@ package org.apache.spark

import java.lang.Long

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.InputMetrics

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.TaskMetricsMap
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
import org.apache.carbondata.spark.InitInputMetrics
Expand All @@ -35,20 +35,28 @@ class CarbonInputMetrics extends InitInputMetrics{
var inputMetrics: InputMetrics = _
// bytes read before compute by other map rdds in lineage
var existingBytesRead: Long = _
var recordCount: Long = _
var inputMetricsInterval: Long = _
var carbonMultiBlockSplit: CarbonMultiBlockSplit = _

def initBytesReadCallback(context: TaskContext,
carbonMultiBlockSplit: CarbonMultiBlockSplit) {
carbonMultiBlockSplit: CarbonMultiBlockSplit, inputMetricsInterval: Long) {
inputMetrics = context.taskMetrics().inputMetrics
existingBytesRead = inputMetrics.bytesRead
this.carbonMultiBlockSplit = carbonMultiBlockSplit;
recordCount = 0L
this.inputMetricsInterval = inputMetricsInterval
this.carbonMultiBlockSplit = carbonMultiBlockSplit
}

def incrementRecordRead(recordRead: Long) {
val value : scala.Long = recordRead
inputMetrics.incRecordsRead(value)
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
val value: scala.Long = recordRead
recordCount = recordCount + value
if (recordCount > inputMetricsInterval) {
inputMetrics.synchronized {
inputMetrics.incRecordsRead(recordCount)
updateBytesRead()
}
recordCount = 0L
}
}

Expand All @@ -59,10 +67,16 @@ class CarbonInputMetrics extends InitInputMetrics{
}

def updateAndClose() {
if (recordCount > 0L) {
inputMetrics.synchronized {
inputMetrics.incRecordsRead(recordCount)
}
recordCount = 0L
}
// if metrics supported file system ex: hdfs
if (!TaskMetricsMap.getInstance().isCallbackEmpty(Thread.currentThread().getId)) {
updateBytesRead()
// after update clear parent thread entry from map.
// after update clear parent thread entry from map.
TaskMetricsMap.getInstance().removeEntry(Thread.currentThread().getId)
} else if (carbonMultiBlockSplit.isInstanceOf[CarbonMultiBlockSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
Expand Down
Expand Up @@ -338,7 +338,7 @@ class IndexDataMapRebuildRDD[K, V](
val segmentId = inputSplit.getAllSplits.get(0).getSegment.getSegmentNo
val segment = segments.find(p => p.getSegmentNo.equals(segmentId))
if (segment.isDefined) {
inputMetrics.initBytesReadCallback(context, inputSplit)
inputMetrics.initBytesReadCallback(context, inputSplit, inputMetricsInterval)

val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
Expand Down

0 comments on commit 702aa5d

Please sign in to comment.