Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2251] fix concurrency issues in random sampler #1229

Closed
wants to merge 1 commit into from

Conversation

mengxr
Copy link
Contributor

@mengxr mengxr commented Jun 26, 2014

The following code is very likely to throw an exception:

val rdd = sc.parallelize(0 until 111, 10).sample(false, 0.1)
rdd.zip(rdd).count()

because the same random number generator is used in compute partitions.

@@ -54,17 +54,17 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable
*/
@DeveloperApi
class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could dropping this implicit break source and binary compatiblity? I think we'd like to avoid asking people to make code changes to upgrade to a bug-fix release, even if the API's are marked as developer. Can you just leave the existing argument and just ignore it?

@asfgit asfgit closed this in c23f5db Jun 27, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
The following code is very likely to throw an exception:

~~~
val rdd = sc.parallelize(0 until 111, 10).sample(false, 0.1)
rdd.zip(rdd).count()
~~~

because the same random number generator is used in compute partitions.

Author: Xiangrui Meng <meng@databricks.com>

Closes apache#1229 from mengxr/fix-sample and squashes the following commits:

f1ee3d7 [Xiangrui Meng] fix concurrency issues in random sampler
@pauloangelo
Copy link

Hi all,

I'm getting similar problem using kmeans clustering with Spark-1.5.1. The stacktrace is below.

Any clue?

Thank you in advance.

Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 5.0 failed 4 times, most recent failure: Lost task 2.3 in stage 5.0 (TID 18, combate): org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
    at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.hasNext(RDD.scala:832)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1553)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1121)
    at org.apache.spark.rdd.RDD.takeSample(RDD.scala:485)
    at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:376)
    at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:249)
    at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:213)
    at org.hogzilla.dns.HogDNS$.kmeans(HogDNS.scala:194)
    at org.hogzilla.dns.HogDNS$.run(HogDNS.scala:73)
    at Hogzilla$.main(Hogzilla.scala:47)
    at Hogzilla.main(Hogzilla.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)

Some good links are:

val sample = data.takeSample(true, runs, seed).toSeq

@pauloangelo
Copy link

My RDD comes from a HBase table, which is growing. When I suspend the row insertion, the problem doesn't happen.

The RDD is cached, should the problem occur? Is there any way to "freeze" the RDD in some point enable the use without troubles?

regards.

PA

@srowen
Copy link
Member

srowen commented Oct 12, 2015

@pauloangelo sounds like your RDD is not immutable then, in which case many bets are off. RDDs are generally always the same whenever you compute them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants