Skip to content

Commit

Permalink
move show_profiles and dump_profiles to SparkContext
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 24, 2014
1 parent fb9565b commit cba9463
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 43 deletions.
13 changes: 8 additions & 5 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,20 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.python.profile</code></td>
<td>false</td>
<td>
Enable profiling in Python worker, the profile result will show up by `rdd.show_profile()`,
or it will show up before the driver exit. It also can be dumped into disk by
`rdd.dump_profile(path)`.
Enable profiling in Python worker, the profile result will show up by `sc.show_profiles()`,
or it will be showed up before the driver exiting. It also can be dumped into disk by
`sc.dump_profiles(path)`. If some of the profile results had been showed up maually,
they will not be showed up automatically before driver exiting.
</td>
</tr>
<tr>
<td><code>spark.python.profile.dump</code></td>
<td>(none)</td>
<td>
The directory which is used to dump the profile result. The results will be dumped
as sepereted file for each RDD. They can be loaded by ptats.Stats().
The directory which is used to dump the profile result before driver exiting.
The results will be dumped as separated file for each RDD. They can be loaded
by ptats.Stats(). If this is specified, the profile result will not be showed up
automatically.
</tr>
<tr>
<td><code>spark.python.worker.reuse</code></td>
Expand Down
39 changes: 38 additions & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
from threading import Lock
from tempfile import NamedTemporaryFile
import atexit

from pyspark import accumulators
from pyspark.accumulators import Accumulator
Expand All @@ -30,7 +31,6 @@
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, CompressedSerializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call

Expand Down Expand Up @@ -193,6 +193,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()

# profiling stats collected for each PythonRDD
self._profile_stats = []

def _initialize_context(self, jconf):
"""
Initialize SparkContext in function to allow subclass specific initialization
Expand Down Expand Up @@ -793,6 +796,40 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))

def _add_profile(self, id, profileAcc):
if not self._profile_stats:
dump_path = self._conf.get("spark.python.profile.dump")
if dump_path:
atexit.register(self.dump_profiles, dump_path)
else:
atexit.register(self.show_profiles)

self._profile_stats.append([id, profileAcc, False])

def show_profiles(self):
""" Print the profile stats to stdout """
for i, (id, acc, showed) in self._profile_stats:
stats = acc.value
if not showed and stats:
print "=" * 60
print "Profile of RDD<id=%d>" % id
print "=" * 60
stats.sort_stats("tottime", "cumtime").print_stats()
# mark it as showed
self._profile_stats[i][2] = True

def dump_profiles(self, path):
""" Dump the profile stats into directory `path`
"""
if not os.path.exists(path):
os.makedirs(path)
for id, acc, _ in self._created_profiles:
stats = acc.value
if stats:
p = os.path.join(path, "rdd_%d.pstats" % id)
stats.dump_stats(p)
self._profile_stats = []


def _test():
import atexit
Expand Down
35 changes: 1 addition & 34 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import warnings
import heapq
import bisect
import atexit
from random import Random
from math import sqrt, log, isinf, isnan

Expand Down Expand Up @@ -2088,41 +2087,9 @@ def _jrdd(self):

if enable_profile:
self._id = self._jrdd_val.id()
if not self._created_profiles:
dump_path = self.ctx._conf.get("spark.python.profile.dump")
if dump_path:
atexit.register(PipelinedRDD.dump_profile, dump_path)
else:
atexit.register(PipelinedRDD.show_profile)
self._created_profiles.append((self._id, profileStats))

self.ctx._add_profile(self._id, profileStats)
return self._jrdd_val

@classmethod
def show_profile(cls):
""" Print the profile stats to stdout """
for id, acc in cls._created_profiles:
stats = acc.value
if stats:
print "=" * 60
print "Profile of RDD<id=%d>" % id
print "=" * 60
stats.sort_stats("tottime", "cumtime").print_stats()
cls._created_profiles = []

@classmethod
def dump_profile(cls, dump_path):
""" Dump the profile stats into directory `dump_path`
"""
if not os.path.exists(dump_path):
os.makedirs(dump_path)
for id, acc in cls._created_profiles:
stats = acc.value
if stats:
path = os.path.join(dump_path, "rdd_%d.pstats" % id)
stats.dump_stats(path)
cls._created_profiles = []

def id(self):
if self._id is None:
self._id = self._jrdd.id()
Expand Down
5 changes: 2 additions & 3 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,10 +616,9 @@ def heavy_foo(x):
for i in range(1 << 20):
x = 1
rdd = self.sc.parallelize(range(100)).foreach(heavy_foo)
from pyspark.rdd import PipelinedRDD
profiles = PipelinedRDD._created_profiles
profiles = self.sc._profile_stats
self.assertEqual(1, len(profiles))
id, acc = profiles.pop()
id, acc, _ = profiles.pop()
stats = acc.value
self.assertTrue(stats is not None)
width, stat_list = stats.get_print_list([])
Expand Down

0 comments on commit cba9463

Please sign in to comment.