Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-927] detect numpy at time of use #2313

Closed
wants to merge 2 commits into from
Closed

Conversation

mattf
Copy link
Contributor

@mattf mattf commented Sep 7, 2014

it is possible for numpy to be installed on the driver node but not on
worker nodes. in such a case, using the rddsampler's constructor to
detect numpy leads to failures on workers as they cannot import numpy
(see below).

the solution here is to detect numpy right before it is used on the
workers.

example code & error -

yum install -y numpy
pyspark

sc.parallelize(range(10000)).sample(False, .1).collect()
...
14/09/07 10:50:01 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, node4): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/test/spark/dist/python/pyspark/worker.py", line 79, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/test/spark/dist/python/pyspark/serializers.py", line 196, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/home/test/spark/dist/python/pyspark/serializers.py", line 127, in dump_stream
for obj in iterator:
File "/home/test/spark/dist/python/pyspark/serializers.py", line 185, in _batched
for item in iterator:
File "/home/test/spark/dist/python/pyspark/rddsampler.py", line 115, in func
if self.getUniformSample(split) <= self._fraction:
File "/home/test/spark/dist/python/pyspark/rddsampler.py", line 57, in getUniformSample
self.initRandomGenerator(split)
File "/home/test/spark/dist/python/pyspark/rddsampler.py", line 42, in initRandomGenerator
import numpy
ImportError: No module named numpy
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:154)
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)

it is possible for numpy to be installed on the driver node but not on
worker nodes. in such a case, using the rddsampler's constructor to
detect numpy leads to failures on workers as they cannot import numpy
(see below).

the solution here is to detect numpy right before it is used on the
workers.

example code & error -

yum install -y numpy
pyspark
>>> sc.parallelize(range(10000)).sample(False, .1).collect()
...
14/09/07 10:50:01 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, node4): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/test/spark/dist/python/pyspark/worker.py", line 79, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/test/spark/dist/python/pyspark/serializers.py", line 196, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/home/test/spark/dist/python/pyspark/serializers.py", line 127, in dump_stream
    for obj in iterator:
  File "/home/test/spark/dist/python/pyspark/serializers.py", line 185, in _batched
    for item in iterator:
  File "/home/test/spark/dist/python/pyspark/rddsampler.py", line 115, in func
    if self.getUniformSample(split) <= self._fraction:
  File "/home/test/spark/dist/python/pyspark/rddsampler.py", line 57, in getUniformSample
    self.initRandomGenerator(split)
  File "/home/test/spark/dist/python/pyspark/rddsampler.py", line 42, in initRandomGenerator
    import numpy
ImportError: No module named numpy
        org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
        org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:744)
@SparkQA
Copy link

SparkQA commented Sep 7, 2014

QA tests have started for PR 2313 at commit 7e8a11c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 7, 2014

QA tests have finished for PR 2313 at commit 7e8a11c.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mattf
Copy link
Contributor Author

mattf commented Sep 7, 2014

This patch fails unit tests.

the test suite failures appear to be from hive & flume and unrelated to this patch

@davies
Copy link
Contributor

davies commented Sep 8, 2014

What's the behavior if some slaves have numpy but others do not have?

"Falling back to default random generator for sampling.")
self._use_numpy = False
self._tried_numpy = True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about put detecting numpy and fallback at the module level?

Such as:

try:
    from numpy.random import RandomState
except ImportError:
    class RandomState(object):
          def __init__(self, seed):
               self._random = random.Random(seed)
          def random_sample(self):
                pass
          def poisson(self):
                pass
          def shuffle(self):
                pass

@JoshRosen
Copy link
Contributor

What's the behavior if some slaves have numpy but others do not have?

Yeah, this was my original concern. Do both implementations give equal results, or will this patch lead to non-deteterministic outcomes if tasks sometimes run with numpy and sometimes without (e.g. while recomputing failed tasks)?

@mattf
Copy link
Contributor Author

