Skip to content

Commit

Permalink
[SPARK-29042][CORE] Sampling-based RDD with unordered input should be…
Browse files Browse the repository at this point in the history
… INDETERMINATE

### What changes were proposed in this pull request?

We already have found and fixed the correctness issue before when RDD output is INDETERMINATE. One missing part is sampling-based RDD. This kind of RDDs is order sensitive to its input. A sampling-based RDD with unordered input, should be INDETERMINATE.

### Why are the changes needed?

A sampling-based RDD with unordered input is just like MapPartitionsRDD with isOrderSensitive parameter as true. The RDD output can be different after a rerun.

It is a problem in ML applications.

In ML, sample is used to prepare training data. ML algorithm fits the model based on the sampled data. If rerun tasks of sample produce different output during model fitting, ML results will be unreliable and also buggy.

Each sample is random output, but once you sampled, the output should be determinate.

### Does this PR introduce any user-facing change?

Previously, a sampling-based RDD can possibly come with different output after a rerun.
After this patch, sampling-based RDD is INDETERMINATE. For an INDETERMINATE map stage, currently Spark scheduler will re-try all the tasks of the failed stage.

### How was this patch tested?

Added test.

Closes apache#25751 from viirya/sample-order-sensitive.

Authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Liang-Chi Hsieh <liangchi@uber.com>
  • Loading branch information
viirya authored and PavithraRamachandran committed Sep 14, 2019
1 parent 70a53fc commit 0686a77
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
}

/**
Expand Down Expand Up @@ -291,7 +291,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,12 @@ private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
thisSampler.setSeed(split.seed)
thisSampler.sample(firstParent[T].iterator(split.prev, context))
}

override protected def getOutputDeterministicLevel = {
if (prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) {
DeterministicLevel.INDETERMINATE
} else {
super.getOutputDeterministicLevel
}
}
}
25 changes: 24 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ abstract class RDD[T: ClassTag](
val sampler = new BernoulliCellSampler[T](lb, ub)
sampler.setSeed(seed + index)
sampler.sample(partition)
}, preservesPartitioning = true)
}, isOrderSensitive = true, preservesPartitioning = true)
}

/**
Expand Down Expand Up @@ -868,6 +868,29 @@ abstract class RDD[T: ClassTag](
preservesPartitioning)
}

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*
* `isOrderSensitive` indicates whether the function is order-sensitive. If it is order
* sensitive, it may return totally different result when the input order
* is changed. Mostly stateful functions are order-sensitive.
*/
private[spark] def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean,
isOrderSensitive: Boolean): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning,
isOrderSensitive = isOrderSensitive)
}

/**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2783,6 +2783,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
.contains("Spark cannot rollback the ShuffleMapStage 1"))
}

test("SPARK-29042: Sampled RDD with unordered input should be indeterminate") {
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = false)

val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))
val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)

assert(shuffleMapRdd2.outputDeterministicLevel == DeterministicLevel.UNORDERED)

val sampledRdd = shuffleMapRdd2.sample(true, 0.3, 1000L)
assert(sampledRdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE)
}

private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {
val shuffleDep = new ShuffleDependency(mapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
Expand Down
19 changes: 10 additions & 9 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
(part, (e.srcId, e.dstId, e.attr))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex( { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
.mapPartitionsWithIndex(
{ (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
}

Expand Down

0 comments on commit 0686a77

Please sign in to comment.