Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29042][Core] Sampling-based RDD with unordered input should be INDETERMINATE #25751

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
}

/**
Expand Down Expand Up @@ -291,7 +291,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,12 @@ private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
thisSampler.setSeed(split.seed)
thisSampler.sample(firstParent[T].iterator(split.prev, context))
}

override protected def getOutputDeterministicLevel = {
if (prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) {
DeterministicLevel.INDETERMINATE
} else {
super.getOutputDeterministicLevel
}
}
}
25 changes: 24 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ abstract class RDD[T: ClassTag](
val sampler = new BernoulliCellSampler[T](lb, ub)
sampler.setSeed(seed + index)
sampler.sample(partition)
}, preservesPartitioning = true)
}, isOrderSensitive = true, preservesPartitioning = true)
}

/**
Expand Down Expand Up @@ -870,6 +870,29 @@ abstract class RDD[T: ClassTag](
preservesPartitioning)
}

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*
* `isOrderSensitive` indicates whether the function is order-sensitive. If it is order
* sensitive, it may return totally different result when the input order
* is changed. Mostly stateful functions are order-sensitive.
*/
private[spark] def mapPartitionsWithIndex[U: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we expose this to users?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to not now. @cloud-fan WDYT?

f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean,
isOrderSensitive: Boolean): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning,
isOrderSensitive = isOrderSensitive)
}

/**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2779,6 +2779,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
.contains("Spark cannot rollback the ShuffleMapStage 1"))
}

test("SPARK-29042: Sampled RDD with unordered input should be indeterminate") {
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = false)

val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))
val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)

assert(shuffleMapRdd2.outputDeterministicLevel == DeterministicLevel.UNORDERED)

val sampledRdd = shuffleMapRdd2.sample(true, 0.3, 1000L)
assert(sampledRdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE)
viirya marked this conversation as resolved.
Show resolved Hide resolved
}

private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {
val shuffleDep = new ShuffleDependency(mapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
Expand Down
19 changes: 10 additions & 9 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
(part, (e.srcId, e.dstId, e.attr))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex( { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
.mapPartitionsWithIndex(
{ (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
}

Expand Down