mattf commented Sep 8, 2014

What's the behavior if some slaves have numpy but others do not have?

Yeah, this was my original concern. Do both implementations give equal results, or will this patch lead to non-deteterministic outcomes if tasks sometimes run with numpy and sometimes without (e.g. while recomputing failed tasks)?

i had the same concern. i mostly talked myself out of it: first, the results are by definition non-deterministic, it's a prng afterall; second, there can already be different implementations of numpy (or better/worse random sources) on different nodes and we wouldn't be able to detect them.

if you guys agree, i'll check on the model level impl, which may be nicer in that it could get rid of _use_numpy

@mattf
Copy link
Contributor Author

mattf commented Sep 11, 2014

@davies @JoshRosen new thoughts on the topic of non-determinism?

@mattf
Copy link
Contributor Author

mattf commented Sep 14, 2014

@erikerlandson i know you've been doing some serious work w/ sampling, what's your take on this?

@erikerlandson
Copy link
Contributor

@mattf, one useful question would be: do the results generate equivalent output distributions. The basic methodology would be to collect output in both scenarios, and run Kolmogorov-Smirnov tests to assess whether the sampling is statistically equivalent.

I did this recently for testing my upcoming proposal for gap sampling:
https://gist.github.com/erikerlandson/05db1f15c8d623448ff6

That doesn't cover the question of exactly reproducible results. I'm not sure if that would be feasible or not. In general, I only consider exactly reproducible results as being relevant for things like unit testing applications, so if that's important my answer would be "make sure your environment is set up to either use numpy or not, consistently"

@mattf
Copy link
Contributor Author

mattf commented Sep 15, 2014

thanks @erikerlandson. @davies @JoshRosen how would you guys like to proceed?

@davies
Copy link
Contributor

davies commented Sep 15, 2014

@mattf The only benefit we got from numpy is performance in poisson(), so the only thing that could lose should be performance, not reproducibility, if it fallbacks to random silently. So I think we should show an warning (say the result may be not reproducible) when there is numpy in driver but it fallback to random in worker. Does it make sense?

@mattf
Copy link
Contributor Author

mattf commented Sep 15, 2014

@davies that makes sense to me. the current message is:

"NumPy does not appear to be installed. Falling back to default random generator for sampling."

what should i change that to?

@davies
Copy link
Contributor

davies commented Sep 15, 2014

If numpy is not installed in driver, it should not have warning, i think.

@mattf
Copy link
Contributor Author

mattf commented Sep 15, 2014

ok, i see. detect numpy in driver, record fact, if in driver and not on worker raise the warning, otherwise be silent.

@mattf
Copy link
Contributor Author

mattf commented Sep 15, 2014

here are the cases. i decided that if the driver doesn't have numpy then the workers shouldn't try to use it. this is consistent w/ the old code and maintains the expectation that the same generator is used on driver and workers. the one case where the expectation is violated (numpy on driver and not on workers) results in a warning and no failure. perviously there would be a failure in this case.

driver w/, worker w/ - numpy used, no message emitted
driver w/, worker w/o - numpy used on driver, not used on workers, warning emitted
driver w/o, worker w/ - numpy not used on driver nor worker, no message emitted
driver w/o, worker w/o - numpy not used on driver nor worker, no message emitted

driver w/, worker w/ - numpy used, no message emitted
driver w/, worker w/o - numpy used on driver, not used on workers, warning emitted
driver w/o, worker w/ - numpy not used on driver nor worker, no message emitted
driver w/o, worker w/o - numpy not used on driver nor worker, no message emitted
@SparkQA
Copy link

SparkQA commented Sep 15, 2014

QA tests have started for PR 2313 at commit 1b7af93.

  • This patch merges cleanly.

@mattf
Copy link
Contributor Author

mattf commented Sep 15, 2014

@davies i'm still digging, but maybe you know off the top of your head. what output path in the worker can be used to report the warning to the driver?

