From 4b20494ce4e5e287a09fee5df5e0684711258627 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 10 Sep 2014 17:51:28 -0700 Subject: [PATCH 01/10] add profile for python --- python/pyspark/accumulators.py | 15 ++++++++++++++ python/pyspark/rdd.py | 38 +++++++++++++++++++++++++++++++++- python/pyspark/worker.py | 20 +++++++++++++++--- 3 files changed, 69 insertions(+), 4 deletions(-) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index ccbca67656c8d..549fcb8e9559a 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -215,6 +215,21 @@ def addInPlace(self, value1, value2): COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j) +class StatsParam(AccumulatorParam): + """StatsParam is used to merge pstats.Stats""" + + @staticmethod + def zero(value): + return None + + @staticmethod + def addInPlace(value1, value2): + if value1 is None: + return value2 + value1.add(value2) + return value1 + + class _UpdateRequestHandler(SocketServer.StreamRequestHandler): """ diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 5667154cb84a8..9dcccdd37a20a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -31,9 +31,11 @@ import warnings import heapq import bisect +import atexit from random import Random from math import sqrt, log, isinf, isnan +from pyspark.accumulators import StatsParam from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, CompressedSerializer @@ -2076,6 +2078,7 @@ class PipelinedRDD(RDD): >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 20 """ + _created_profiles = [] def __init__(self, prev, func, preservesPartitioning=False): if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): @@ -2110,7 +2113,9 @@ def _jrdd(self): return self._jrdd_val if self._bypass_serializer: self._jrdd_deserializer = NoOpSerializer() - command = (self.func, self._prev_jrdd_deserializer, + enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" + profileStats = self.ctx.accumulator(None, StatsParam) if enable_profile else None + command = (self.func, profileStats, self._prev_jrdd_deserializer, self._jrdd_deserializer) ser = CloudPickleSerializer() pickled_command = ser.dumps(command) @@ -2128,8 +2133,39 @@ def _jrdd(self): self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator) self._jrdd_val = python_rdd.asJavaRDD() + + if enable_profile: + self._id = self._jrdd_val.id() + if not self._created_profiles: + dump_path = self.ctx._conf.get("spark.python.profile.dump") + if dump_path: + atexit.register(PipelinedRDD.dump_profile, dump_path) + else: + atexit.register(PipelinedRDD.show_profile) + self._created_profiles.append((self._id, profileStats)) + return self._jrdd_val + @classmethod + def show_profile(cls): + for id, acc in cls._created_profiles: + stats = acc.value + if stats: + print "="*60 + print "Profile of RDD" % id + print "="*60 + stats.sort_stats("tottime", "cumtime").print_stats() + + @classmethod + def dump_profile(cls, dump_path): + if not os.path.exists(dump_path): + os.makedirs(dump_path) + for id, acc in cls._created_profiles: + stats = acc.value + if stats: + path = os.path.join(dump_path, "rdd_%d.pstats" % id) + stats.dump_stats(path) + def id(self): if self._id is None: self._id = self._jrdd.id() diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 6805063e06798..6e5c1e17d2647 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -23,6 +23,9 @@ import time import socket import traceback +import cProfile +import pstats + # CloudPickler needs to be imported so that depicklers are registered using the # copy_reg module. from pyspark.accumulators import _accumulatorRegistry @@ -73,10 +76,21 @@ def main(infile, outfile): _broadcastRegistry[bid] = Broadcast(bid, value) command = pickleSer._read_with_length(infile) - (func, deserializer, serializer) = command + (func, stats, deserializer, serializer) = command init_time = time.time() - iterator = deserializer.load_stream(infile) - serializer.dump_stream(func(split_index, iterator), outfile) + + def process(): + iterator = deserializer.load_stream(infile) + serializer.dump_stream(func(split_index, iterator), outfile) + + if stats: + p = cProfile.Profile() + p.runcall(process) + st = pstats.Stats(p) + st.stream = None # make it picklable + stats.add(st.strip_dirs()) + else: + process() except Exception: try: write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) From 0a5b6ebcd38f13fa15721c56a9d96bd9000529f5 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 10 Sep 2014 20:25:23 -0700 Subject: [PATCH 02/10] fix Python UDF --- python/pyspark/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 53eea6d6cf3ba..3156e5c54a481 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -971,7 +971,7 @@ def registerFunction(self, name, f, returnType=StringType()): [Row(c0=4)] """ func = lambda _, it: imap(lambda x: f(*x), it) - command = (func, + command = (func, None, BatchedSerializer(PickleSerializer(), 1024), BatchedSerializer(PickleSerializer(), 1024)) pickled_command = CloudPickleSerializer().dumps(command) From 4f8309d7d8df18fb5f4da1d9f150d7606bf650c9 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 12 Sep 2014 20:14:34 -0700 Subject: [PATCH 03/10] address comment, add tests --- python/pyspark/accumulators.py | 4 ++-- python/pyspark/rdd.py | 5 ++--- python/pyspark/tests.py | 25 +++++++++++++++++++++++++ 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 549fcb8e9559a..b8cdbbe3cf2b6 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -215,8 +215,8 @@ def addInPlace(self, value1, value2): COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j) -class StatsParam(AccumulatorParam): - """StatsParam is used to merge pstats.Stats""" +class PStatsParam(AccumulatorParam): + """PStatsParam is used to merge pstats.Stats""" @staticmethod def zero(value): diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 9dcccdd37a20a..6d08e770db34b 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -15,7 +15,6 @@ # limitations under the License. # -from base64 import standard_b64encode as b64enc import copy from collections import defaultdict from collections import namedtuple @@ -35,7 +34,7 @@ from random import Random from math import sqrt, log, isinf, isnan -from pyspark.accumulators import StatsParam +from pyspark.accumulators import PStatsParam from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, CompressedSerializer @@ -2114,7 +2113,7 @@ def _jrdd(self): if self._bypass_serializer: self._jrdd_deserializer = NoOpSerializer() enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" - profileStats = self.ctx.accumulator(None, StatsParam) if enable_profile else None + profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None command = (self.func, profileStats, self._prev_jrdd_deserializer, self._jrdd_deserializer) ser = CloudPickleSerializer() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index bb84ebe72cb24..10494be3e7918 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -554,6 +554,31 @@ def test_repartitionAndSortWithinPartitions(self): self.assertEquals(partitions[1], [(1, 3), (3, 8), (3, 8)]) +class TestProfiler(PySparkTestCase): + + def setUp(self): + self._old_sys_path = list(sys.path) + class_name = self.__class__.__name__ + conf = SparkConf().set("spark.python.profile", "true") + self.sc = SparkContext('local[4]', class_name, batchSize=2, conf=conf) + + def test_profiler(self): + + def heavy_foo(x): + for i in range(1 << 20): + x = 1 + rdd = self.sc.parallelize(range(100)).foreach(heavy_foo) + from pyspark.rdd import PipelinedRDD + profiles = PipelinedRDD._created_profiles + self.assertEqual(1, len(profiles)) + id, acc = profiles.pop() + stats = acc.value + self.assertTrue(stats is not None) + width, stat_list = stats.get_print_list([]) + func_names = [func_name for fname, n, func_name in stat_list] + self.assertTrue("heavy_foo" in func_names) + + class TestSQL(PySparkTestCase): def setUp(self): From dadee1a228b20d24e4a6b0a7d081f1b30f773988 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 12 Sep 2014 21:51:33 -0700 Subject: [PATCH 04/10] add docs string and clear profiles after show or dump --- python/pyspark/rdd.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6d08e770db34b..5c701fe739f4e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2147,16 +2147,20 @@ def _jrdd(self): @classmethod def show_profile(cls): + """ Print the profile stats to stdout """ for id, acc in cls._created_profiles: stats = acc.value if stats: - print "="*60 + print "=" * 60 print "Profile of RDD" % id - print "="*60 + print "=" * 60 stats.sort_stats("tottime", "cumtime").print_stats() + cls._created_profiles = [] @classmethod def dump_profile(cls, dump_path): + """ Dump the profile stats into directory `dump_path` + """ if not os.path.exists(dump_path): os.makedirs(dump_path) for id, acc in cls._created_profiles: @@ -2164,6 +2168,7 @@ def dump_profile(cls, dump_path): if stats: path = os.path.join(dump_path, "rdd_%d.pstats" % id) stats.dump_stats(path) + cls._created_profiles = [] def id(self): if self._id is None: From 15d6f18fd97422ff7bebf343383b7eca9ef433bc Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 12 Sep 2014 22:09:06 -0700 Subject: [PATCH 05/10] add docs for two configs --- docs/configuration.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 36178efb97103..67a3108b76914 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -206,6 +206,23 @@ Apart from these, the following properties are also available, and may be useful used during aggregation goes above this amount, it will spill the data into disks. + + spark.python.profile + false + + Enable profiling in Python worker, the profile result will show up by `rdd.show_profile()`, + or it will show up before the driver exit. It also can be dumped into disk by + `rdd.dump_profile(path)`. + + + + spark.python.profile.dump + (none) + + The directory which is used to dump the profile result. The results will be dumped + as sepereted file for each RDD. They can be loaded by ptats.Stats(). + + spark.executorEnv.[EnvironmentVariableName] (none) From cba94639fa6e5c4b2cb26f3152ea80bffaf65cce Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 24 Sep 2014 16:05:06 -0700 Subject: [PATCH 06/10] move show_profiles and dump_profiles to SparkContext --- docs/configuration.md | 13 ++++++++----- python/pyspark/context.py | 39 ++++++++++++++++++++++++++++++++++++++- python/pyspark/rdd.py | 35 +---------------------------------- python/pyspark/tests.py | 5 ++--- 4 files changed, 49 insertions(+), 43 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 8dfabb3a1d048..03251866873dc 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -210,17 +210,20 @@ Apart from these, the following properties are also available, and may be useful spark.python.profile false - Enable profiling in Python worker, the profile result will show up by `rdd.show_profile()`, - or it will show up before the driver exit. It also can be dumped into disk by - `rdd.dump_profile(path)`. + Enable profiling in Python worker, the profile result will show up by `sc.show_profiles()`, + or it will be showed up before the driver exiting. It also can be dumped into disk by + `sc.dump_profiles(path)`. If some of the profile results had been showed up maually, + they will not be showed up automatically before driver exiting. spark.python.profile.dump (none) - The directory which is used to dump the profile result. The results will be dumped - as sepereted file for each RDD. They can be loaded by ptats.Stats(). + The directory which is used to dump the profile result before driver exiting. + The results will be dumped as separated file for each RDD. They can be loaded + by ptats.Stats(). If this is specified, the profile result will not be showed up + automatically. spark.python.worker.reuse diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 064a24bff539c..5a66d11eca768 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -20,6 +20,7 @@ import sys from threading import Lock from tempfile import NamedTemporaryFile +import atexit from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -30,7 +31,6 @@ from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ PairDeserializer, CompressedSerializer from pyspark.storagelevel import StorageLevel -from pyspark import rdd from pyspark.rdd import RDD from pyspark.traceback_utils import CallSite, first_spark_call @@ -193,6 +193,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() + # profiling stats collected for each PythonRDD + self._profile_stats = [] + def _initialize_context(self, jconf): """ Initialize SparkContext in function to allow subclass specific initialization @@ -793,6 +796,40 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) + def _add_profile(self, id, profileAcc): + if not self._profile_stats: + dump_path = self._conf.get("spark.python.profile.dump") + if dump_path: + atexit.register(self.dump_profiles, dump_path) + else: + atexit.register(self.show_profiles) + + self._profile_stats.append([id, profileAcc, False]) + + def show_profiles(self): + """ Print the profile stats to stdout """ + for i, (id, acc, showed) in self._profile_stats: + stats = acc.value + if not showed and stats: + print "=" * 60 + print "Profile of RDD" % id + print "=" * 60 + stats.sort_stats("tottime", "cumtime").print_stats() + # mark it as showed + self._profile_stats[i][2] = True + + def dump_profiles(self, path): + """ Dump the profile stats into directory `path` + """ + if not os.path.exists(path): + os.makedirs(path) + for id, acc, _ in self._created_profiles: + stats = acc.value + if stats: + p = os.path.join(path, "rdd_%d.pstats" % id) + stats.dump_stats(p) + self._profile_stats = [] + def _test(): import atexit diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index b5cb67f6a69e1..8bdb3ea5f7961 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -28,7 +28,6 @@ import warnings import heapq import bisect -import atexit from random import Random from math import sqrt, log, isinf, isnan @@ -2088,41 +2087,9 @@ def _jrdd(self): if enable_profile: self._id = self._jrdd_val.id() - if not self._created_profiles: - dump_path = self.ctx._conf.get("spark.python.profile.dump") - if dump_path: - atexit.register(PipelinedRDD.dump_profile, dump_path) - else: - atexit.register(PipelinedRDD.show_profile) - self._created_profiles.append((self._id, profileStats)) - + self.ctx._add_profile(self._id, profileStats) return self._jrdd_val - @classmethod - def show_profile(cls): - """ Print the profile stats to stdout """ - for id, acc in cls._created_profiles: - stats = acc.value - if stats: - print "=" * 60 - print "Profile of RDD" % id - print "=" * 60 - stats.sort_stats("tottime", "cumtime").print_stats() - cls._created_profiles = [] - - @classmethod - def dump_profile(cls, dump_path): - """ Dump the profile stats into directory `dump_path` - """ - if not os.path.exists(dump_path): - os.makedirs(dump_path) - for id, acc in cls._created_profiles: - stats = acc.value - if stats: - path = os.path.join(dump_path, "rdd_%d.pstats" % id) - stats.dump_stats(path) - cls._created_profiles = [] - def id(self): if self._id is None: self._id = self._jrdd.id() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 7aa04644f6310..cd493a5f93b48 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -616,10 +616,9 @@ def heavy_foo(x): for i in range(1 << 20): x = 1 rdd = self.sc.parallelize(range(100)).foreach(heavy_foo) - from pyspark.rdd import PipelinedRDD - profiles = PipelinedRDD._created_profiles + profiles = self.sc._profile_stats self.assertEqual(1, len(profiles)) - id, acc = profiles.pop() + id, acc, _ = profiles.pop() stats = acc.value self.assertTrue(stats is not None) width, stat_list = stats.get_print_list([]) From 7a56c2420dd087cbe311d34fa81b5b9d22024b53 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 24 Sep 2014 16:12:11 -0700 Subject: [PATCH 07/10] bugfix --- python/pyspark/context.py | 2 +- python/pyspark/rdd.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 5a66d11eca768..a2c76ad218069 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -823,7 +823,7 @@ def dump_profiles(self, path): """ if not os.path.exists(path): os.makedirs(path) - for id, acc, _ in self._created_profiles: + for id, acc, _ in self._profile_stats: stats = acc.value if stats: p = os.path.join(path, "rdd_%d.pstats" % id) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8bdb3ea5f7961..0f4b167193bce 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2025,7 +2025,6 @@ class PipelinedRDD(RDD): >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 20 """ - _created_profiles = [] def __init__(self, prev, func, preservesPartitioning=False): if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): From 2b0daf207384b7cbf15a180bb05985fb596e8281 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 24 Sep 2014 16:13:25 -0700 Subject: [PATCH 08/10] fix docs --- docs/configuration.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 03251866873dc..791b6f2aa3261 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -211,9 +211,9 @@ Apart from these, the following properties are also available, and may be useful false Enable profiling in Python worker, the profile result will show up by `sc.show_profiles()`, - or it will be showed up before the driver exiting. It also can be dumped into disk by - `sc.dump_profiles(path)`. If some of the profile results had been showed up maually, - they will not be showed up automatically before driver exiting. + or it will be displayed before the driver exiting. It also can be dumped into disk by + `sc.dump_profiles(path)`. If some of the profile results had been displayed maually, + they will not be displayed automatically before driver exiting. @@ -222,7 +222,7 @@ Apart from these, the following properties are also available, and may be useful The directory which is used to dump the profile result before driver exiting. The results will be dumped as separated file for each RDD. They can be loaded - by ptats.Stats(). If this is specified, the profile result will not be showed up + by ptats.Stats(). If this is specified, the profile result will not be displayed automatically. From 7ef2aa05cf07b2648cb73cd05f2ece93a44d9b9a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 25 Sep 2014 14:47:49 -0700 Subject: [PATCH 09/10] bugfix, add tests for show_profiles and dump_profiles() --- python/pyspark/context.py | 2 +- python/pyspark/tests.py | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a2c76ad218069..1c9998c0ef4ff 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -808,7 +808,7 @@ def _add_profile(self, id, profileAcc): def show_profiles(self): """ Print the profile stats to stdout """ - for i, (id, acc, showed) in self._profile_stats: + for i, (id, acc, showed) in enumerate(self._profile_stats): stats = acc.value if not showed and stats: print "=" * 60 diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index cd493a5f93b48..bf3cf94d5d083 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -615,16 +615,22 @@ def test_profiler(self): def heavy_foo(x): for i in range(1 << 20): x = 1 - rdd = self.sc.parallelize(range(100)).foreach(heavy_foo) + rdd = self.sc.parallelize(range(100)) + rdd.foreach(heavy_foo) profiles = self.sc._profile_stats self.assertEqual(1, len(profiles)) - id, acc, _ = profiles.pop() + id, acc, _ = profiles[0] stats = acc.value self.assertTrue(stats is not None) width, stat_list = stats.get_print_list([]) func_names = [func_name for fname, n, func_name in stat_list] self.assertTrue("heavy_foo" in func_names) + self.sc.show_profiles() + d = tempfile.gettempdir() + self.sc.dump_profiles(d) + self.assertTrue("rdd_%d.pstats" % id in os.listdir(d)) + class TestSQL(PySparkTestCase): From 858e74caf5063e43fe7621716bc3e2048321ea00 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 26 Sep 2014 21:29:40 -0700 Subject: [PATCH 10/10] compatitable with python 2.6 --- python/pyspark/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 1c9998c0ef4ff..4dd6b92de4097 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -814,7 +814,7 @@ def show_profiles(self): print "=" * 60 print "Profile of RDD" % id print "=" * 60 - stats.sort_stats("tottime", "cumtime").print_stats() + stats.sort_stats("time", "cumulative").print_stats() # mark it as showed self._profile_stats[i][2] = True