From cba94639fa6e5c4b2cb26f3152ea80bffaf65cce Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 24 Sep 2014 16:05:06 -0700 Subject: [PATCH] move show_profiles and dump_profiles to SparkContext --- docs/configuration.md | 13 ++++++++----- python/pyspark/context.py | 39 ++++++++++++++++++++++++++++++++++++++- python/pyspark/rdd.py | 35 +---------------------------------- python/pyspark/tests.py | 5 ++--- 4 files changed, 49 insertions(+), 43 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 8dfabb3a1d048..03251866873dc 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -210,17 +210,20 @@ Apart from these, the following properties are also available, and may be useful spark.python.profile false - Enable profiling in Python worker, the profile result will show up by `rdd.show_profile()`, - or it will show up before the driver exit. It also can be dumped into disk by - `rdd.dump_profile(path)`. + Enable profiling in Python worker, the profile result will show up by `sc.show_profiles()`, + or it will be showed up before the driver exiting. It also can be dumped into disk by + `sc.dump_profiles(path)`. If some of the profile results had been showed up maually, + they will not be showed up automatically before driver exiting. spark.python.profile.dump (none) - The directory which is used to dump the profile result. The results will be dumped - as sepereted file for each RDD. They can be loaded by ptats.Stats(). + The directory which is used to dump the profile result before driver exiting. + The results will be dumped as separated file for each RDD. They can be loaded + by ptats.Stats(). If this is specified, the profile result will not be showed up + automatically. spark.python.worker.reuse diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 064a24bff539c..5a66d11eca768 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -20,6 +20,7 @@ import sys from threading import Lock from tempfile import NamedTemporaryFile +import atexit from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -30,7 +31,6 @@ from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ PairDeserializer, CompressedSerializer from pyspark.storagelevel import StorageLevel -from pyspark import rdd from pyspark.rdd import RDD from pyspark.traceback_utils import CallSite, first_spark_call @@ -193,6 +193,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() + # profiling stats collected for each PythonRDD + self._profile_stats = [] + def _initialize_context(self, jconf): """ Initialize SparkContext in function to allow subclass specific initialization @@ -793,6 +796,40 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) + def _add_profile(self, id, profileAcc): + if not self._profile_stats: + dump_path = self._conf.get("spark.python.profile.dump") + if dump_path: + atexit.register(self.dump_profiles, dump_path) + else: + atexit.register(self.show_profiles) + + self._profile_stats.append([id, profileAcc, False]) + + def show_profiles(self): + """ Print the profile stats to stdout """ + for i, (id, acc, showed) in self._profile_stats: + stats = acc.value + if not showed and stats: + print "=" * 60 + print "Profile of RDD" % id + print "=" * 60 + stats.sort_stats("tottime", "cumtime").print_stats() + # mark it as showed + self._profile_stats[i][2] = True + + def dump_profiles(self, path): + """ Dump the profile stats into directory `path` + """ + if not os.path.exists(path): + os.makedirs(path) + for id, acc, _ in self._created_profiles: + stats = acc.value + if stats: + p = os.path.join(path, "rdd_%d.pstats" % id) + stats.dump_stats(p) + self._profile_stats = [] + def _test(): import atexit diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index b5cb67f6a69e1..8bdb3ea5f7961 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -28,7 +28,6 @@ import warnings import heapq import bisect -import atexit from random import Random from math import sqrt, log, isinf, isnan @@ -2088,41 +2087,9 @@ def _jrdd(self): if enable_profile: self._id = self._jrdd_val.id() - if not self._created_profiles: - dump_path = self.ctx._conf.get("spark.python.profile.dump") - if dump_path: - atexit.register(PipelinedRDD.dump_profile, dump_path) - else: - atexit.register(PipelinedRDD.show_profile) - self._created_profiles.append((self._id, profileStats)) - + self.ctx._add_profile(self._id, profileStats) return self._jrdd_val - @classmethod - def show_profile(cls): - """ Print the profile stats to stdout """ - for id, acc in cls._created_profiles: - stats = acc.value - if stats: - print "=" * 60 - print "Profile of RDD" % id - print "=" * 60 - stats.sort_stats("tottime", "cumtime").print_stats() - cls._created_profiles = [] - - @classmethod - def dump_profile(cls, dump_path): - """ Dump the profile stats into directory `dump_path` - """ - if not os.path.exists(dump_path): - os.makedirs(dump_path) - for id, acc in cls._created_profiles: - stats = acc.value - if stats: - path = os.path.join(dump_path, "rdd_%d.pstats" % id) - stats.dump_stats(path) - cls._created_profiles = [] - def id(self): if self._id is None: self._id = self._jrdd.id() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 7aa04644f6310..cd493a5f93b48 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -616,10 +616,9 @@ def heavy_foo(x): for i in range(1 << 20): x = 1 rdd = self.sc.parallelize(range(100)).foreach(heavy_foo) - from pyspark.rdd import PipelinedRDD - profiles = PipelinedRDD._created_profiles + profiles = self.sc._profile_stats self.assertEqual(1, len(profiles)) - id, acc = profiles.pop() + id, acc, _ = profiles.pop() stats = acc.value self.assertTrue(stats is not None) width, stat_list = stats.get_print_list([])