From 872ebdedc80a76b9bf064bda52af6074d16e7efc Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Mon, 23 May 2016 22:32:35 -0700 Subject: [PATCH 1/5] add python test with single test and remote debug param --- python/pyspark/debugutil.py | 26 +++++++++++++ python/pyspark/ml/tests.py | 2 + python/pyspark/mllib/tests.py | 2 + python/pyspark/sql/tests.py | 2 + python/pyspark/streaming/tests.py | 2 + python/pyspark/tests.py | 2 + python/run-tests.py | 61 ++++++++++++++++++++++++------- 7 files changed, 84 insertions(+), 13 deletions(-) create mode 100644 python/pyspark/debugutil.py diff --git a/python/pyspark/debugutil.py b/python/pyspark/debugutil.py new file mode 100644 index 0000000000000..97baa1bbfd91c --- /dev/null +++ b/python/pyspark/debugutil.py @@ -0,0 +1,26 @@ +import os + +HAS_CHECK_DEBUGGER = False +def check_debugger(): + global HAS_CHECK_DEBUGGER + + if HAS_CHECK_DEBUGGER: + return + + HAS_CHECK_DEBUGGER = True + env = dict(os.environ) + debug_server = env.get('PYTHON_REMOTE_DEBUG_SERVER') + debug_port = env.get('PYTHON_REMOTE_DEBUG_PORT') + + if debug_server != None and debug_port != None: + try: + import pydevd + print('connecting debug server %s, port %s' + % (debug_server, debug_port)) + pydevd.settrace(debug_server, + port=int(debug_port), + stdoutToServer=False, + stderrToServer=False) + except Exception, e: + print(e) + raise Exception('init debugger fail.' ) \ No newline at end of file diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index a7c93ac802726..64c02a539815f 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from pyspark.debugutil import check_debugger +check_debugger() """ Unit tests for Spark ML Python APIs. diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 74cf7bb8eaf9d..28d4c39b47957 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from pyspark.debugutil import check_debugger +check_debugger() """ Fuller unit tests for Python MLlib. diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1790432edd5dc..f93fafabc81bf 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from pyspark.debugutil import check_debugger +check_debugger() """ Unit tests for pyspark.sql; additional tests are implemented as doctests in diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 360ba1e7167cb..0c79da362623a 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from pyspark.debugutil import check_debugger +check_debugger() import glob import os diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 97ea39dde05fa..241573ba2337e 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from pyspark.debugutil import check_debugger +check_debugger() """ Unit tests for PySpark; additional tests are implemented as doctests in diff --git a/python/run-tests.py b/python/run-tests.py index 38b3bb84c10be..72e59eab61158 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -32,6 +32,9 @@ else: import queue as Queue +import pydevd +print("connecting to debug server, host %s, port %s" % ('n131', 5678)) +pydevd.settrace('n131',port=5678,stdoutToServer=False,stderrToServer=False) # Append `SPARK_HOME/dev` to the Python path so that we can import the sparktestsupport module sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../dev/")) @@ -63,7 +66,7 @@ def print_red(text): raise Exception("Cannot find assembly build directory, please build Spark first.") -def run_individual_python_test(test_name, pyspark_python): +def run_individual_python_test(test_name, pyspark_python, is_single_test=False, debug_server=None, debug_port=None): env = dict(os.environ) env.update({ 'SPARK_DIST_CLASSPATH': SPARK_DIST_CLASSPATH, @@ -72,10 +75,19 @@ def run_individual_python_test(test_name, pyspark_python): 'PYSPARK_PYTHON': which(pyspark_python), 'PYSPARK_DRIVER_PYTHON': which(pyspark_python) }) + if is_single_test: + if debug_server != None: + env.update({'PYTHON_REMOTE_DEBUG_SERVER': debug_server}) + if debug_port != None: + env.update({'PYTHON_REMOTE_DEBUG_PORT': '%d' % debug_port}) + LOGGER.debug("Starting test(%s): %s", pyspark_python, test_name) start_time = time.time() try: - per_test_output = tempfile.TemporaryFile() + if is_single_test: + per_test_output = None + else: + per_test_output = tempfile.TemporaryFile() retcode = subprocess.Popen( [os.path.join(SPARK_HOME, "bin/pyspark"), test_name], stderr=per_test_output, stdout=per_test_output, env=env).wait() @@ -88,16 +100,17 @@ def run_individual_python_test(test_name, pyspark_python): # Exit on the first failure. if retcode != 0: try: - with FAILURE_REPORTING_LOCK: - with open(LOG_FILE, 'ab') as log_file: + if per_test_output != None: + with FAILURE_REPORTING_LOCK: + with open(LOG_FILE, 'ab') as log_file: + per_test_output.seek(0) + log_file.writelines(per_test_output) per_test_output.seek(0) - log_file.writelines(per_test_output) - per_test_output.seek(0) - for line in per_test_output: - decoded_line = line.decode() - if not re.match('[0-9]+', decoded_line): - print(decoded_line, end='') - per_test_output.close() + for line in per_test_output: + decoded_line = line.decode() + if not re.match('[0-9]+', decoded_line): + print(decoded_line, end='') + per_test_output.close() except: LOGGER.exception("Got an exception while trying to print failed test output") finally: @@ -106,7 +119,8 @@ def run_individual_python_test(test_name, pyspark_python): # this code is invoked from a thread other than the main thread. os._exit(-1) else: - per_test_output.close() + if per_test_output != None: + per_test_output.close() LOGGER.info("Finished test(%s): %s (%is)", pyspark_python, test_name, duration) @@ -140,7 +154,19 @@ def parse_opts(): "--verbose", action="store_true", help="Enable additional debug logging" ) - + parser.add_option( + "--single-test", type="string", default=None, + help="specify a python module to run single python test." + ) + parser.add_option( + "--debug-server", type="string", default=None, + help="debug server host, only used in single test." + ) + parser.add_option( + "--debug-port", type="int", default=5678, + help="debug server port, only used in single test." + ) + (opts, args) = parser.parse_args() if args: parser.error("Unsupported arguments: %s" % ' '.join(args)) @@ -169,6 +195,15 @@ def main(): (module_name, ", ".join(python_modules))) sys.exit(-1) LOGGER.info("Will test against the following Python executables: %s", python_execs) + + if(opts.single_test != None): + test_goal = opts.single_test + LOGGER.info("Will test the single Python module: %s", test_goal) + for python_exec in python_execs: + run_individual_python_test(test_goal, python_exec, + True, opts.debug_server, opts.debug_port) + return + LOGGER.info("Will test the following Python modules: %s", [x.name for x in modules_to_test]) task_queue = Queue.PriorityQueue() From e694d10f27d2da9607c6f52bb7b2a2e2fd50e0fd Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Mon, 23 May 2016 22:41:53 -0700 Subject: [PATCH 2/5] remove useless code which used for debug by myself --- python/run-tests.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/run-tests.py b/python/run-tests.py index 72e59eab61158..a10eb4c96f2bd 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -32,9 +32,6 @@ else: import queue as Queue -import pydevd -print("connecting to debug server, host %s, port %s" % ('n131', 5678)) -pydevd.settrace('n131',port=5678,stdoutToServer=False,stderrToServer=False) # Append `SPARK_HOME/dev` to the Python path so that we can import the sparktestsupport module sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../dev/")) From 60e0e96bd7873b169854b361f90d1ef97378a22e Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Mon, 23 May 2016 23:07:25 -0700 Subject: [PATCH 3/5] add apache license header. --- python/pyspark/debugutil.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/pyspark/debugutil.py b/python/pyspark/debugutil.py index 97baa1bbfd91c..106d244e86ba7 100644 --- a/python/pyspark/debugutil.py +++ b/python/pyspark/debugutil.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + import os HAS_CHECK_DEBUGGER = False From db1e37dccdd6570eceed2926ee3c7b391a21612c Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Mon, 23 May 2016 23:17:36 -0700 Subject: [PATCH 4/5] fix exception syntax problem. --- python/pyspark/debugutil.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/debugutil.py b/python/pyspark/debugutil.py index 106d244e86ba7..ba6e6514d6667 100644 --- a/python/pyspark/debugutil.py +++ b/python/pyspark/debugutil.py @@ -38,6 +38,6 @@ def check_debugger(): port=int(debug_port), stdoutToServer=False, stderrToServer=False) - except Exception, e: + except Exception as e: print(e) raise Exception('init debugger fail.' ) \ No newline at end of file From 52f1da93dc08fb831331436ac72163fb1458e0da Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 24 May 2016 00:04:09 -0700 Subject: [PATCH 5/5] improve python stype to pass pep8 check --- python/pyspark/debugutil.py | 20 +++++++++++--------- python/run-tests.py | 23 ++++++++++++----------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/python/pyspark/debugutil.py b/python/pyspark/debugutil.py index ba6e6514d6667..0d102ef5c4dcb 100644 --- a/python/pyspark/debugutil.py +++ b/python/pyspark/debugutil.py @@ -18,26 +18,28 @@ import os HAS_CHECK_DEBUGGER = False + + def check_debugger(): global HAS_CHECK_DEBUGGER - + if HAS_CHECK_DEBUGGER: return - + HAS_CHECK_DEBUGGER = True env = dict(os.environ) debug_server = env.get('PYTHON_REMOTE_DEBUG_SERVER') debug_port = env.get('PYTHON_REMOTE_DEBUG_PORT') - - if debug_server != None and debug_port != None: + + if debug_server is not None and debug_port is not None: try: import pydevd print('connecting debug server %s, port %s' - % (debug_server, debug_port)) - pydevd.settrace(debug_server, - port=int(debug_port), - stdoutToServer=False, + % (debug_server, debug_port)) + pydevd.settrace(debug_server, + port=int(debug_port), + stdoutToServer=False, stderrToServer=False) except Exception as e: print(e) - raise Exception('init debugger fail.' ) \ No newline at end of file + raise Exception('init debugger fail.') diff --git a/python/run-tests.py b/python/run-tests.py index a10eb4c96f2bd..02ec5f381be99 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -63,7 +63,8 @@ def print_red(text): raise Exception("Cannot find assembly build directory, please build Spark first.") -def run_individual_python_test(test_name, pyspark_python, is_single_test=False, debug_server=None, debug_port=None): +def run_individual_python_test(test_name, pyspark_python, + is_single_test=False, debug_server=None, debug_port=None): env = dict(os.environ) env.update({ 'SPARK_DIST_CLASSPATH': SPARK_DIST_CLASSPATH, @@ -73,11 +74,11 @@ def run_individual_python_test(test_name, pyspark_python, is_single_test=False, 'PYSPARK_DRIVER_PYTHON': which(pyspark_python) }) if is_single_test: - if debug_server != None: + if debug_server is not None: env.update({'PYTHON_REMOTE_DEBUG_SERVER': debug_server}) - if debug_port != None: + if debug_port is not None: env.update({'PYTHON_REMOTE_DEBUG_PORT': '%d' % debug_port}) - + LOGGER.debug("Starting test(%s): %s", pyspark_python, test_name) start_time = time.time() try: @@ -97,7 +98,7 @@ def run_individual_python_test(test_name, pyspark_python, is_single_test=False, # Exit on the first failure. if retcode != 0: try: - if per_test_output != None: + if per_test_output is not None: with FAILURE_REPORTING_LOCK: with open(LOG_FILE, 'ab') as log_file: per_test_output.seek(0) @@ -116,8 +117,8 @@ def run_individual_python_test(test_name, pyspark_python, is_single_test=False, # this code is invoked from a thread other than the main thread. os._exit(-1) else: - if per_test_output != None: - per_test_output.close() + if per_test_output is not None: + per_test_output.close() LOGGER.info("Finished test(%s): %s (%is)", pyspark_python, test_name, duration) @@ -163,7 +164,7 @@ def parse_opts(): "--debug-port", type="int", default=5678, help="debug server port, only used in single test." ) - + (opts, args) = parser.parse_args() if args: parser.error("Unsupported arguments: %s" % ' '.join(args)) @@ -192,15 +193,15 @@ def main(): (module_name, ", ".join(python_modules))) sys.exit(-1) LOGGER.info("Will test against the following Python executables: %s", python_execs) - - if(opts.single_test != None): + + if(opts.single_test is not None): test_goal = opts.single_test LOGGER.info("Will test the single Python module: %s", test_goal) for python_exec in python_execs: run_individual_python_test(test_goal, python_exec, True, opts.debug_server, opts.debug_port) return - + LOGGER.info("Will test the following Python modules: %s", [x.name for x in modules_to_test]) task_queue = Queue.PriorityQueue()