From 72bd097896aca042944d8e20282617e4864d9dd0 Mon Sep 17 00:00:00 2001 From: zero323 Date: Sun, 21 May 2017 23:08:09 +0200 Subject: [PATCH 1/3] Initial commit --- python/pyspark/context.py | 32 ++++++++++++++++ python/pyspark/rdd.py | 81 +++++++++++++++++++++++++++++++++++++++ python/pyspark/tests.py | 25 ++++++++++++ 3 files changed, 138 insertions(+) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 3be07325f4162..f4e5a52867865 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -71,6 +71,7 @@ class SparkContext(object): _active_spark_context = None _lock = RLock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH + _thread_pool_executor = None PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') @@ -421,6 +422,10 @@ def stop(self): with SparkContext._lock: SparkContext._active_spark_context = None + if SparkContext._thread_pool_executor is not None: + SparkContext._thread_pool_executor.shutdown() + SparkContext._thread_pool_executor = None + def emptyRDD(self): """ Create an RDD that has no partitions or elements. @@ -1006,6 +1011,33 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf + def _get_executor(self): + """ Return existing thread pool executor + or create a new one. + """ + # This would fail anyway, but + # we don't want an orphan executor + if SparkContext._active_spark_context is None: + raise ValueError("No active SparkContext") + if SparkContext._thread_pool_executor is None: + try: + import concurrent.futures + + # Make sure that there is only one executor + with SparkContext._lock: + cores = self.getConf().get("spark.driver.cores") or 2 + SparkContext._thread_pool_executor = ( + SparkContext._thread_pool_executor or + concurrent.futures.ThreadPoolExecutor(max_workers=cores) + ) + + # Python 2.7 and not futures backport installed + except ImportError as e: + msg = "{}. Async actions require Python >= 3.2 or futures package installed" + raise ImportError(msg.format(e.message)) + + return SparkContext._thread_pool_executor + def _test(): import atexit diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 60141792d499b..1b9776264e117 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2367,6 +2367,87 @@ def toLocalIterator(self): port = self.ctx._jvm.PythonRDD.toLocalIteratorAndServe(self._jrdd.rdd()) return _load_from_socket(port, self._jrdd_deserializer) + def collectAsync(self): + """ + .. note:: Experimental + + Returns a `concurrent.futures.Future` for retrieving all elements of this RDD. + + >>> rdd = sc.parallelize(range(10)) # doctest: +SKIP + >>> f = rdd.collectAsync() # doctest: +SKIP + >>> f.result() # doctest: +SKIP + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + .. versionadded:: 2.3.0 + """ + executor = self.ctx._get_executor() + return executor.submit(self.collect) + + def countAsync(self): + """ + .. note:: Experimental + + >>> rdd = sc.parallelize(range(10)) # doctest: +SKIP + >>> f = rdd.countAsync() # doctest: +SKIP + >>> f.result() # doctest: +SKIP + 10 + """ + executor = self.ctx._get_executor() + return executor.submit(self.count) + + def foreachAsync(self, f): + """ + .. note:: Experimental + + Asynchronously applies a function f to all elements of this RDD + and returns a `concurrent.futures.Future` of this action. + + >>> def g(x): print(x) # doctest: +SKIP + >>> rdd = sc.parallelize(range(10)) # doctest: +SKIP + >>> f = rdd.foreachAsync(g) # doctest: +SKIP + >>> f.result() is None # doctest: +SKIP + True + + .. versionadded:: 2.3.0 + """ + executor = self.ctx._get_executor() + return executor.submit(self.foreach, f) + + def foreachPartitionAsync(self, f): + """ + .. note:: Experimental + + Asynchronously applies a function f to each partition of this RDD + and returns a `concurrent.futures.Future` of this action. + + >>> def g(xs): # doctest: +SKIP + ... for x in xs: + ... print(x) + >>> rdd = sc.parallelize(range(10)) # doctest: +SKIP + >>> f = rdd.foreachPartitionAsync(g) # doctest: +SKIP + >>> f.result() is None # doctest: +SKIP + .. versionadded:: 2.3.0 + """ + executor = self.ctx._get_executor() + return executor.submit(self.foreachPartition, f) + + def takeAsync(self, num): + """ + .. note:: Experimental + + Returns a `concurrent.futures.Future` for retrieving + the first num elements of the RDD. + + >>> rdd = sc.parallelize(range(10)) # doctest: +SKIP + >>> f = rdd.takeAsync(3) # doctest: +SKIP + >>> f.result() is None # doctest: +SKIP + [0, 1, 2] + + .. versionadded:: 2.3.0 + """ + executor = self.ctx._get_executor() + return executor.submit(self.take, num) + def _prepare_for_python_RDD(sc, command): # the serialized command will be compressed by broadcast diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index bb13de563cdd4..4e6934b8dd384 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1175,6 +1175,31 @@ def test_pipe_functions(self): self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', checkCode=True).collect) self.assertEqual([], rdd.pipe('grep 4').collect()) + def test_async_actions(self): + data = [x for x in range(10)] + rdd = self.sc.parallelize(data) + f = rdd.collectAsync() + self.assertListEqual(f.result(), data) + + f = rdd.countAsync() + self.assertEqual(f.result(), 10) + + f = rdd.takeAsync(5) + self.assertEqual(f.result(), data[:5]) + + acc1 = self.sc.accumulator(0) + + f = rdd.foreachAsync(lambda _: acc1.add(1)) + self.assertTrue( + f.result() is None and acc1.value == len(data) + ) + + acc2 = self.sc.accumulator(0) + f = rdd.foreachPartitionAsync(lambda xs: [acc2.add(1) for _ in xs]) + self.assertTrue( + f.result() is None and acc2.value == len(data) + ) + class ProfilerTests(PySparkTestCase): From 06116d1b29c9038df4a3c231bde471c337bf3c53 Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 22 May 2017 02:07:54 +0200 Subject: [PATCH 2/3] Fix style --- python/pyspark/context.py | 2 +- python/pyspark/rdd.py | 28 ++++++++++++++-------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index f4e5a52867865..e3be8d4eedf73 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -1012,7 +1012,7 @@ def getConf(self): return conf def _get_executor(self): - """ Return existing thread pool executor + """ Return existing thread pool executor or create a new one. """ # This would fail anyway, but diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1b9776264e117..0a695aad391fb 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2370,7 +2370,7 @@ def toLocalIterator(self): def collectAsync(self): """ .. note:: Experimental - + Returns a `concurrent.futures.Future` for retrieving all elements of this RDD. >>> rdd = sc.parallelize(range(10)) # doctest: +SKIP @@ -2384,7 +2384,7 @@ def collectAsync(self): return executor.submit(self.collect) def countAsync(self): - """ + """ .. note:: Experimental >>> rdd = sc.parallelize(range(10)) # doctest: +SKIP @@ -2398,8 +2398,8 @@ def countAsync(self): def foreachAsync(self, f): """ .. note:: Experimental - - Asynchronously applies a function f to all elements of this RDD + + Asynchronously applies a function f to all elements of this RDD and returns a `concurrent.futures.Future` of this action. >>> def g(x): print(x) # doctest: +SKIP @@ -2407,7 +2407,7 @@ def foreachAsync(self, f): >>> f = rdd.foreachAsync(g) # doctest: +SKIP >>> f.result() is None # doctest: +SKIP True - + .. versionadded:: 2.3.0 """ executor = self.ctx._get_executor() @@ -2416,13 +2416,13 @@ def foreachAsync(self, f): def foreachPartitionAsync(self, f): """ .. note:: Experimental - - Asynchronously applies a function f to each partition of this RDD - and returns a `concurrent.futures.Future` of this action. - + + Asynchronously applies a function f to each partition of this RDD + and returns a `concurrent.futures.Future` of this action. + >>> def g(xs): # doctest: +SKIP ... for x in xs: - ... print(x) + ... print(x) >>> rdd = sc.parallelize(range(10)) # doctest: +SKIP >>> f = rdd.foreachPartitionAsync(g) # doctest: +SKIP >>> f.result() is None # doctest: +SKIP @@ -2434,15 +2434,15 @@ def foreachPartitionAsync(self, f): def takeAsync(self, num): """ .. note:: Experimental - - Returns a `concurrent.futures.Future` for retrieving + + Returns a `concurrent.futures.Future` for retrieving the first num elements of the RDD. - + >>> rdd = sc.parallelize(range(10)) # doctest: +SKIP >>> f = rdd.takeAsync(3) # doctest: +SKIP >>> f.result() is None # doctest: +SKIP [0, 1, 2] - + .. versionadded:: 2.3.0 """ executor = self.ctx._get_executor() From ff55e2d9788dc3f212d923e36a6984f6c97f51d4 Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 22 May 2017 11:57:04 +0200 Subject: [PATCH 3/3] Skip tests if Python < 3.2 --- python/pyspark/tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 4e6934b8dd384..066b0c36ae184 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1175,6 +1175,7 @@ def test_pipe_functions(self): self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', checkCode=True).collect) self.assertEqual([], rdd.pipe('grep 4').collect()) + @unittest.skipIf(sys.version_info < (3, 2), "concurent.futures") def test_async_actions(self): data = [x for x in range(10)] rdd = self.sc.parallelize(data)