Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3478] [PySpark] Profile the Python tasks #2351

Closed
wants to merge 13 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Sep 11, 2014

This patch add profiling support for PySpark, it will show the profiling results
before the driver exits, here is one example:

============================================================
Profile of RDD<id=3>
============================================================
         5146507 function calls (5146487 primitive calls) in 71.094 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
  5144576   68.331    0.000   68.331    0.000 statcounter.py:44(merge)
       20    2.735    0.137   71.071    3.554 statcounter.py:33(__init__)
       20    0.017    0.001    0.017    0.001 {cPickle.dumps}
     1024    0.003    0.000    0.003    0.000 t.py:16(<lambda>)
       20    0.001    0.000    0.001    0.000 {reduce}
       21    0.001    0.000    0.001    0.000 {cPickle.loads}
       20    0.001    0.000    0.001    0.000 copy_reg.py:95(_slotnames)
       41    0.001    0.000    0.001    0.000 serializers.py:461(read_int)
       40    0.001    0.000    0.002    0.000 serializers.py:179(_batched)
       62    0.000    0.000    0.000    0.000 {method 'read' of 'file' objects}
       20    0.000    0.000   71.072    3.554 rdd.py:863(<lambda>)
       20    0.000    0.000    0.001    0.000 serializers.py:198(load_stream)
    40/20    0.000    0.000   71.072    3.554 rdd.py:2093(pipeline_func)
       41    0.000    0.000    0.002    0.000 serializers.py:130(load_stream)
       40    0.000    0.000   71.072    1.777 rdd.py:304(func)
       20    0.000    0.000   71.094    3.555 worker.py:82(process)

Also, use can show profile result manually by sc.show_profiles() or dump it into disk
by sc.dump_profiles(path), such as

>>> sc._conf.set("spark.python.profile", "true")
>>> rdd = sc.parallelize(range(100)).map(str)
>>> rdd.count()
100
>>> sc.show_profiles()
============================================================
Profile of RDD<id=1>
============================================================
         284 function calls (276 primitive calls) in 0.001 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
        4    0.000    0.000    0.000    0.000 {reduce}
     12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
        4    0.000    0.000    0.000    0.000 {cPickle.loads}
        4    0.000    0.000    0.000    0.000 {cPickle.dumps}
      104    0.000    0.000    0.000    0.000 rdd.py:852(<genexpr>)
        8    0.000    0.000    0.000    0.000 serializers.py:461(read_int)
       12    0.000    0.000    0.000    0.000 rdd.py:303(func)

The profiling is disabled by default, can be enabled by "spark.python.profile=true".

Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 2351 at commit 4b20494.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 2351 at commit 4b20494.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 2351 at commit 4b20494.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StatsParam(AccumulatorParam):

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 2351 at commit 4b20494.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StatsParam(AccumulatorParam):

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 2351 at commit 0a5b6eb.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 2351 at commit 0a5b6eb.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StatsParam(AccumulatorParam):

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 2351 at commit 0a5b6eb.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 2351 at commit 0a5b6eb.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StatsParam(AccumulatorParam):

@@ -215,6 +215,21 @@ def addInPlace(self, value1, value2):
COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)


class StatsParam(AccumulatorParam):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be clearer to name this ProfilingStatsParam or PStatsParam?

@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have started for PR 2351 at commit 4f8309d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have finished for PR 2351 at commit 4f8309d.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PStatsParam(AccumulatorParam):

@davies
Copy link
Contributor Author

davies commented Sep 13, 2014

@JoshRosen I had addressed your comment, also added docs for configs and tests.

I realized that the profile result also can be showed interactively, by rdd.show_profile(), I had updated the PR description for this.

@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have started for PR 2351 at commit 15d6f18.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have started for PR 2351 at commit c23865c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have finished for PR 2351 at commit c23865c.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class TaskCompletionListenerException(errorMessages: Seq[String]) extends Exception
    • class PStatsParam(AccumulatorParam):

@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have finished for PR 2351 at commit 15d6f18.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class TaskCompletionListenerException(errorMessages: Seq[String]) extends Exception
    • class PStatsParam(AccumulatorParam):

Conflicts:
	docs/configuration.md
@SparkQA
Copy link

SparkQA commented Sep 14, 2014

QA tests have started for PR 2351 at commit 09d02c3.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 14, 2014

QA tests have finished for PR 2351 at commit 09d02c3.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PStatsParam(AccumulatorParam):

@SparkQA
Copy link

SparkQA commented Sep 17, 2014

QA tests have started for PR 2351 at commit 116d52a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 17, 2014

QA tests have finished for PR 2351 at commit 116d52a.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PStatsParam(AccumulatorParam):

@SparkQA
Copy link

SparkQA commented Sep 23, 2014

QA tests have started for PR 2351 at commit fb9565b.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor

(I killed the test here so that I could re-run it with the newer commits).

@SparkQA
Copy link

SparkQA commented Sep 24, 2014

QA tests have started for PR 2351 at commit 2b0daf2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 24, 2014

QA tests have started for PR 2351 at commit 2b0daf2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have finished for PR 2351 at commit 2b0daf2.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PStatsParam(AccumulatorParam):

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have finished for PR 2351 at commit 2b0daf2.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PStatsParam(AccumulatorParam):

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20769/

@shaneknapp
Copy link
Contributor

jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have started for PR 2351 at commit 2b0daf2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have finished for PR 2351 at commit 2b0daf2.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PStatsParam(AccumulatorParam):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20771/

@JoshRosen
Copy link
Contributor

This looks good to me. Thanks!

@JoshRosen
Copy link
Contributor

I noticed that we don't have any automated tests for show_profiles(), so I tested it manually and found a problem when running this file through spark-submit:

