From 7ed45550210808ddde460d04274447f4c31e0a3b Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 24 Jun 2016 14:17:34 -0700 Subject: [PATCH 1/4] Changing Status API example to use SparkSession --- examples/src/main/python/status_api_demo.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/examples/src/main/python/status_api_demo.py b/examples/src/main/python/status_api_demo.py index 49b7902185aaa..9b4e0c76ff28b 100644 --- a/examples/src/main/python/status_api_demo.py +++ b/examples/src/main/python/status_api_demo.py @@ -21,7 +21,7 @@ import threading import Queue -from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession def delayed(seconds): @@ -39,17 +39,20 @@ def call_in_background(f, *args): return result -def main(): - conf = SparkConf().set("spark.ui.showConsoleProgress", "false") - sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf) +if __name__ == "__main__": + spark = SparkSession \ + .builder \ + .appName("PythonStatusAPIDemo") \ + .config("spark.ui.showConsoleProgress", "false") \ + .getOrCreate() def run(): - rdd = sc.parallelize(range(10), 10).map(delayed(2)) + rdd = spark.sparkContext.parallelize(range(10), 10).map(delayed(2)) reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) return reduced.map(delayed(2)).collect() result = call_in_background(run) - status = sc.statusTracker() + status = spark.sparkContext.statusTracker() while result.empty(): ids = status.getJobIdsForGroup() for id in ids: @@ -63,7 +66,5 @@ def run(): time.sleep(1) print("Job results are:", result.get()) - sc.stop() -if __name__ == "__main__": - main() + spark.stop() From 80b9cfcfa238c30eca36a4d7c60916e4d5f998c4 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 24 Jun 2016 14:18:45 -0700 Subject: [PATCH 2/4] Cleanup of Status API to use 'pass' for empty classes and return 'None' for get methods --- python/pyspark/status.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/pyspark/status.py b/python/pyspark/status.py index a6fa7dd3144d4..fdcd276f02303 100644 --- a/python/pyspark/status.py +++ b/python/pyspark/status.py @@ -24,6 +24,7 @@ class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status")): """ Exposes information about Spark Jobs. """ + pass class SparkStageInfo(namedtuple("SparkStageInfo", @@ -32,6 +33,7 @@ class SparkStageInfo(namedtuple("SparkStageInfo", """ Exposes information about Spark Stages. """ + pass class StatusTracker(object): @@ -83,6 +85,8 @@ def getJobInfo(self, jobId): job = self._jtracker.getJobInfo(jobId) if job is not None: return SparkJobInfo(jobId, job.stageIds(), str(job.status())) + else: + return None def getStageInfo(self, stageId): """ @@ -94,3 +98,5 @@ def getStageInfo(self, stageId): # TODO: fetch them in batch for better performance attrs = [getattr(stage, f)() for f in SparkStageInfo._fields[1:]] return SparkStageInfo(stageId, *attrs) + else: + return None From 5941fa0d602e6803878e4b4a39c5b53168b198db Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 22 Jul 2016 10:29:40 -0700 Subject: [PATCH 3/4] Revert "Cleanup of Status API to use 'pass' for empty classes and return 'None' for get methods" This reverts commit 80b9cfcfa238c30eca36a4d7c60916e4d5f998c4. --- python/pyspark/status.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/python/pyspark/status.py b/python/pyspark/status.py index fdcd276f02303..a6fa7dd3144d4 100644 --- a/python/pyspark/status.py +++ b/python/pyspark/status.py @@ -24,7 +24,6 @@ class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status")): """ Exposes information about Spark Jobs. """ - pass class SparkStageInfo(namedtuple("SparkStageInfo", @@ -33,7 +32,6 @@ class SparkStageInfo(namedtuple("SparkStageInfo", """ Exposes information about Spark Stages. """ - pass class StatusTracker(object): @@ -85,8 +83,6 @@ def getJobInfo(self, jobId): job = self._jtracker.getJobInfo(jobId) if job is not None: return SparkJobInfo(jobId, job.stageIds(), str(job.status())) - else: - return None def getStageInfo(self, stageId): """ @@ -98,5 +94,3 @@ def getStageInfo(self, stageId): # TODO: fetch them in batch for better performance attrs = [getattr(stage, f)() for f in SparkStageInfo._fields[1:]] return SparkStageInfo(stageId, *attrs) - else: - return None From 3594ee16fece7fefc577c5d60240ea0a588a916a Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 22 Jul 2016 10:43:07 -0700 Subject: [PATCH 4/4] main() should probably be kept in local scope so not to shadow var 'result' --- examples/src/main/python/status_api_demo.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/examples/src/main/python/status_api_demo.py b/examples/src/main/python/status_api_demo.py index 9b4e0c76ff28b..03c11fc09b670 100644 --- a/examples/src/main/python/status_api_demo.py +++ b/examples/src/main/python/status_api_demo.py @@ -39,11 +39,11 @@ def call_in_background(f, *args): return result -if __name__ == "__main__": - spark = SparkSession \ - .builder \ - .appName("PythonStatusAPIDemo") \ - .config("spark.ui.showConsoleProgress", "false") \ +def main(): + spark = SparkSession\ + .builder\ + .appName("PythonStatusAPIDemo")\ + .config("spark.ui.showConsoleProgress", "false")\ .getOrCreate() def run(): @@ -66,5 +66,7 @@ def run(): time.sleep(1) print("Job results are:", result.get()) - spark.stop() + +if __name__ == "__main__": + main()