Skip to content

Commit

Permalink
Log warning on partition recompute
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed Mar 28, 2014
1 parent c289e91 commit f36e576
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Random
import scala.collection.Map
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
import scala.reflect.{classTag, ClassTag}

import com.clearspring.analytics.stream.cardinality.HyperLogLog
Expand Down Expand Up @@ -221,12 +222,22 @@ abstract class RDD[T: ClassTag](
}
}

private val previouslyComputed = new HashSet[Partition]
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
if (isCheckpointed) {
firstParent[T].iterator(split, context)
} else {
if (previouslyComputed.contains(split)) {
logWarning("Recomputing RDD %d, partition %d".format(id, split.index))
} else {
previouslyComputed.add(split)
}
compute(split, context)
}
}

// Transformations (return a new RDD)
Expand Down Expand Up @@ -1045,6 +1056,8 @@ abstract class RDD[T: ClassTag](

private[spark] def elementClassTag: ClassTag[T] = classTag[T]

private[spark] val computeSites = new ArrayBuffer[String]

private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None

/** Returns the first parent RDD */
Expand Down

0 comments on commit f36e576

Please sign in to comment.