Skip to content

Commit

Permalink
[CARBONDATA-3459] Fixed id based distribution for showcache command
Browse files Browse the repository at this point in the history
Problem: Currently tasks are not being fired based on the executor ID because getPrefferedLocation was not overridden.

Solution: override getPreferredLocations in the ShowCache and InvalidateCacheRDD to fire tasks at the appropriate location

This closes #3315
  • Loading branch information
kunal642 authored and ravipesala committed Jul 15, 2019
1 parent b017253 commit a682f98
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,10 @@ object DistributedRDDUtils {
if (existingSegmentMapping == null) {
val newSegmentMapping = new ConcurrentHashMap[String, String]()
newSegmentMapping.put(segment.getSegmentNo, s"${newHost}_$newExecutor")
tableToExecutorMapping.put(tableUniqueName, newSegmentMapping)
tableToExecutorMapping.putIfAbsent(tableUniqueName, newSegmentMapping)
} else {
existingSegmentMapping.put(segment.getSegmentNo, s"${newHost}_$newExecutor")
tableToExecutorMapping.put(tableUniqueName, existingSegmentMapping)
existingSegmentMapping.putIfAbsent(segment.getSegmentNo, s"${newHost}_$newExecutor")
tableToExecutorMapping.putIfAbsent(tableUniqueName, existingSegmentMapping)
}
s"executor_${newHost}_$newExecutor"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ class DistributedShowCacheRDD(@transient private val ss: SparkSession, tableName
}
}.toArray

override protected def getPreferredLocations(split: Partition): Seq[String] = {
if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) {
split.asInstanceOf[DataMapRDDPartition].getLocations.toSeq
} else {
Seq()
}
}

override protected def internalGetPartitions: Array[Partition] = {
executorsList.zipWithIndex.map {
case (executor, idx) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,27 @@ import org.apache.carbondata.spark.rdd.CarbonRDD
class InvalidateSegmentCacheRDD(@transient private val ss: SparkSession, carbonTable: CarbonTable,
invalidSegmentIds: List[String]) extends CarbonRDD[String](ss, Nil) {

val executorsList: Array[String] = DistributionUtil.getNodeList(ss.sparkContext)
val executorsList: Array[String] = DistributionUtil.getExecutors(ss.sparkContext).flatMap {
case (host, executors) =>
executors.map {
executor => s"executor_${host}_$executor"
}
}.toArray

override def internalCompute(split: Partition,
context: TaskContext): Iterator[String] = {
DataMapStoreManager.getInstance().clearInvalidSegments(carbonTable, invalidSegmentIds.asJava)
Iterator.empty
}

override protected def getPreferredLocations(split: Partition): Seq[String] = {
if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) {
split.asInstanceOf[DataMapRDDPartition].getLocations.toSeq
} else {
Seq()
}
}

override protected def internalGetPartitions: Array[Partition] = {
if (invalidSegmentIds.isEmpty) {
Array()
Expand Down

0 comments on commit a682f98

Please sign in to comment.