@davies
Copy link
Contributor

davies commented Sep 15, 2014

Good question, we did not push the logging from worker to driver, so it's not easy to show up an warning from worker. Maybe we could leave this (push logging to driver) in the future.

@mattf
Copy link
Contributor Author

mattf commented Sep 15, 2014

i've not found a path, so i'm happy to leave this PR as is and log a JIRA for enhanced worker -> driver log communication.

deal?

@SparkQA
Copy link

SparkQA commented Sep 15, 2014

QA tests have finished for PR 2313 at commit 1b7af93.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Sep 15, 2014

Yes, it works for me.

@mattf
Copy link
Contributor Author

mattf commented Sep 15, 2014

I filed SPARK-3538 to cover the improvement/enhancement

@mattf
Copy link
Contributor Author

mattf commented Sep 19, 2014

@JoshRosen it looks like @davies and i are on the same page. how would you like to proceed?

@JoshRosen
Copy link
Contributor

This is a tricky issue.

Exact reproducibility / determinism crops up in two different senses here: re-running an entire job and re-computing a lost partition. Spark's lineage-based fault-tolerance is built on the idea that partitions can be deterministically recomputed. Tasks that have dependencies on the external environment may violate this determinism property (e.g. by reading the current system time to set a random seed). Workers using different versions of libraries which give different results is one way that the environment can leak into tasks and make them non-deterministic based on where they're run.

There are some scenarios where exact reproducibility might be desirable. Imagine that I train a ML model on some data, make predictions with it, and want to go back and understand the lineage that led to that model being created. To do this, I may want to deterministically re-run the job with additional internal logging. This use-case is tricky in general, though: details of the execution environment might creep in via other means. We might see different results due to rounding errors / numerical instability if we run on environments with different BLAS libraries, etc (I guess we could say "deterministic within some rounding error / to k bits of precision). Exact long-term reproducibility of computational results is a hard, unsolved problem in general.

/cc @mengxr @jkbradley; since you work on MLlib; what do you think we should do here? Is there any precedent in MLlib and its use of native libraries?

@mattf
Copy link
Contributor Author

mattf commented Sep 19, 2014

that's a very good point, especially about how it's an unsolved problem in general, at least on our existing operating systems. iirc, systems like plan9 tried to address complete reproducibility, but i may be misremembering the specifics.

the four stated cases are:
a) driver w/ numpy, worker w/ numpy - numpy used, no message emitted
b) driver w/ numpy, worker w/o numpy - numpy used on driver, not used on workers, warning emitted
c) driver w/o numpy, worker w/ numpy - numpy not used on driver nor worker, no message emitted
d) driver w/o numpy, worker w/o numpy - numpy not used on driver nor worker, no message emitted

case (a) is not a concern because numpy is used consistently throughout
case (b) is not a concern because python random is used consistely throught the workers
case (c) and (d) are not a concern because pythons random module are used throughout

however, there's a fifth case:
d) driver w/ numpy, some workers w/ numpy, some workers w/o numpy

there's actually a sixth case, but it's intractable for spark and shouldn't be considered: different implementations of python random or numpy's random across workers. this is something that should be managed outside of spark.