from pyspark import SparkContext, SparkConf
conf = SparkConf()
conf.set("spark.python.profile", "true")
sc = SparkContext(appName="test", conf=conf)
count = sc.parallelize(range(10000)).count()
sc.show_profiles()

This results in:

Traceback (most recent call last):
  File "/Users/joshrosen/Documents/spark/test.py", line 6, in <module>
    sc.show_profiles()
  File "/Users/joshrosen/Documents/Spark/python/pyspark/context.py", line 811, in show_profiles
    for i, (id, acc, showed) in self._profile_stats:
ValueError: too many values to unpack
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/Users/joshrosen/anaconda/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/Users/joshrosen/Documents/Spark/python/pyspark/context.py", line 811, in show_profiles
    for i, (id, acc, showed) in self._profile_stats:
ValueError: too many values to unpack
Error in sys.exitfunc:
Traceback (most recent call last):
  File "/Users/joshrosen/anaconda/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/Users/joshrosen/Documents/Spark/python/pyspark/context.py", line 811, in show_profiles
    for i, (id, acc, showed) in self._profile_stats:
ValueError: too many values to unpack

Can we add a test for this, too?

@davies
Copy link
Contributor Author

davies commented Sep 25, 2014

@JoshRosen sorry for this mistake, fixed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20822/

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have started for PR 2351 at commit 7ef2aa0.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have finished for PR 2351 at commit 7ef2aa0.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PStatsParam(AccumulatorParam):

Enable profiling in Python worker, the profile result will show up by `sc.show_profiles()`,
or it will be displayed before the driver exiting. It also can be dumped into disk by
`sc.dump_profiles(path)`. If some of the profile results had been displayed maually,
they will not be displayed automatically before driver exiting.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still true? It looks like we now use a showed flag to detect whether they've been printed instead of clearing the profiles array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we clear _profile_stats when we perform manual dump_profiles() calls, but not when we call show_profiles(), so it seems like this is half-true (unless I've overlooked something).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if showed is true, it will not be displayed again, but will be dumped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right. If it's been manually dumped, the it won't be dumped again when exiting. If it's been manually dumped or displayed, then it won't be displayed when exiting.

This makes sense; sorry for the confusion.

@asfgit asfgit closed this in 1aa549b Sep 26, 2014
@davies
Copy link
Contributor Author

davies commented Sep 26, 2014

Thanks for review this, your comments made it much better.

@JoshRosen
Copy link
Contributor

Whoops, looks like this failed unit tests and caused a build-break. I'm going to revert it to un-break the build while we investigate.

asfgit pushed a commit that referenced this pull request Oct 1, 2014
This patch add profiling support for PySpark, it will show the profiling results
before the driver exits, here is one example:

```
============================================================
Profile of RDD<id=3>
============================================================
         5146507 function calls (5146487 primitive calls) in 71.094 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
  5144576   68.331    0.000   68.331    0.000 statcounter.py:44(merge)
       20    2.735    0.137   71.071    3.554 statcounter.py:33(__init__)
       20    0.017    0.001    0.017    0.001 {cPickle.dumps}
     1024    0.003    0.000    0.003    0.000 t.py:16(<lambda>)
       20    0.001    0.000    0.001    0.000 {reduce}
       21    0.001    0.000    0.001    0.000 {cPickle.loads}
       20    0.001    0.000    0.001    0.000 copy_reg.py:95(_slotnames)
       41    0.001    0.000    0.001    0.000 serializers.py:461(read_int)
       40    0.001    0.000    0.002    0.000 serializers.py:179(_batched)
       62    0.000    0.000    0.000    0.000 {method 'read' of 'file' objects}
       20    0.000    0.000   71.072    3.554 rdd.py:863(<lambda>)
       20    0.000    0.000    0.001    0.000 serializers.py:198(load_stream)
    40/20    0.000    0.000   71.072    3.554 rdd.py:2093(pipeline_func)
       41    0.000    0.000    0.002    0.000 serializers.py:130(load_stream)
       40    0.000    0.000   71.072    1.777 rdd.py:304(func)
       20    0.000    0.000   71.094    3.555 worker.py:82(process)
```

Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
by `sc.dump_profiles(path)`, such as

```python
>>> sc._conf.set("spark.python.profile", "true")
>>> rdd = sc.parallelize(range(100)).map(str)
>>> rdd.count()
100
>>> sc.show_profiles()
============================================================
Profile of RDD<id=1>
============================================================
         284 function calls (276 primitive calls) in 0.001 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
        4    0.000    0.000    0.000    0.000 {reduce}
     12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
        4    0.000    0.000    0.000    0.000 {cPickle.loads}
        4    0.000    0.000    0.000    0.000 {cPickle.dumps}
      104    0.000    0.000    0.000    0.000 rdd.py:852(<genexpr>)
        8    0.000    0.000    0.000    0.000 serializers.py:461(read_int)
       12    0.000    0.000    0.000    0.000 rdd.py:303(func)
```
The profiling is disabled by default, can be enabled by "spark.python.profile=true".

Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"

This is bugfix of #2351 cc JoshRosen

Author: Davies Liu <davies.liu@gmail.com>

Closes #2556 from davies/profiler and squashes the following commits:

e68df5a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
858e74c [Davies Liu] compatitable with python 2.6
7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
2b0daf2 [Davies Liu] fix docs
7a56c24 [Davies Liu] bugfix
cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
09d02c3 [Davies Liu] Merge branch 'master' into profiler
c23865c [Davies Liu] Merge branch 'master' into profiler
15d6f18 [Davies Liu] add docs for two configs
dadee1a [Davies Liu] add docs string and clear profiles after show or dump
4f8309d [Davies Liu] address comment, add tests
0a5b6eb [Davies Liu] fix Python UDF
4b20494 [Davies Liu] add profile for python
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants