From 9bda3ecbe89d03a064c8cf7c604294878fd103a2 Mon Sep 17 00:00:00 2001 From: Yandu Oppacher Date: Thu, 13 Nov 2014 16:02:33 -0500 Subject: [PATCH 01/11] Refactor profiler code --- docs/configuration.md | 33 ++++++++------- python/pyspark/context.py | 38 +++++++---------- python/pyspark/pyprofiler.py | 81 ++++++++++++++++++++++++++++++++++++ python/pyspark/rdd.py | 16 ++++--- python/pyspark/tests.py | 45 +++++++++++++++----- python/pyspark/worker.py | 13 ++---- 6 files changed, 162 insertions(+), 64 deletions(-) create mode 100644 python/pyspark/pyprofiler.py diff --git a/docs/configuration.md b/docs/configuration.md index f0b396e21f198..8ec3ce6f7fe1f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -23,8 +23,8 @@ application. These properties can be set directly on a (e.g. master URL and application name), as well as arbitrary key-value pairs through the `set()` method. For example, we could initialize an application with two threads as follows: -Note that we run with local[2], meaning two threads - which represents "minimal" parallelism, -which can help detect bugs that only exist when we run in a distributed context. +Note that we run with local[2], meaning two threads - which represents "minimal" parallelism, +which can help detect bugs that only exist when we run in a distributed context. {% highlight scala %} val conf = new SparkConf() @@ -35,7 +35,7 @@ val sc = new SparkContext(conf) {% endhighlight %} Note that we can have more than 1 thread in local mode, and in cases like spark streaming, we may actually -require one to prevent any sort of starvation issues. +require one to prevent any sort of starvation issues. ## Dynamically Loading Spark Properties In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For @@ -48,8 +48,8 @@ val sc = new SparkContext(new SparkConf()) Then, you can supply configuration values at runtime: {% highlight bash %} -./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false - --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar +./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false + --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar {% endhighlight %} The Spark shell and [`spark-submit`](cluster-overview.html#launching-applications-with-spark-submit) @@ -123,7 +123,7 @@ of the most common options to set are: Limit of total size of serialized results of all partitions for each Spark action (e.g. collect). Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size - is above this limit. + is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors. @@ -243,13 +243,16 @@ Apart from these, the following properties are also available, and may be useful or it will be displayed before the driver exiting. It also can be dumped into disk by `sc.dump_profiles(path)`. If some of the profile results had been displayed maually, they will not be displayed automatically before driver exiting. + + By default the `pyspark.pyprofiler.BasicProfiler` will be used, but this can be overridden by + passing a profiler class in as a parameter to the `SparkContext` constructor. spark.python.profile.dump (none) - The directory which is used to dump the profile result before driver exiting. + The directory which is used to dump the profile result before driver exiting. The results will be dumped as separated file for each RDD. They can be loaded by ptats.Stats(). If this is specified, the profile result will not be displayed automatically. @@ -268,8 +271,8 @@ Apart from these, the following properties are also available, and may be useful spark.executorEnv.[EnvironmentVariableName] (none) - Add the environment variable specified by EnvironmentVariableName to the Executor - process. The user can specify multiple of these and to set multiple environment variables. + Add the environment variable specified by EnvironmentVariableName to the Executor + process. The user can specify multiple of these and to set multiple environment variables. @@ -474,9 +477,9 @@ Apart from these, the following properties are also available, and may be useful The codec used to compress internal data such as RDD partitions, broadcast variables and shuffle outputs. By default, Spark provides three codecs: lz4, lzf, - and snappy. You can also use fully qualified class names to specify the codec, - e.g. - org.apache.spark.io.LZ4CompressionCodec, + and snappy. You can also use fully qualified class names to specify the codec, + e.g. + org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, and org.apache.spark.io.SnappyCompressionCodec. @@ -944,7 +947,7 @@ Apart from these, the following properties are also available, and may be useful (resources are executors in yarn mode, CPU cores in standalone mode) to wait for before scheduling begins. Specified as a double between 0 and 1. Regardless of whether the minimum ratio of resources has been reached, - the maximum amount of time it will wait before scheduling begins is controlled by config + the maximum amount of time it will wait before scheduling begins is controlled by config spark.scheduler.maxRegisteredResourcesWaitingTime. @@ -953,7 +956,7 @@ Apart from these, the following properties are also available, and may be useful 30000 Maximum amount of time to wait for resources to register before scheduling begins - (in milliseconds). + (in milliseconds). @@ -1022,7 +1025,7 @@ Apart from these, the following properties are also available, and may be useful false Whether Spark acls should are enabled. If enabled, this checks to see if the user has - access permissions to view or modify the job. Note this requires the user to be known, + access permissions to view or modify the job. Note this requires the user to be known, so if the user comes across as null no checks are done. Filters can be used with the UI to authenticate and set the user. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index faa5952258aef..2f1b122f6dac9 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -33,6 +33,7 @@ from pyspark.storagelevel import StorageLevel from pyspark.rdd import RDD from pyspark.traceback_utils import CallSite, first_spark_call +from pyspark.pyprofiler import BasicProfiler from py4j.java_collections import ListConverter @@ -66,7 +67,7 @@ class SparkContext(object): def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, - gateway=None, jsc=None): + gateway=None, jsc=None, profiler=None): """ Create a new SparkContext. At least the master and app name should be set, either through the named parameters here or through C{conf}. @@ -102,14 +103,14 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, SparkContext._ensure_initialized(self, gateway=gateway) try: self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, - conf, jsc) + conf, jsc, profiler) except: # If an error occurs, clean up in order to allow future SparkContext creation: self.stop() raise def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, - conf, jsc): + conf, jsc, profiler): self.environment = environment or {} self._conf = conf or SparkConf(_jvm=self._jvm) self._batchSize = batchSize # -1 represents an unlimited batch size @@ -191,7 +192,13 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() + # profiling stats collected for each PythonRDD + if self._conf.get("spark.python.profile", "false") == "true": + self.profiler = profiler if profiler else BasicProfiler + else: + self.profiler = None + self._profile_stats = [] def _initialize_context(self, jconf): @@ -808,7 +815,7 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) - def _add_profile(self, id, profileAcc): + def _add_profiler(self, id, profiler): if not self._profile_stats: dump_path = self._conf.get("spark.python.profile.dump") if dump_path: @@ -816,30 +823,13 @@ def _add_profile(self, id, profileAcc): else: atexit.register(self.show_profiles) - self._profile_stats.append([id, profileAcc, False]) + self._profile_stats.append([id, profiler, False]) def show_profiles(self): - """ Print the profile stats to stdout """ - for i, (id, acc, showed) in enumerate(self._profile_stats): - stats = acc.value - if not showed and stats: - print "=" * 60 - print "Profile of RDD" % id - print "=" * 60 - stats.sort_stats("time", "cumulative").print_stats() - # mark it as showed - self._profile_stats[i][2] = True + self.profiler.show_profiles(self._profile_stats) def dump_profiles(self, path): - """ Dump the profile stats into directory `path` - """ - if not os.path.exists(path): - os.makedirs(path) - for id, acc, _ in self._profile_stats: - stats = acc.value - if stats: - p = os.path.join(path, "rdd_%d.pstats" % id) - stats.dump_stats(p) + self.profiler.dump_profiles(path, self._profile_stats) self._profile_stats = [] diff --git a/python/pyspark/pyprofiler.py b/python/pyspark/pyprofiler.py new file mode 100644 index 0000000000000..b3c16a4a434fd --- /dev/null +++ b/python/pyspark/pyprofiler.py @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +PySpark supports custom profilers, this is to allow for different profilers to +be used as well as outputting to different formats than what is provided in the +BasicProfiler. + +The profiler class is chosen when creating L{SparkContext}: +NOTE: This has no effect if `spark.python.profile` is not set. +>>> from pyspark.context import SparkContext +>>> from pyspark.serializers import MarshalSerializer +>>> sc = SparkContext('local', 'test', profiler=MyCustomProfiler) +>>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10) +[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] +>>> sc.show_profiles() +>>> sc.stop() + +""" + + +import cProfile +import pstats +import os +from pyspark.accumulators import PStatsParam + + +class BasicProfiler(object): + + @staticmethod + def profile(to_profile): + pr = cProfile.Profile() + pr.runcall(to_profile) + st = pstats.Stats(pr) + st.stream = None # make it picklable + st.strip_dirs() + return st + + @staticmethod + def show_profiles(profilers): + """ Print the profile stats to stdout """ + for i, (id, profiler, showed) in enumerate(profilers): + stats = profiler._accumulator.value + if not showed and stats: + print "=" * 60 + print "Profile of RDD" % id + print "=" * 60 + stats.sort_stats("time", "cumulative").print_stats() + # mark it as showed + profilers[i][2] = True + + @staticmethod + def dump_profiles(path, profilers): + """ Dump the profilers stats into directory `path` """ + if not os.path.exists(path): + os.makedirs(path) + for id, profiler, _ in profilers: + stats = profiler._accumulator.value + if stats: + p = os.path.join(path, "rdd_%d.pstats" % id) + stats.dump_stats(p) + + def new_profile_accumulator(self, ctx): + self._accumulator = ctx.accumulator(None, PStatsParam) + + def add(self, accum_value): + self._accumulator.add(accum_value) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 08d047402625f..9f1881d676747 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -31,7 +31,6 @@ from random import Random from math import sqrt, log, isinf, isnan -from pyspark.accumulators import PStatsParam from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, AutoBatchedSerializer @@ -2072,9 +2071,14 @@ def _jrdd(self): return self._jrdd_val if self._bypass_serializer: self._jrdd_deserializer = NoOpSerializer() - enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" - profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None - command = (self.func, profileStats, self._prev_jrdd_deserializer, + + if self.ctx.profiler: + profiler = self.ctx.profiler() + profiler.new_profile_accumulator(self.ctx) + else: + profiler = None + + command = (self.func, profiler, self._prev_jrdd_deserializer, self._jrdd_deserializer) # the serialized command will be compressed by broadcast ser = CloudPickleSerializer() @@ -2097,9 +2101,9 @@ def _jrdd(self): broadcast_vars, self.ctx._javaAccumulator) self._jrdd_val = python_rdd.asJavaRDD() - if enable_profile: + if profiler: self._id = self._jrdd_val.id() - self.ctx._add_profile(self._id, profileStats) + self.ctx._add_profiler(self._id, profiler) return self._jrdd_val def id(self): diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 491e445a216bf..5045e4da7c246 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -52,6 +52,7 @@ from pyspark.sql import SQLContext, IntegerType, Row, ArrayType, StructType, StructField, \ UserDefinedType, DoubleType from pyspark import shuffle +from pyspark.pyprofiler import BasicProfiler _have_scipy = False _have_numpy = False @@ -670,22 +671,17 @@ def test_sample(self): class ProfilerTests(PySparkTestCase): - def setUp(self): + def test_profiler(self): self._old_sys_path = list(sys.path) class_name = self.__class__.__name__ conf = SparkConf().set("spark.python.profile", "true") self.sc = SparkContext('local[4]', class_name, conf=conf) - def test_profiler(self): + self.do_computation() - def heavy_foo(x): - for i in range(1 << 20): - x = 1 - rdd = self.sc.parallelize(range(100)) - rdd.foreach(heavy_foo) - profiles = self.sc._profile_stats - self.assertEqual(1, len(profiles)) - id, acc, _ = profiles[0] + profilers = self.sc._profile_stats + self.assertEqual(1, len(profilers)) + id, acc, _ = profilers[0] stats = acc.value self.assertTrue(stats is not None) width, stat_list = stats.get_print_list([]) @@ -697,6 +693,35 @@ def heavy_foo(x): self.sc.dump_profiles(d) self.assertTrue("rdd_%d.pstats" % id in os.listdir(d)) + def test_custom_profiler(self): + class TestCustomProfiler(BasicProfiler): + def show_profiles(self, profilers): + return "Custom formatting" + + + self._old_sys_path = list(sys.path) + class_name = self.__class__.__name__ + conf = SparkConf().set("spark.python.profile", "true") + self.sc = SparkContext('local[4]', class_name, conf=conf, profiler=TestCustomProfiler) + + self.do_computation() + + profilers = self.sc._profile_stats + self.assertEqual(1, len(profilers)) + id, profiler, _ = profilers[0] + self.assertTrue(isinstance(profiler, TestCustomProfiler)) + + self.assertEqual("Custom formatting", self.sc.show_profiles()) + + + def do_computation(self): + def heavy_foo(x): + for i in range(1 << 20): + x = 1 + + rdd = self.sc.parallelize(range(100)) + rdd.foreach(heavy_foo) + class ExamplePointUDT(UserDefinedType): """ diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 2bdccb5e93f09..563ed1c63774b 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -23,8 +23,6 @@ import time import socket import traceback -import cProfile -import pstats from pyspark.accumulators import _accumulatorRegistry from pyspark.broadcast import Broadcast, _broadcastRegistry @@ -92,19 +90,16 @@ def main(infile, outfile): command = pickleSer._read_with_length(infile) if isinstance(command, Broadcast): command = pickleSer.loads(command.value) - (func, stats, deserializer, serializer) = command + (func, profiler, deserializer, serializer) = command init_time = time.time() def process(): iterator = deserializer.load_stream(infile) serializer.dump_stream(func(split_index, iterator), outfile) - if stats: - p = cProfile.Profile() - p.runcall(process) - st = pstats.Stats(p) - st.stream = None # make it picklable - stats.add(st.strip_dirs()) + if profiler: + st = profiler.profile(process) + profiler.add(st) else: process() except Exception: From 8739affbe9d64d252fb1a32d32239b6281f75bab Mon Sep 17 00:00:00 2001 From: Yandu Oppacher Date: Thu, 13 Nov 2014 22:22:26 -0500 Subject: [PATCH 02/11] Code review fixes --- docs/configuration.md | 32 ++++++------- python/pyspark/context.py | 2 +- python/pyspark/{pyprofiler.py => profiler.py} | 45 +++++++++++++++---- python/pyspark/tests.py | 25 +++++------ python/run-tests | 1 + 5 files changed, 65 insertions(+), 40 deletions(-) rename python/pyspark/{pyprofiler.py => profiler.py} (61%) diff --git a/docs/configuration.md b/docs/configuration.md index 8ec3ce6f7fe1f..1046fcdee4792 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -23,8 +23,8 @@ application. These properties can be set directly on a (e.g. master URL and application name), as well as arbitrary key-value pairs through the `set()` method. For example, we could initialize an application with two threads as follows: -Note that we run with local[2], meaning two threads - which represents "minimal" parallelism, -which can help detect bugs that only exist when we run in a distributed context. +Note that we run with local[2], meaning two threads - which represents "minimal" parallelism, +which can help detect bugs that only exist when we run in a distributed context. {% highlight scala %} val conf = new SparkConf() @@ -35,7 +35,7 @@ val sc = new SparkContext(conf) {% endhighlight %} Note that we can have more than 1 thread in local mode, and in cases like spark streaming, we may actually -require one to prevent any sort of starvation issues. +require one to prevent any sort of starvation issues. ## Dynamically Loading Spark Properties In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For @@ -48,8 +48,8 @@ val sc = new SparkContext(new SparkConf()) Then, you can supply configuration values at runtime: {% highlight bash %} -./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false - --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar +./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false + --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar {% endhighlight %} The Spark shell and [`spark-submit`](cluster-overview.html#launching-applications-with-spark-submit) @@ -123,7 +123,7 @@ of the most common options to set are: Limit of total size of serialized results of all partitions for each Spark action (e.g. collect). Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size - is above this limit. + is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors. @@ -244,7 +244,7 @@ Apart from these, the following properties are also available, and may be useful `sc.dump_profiles(path)`. If some of the profile results had been displayed maually, they will not be displayed automatically before driver exiting. - By default the `pyspark.pyprofiler.BasicProfiler` will be used, but this can be overridden by + By default the `pyspark.profiler.BasicProfiler` will be used, but this can be overridden by passing a profiler class in as a parameter to the `SparkContext` constructor. @@ -252,7 +252,7 @@ Apart from these, the following properties are also available, and may be useful spark.python.profile.dump (none) - The directory which is used to dump the profile result before driver exiting. + The directory which is used to dump the profile result before driver exiting. The results will be dumped as separated file for each RDD. They can be loaded by ptats.Stats(). If this is specified, the profile result will not be displayed automatically. @@ -271,8 +271,8 @@ Apart from these, the following properties are also available, and may be useful spark.executorEnv.[EnvironmentVariableName] (none) - Add the environment variable specified by EnvironmentVariableName to the Executor - process. The user can specify multiple of these and to set multiple environment variables. + Add the environment variable specified by EnvironmentVariableName to the Executor + process. The user can specify multiple of these and to set multiple environment variables. @@ -477,9 +477,9 @@ Apart from these, the following properties are also available, and may be useful The codec used to compress internal data such as RDD partitions, broadcast variables and shuffle outputs. By default, Spark provides three codecs: lz4, lzf, - and snappy. You can also use fully qualified class names to specify the codec, - e.g. - org.apache.spark.io.LZ4CompressionCodec, + and snappy. You can also use fully qualified class names to specify the codec, + e.g. + org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, and org.apache.spark.io.SnappyCompressionCodec. @@ -947,7 +947,7 @@ Apart from these, the following properties are also available, and may be useful (resources are executors in yarn mode, CPU cores in standalone mode) to wait for before scheduling begins. Specified as a double between 0 and 1. Regardless of whether the minimum ratio of resources has been reached, - the maximum amount of time it will wait before scheduling begins is controlled by config + the maximum amount of time it will wait before scheduling begins is controlled by config spark.scheduler.maxRegisteredResourcesWaitingTime. @@ -956,7 +956,7 @@ Apart from these, the following properties are also available, and may be useful 30000 Maximum amount of time to wait for resources to register before scheduling begins - (in milliseconds). + (in milliseconds). @@ -1025,7 +1025,7 @@ Apart from these, the following properties are also available, and may be useful false Whether Spark acls should are enabled. If enabled, this checks to see if the user has - access permissions to view or modify the job. Note this requires the user to be known, + access permissions to view or modify the job. Note this requires the user to be known, so if the user comes across as null no checks are done. Filters can be used with the UI to authenticate and set the user. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 2f1b122f6dac9..441731e03ae0f 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -33,7 +33,7 @@ from pyspark.storagelevel import StorageLevel from pyspark.rdd import RDD from pyspark.traceback_utils import CallSite, first_spark_call -from pyspark.pyprofiler import BasicProfiler +from pyspark.profiler import BasicProfiler from py4j.java_collections import ListConverter diff --git a/python/pyspark/pyprofiler.py b/python/pyspark/profiler.py similarity index 61% rename from python/pyspark/pyprofiler.py rename to python/pyspark/profiler.py index b3c16a4a434fd..16c44a8cefc50 100644 --- a/python/pyspark/pyprofiler.py +++ b/python/pyspark/profiler.py @@ -16,20 +16,21 @@ # """ -PySpark supports custom profilers, this is to allow for different profilers to -be used as well as outputting to different formats than what is provided in the -BasicProfiler. - -The profiler class is chosen when creating L{SparkContext}: -NOTE: This has no effect if `spark.python.profile` is not set. >>> from pyspark.context import SparkContext ->>> from pyspark.serializers import MarshalSerializer ->>> sc = SparkContext('local', 'test', profiler=MyCustomProfiler) +>>> from pyspark.conf import SparkConf +>>> from pyspark.profiler import BasicProfiler +>>> class MyCustomProfiler(BasicProfiler): +... @staticmethod +... def show_profiles(profilers): +... print "My custom profiles" +... +>>> conf = SparkConf().set("spark.python.profile", "true") +>>> sc = SparkContext('local', 'test', conf=conf, profiler=MyCustomProfiler) >>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] >>> sc.show_profiles() +My custom profiles >>> sc.stop() - """ @@ -40,9 +41,30 @@ class BasicProfiler(object): + """ + + :: DeveloperApi :: + + PySpark supports custom profilers, this is to allow for different profilers to + be used as well as outputting to different formats than what is provided in the + BasicProfiler. + + A custom profiler has to define the following static methods: + profile - will produce a system profile of some sort. + show_profiles - shows all collected profiles in a readable format + dump_profiles - dumps the provided profiles to a path + + and the following instance methods: + new_profile_accumulator - produces a new accumulator that can be used to combine the + profiles of partitions on a per stage basis + add - adds a profile to the existing accumulated profile + + The profiler class is chosen when creating L{SparkContext}: + """ @staticmethod def profile(to_profile): + """ Runs and profiles the method to_profile passed in. A profile object is returned. """ pr = cProfile.Profile() pr.runcall(to_profile) st = pstats.Stats(pr) @@ -75,7 +97,12 @@ def dump_profiles(path, profilers): stats.dump_stats(p) def new_profile_accumulator(self, ctx): + """ + Creates a new accumulator for combining the profiles of different + partitions of a stage + """ self._accumulator = ctx.accumulator(None, PStatsParam) def add(self, accum_value): + """ Adds a new profile to the existing accumulated value """ self._accumulator.add(accum_value) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 5045e4da7c246..d0f6b24bd95f7 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -52,7 +52,7 @@ from pyspark.sql import SQLContext, IntegerType, Row, ArrayType, StructType, StructField, \ UserDefinedType, DoubleType from pyspark import shuffle -from pyspark.pyprofiler import BasicProfiler +from pyspark.profiler import BasicProfiler _have_scipy = False _have_numpy = False @@ -671,12 +671,14 @@ def test_sample(self): class ProfilerTests(PySparkTestCase): - def test_profiler(self): + def setUp(self): self._old_sys_path = list(sys.path) class_name = self.__class__.__name__ conf = SparkConf().set("spark.python.profile", "true") self.sc = SparkContext('local[4]', class_name, conf=conf) + + def test_profiler(self): self.do_computation() profilers = self.sc._profile_stats @@ -698,11 +700,7 @@ class TestCustomProfiler(BasicProfiler): def show_profiles(self, profilers): return "Custom formatting" - - self._old_sys_path = list(sys.path) - class_name = self.__class__.__name__ - conf = SparkConf().set("spark.python.profile", "true") - self.sc = SparkContext('local[4]', class_name, conf=conf, profiler=TestCustomProfiler) + self.sc.profiler = TestCustomProfiler self.do_computation() @@ -713,14 +711,13 @@ def show_profiles(self, profilers): self.assertEqual("Custom formatting", self.sc.show_profiles()) + def do_computation(self): + def heavy_foo(x): + for i in range(1 << 20): + x = 1 - def do_computation(self): - def heavy_foo(x): - for i in range(1 << 20): - x = 1 - - rdd = self.sc.parallelize(range(100)) - rdd.foreach(heavy_foo) + rdd = self.sc.parallelize(range(100)) + rdd.foreach(heavy_foo) class ExamplePointUDT(UserDefinedType): diff --git a/python/run-tests b/python/run-tests index e66854b44dfa6..d3ca5e341a7e2 100755 --- a/python/run-tests +++ b/python/run-tests @@ -57,6 +57,7 @@ function run_core_tests() { PYSPARK_DOC_TEST=1 run_test "pyspark/broadcast.py" PYSPARK_DOC_TEST=1 run_test "pyspark/accumulators.py" PYSPARK_DOC_TEST=1 run_test "pyspark/serializers.py" + PYSPARK_DOC_TEST=1 run_test "pyspark/profiler.py" run_test "pyspark/shuffle.py" run_test "pyspark/tests.py" } From 9ace076444e03598cab0b4bb955e7bd25b17a192 Mon Sep 17 00:00:00 2001 From: Yandu Oppacher Date: Fri, 14 Nov 2014 11:15:36 -0500 Subject: [PATCH 03/11] Refactor of profiler, and moved tests around --- python/pyspark/__init__.py | 2 + python/pyspark/context.py | 11 +++++- python/pyspark/profiler.py | 79 +++++++++++++++++--------------------- python/pyspark/rdd.py | 3 +- 4 files changed, 48 insertions(+), 47 deletions(-) diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 9556e4718e585..6149e90aa9f3e 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -45,6 +45,7 @@ from pyspark.accumulators import Accumulator, AccumulatorParam from pyspark.broadcast import Broadcast from pyspark.serializers import MarshalSerializer, PickleSerializer +from pyspark.profiler import BasicProfiler # for back compatibility from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row @@ -52,4 +53,5 @@ __all__ = [ "SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast", "Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer", + "BasicProfiler", ] diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 441731e03ae0f..6ea116e2effe6 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -826,10 +826,17 @@ def _add_profiler(self, id, profiler): self._profile_stats.append([id, profiler, False]) def show_profiles(self): - self.profiler.show_profiles(self._profile_stats) + """ Print the profile stats to stdout """ + for i, (id, profiler, showed) in enumerate(self._profile_stats): + if not showed and profiler: + profiler.show(id) + # mark it as showed + self._profile_stats[i][2] = True + def dump_profiles(self, path): - self.profiler.dump_profiles(path, self._profile_stats) + for id, profiler, _ in self._profile_stats: + profiler.dump(id, path) self._profile_stats = [] diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py index 16c44a8cefc50..d4a88fe85a225 100644 --- a/python/pyspark/profiler.py +++ b/python/pyspark/profiler.py @@ -15,25 +15,6 @@ # limitations under the License. # -""" ->>> from pyspark.context import SparkContext ->>> from pyspark.conf import SparkConf ->>> from pyspark.profiler import BasicProfiler ->>> class MyCustomProfiler(BasicProfiler): -... @staticmethod -... def show_profiles(profilers): -... print "My custom profiles" -... ->>> conf = SparkConf().set("spark.python.profile", "true") ->>> sc = SparkContext('local', 'test', conf=conf, profiler=MyCustomProfiler) ->>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10) -[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] ->>> sc.show_profiles() -My custom profiles ->>> sc.stop() -""" - - import cProfile import pstats import os @@ -59,11 +40,29 @@ class BasicProfiler(object): profiles of partitions on a per stage basis add - adds a profile to the existing accumulated profile - The profiler class is chosen when creating L{SparkContext}: + The profiler class is chosen when creating a SparkContext + + >>> from pyspark.context import SparkContext + >>> from pyspark.conf import SparkConf + >>> from pyspark.profiler import BasicProfiler + >>> class MyCustomProfiler(BasicProfiler): + ... def show(self, id): + ... print "My custom profiles for RDD:%s" % id + ... + >>> conf = SparkConf().set("spark.python.profile", "true") + >>> sc = SparkContext('local', 'test', conf=conf, profiler=MyCustomProfiler) + >>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10) + [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] + >>> sc.show_profiles() + My custom profiles for RDD:1 + My custom profiles for RDD:2 + >>> sc.stop() """ - @staticmethod - def profile(to_profile): + def __init__(self, ctx): + self._new_profile_accumulator(ctx) + + def profile(self, to_profile): """ Runs and profiles the method to_profile passed in. A profile object is returned. """ pr = cProfile.Profile() pr.runcall(to_profile) @@ -72,31 +71,25 @@ def profile(to_profile): st.strip_dirs() return st - @staticmethod - def show_profiles(profilers): - """ Print the profile stats to stdout """ - for i, (id, profiler, showed) in enumerate(profilers): - stats = profiler._accumulator.value - if not showed and stats: - print "=" * 60 - print "Profile of RDD" % id - print "=" * 60 - stats.sort_stats("time", "cumulative").print_stats() - # mark it as showed - profilers[i][2] = True + def show(self, id): + """ Print the profile stats to stdout, id is the RDD id """ + stats = self._accumulator.value + if stats: + print "=" * 60 + print "Profile of RDD" % id + print "=" * 60 + stats.sort_stats("time", "cumulative").print_stats() - @staticmethod - def dump_profiles(path, profilers): - """ Dump the profilers stats into directory `path` """ + def dump(self, id, path): + """ Dump the profile into path, id is the RDD id """ if not os.path.exists(path): os.makedirs(path) - for id, profiler, _ in profilers: - stats = profiler._accumulator.value - if stats: - p = os.path.join(path, "rdd_%d.pstats" % id) - stats.dump_stats(p) + stats = self._accumulator.value + if stats: + p = os.path.join(path, "rdd_%d.pstats" % id) + stats.dump_stats(p) - def new_profile_accumulator(self, ctx): + def _new_profile_accumulator(self, ctx): """ Creates a new accumulator for combining the profiles of different partitions of a stage diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 9f1881d676747..c6e546795cc6f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2073,8 +2073,7 @@ def _jrdd(self): self._jrdd_deserializer = NoOpSerializer() if self.ctx.profiler: - profiler = self.ctx.profiler() - profiler.new_profile_accumulator(self.ctx) + profiler = self.ctx.profiler(self.ctx) else: profiler = None From 9eefc365a815c3a0dbaee34d5cf4bbad61570d97 Mon Sep 17 00:00:00 2001 From: Yandu Oppacher Date: Fri, 14 Nov 2014 11:18:25 -0500 Subject: [PATCH 04/11] Fix doc --- python/pyspark/profiler.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py index d4a88fe85a225..2f43406202600 100644 --- a/python/pyspark/profiler.py +++ b/python/pyspark/profiler.py @@ -30,14 +30,10 @@ class BasicProfiler(object): be used as well as outputting to different formats than what is provided in the BasicProfiler. - A custom profiler has to define the following static methods: + A custom profiler has to define or inherit the following methods: profile - will produce a system profile of some sort. - show_profiles - shows all collected profiles in a readable format - dump_profiles - dumps the provided profiles to a path - - and the following instance methods: - new_profile_accumulator - produces a new accumulator that can be used to combine the - profiles of partitions on a per stage basis + show - shows collected profiles for this profiler in a readable format + dump - dumps the profiles to a path add - adds a profile to the existing accumulated profile The profiler class is chosen when creating a SparkContext From 76a6c37cc68f6ef036f3a7fe868112db5dcc2514 Mon Sep 17 00:00:00 2001 From: Yandu Oppacher Date: Fri, 14 Nov 2014 13:48:40 -0500 Subject: [PATCH 05/11] Added a profile collector to accumulate the profilers per stage --- python/pyspark/context.py | 26 ++++++++----------------- python/pyspark/profiler.py | 39 ++++++++++++++++++++++++++++++++++++++ python/pyspark/rdd.py | 6 +++--- python/pyspark/tests.py | 6 +++--- 4 files changed, 53 insertions(+), 24 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 6ea116e2effe6..27b110124e127 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -33,7 +33,7 @@ from pyspark.storagelevel import StorageLevel from pyspark.rdd import RDD from pyspark.traceback_utils import CallSite, first_spark_call -from pyspark.profiler import BasicProfiler +from pyspark.profiler import ProfilerCollector from py4j.java_collections import ListConverter @@ -192,14 +192,12 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() - # profiling stats collected for each PythonRDD if self._conf.get("spark.python.profile", "false") == "true": - self.profiler = profiler if profiler else BasicProfiler + self.profiler_collector = ProfilerCollector(profiler) + self.profiler_collector.profiles_dump_path = self._conf.get("spark.python.profile.dump", None) else: - self.profiler = None - - self._profile_stats = [] + self.profiler_collector = None def _initialize_context(self, jconf): """ @@ -826,18 +824,10 @@ def _add_profiler(self, id, profiler): self._profile_stats.append([id, profiler, False]) def show_profiles(self): - """ Print the profile stats to stdout """ - for i, (id, profiler, showed) in enumerate(self._profile_stats): - if not showed and profiler: - profiler.show(id) - # mark it as showed - self._profile_stats[i][2] = True - - - def dump_profiles(self, path): - for id, profiler, _ in self._profile_stats: - profiler.dump(id, path) - self._profile_stats = [] + self.profiler_collector.show_profiles() + + def dump_profiles(self): + self.profiler_collector.dump_profiles() def _test(): diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py index 2f43406202600..c78a693026442 100644 --- a/python/pyspark/profiler.py +++ b/python/pyspark/profiler.py @@ -18,9 +18,48 @@ import cProfile import pstats import os +import atexit from pyspark.accumulators import PStatsParam +class ProfilerCollector(object): + """ + This class keeps track of different profilers on a per + stage basis. Also this is used to create new profilers for + the different stages. + """ + + def __init__(self, profiler): + self.profilers = [] + self.profile_dump_path = None + self.profiler = profiler if profiler else BasicProfiler + + def add_profiler(self, id, profiler): + if not self.profilers: + if self.profile_dump_path: + atexit.register(self.dump_profiles) + else: + atexit.register(self.show_profiles) + + self.profilers.append([id, profiler, False]) + + def dump_profiles(self): + for id, profiler, _ in self.profilers: + profiler.dump(id, self.profile_dump_path) + self.profilers = [] + + def show_profiles(self): + """ Print the profile stats to stdout """ + for i, (id, profiler, showed) in enumerate(self.profilers): + if not showed and profiler: + profiler.show(id) + # mark it as showed + self.profilers[i][2] = True + + def new_profiler(self, ctx): + return self.profiler(ctx) + + class BasicProfiler(object): """ diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index c6e546795cc6f..c69c6ff124432 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2072,8 +2072,8 @@ def _jrdd(self): if self._bypass_serializer: self._jrdd_deserializer = NoOpSerializer() - if self.ctx.profiler: - profiler = self.ctx.profiler(self.ctx) + if self.ctx.profiler_collector: + profiler = self.ctx.profiler_collector.new_profiler(self.ctx) else: profiler = None @@ -2102,7 +2102,7 @@ def _jrdd(self): if profiler: self._id = self._jrdd_val.id() - self.ctx._add_profiler(self._id, profiler) + self.ctx.profiler_collector.add_profiler(self._id, profiler) return self._jrdd_val def id(self): diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index d0f6b24bd95f7..2daf254fe9064 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -681,7 +681,7 @@ def setUp(self): def test_profiler(self): self.do_computation() - profilers = self.sc._profile_stats + profilers = self.sc.profiler_collector.profilers self.assertEqual(1, len(profilers)) id, acc, _ = profilers[0] stats = acc.value @@ -700,11 +700,11 @@ class TestCustomProfiler(BasicProfiler): def show_profiles(self, profilers): return "Custom formatting" - self.sc.profiler = TestCustomProfiler + self.sc.profiler_collector.profiler = TestCustomProfiler self.do_computation() - profilers = self.sc._profile_stats + profilers = self.sc.profiler_collector.profilers self.assertEqual(1, len(profilers)) id, profiler, _ = profilers[0] self.assertTrue(isinstance(profiler, TestCustomProfiler)) From 0864b5d7edc420eff882dfb36f657cb8c286f358 Mon Sep 17 00:00:00 2001 From: Yandu Oppacher Date: Fri, 14 Nov 2014 15:34:11 -0500 Subject: [PATCH 06/11] Remove unused method --- python/pyspark/context.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 27b110124e127..063b0740b88ad 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -20,7 +20,6 @@ import sys from threading import Lock from tempfile import NamedTemporaryFile -import atexit from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -813,16 +812,6 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) - def _add_profiler(self, id, profiler): - if not self._profile_stats: - dump_path = self._conf.get("spark.python.profile.dump") - if dump_path: - atexit.register(self.dump_profiles, dump_path) - else: - atexit.register(self.show_profiles) - - self._profile_stats.append([id, profiler, False]) - def show_profiles(self): self.profiler_collector.show_profiles() From 6a5d4df5870a97684a87f5717540dbbaf80ccff5 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 5 Jan 2015 16:09:57 -0800 Subject: [PATCH 07/11] refactor and fix tests --- python/pyspark/__init__.py | 4 +- python/pyspark/accumulators.py | 15 ------ python/pyspark/context.py | 5 +- python/pyspark/profiler.py | 93 +++++++++++++++++++++++----------- python/pyspark/tests.py | 13 ++--- python/pyspark/worker.py | 3 +- 6 files changed, 76 insertions(+), 57 deletions(-) diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 6149e90aa9f3e..d3efcdf221d82 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -45,7 +45,7 @@ from pyspark.accumulators import Accumulator, AccumulatorParam from pyspark.broadcast import Broadcast from pyspark.serializers import MarshalSerializer, PickleSerializer -from pyspark.profiler import BasicProfiler +from pyspark.profiler import Profiler, BasicProfiler # for back compatibility from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row @@ -53,5 +53,5 @@ __all__ = [ "SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast", "Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer", - "BasicProfiler", + "Profiler", "BasicProfiler", ] diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index b8cdbbe3cf2b6..ccbca67656c8d 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -215,21 +215,6 @@ def addInPlace(self, value1, value2): COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j) -class PStatsParam(AccumulatorParam): - """PStatsParam is used to merge pstats.Stats""" - - @staticmethod - def zero(value): - return None - - @staticmethod - def addInPlace(value1, value2): - if value1 is None: - return value2 - value1.add(value2) - return value1 - - class _UpdateRequestHandler(SocketServer.StreamRequestHandler): """ diff --git a/python/pyspark/context.py b/python/pyspark/context.py index e9f3a793f1b3a..f300c7c71a403 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -824,10 +824,11 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): return list(mappedRDD._collect_iterator_through_file(it)) def show_profiles(self): + self.profiler_collector.show_profiles() - def dump_profiles(self): - self.profiler_collector.dump_profiles() + def dump_profiles(self, path): + self.profiler_collector.dump_profiles(path) def _test(): diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py index c78a693026442..28484fd2d9406 100644 --- a/python/pyspark/profiler.py +++ b/python/pyspark/profiler.py @@ -19,7 +19,8 @@ import pstats import os import atexit -from pyspark.accumulators import PStatsParam + +from pyspark.accumulators import AccumulatorParam class ProfilerCollector(object): @@ -37,15 +38,15 @@ def __init__(self, profiler): def add_profiler(self, id, profiler): if not self.profilers: if self.profile_dump_path: - atexit.register(self.dump_profiles) + atexit.register(self.dump_profiles, self.profile_dump_path) else: atexit.register(self.show_profiles) self.profilers.append([id, profiler, False]) - def dump_profiles(self): + def dump_profiles(self, path): for id, profiler, _ in self.profilers: - profiler.dump(id, self.profile_dump_path) + profiler.dump(id, path) self.profilers = [] def show_profiles(self): @@ -60,10 +61,9 @@ def new_profiler(self, ctx): return self.profiler(ctx) -class BasicProfiler(object): +class Profiler(object): """ - - :: DeveloperApi :: + .. note:: DeveloperApi PySpark supports custom profilers, this is to allow for different profilers to be used as well as outputting to different formats than what is provided in the @@ -71,15 +71,14 @@ class BasicProfiler(object): A custom profiler has to define or inherit the following methods: profile - will produce a system profile of some sort. - show - shows collected profiles for this profiler in a readable format + stats - return the collected stats. dump - dumps the profiles to a path add - adds a profile to the existing accumulated profile The profiler class is chosen when creating a SparkContext - >>> from pyspark.context import SparkContext - >>> from pyspark.conf import SparkConf - >>> from pyspark.profiler import BasicProfiler + >>> from pyspark import SparkConf, SparkContext + >>> from pyspark import BasicProfiler >>> class MyCustomProfiler(BasicProfiler): ... def show(self, id): ... print "My custom profiles for RDD:%s" % id @@ -95,20 +94,19 @@ class BasicProfiler(object): """ def __init__(self, ctx): - self._new_profile_accumulator(ctx) + pass - def profile(self, to_profile): - """ Runs and profiles the method to_profile passed in. A profile object is returned. """ - pr = cProfile.Profile() - pr.runcall(to_profile) - st = pstats.Stats(pr) - st.stream = None # make it picklable - st.strip_dirs() - return st + def profile(self, func): + """ Do profiling on the function `func`""" + raise NotImplemented + + def stats(self): + """ Return the collected profiling stats (pstats.Stats)""" + raise NotImplemented def show(self, id): """ Print the profile stats to stdout, id is the RDD id """ - stats = self._accumulator.value + stats = self.stats() if stats: print "=" * 60 print "Profile of RDD" % id @@ -119,18 +117,53 @@ def dump(self, id, path): """ Dump the profile into path, id is the RDD id """ if not os.path.exists(path): os.makedirs(path) - stats = self._accumulator.value + stats = self.stats() if stats: p = os.path.join(path, "rdd_%d.pstats" % id) stats.dump_stats(p) - def _new_profile_accumulator(self, ctx): - """ - Creates a new accumulator for combining the profiles of different - partitions of a stage - """ + +class PStatsParam(AccumulatorParam): + """PStatsParam is used to merge pstats.Stats""" + + @staticmethod + def zero(value): + return None + + @staticmethod + def addInPlace(value1, value2): + if value1 is None: + return value2 + value1.add(value2) + return value1 + + +class BasicProfiler(Profiler): + """ + BasicProfiler is the default profiler, which is implemented based on + cProfile and Accumulator + """ + def __init__(self, ctx): + Profiler.__init__(self, ctx) + # Creates a new accumulator for combining the profiles of different + # partitions of a stage self._accumulator = ctx.accumulator(None, PStatsParam) - def add(self, accum_value): - """ Adds a new profile to the existing accumulated value """ - self._accumulator.add(accum_value) + def profile(self, func): + """ Runs and profiles the method to_profile passed in. A profile object is returned. """ + pr = cProfile.Profile() + pr.runcall(func) + st = pstats.Stats(pr) + st.stream = None # make it picklable + st.strip_dirs() + + # Adds a new profile to the existing accumulated value + self._accumulator.add(st) + + def stats(self): + return self._accumulator.value + + +if __name__ == "__main__": + import doctest + doctest.testmod() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index de8db5eee2fa2..e34ffe27a9bed 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -729,8 +729,8 @@ def test_profiler(self): profilers = self.sc.profiler_collector.profilers self.assertEqual(1, len(profilers)) - id, acc, _ = profilers[0] - stats = acc.value + id, profiler, _ = profilers[0] + stats = profiler.stats() self.assertTrue(stats is not None) width, stat_list = stats.get_print_list([]) func_names = [func_name for fname, n, func_name in stat_list] @@ -743,8 +743,8 @@ def test_profiler(self): def test_custom_profiler(self): class TestCustomProfiler(BasicProfiler): - def show_profiles(self, profilers): - return "Custom formatting" + def show(self, id): + self.result = "Custom formatting" self.sc.profiler_collector.profiler = TestCustomProfiler @@ -752,10 +752,11 @@ def show_profiles(self, profilers): profilers = self.sc.profiler_collector.profilers self.assertEqual(1, len(profilers)) - id, profiler, _ = profilers[0] + _, profiler, _ = profilers[0] self.assertTrue(isinstance(profiler, TestCustomProfiler)) - self.assertEqual("Custom formatting", self.sc.show_profiles()) + self.sc.show_profiles() + self.assertEqual("Custom formatting", profiler.result) def do_computation(self): def heavy_foo(x): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 096b3246237aa..8a93c320ec5d3 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -96,8 +96,7 @@ def process(): serializer.dump_stream(func(split_index, iterator), outfile) if profiler: - st = profiler.profile(process) - profiler.add(st) + profiler.profile(process) else: process() except Exception: From 349e341a76960963ecb1ecb9c6c620543f4970b2 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 5 Jan 2015 16:15:30 -0800 Subject: [PATCH 08/11] more refactor --- python/pyspark/context.py | 4 +++- python/pyspark/profiler.py | 13 ++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index f300c7c71a403..0f31c6b6cc9ca 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -824,10 +824,12 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): return list(mappedRDD._collect_iterator_through_file(it)) def show_profiles(self): - + """ Print the profile stats to stdout """ self.profiler_collector.show_profiles() def dump_profiles(self, path): + """ Dump the profile stats into directory `path` + """ self.profiler_collector.dump_profiles(path) diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py index 28484fd2d9406..5cf1e6f8200cc 100644 --- a/python/pyspark/profiler.py +++ b/python/pyspark/profiler.py @@ -30,12 +30,17 @@ class ProfilerCollector(object): the different stages. """ - def __init__(self, profiler): + def __init__(self, profiler_cls): self.profilers = [] self.profile_dump_path = None - self.profiler = profiler if profiler else BasicProfiler + self.profiler_cls = profiler_cls if profiler_cls else BasicProfiler + + def new_profiler(self, ctx): + """ Create a new profiler using class `profiler_cls` """ + return self.profiler_cls(ctx) def add_profiler(self, id, profiler): + """ Add a profiler for RDD `id` """ if not self.profilers: if self.profile_dump_path: atexit.register(self.dump_profiles, self.profile_dump_path) @@ -45,6 +50,7 @@ def add_profiler(self, id, profiler): self.profilers.append([id, profiler, False]) def dump_profiles(self, path): + """ Dump the profile stats into directory `path` """ for id, profiler, _ in self.profilers: profiler.dump(id, path) self.profilers = [] @@ -57,9 +63,6 @@ def show_profiles(self): # mark it as showed self.profilers[i][2] = True - def new_profiler(self, ctx): - return self.profiler(ctx) - class Profiler(object): """ From 2700e474614ab16a0e69470c054ba01fe23e5498 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 5 Jan 2015 16:19:42 -0800 Subject: [PATCH 09/11] use BasicProfiler as default --- python/pyspark/context.py | 13 ++++++------- python/pyspark/profiler.py | 6 +++--- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 0f31c6b6cc9ca..a67342c5c1947 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -32,7 +32,7 @@ from pyspark.storagelevel import StorageLevel from pyspark.rdd import RDD from pyspark.traceback_utils import CallSite, first_spark_call -from pyspark.profiler import ProfilerCollector +from pyspark.profiler import ProfilerCollector, BasicProfiler from py4j.java_collections import ListConverter @@ -66,7 +66,7 @@ class SparkContext(object): def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, - gateway=None, jsc=None, profiler=None): + gateway=None, jsc=None, profiler_cls=BasicProfiler): """ Create a new SparkContext. At least the master and app name should be set, either through the named parameters here or through C{conf}. @@ -102,14 +102,14 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, SparkContext._ensure_initialized(self, gateway=gateway) try: self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, - conf, jsc, profiler) + conf, jsc, profiler_cls) except: # If an error occurs, clean up in order to allow future SparkContext creation: self.stop() raise def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, - conf, jsc, profiler): + conf, jsc, profiler_cls): self.environment = environment or {} self._conf = conf or SparkConf(_jvm=self._jvm) self._batchSize = batchSize # -1 represents an unlimited batch size @@ -193,9 +193,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, # profiling stats collected for each PythonRDD if self._conf.get("spark.python.profile", "false") == "true": - self.profiler_collector = ProfilerCollector(profiler) - self.profiler_collector.profiles_dump_path = self._conf.get("spark.python.profile.dump", - None) + dump_path = self._conf.get("spark.python.profile.dump", None) + self.profiler_collector = ProfilerCollector(profiler_cls, dump_path) else: self.profiler_collector = None diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py index 5cf1e6f8200cc..97de37c1bf6bf 100644 --- a/python/pyspark/profiler.py +++ b/python/pyspark/profiler.py @@ -30,10 +30,10 @@ class ProfilerCollector(object): the different stages. """ - def __init__(self, profiler_cls): + def __init__(self, profiler_cls, dump_path=None): + self.profiler_cls = profiler_cls + self.profile_dump_path = dump_path self.profilers = [] - self.profile_dump_path = None - self.profiler_cls = profiler_cls if profiler_cls else BasicProfiler def new_profiler(self, ctx): """ Create a new profiler using class `profiler_cls` """ From 4b79ce82e7b7d6114ad35c8450c15cf6bd336a90 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 5 Jan 2015 16:23:55 -0800 Subject: [PATCH 10/11] add docstring for profiler_cls --- python/pyspark/context.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a67342c5c1947..145fd68ad4ae5 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -88,6 +88,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, :param conf: A L{SparkConf} object setting Spark properties. :param gateway: Use an existing gateway and JVM, otherwise a new JVM will be instantiated. + :param jsc: The JavaSparkContext instance (optional). + :param profiler_cls: A class of custom Profiler used to do profiling + (default is pyspark.profiler.BasicProfiler). >>> from pyspark.context import SparkContext From b4a9306d422c25d63ffdcaade7341580c2996147 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 5 Jan 2015 17:27:57 -0800 Subject: [PATCH 11/11] fix tests --- python/pyspark/profiler.py | 2 +- python/pyspark/tests.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py index 97de37c1bf6bf..4408996db0790 100644 --- a/python/pyspark/profiler.py +++ b/python/pyspark/profiler.py @@ -87,7 +87,7 @@ class Profiler(object): ... print "My custom profiles for RDD:%s" % id ... >>> conf = SparkConf().set("spark.python.profile", "true") - >>> sc = SparkContext('local', 'test', conf=conf, profiler=MyCustomProfiler) + >>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler) >>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] >>> sc.show_profiles() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index e34ffe27a9bed..f23b21ff6e502 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -746,7 +746,7 @@ class TestCustomProfiler(BasicProfiler): def show(self, id): self.result = "Custom formatting" - self.sc.profiler_collector.profiler = TestCustomProfiler + self.sc.profiler_collector.profiler_cls = TestCustomProfiler self.do_computation()