in (d), some workers will use numpy and others will use random. previously, all workers w/o numpy would error out, potentially terminating the computation. now, a warning will be emitted (though it'll be emitted to /dev/null) and execution will complete.

i'd solve this with a systems approach: remove the python random code and require numpy to be present, or remove the numpy code. and, i'd lean toward using the faster code (numpy). however, that might not be palatable for the project. if it is, i'm more than happy to redo scrap this ticket and create another to simplify the RDDSampler.

as i see it, to proceed we evaluate -

if acceptable to require numpy:
   matt.dothat()
else:
   if acceptable to potentially compromise re-computability w/ warning:
      commit this
   else:
      scrap this

(i'm left out the case where we decide to simply by always using the slower python code, because i'd rather not trade off performance to avoid an error message and i think adding a numpy dep is straightforward)

@mengxr
Copy link
Contributor

mengxr commented Sep 19, 2014

@JoshRosen PySpark/MLlib requires NumPy to run, and I don't think we claimed that we support different versions of NumPy.

sample() in core is different. Maybe we can test the overhead of using Python's own random. The overhead may not be huge, due to serialization/deserialization.

@jkbradley
Copy link
Member

Philosophically, I agree with @erikerlandson about it being OK for random generators to be, well, random. If problems are caused by the output of a randomized process not being reproducible, then then output probably isn't being used/tested correctly.

Practically, I second @mengxr in saying we should encourage reproducibility by requiring numpy in MLlib. But avoiding it where possible sounds good, assuming the performance hit is not too bad.

@mattf
Copy link
Contributor Author

mattf commented Sep 19, 2014

for some additional input, @pwendell - do you think requiring numpy for core would be acceptable?

@mattf
Copy link
Contributor Author

mattf commented Sep 21, 2014

andrewor14 pushed a commit to andrewor14/spark that referenced this pull request Nov 21, 2014
In RDDSampler, it try use numpy to gain better performance for possion(), but the number of call of random() is only (1+faction) * N in the pure python implementation of possion(), so there is no much performance gain from numpy.

numpy is not a dependent of pyspark, so it maybe introduce some problem, such as there is no numpy installed in slaves, but only installed master, as reported in SPARK-927.

It also complicate the code a lot, so we may should remove numpy from RDDSampler.

I also did some benchmark to verify that:
```
>>> from pyspark.mllib.random import RandomRDDs
>>> rdd = RandomRDDs.uniformRDD(sc, 1 << 20, 1).cache()
>>> rdd.count()  # cache it
>>> rdd.sample(True, 0.9).count()    # measure this line
```
the results:

|withReplacement      |  random  | numpy.random |
 ------- | ------------ |  -------
|True | 1.5 s|  1.4 s|
|False|  0.6 s | 0.8 s|

closes apache#2313

Note: this patch including some commits that not mirrored to github, it will be OK after it catches up.

Author: Davies Liu <davies@databricks.com>
Author: Xiangrui Meng <meng@databricks.com>

Closes apache#3351 from davies/numpy and squashes the following commits:

5c438d7 [Davies Liu] fix comment
c5b9252 [Davies Liu] Merge pull request #1 from mengxr/SPARK-4477
98eb31b [Xiangrui Meng] make poisson sampling slightly faster
ee17d78 [Davies Liu] remove = for float
13f7b05 [Davies Liu] Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/spark into numpy
f583023 [Davies Liu] fix tests
51649f5 [Davies Liu] remove numpy in RDDSampler
78bf997 [Davies Liu] fix tests, do not use numpy in randomSplit, no performance gain
f5fdf63 [Davies Liu] fix bug with int in weights
4dfa2cd [Davies Liu] refactor
f866bcf [Davies Liu] remove unneeded change
c7a2007 [Davies Liu] switch to python implementation
95a48ac [Davies Liu] Merge branch 'master' of github.com:apache/spark into randomSplit
0d9b256 [Davies Liu] refactor
1715ee3 [Davies Liu] address comments
41fce54 [Davies Liu] randomSplit()

(cherry picked from commit d39f2e9)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
@davies
Copy link
Contributor

davies commented Nov 25, 2014

This issue should be fix by #3351, do you mind to close this?

@JoshRosen
Copy link
Contributor

Do you mind closing this PR? Thanks!

(Commenting so the auto-close script / PR review board can pick this up).

@JoshRosen
Copy link
Contributor

Can you close this issue? Thanks!

(I think "close this issue" was the magic phrase I needed for the script to pick it up, but let's see).

@mattf mattf closed this Jan 5, 2015
@mattf mattf deleted the SPARK-927 branch January 5, 2015 22:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants