Skip to content

Commit

Permalink
Sampling-based RDD with unordered input should be INDETERMINATE.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Sep 11, 2019
1 parent 6378d4b commit fb94fea
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
}

/**
Expand Down Expand Up @@ -291,7 +291,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,12 @@ private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
thisSampler.setSeed(split.seed)
thisSampler.sample(firstParent[T].iterator(split.prev, context))
}

override protected def getOutputDeterministicLevel = {
if (prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) {
DeterministicLevel.INDETERMINATE
} else {
super.getOutputDeterministicLevel
}
}
}
25 changes: 24 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ abstract class RDD[T: ClassTag](
val sampler = new BernoulliCellSampler[T](lb, ub)
sampler.setSeed(seed + index)
sampler.sample(partition)
}, preservesPartitioning = true)
}, isOrderSensitive = true, preservesPartitioning = true)
}

/**
Expand Down Expand Up @@ -870,6 +870,29 @@ abstract class RDD[T: ClassTag](
preservesPartitioning)
}

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*
* `isOrderSensitive` indicates whether the function is order-sensitive. If it is order
* sensitive, it may return totally different result when the input order
* is changed. Mostly stateful functions are order-sensitive.
*/
private[spark] def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean,
isOrderSensitive: Boolean): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning,
isOrderSensitive = isOrderSensitive)
}

/**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2779,6 +2779,45 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
.contains("Spark cannot rollback the ShuffleMapStage 1"))
}

test("Sampled RDD with unordered input should be indeterminate") {
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = false)

val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))
val shuffleId1 = shuffleDep1.shuffleId
val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)

assert(shuffleMapRdd2.outputDeterministicLevel == DeterministicLevel.UNORDERED)

val sampledRdd = shuffleMapRdd2.sample(true, 0.3, 1000L)
assert(sampledRdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE)

val shuffleDep2 = new ShuffleDependency(sampledRdd, new HashPartitioner(2))
val shuffleId2 = shuffleDep2.shuffleId
val finalRdd = new MyRDD(sc, 2, List(shuffleDep2), tracker = mapOutputTracker)

submit(finalRdd, Array(0, 1))

// Finish the first shuffle map stage.
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
assert(mapOutputTracker.findMissingPartitions(shuffleId1) === Some(Seq.empty))

// Finish the first shuffle map stage.
complete(taskSets(1), Seq(
(Success, makeMapStatus("hostC", 2)),
(Success, makeMapStatus("hostD", 2))))
assert(mapOutputTracker.findMissingPartitions(shuffleId2) === Some(Seq.empty))

// The first task of the final stage failed with fetch failure
runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostC"), shuffleId2, 0, 0, "ignored"),
null))
assert(failure != null && failure.getMessage
.contains("Spark cannot rollback the ShuffleMapStage 1"))
}

private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {
val shuffleDep = new ShuffleDependency(mapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
Expand Down
19 changes: 10 additions & 9 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
(part, (e.srcId, e.dstId, e.attr))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex( { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
.mapPartitionsWithIndex(
{ (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
}

Expand Down

0 comments on commit fb94fea

Please sign in to comment.