From 0ed060df2ec5a1a0427df6c160bd51c7014b29da Mon Sep 17 00:00:00 2001 From: buck heroux Date: Wed, 18 Feb 2015 12:50:27 -0500 Subject: [PATCH 01/19] added requirements file to pyspark --- python/pyspark/context.py | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index bf1f61c8504ed..09c5633440e1b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -65,8 +65,9 @@ class SparkContext(object): _python_includes = None # zip and egg files that need to be added to PYTHONPATH def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, - environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, - gateway=None, jsc=None, profiler_cls=BasicProfiler): + requirementsFile=None, environment=None, batchSize=0, + serializer=PickleSerializer(), conf=None, gateway=None, + jsc=None, profiler_cls=BasicProfiler): """ Create a new SparkContext. At least the master and app name should be set, either through the named parameters here or through C{conf}. @@ -78,6 +79,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, :param pyFiles: Collection of .zip or .py files to send to the cluster and add to PYTHONPATH. These can be paths on the local file system or HDFS, HTTP, HTTPS, or FTP URLs. + :param requirementsFile: Pip requirements file to send dependencies + to the cluster and add to PYTHONPATH. :param environment: A dictionary of environment variables to set on worker nodes. :param batchSize: The number of Python objects represented as a single @@ -104,8 +107,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, self._callsite = first_spark_call() or CallSite(None, None, None) SparkContext._ensure_initialized(self, gateway=gateway) try: - self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, - conf, jsc, profiler_cls) + self._do_init(master, appName, sparkHome, pyFiles, requiremnetsFile, environment, + batchSize, serializer, conf, jsc, profiler_cls) except: # If an error occurs, clean up in order to allow future SparkContext creation: self.stop() @@ -180,6 +183,10 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, for path in (pyFiles or []): self.addPyFile(path) + # Deplpoy code dependencies from requirements file in the constructor + if requirementsFIle: + self.addRequirementsFile(requirementsFile) + # Deploy code dependencies set by spark-submit; these will already have been added # with SparkContext.addFile, so we just need to add them to the PYTHONPATH for path in self._conf.get("spark.submit.pyFiles", "").split(","): @@ -710,6 +717,27 @@ def addPyFile(self, path): # for tests in local mode sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + def addRequirementsFile(self, path): + """ + Add a pip requirements file to distribute dependencies for all tasks + on thie SparkContext int he future. + See https://pip.pypa.io/en/latest/user_guide.html#requirements-files + """ + import importlib + import pip + import tarfile + import uuid + for req in pip.req.parse_requirements(path, session=uuid.uuid1()): + if not req.check_if_exists(): + pip.main(['install', req.req.__str__()]) + mod = importlib.import_module(req.name) #throws ImportError + mod_path = mod.__path__[0] + tar_path = req.name+'.tar.gz' + with tarfile.open(tar_path, "w:gz") as tar: + tar.add(mod_path, arcname=os.path.basename(mod_path)) + self.addPyFile(tar_path) + os.remove(tar_path) + def setCheckpointDir(self, dirName): """ Set the directory under which RDDs are going to be checkpointed. The From 6b8bcde60378b58998f5c14d81d72de81f44d718 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Wed, 18 Feb 2015 18:30:28 -0500 Subject: [PATCH 02/19] tarfile has no contextmanager in python2. --- python/pyspark/context.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 09c5633440e1b..11b12166e5456 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -184,7 +184,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self.addPyFile(path) # Deplpoy code dependencies from requirements file in the constructor - if requirementsFIle: + if requirementsFile: self.addRequirementsFile(requirementsFile) # Deploy code dependencies set by spark-submit; these will already have been added @@ -733,8 +733,9 @@ def addRequirementsFile(self, path): mod = importlib.import_module(req.name) #throws ImportError mod_path = mod.__path__[0] tar_path = req.name+'.tar.gz' - with tarfile.open(tar_path, "w:gz") as tar: - tar.add(mod_path, arcname=os.path.basename(mod_path)) + tar = tarfile.open(tar_path, "w:gz") + tar.add(mod_path, arcname=os.path.basename(mod_path)) + tar.close() self.addPyFile(tar_path) os.remove(tar_path) From 2773483ea6cc244cb7de02c7dc184391a94d29e6 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Wed, 18 Feb 2015 20:52:06 -0500 Subject: [PATCH 03/19] reqs fix --- python/pyspark/context.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 11b12166e5456..c375581a60896 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -107,15 +107,15 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, self._callsite = first_spark_call() or CallSite(None, None, None) SparkContext._ensure_initialized(self, gateway=gateway) try: - self._do_init(master, appName, sparkHome, pyFiles, requiremnetsFile, environment, + self._do_init(master, appName, sparkHome, pyFiles, requirementsFile, environment, batchSize, serializer, conf, jsc, profiler_cls) except: # If an error occurs, clean up in order to allow future SparkContext creation: self.stop() raise - def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, - conf, jsc, profiler_cls): + def _do_init(self, master, appName, sparkHome, pyFiles, requirementsFile, environment, + batchSize, serializer, conf, jsc, profiler_cls): self.environment = environment or {} self._conf = conf or SparkConf(_jvm=self._jvm) self._batchSize = batchSize # -1 represents an unlimited batch size From 0371ad9b13f96dcc534d897789ccd32f907d5ed9 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Wed, 18 Feb 2015 21:06:48 -0500 Subject: [PATCH 04/19] temp tar file --- python/pyspark/context.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c375581a60896..56424a74cc5be 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -720,24 +720,30 @@ def addPyFile(self, path): def addRequirementsFile(self, path): """ Add a pip requirements file to distribute dependencies for all tasks - on thie SparkContext int he future. + on thie SparkContext in the future. An ImportError will be thrown if + a module in the file can't be downloaded. See https://pip.pypa.io/en/latest/user_guide.html#requirements-files """ import importlib import pip import tarfile + import tempfile import uuid + tar_dir = tempfile.mkdtemp() for req in pip.req.parse_requirements(path, session=uuid.uuid1()): if not req.check_if_exists(): pip.main(['install', req.req.__str__()]) - mod = importlib.import_module(req.name) #throws ImportError + try: + mod = importlib.import_module(req.name) + finally: + shutil.rmtree(tar_dir) mod_path = mod.__path__[0] - tar_path = req.name+'.tar.gz' + tar_path = os.path.join(tar_dir, req.name+'.tar.gz') tar = tarfile.open(tar_path, "w:gz") tar.add(mod_path, arcname=os.path.basename(mod_path)) tar.close() self.addPyFile(tar_path) - os.remove(tar_path) + shutil.rmtree(tar_dir) def setCheckpointDir(self, dirName): """ From f2a46e5d6e309a5ba29259cc1f77e594d932b0f5 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Wed, 4 Mar 2015 16:51:46 -0800 Subject: [PATCH 05/19] bubbled up try finally --- python/pyspark/context.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 56424a74cc5be..82f77fd4b22ef 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -729,6 +729,21 @@ def addRequirementsFile(self, path): import tarfile import tempfile import uuid + tar_dir = tempfile.mkdtemp() + try: + for req in pip.req.parse_requirements(path, session=uuid.uuid1()): + if not req.check_if_exists(): + pip.main(['install', req.req.__str__()]) + mod = importlib.import_module(req.name) # Can throw ImportError + mod_path = mod.__path__[0] + tar_path = os.path.join(tar_dir, req.name+'.tar.gz') + tar = tarfile.open(tar_path, "w:gz") + tar.add(mod_path, arcname=os.path.basename(mod_path)) + tar.close() + self.addPyFile(tar_path) + finally: + shutil.rmtree(tar_dir) + tar_dir = tempfile.mkdtemp() for req in pip.req.parse_requirements(path, session=uuid.uuid1()): if not req.check_if_exists(): From fca4be61c6542b807a0d5370f761ef031fc7eb86 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Wed, 4 Mar 2015 16:53:17 -0800 Subject: [PATCH 06/19] forgot to remove --- python/pyspark/context.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 82f77fd4b22ef..b0142e605ef42 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -744,22 +744,6 @@ def addRequirementsFile(self, path): finally: shutil.rmtree(tar_dir) - tar_dir = tempfile.mkdtemp() - for req in pip.req.parse_requirements(path, session=uuid.uuid1()): - if not req.check_if_exists(): - pip.main(['install', req.req.__str__()]) - try: - mod = importlib.import_module(req.name) - finally: - shutil.rmtree(tar_dir) - mod_path = mod.__path__[0] - tar_path = os.path.join(tar_dir, req.name+'.tar.gz') - tar = tarfile.open(tar_path, "w:gz") - tar.add(mod_path, arcname=os.path.basename(mod_path)) - tar.close() - self.addPyFile(tar_path) - shutil.rmtree(tar_dir) - def setCheckpointDir(self, dirName): """ Set the directory under which RDDs are going to be checkpointed. The From d28752297ec5b10f6afce49713f4efe20f8d533f Mon Sep 17 00:00:00 2001 From: buck heroux Date: Fri, 3 Apr 2015 16:35:44 -0400 Subject: [PATCH 07/19] added requirementsFile tests and switch to __import__ --- python/pyspark/context.py | 28 ++++++++++++++-------------- python/pyspark/tests.py | 12 +++++++++++- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index b0142e605ef42..fe9d8ae8183cb 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -18,6 +18,9 @@ import os import shutil import sys +import tarfile +import tempfile +import uuid from threading import Lock from tempfile import NamedTemporaryFile @@ -65,9 +68,9 @@ class SparkContext(object): _python_includes = None # zip and egg files that need to be added to PYTHONPATH def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, - requirementsFile=None, environment=None, batchSize=0, - serializer=PickleSerializer(), conf=None, gateway=None, - jsc=None, profiler_cls=BasicProfiler): + environment=None, batchSize=0, serializer=PickleSerializer(), + conf=None, gateway=None, jsc=None, profiler_cls=BasicProfiler, + requirementsFile=None): """ Create a new SparkContext. At least the master and app name should be set, either through the named parameters here or through C{conf}. @@ -79,8 +82,6 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, :param pyFiles: Collection of .zip or .py files to send to the cluster and add to PYTHONPATH. These can be paths on the local file system or HDFS, HTTP, HTTPS, or FTP URLs. - :param requirementsFile: Pip requirements file to send dependencies - to the cluster and add to PYTHONPATH. :param environment: A dictionary of environment variables to set on worker nodes. :param batchSize: The number of Python objects represented as a single @@ -94,6 +95,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, :param jsc: The JavaSparkContext instance (optional). :param profiler_cls: A class of custom Profiler used to do profiling (default is pyspark.profiler.BasicProfiler). + :param requirementsFile: Pip requirements file to send dependencies + to the cluster and add to PYTHONPATH. >>> from pyspark.context import SparkContext @@ -107,15 +110,15 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, self._callsite = first_spark_call() or CallSite(None, None, None) SparkContext._ensure_initialized(self, gateway=gateway) try: - self._do_init(master, appName, sparkHome, pyFiles, requirementsFile, environment, - batchSize, serializer, conf, jsc, profiler_cls) + self._do_init(master, appName, sparkHome, pyFiles, environment, + batchSize, serializer, conf, jsc, profiler_cls, requirementsFile) except: # If an error occurs, clean up in order to allow future SparkContext creation: self.stop() raise - def _do_init(self, master, appName, sparkHome, pyFiles, requirementsFile, environment, - batchSize, serializer, conf, jsc, profiler_cls): + def _do_init(self, master, appName, sparkHome, pyFiles, environment, + batchSize, serializer, conf, jsc, profiler_cls, requirementsFile): self.environment = environment or {} self._conf = conf or SparkConf(_jvm=self._jvm) self._batchSize = batchSize # -1 represents an unlimited batch size @@ -723,18 +726,15 @@ def addRequirementsFile(self, path): on thie SparkContext in the future. An ImportError will be thrown if a module in the file can't be downloaded. See https://pip.pypa.io/en/latest/user_guide.html#requirements-files + Raises ImportError if the requirement can't be found """ - import importlib import pip - import tarfile - import tempfile - import uuid tar_dir = tempfile.mkdtemp() try: for req in pip.req.parse_requirements(path, session=uuid.uuid1()): if not req.check_if_exists(): pip.main(['install', req.req.__str__()]) - mod = importlib.import_module(req.name) # Can throw ImportError + mod = __import__(req.name) mod_path = mod.__path__[0] tar_path = os.path.join(tar_dir, req.name+'.tar.gz') tar = tarfile.open(tar_path, "w:gz") diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index b5e28c498040b..69da082ed7fc7 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -43,7 +43,6 @@ else: import unittest - from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.rdd import RDD @@ -1550,6 +1549,17 @@ def test_with_stop(self): sc.stop() self.assertEqual(SparkContext._active_spark_context, None) + def test_requiriements_file(self): + import pip + with tempfile.NamedTemporaryFile() as temp: + temp.write('simplejson\nquadkey>0.0.4\nsix==1.8.0') + pip.main(['install', 'quadkey>0.0.4']) + with SparkContext(requirementsFile=temp.name) as sc: + import quadkey + qks = sc.parallelize([(0,0),(1,1),(2,2)]) \ + .map(lambda pair: quadkey.from_geo(pair, 1).key) + self.assertSequenceEqual(['3','1','1'], qks.collect()) + @unittest.skipIf(not _have_scipy, "SciPy not installed") class SciPyTests(PySparkTestCase): From 565bf7fc0889390178f8f96d0ed9a917b287e164 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Tue, 14 Jul 2015 13:54:12 -0400 Subject: [PATCH 08/19] pep8 styling --- python/pyspark/context.py | 4 ++-- python/pyspark/tests.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index f080b2a82e45a..5798393297501 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -113,14 +113,14 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, SparkContext._ensure_initialized(self, gateway=gateway) try: self._do_init(master, appName, sparkHome, pyFiles, environment, - batchSize, serializer, conf, jsc, profiler_cls, requirementsFile) + batchSize, serializer, conf, jsc, profiler_cls, requirementsFile) except: # If an error occurs, clean up in order to allow future SparkContext creation: self.stop() raise def _do_init(self, master, appName, sparkHome, pyFiles, environment, - batchSize, serializer, conf, jsc, profiler_cls, requirementsFile): + batchSize, serializer, conf, jsc, profiler_cls, requirementsFile): self.environment = environment or {} self._conf = conf or SparkConf(_jvm=self._jvm) self._batchSize = batchSize # -1 represents an unlimited batch size diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 07766418ae8bd..596b04e43ccaa 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1664,9 +1664,9 @@ def test_requiriements_file(self): pip.main(['install', 'quadkey>0.0.4']) with SparkContext(requirementsFile=temp.name) as sc: import quadkey - qks = sc.parallelize([(0,0),(1,1),(2,2)]) \ + qks = sc.parallelize([(0, 0), (1, 1), (2, 2)]) \ .map(lambda pair: quadkey.from_geo(pair, 1).key) - self.assertSequenceEqual(['3','1','1'], qks.collect()) + self.assertSequenceEqual(['3', '1', '1'], qks.collect()) def test_progress_api(self): with SparkContext() as sc: From 23771fde4674c73d5fb0bbb478385d9cf55c648b Mon Sep 17 00:00:00 2001 From: buck heroux Date: Mon, 5 Oct 2015 18:07:48 -0400 Subject: [PATCH 09/19] support namespace packages and extract addModule logic --- python/pyspark/context.py | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 5798393297501..a978e849d87ec 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -706,6 +706,25 @@ def clearFiles(self): # TODO: remove added .py or .zip files from the PYTHONPATH? self._jsc.sc().clearFiles() + def addModule(self, module): + """ + Add a module to the spark context, the module must have already been + imported by the driver via __import__ semantics. Supports namespace + packages by simulating the loading __path__ as a set of modules from + the __path__ list in a single package. + """ + tmp_dir = tempfile.mkdtemp() + try: + tar_path = os.path.join(tmp_dir, module.__name__+'.tar.gz') + tar = tarfile.open(tar_path, "w:gz") + for mod in module.__path__[::-1]: + # adds in reverse to simulate namespace loading path + tar.add(mod, arcname=os.path.basename(mod)) + tar.close() + self.addPyFile(tar_path) + finally: + shutil.rmtree(tmp_dir) + def addPyFile(self, path): """ Add a .py or .zip dependency for all tasks to be executed on this @@ -730,20 +749,11 @@ def addRequirementsFile(self, path): Raises ImportError if the requirement can't be found """ import pip - tar_dir = tempfile.mkdtemp() - try: - for req in pip.req.parse_requirements(path, session=uuid.uuid1()): - if not req.check_if_exists(): - pip.main(['install', req.req.__str__()]) - mod = __import__(req.name) - mod_path = mod.__path__[0] - tar_path = os.path.join(tar_dir, req.name+'.tar.gz') - tar = tarfile.open(tar_path, "w:gz") - tar.add(mod_path, arcname=os.path.basename(mod_path)) - tar.close() - self.addPyFile(tar_path) - finally: - shutil.rmtree(tar_dir) + for req in pip.req.parse_requirements(path, session=uuid.uuid1()): + if not req.check_if_exists(): + pip.main(['install', req.req.__str__()]) + mod = __import__(req.name) + self.addModule(tmp_dir) def setCheckpointDir(self, dirName): """ From cd21c5c06d6a611f4e738199f20582aeb6087fd7 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Mon, 5 Oct 2015 18:10:17 -0400 Subject: [PATCH 10/19] tmp_dir to mod --- python/pyspark/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a978e849d87ec..c391ee7791873 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -753,7 +753,7 @@ def addRequirementsFile(self, path): if not req.check_if_exists(): pip.main(['install', req.req.__str__()]) mod = __import__(req.name) - self.addModule(tmp_dir) + self.addModule(mod) def setCheckpointDir(self, dirName): """ From 39f26d93b7030f2d7bfe7782414dc5b016cdd852 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Wed, 21 Oct 2015 16:22:06 -0400 Subject: [PATCH 11/19] remove reqs from context constructor --- python/pyspark/context.py | 53 +++++++++++++++++---------------------- python/pyspark/tests.py | 8 +++--- 2 files changed, 27 insertions(+), 34 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c391ee7791873..74b9eff558397 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -71,8 +71,7 @@ class SparkContext(object): def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), - conf=None, gateway=None, jsc=None, profiler_cls=BasicProfiler, - requirementsFile=None): + conf=None, gateway=None, jsc=None, profiler_cls=BasicProfiler): """ Create a new SparkContext. At least the master and app name should be set, either through the named parameters here or through C{conf}. @@ -97,8 +96,6 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, :param jsc: The JavaSparkContext instance (optional). :param profiler_cls: A class of custom Profiler used to do profiling (default is pyspark.profiler.BasicProfiler). - :param requirementsFile: Pip requirements file to send dependencies - to the cluster and add to PYTHONPATH. >>> from pyspark.context import SparkContext @@ -120,7 +117,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, raise def _do_init(self, master, appName, sparkHome, pyFiles, environment, - batchSize, serializer, conf, jsc, profiler_cls, requirementsFile): + batchSize, serializer, conf, jsc, profiler_cls): self.environment = environment or {} self._conf = conf or SparkConf(_jvm=self._jvm) self._batchSize = batchSize # -1 represents an unlimited batch size @@ -188,10 +185,6 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, for path in (pyFiles or []): self.addPyFile(path) - # Deplpoy code dependencies from requirements file in the constructor - if requirementsFile: - self.addRequirementsFile(requirementsFile) - # Deploy code dependencies set by spark-submit; these will already have been added # with SparkContext.addFile, so we just need to add them to the PYTHONPATH for path in self._conf.get("spark.submit.pyFiles", "").split(","): @@ -706,25 +699,6 @@ def clearFiles(self): # TODO: remove added .py or .zip files from the PYTHONPATH? self._jsc.sc().clearFiles() - def addModule(self, module): - """ - Add a module to the spark context, the module must have already been - imported by the driver via __import__ semantics. Supports namespace - packages by simulating the loading __path__ as a set of modules from - the __path__ list in a single package. - """ - tmp_dir = tempfile.mkdtemp() - try: - tar_path = os.path.join(tmp_dir, module.__name__+'.tar.gz') - tar = tarfile.open(tar_path, "w:gz") - for mod in module.__path__[::-1]: - # adds in reverse to simulate namespace loading path - tar.add(mod, arcname=os.path.basename(mod)) - tar.close() - self.addPyFile(tar_path) - finally: - shutil.rmtree(tmp_dir) - def addPyFile(self, path): """ Add a .py or .zip dependency for all tasks to be executed on this @@ -740,6 +714,25 @@ def addPyFile(self, path): # for tests in local mode sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + def addPyPackage(self, pkg): + """ + Add a package to the spark context, the package must have already been + imported by the driver via __import__ semantics. Supports namespace + packages by simulating the loading __path__ as a set of modules from + the __path__ list in a single package. + """ + tmp_dir = tempfile.mkdtemp() + try: + tar_path = os.path.join(tmp_dir, pkg.__name__+'.tar.gz') + tar = tarfile.open(tar_path, "w:gz") + for mod in pkg.__path__[::-1]: + # adds in reverse to simulate namespace loading path + tar.add(mod, arcname=os.path.basename(mod)) + tar.close() + self.addPyFile(tar_path) + finally: + shutil.rmtree(tmp_dir) + def addRequirementsFile(self, path): """ Add a pip requirements file to distribute dependencies for all tasks @@ -752,8 +745,8 @@ def addRequirementsFile(self, path): for req in pip.req.parse_requirements(path, session=uuid.uuid1()): if not req.check_if_exists(): pip.main(['install', req.req.__str__()]) - mod = __import__(req.name) - self.addModule(mod) + pkg = __import__(req.name) + self.addPyPackage(pkg) def setCheckpointDir(self, dirName): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 596b04e43ccaa..56d62c7f70006 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1657,12 +1657,12 @@ def test_with_stop(self): sc.stop() self.assertEqual(SparkContext._active_spark_context, None) - def test_requiriements_file(self): + def test_requirements_file(self): import pip with tempfile.NamedTemporaryFile() as temp: - temp.write('simplejson\nquadkey>0.0.4\nsix==1.8.0') - pip.main(['install', 'quadkey>0.0.4']) - with SparkContext(requirementsFile=temp.name) as sc: + temp.write('simplejson\nquadkey>=0.0.5\nsix==1.8.0') + with SparkContext() as sc: + sc.addRequirementsFile(temp.name) import quadkey qks = sc.parallelize([(0, 0), (1, 1), (2, 2)]) \ .map(lambda pair: quadkey.from_geo(pair, 1).key) From 88a1d6ceffaddfc5f3aa5afc07d65a1d9ddb54e9 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Wed, 13 Apr 2016 18:57:23 -0400 Subject: [PATCH 12/19] add_py_package test --- python/pyspark/tests.py | 20 ++++++++++++++++++++ python/run-tests.py | 3 ++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index ec20177ae527c..817caa61c88e2 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -23,6 +23,7 @@ from array import array from glob import glob import os +import os.path import re import shutil import subprocess @@ -1946,6 +1947,24 @@ def test_with_stop(self): sc.stop() self.assertEqual(SparkContext._active_spark_context, None) + def test_add_py_package(self): + name = "test_tmp" + try: + os.mkdir(name) + with open(os.path.join(name, "__init__.py"), 'w+') as temp: + temp.write("triple = lambda x: 3*x") + pkg = __import__(name) + with SparkContext() as sc: + #trips = sc.parallelize([0, 1, 2, 3]).map(test_tmp.triple) + #sc.addPyPackage(pkg) + trips = sc.parallelize([0, 1, 2, 3]).map(lambda x: pkg.triple(x)) + self.assertSequenceEqual([0, 3, 6, 9], trips.collect()) + finally: + shutil.rmtree(name) + + + """ + This needs internet access def test_requirements_file(self): import pip with tempfile.NamedTemporaryFile() as temp: @@ -1956,6 +1975,7 @@ def test_requirements_file(self): qks = sc.parallelize([(0, 0), (1, 1), (2, 2)]) \ .map(lambda pair: quadkey.from_geo(pair, 1).key) self.assertSequenceEqual(['3', '1', '1'], qks.collect()) + """ def test_progress_api(self): with SparkContext() as sc: diff --git a/python/run-tests.py b/python/run-tests.py index 38b3bb84c10be..2a7e04acc7801 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -41,7 +41,6 @@ from sparktestsupport.shellutils import which, subprocess_check_output # noqa from sparktestsupport.modules import all_modules # noqa - python_modules = dict((m.name, m) for m in all_modules if m.python_test_goals if m.name != 'root') @@ -168,6 +167,8 @@ def main(): print("Error: unrecognized module '%s'. Supported modules: %s" % (module_name, ", ".join(python_modules))) sys.exit(-1) + #TODO REMOVE + modules_to_test = [modules_to_test[0]] LOGGER.info("Will test against the following Python executables: %s", python_execs) LOGGER.info("Will test the following Python modules: %s", [x.name for x in modules_to_test]) From 93b9e9fdd464744f90eb3a7eb325f853e2f113d4 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Thu, 14 Apr 2016 10:39:17 -0400 Subject: [PATCH 13/19] uncommented pip_requirements test --- python/pyspark/tests.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 817caa61c88e2..465489a9fa159 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1963,8 +1963,6 @@ def test_add_py_package(self): shutil.rmtree(name) - """ - This needs internet access def test_requirements_file(self): import pip with tempfile.NamedTemporaryFile() as temp: @@ -1975,7 +1973,6 @@ def test_requirements_file(self): qks = sc.parallelize([(0, 0), (1, 1), (2, 2)]) \ .map(lambda pair: quadkey.from_geo(pair, 1).key) self.assertSequenceEqual(['3', '1', '1'], qks.collect()) - """ def test_progress_api(self): with SparkContext() as sc: From 82476a632c5da724c3ca45ed8b888a946d934104 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Thu, 14 Apr 2016 10:42:05 -0400 Subject: [PATCH 14/19] removed todo --- python/run-tests.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/run-tests.py b/python/run-tests.py index 2a7e04acc7801..f3443be4d88ac 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -167,8 +167,6 @@ def main(): print("Error: unrecognized module '%s'. Supported modules: %s" % (module_name, ", ".join(python_modules))) sys.exit(-1) - #TODO REMOVE - modules_to_test = [modules_to_test[0]] LOGGER.info("Will test against the following Python executables: %s", python_execs) LOGGER.info("Will test the following Python modules: %s", [x.name for x in modules_to_test]) From ce9966e875a6d00bbe6325dc5fc9519ce5292c04 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Thu, 14 Apr 2016 10:47:36 -0400 Subject: [PATCH 15/19] spacing --- python/run-tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/run-tests.py b/python/run-tests.py index f3443be4d88ac..38b3bb84c10be 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -41,6 +41,7 @@ from sparktestsupport.shellutils import which, subprocess_check_output # noqa from sparktestsupport.modules import all_modules # noqa + python_modules = dict((m.name, m) for m in all_modules if m.python_test_goals if m.name != 'root') From 82534d04412344ec4fd1a58f03edaa543664505e Mon Sep 17 00:00:00 2001 From: buck heroux Date: Mon, 9 May 2016 11:33:19 -0400 Subject: [PATCH 16/19] formatting and addPyPackage example --- python/pyspark/context.py | 11 ++++++++++- python/pyspark/tests.py | 1 - 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 8862d1f8d17ea..53ed31954537b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -822,7 +822,16 @@ def addPyPackage(self, pkg): Add a package to the spark context, the package must have already been imported by the driver via __import__ semantics. Supports namespace packages by simulating the loading __path__ as a set of modules from - the __path__ list in a single package. + the __path__ list in a single package. Example follows: + + import pyspark + import foolib + + sc = pyspark.SparkContext() + sc.addPyPackage(foolib) + # foolib now in workers PYTHONPATH + rdd = sc.parallelize([1, 2, 3]) + doubles = rdd.map(lambda x: foolib.double(x)) """ tmp_dir = tempfile.mkdtemp() try: diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 465489a9fa159..e9d35154e9d56 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1961,7 +1961,6 @@ def test_add_py_package(self): self.assertSequenceEqual([0, 3, 6, 9], trips.collect()) finally: shutil.rmtree(name) - def test_requirements_file(self): import pip From ea6b89ffd8e1c24a24d9b4a01c25ae3035e4bfc0 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 11 May 2016 00:34:22 +0100 Subject: [PATCH 17/19] add py-requirements submit option --- .../org/apache/spark/deploy/SparkSubmit.scala | 14 +++++++-- .../spark/deploy/SparkSubmitArguments.scala | 7 +++++ .../spark/deploy/SparkSubmitSuite.scala | 31 +++++++++++++++++++ .../launcher/SparkSubmitOptionParser.java | 1 + python/pyspark/context.py | 6 ++++ 5 files changed, 57 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 926e1ff7a874d..52d2ef6e700e1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -356,16 +356,19 @@ object SparkSubmit { args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs if (clusterManager != YARN) { // The YARN backend distributes the primary file differently, so don't merge it. - args.files = mergeFileLists(args.files, args.primaryResource) + args.files = mergeFileLists(args.files, args.primaryResource, args.pyRequirements) } } if (clusterManager != YARN) { // The YARN backend handles python files differently, so don't merge the lists. - args.files = mergeFileLists(args.files, args.pyFiles) + args.files = mergeFileLists(args.files, args.pyFiles, args.pyRequirements) } if (args.pyFiles != null) { sysProps("spark.submit.pyFiles") = args.pyFiles } + if (args.pyRequirements != null) { + sysProps("spark.submit.pyRequirements") = args.pyRequirements + } } // In YARN mode for an R app, add the SparkR package archive and the R package @@ -542,6 +545,10 @@ object SparkSubmit { if (args.pyFiles != null) { sysProps("spark.submit.pyFiles") = args.pyFiles } + + if (args.pyRequirements != null) { + sysProps("spark.submit.pyRequirements") = args.pyRequirements + } } // assure a keytab is available from any place in a JVM @@ -593,6 +600,9 @@ object SparkSubmit { if (args.pyFiles != null) { sysProps("spark.submit.pyFiles") = args.pyFiles } + if (args.pyRequirements != null) { + sysProps("spark.submit.pyRequirements") = args.pyRequirements + } } else { childArgs += (args.primaryResource, args.mainClass) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index ec6d48485f110..3136a729d5933 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -64,6 +64,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var verbose: Boolean = false var isPython: Boolean = false var pyFiles: String = null + var pyRequirements: String = null var isR: Boolean = false var action: SparkSubmitAction = null val sparkProperties: HashMap[String, String] = new HashMap[String, String]() @@ -304,6 +305,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | numExecutors $numExecutors | files $files | pyFiles $pyFiles + | pyRequiremenst $pyRequirements | archives $archives | mainClass $mainClass | primaryResource $primaryResource @@ -395,6 +397,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case PY_FILES => pyFiles = Utils.resolveURIs(value) + case PY_REQUIREMENTS => + pyRequirements = Utils.resolveURIs(value) + case ARCHIVES => archives = Utils.resolveURIs(value) @@ -505,6 +510,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | search for the maven coordinates given with --packages. | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place | on the PYTHONPATH for Python apps. + | --py-requirements REQS Pip requirements file with dependencies that will be fetched + | and placed on PYTHONPATH | --files FILES Comma-separated list of files to be placed in the working | directory of each executor. | diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 271897699201b..8fac39a5cb267 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -570,6 +570,37 @@ class SparkSubmitSuite appArgs.executorMemory should be ("2.3g") } } + + test("py-requirements will be distributed") { + val pyReqs = "requirements.txt" + + val clArgsYarn = Seq( + "--master", "yarn", + "--deploy-mode", "cluster", + "--py-requirements", pyReqs, + "mister.py" + ) + + val appArgsYarn = new SparkSubmitArguments(clArgsYarn) + val sysPropsYarn = SparkSubmit.prepareSubmitEnvironment(appArgsYarn)._3 + appArgsYarn.pyRequirements should be (Utils.resolveURIs(pyReqs)) + sysPropsYarn("spark.yarn.dist.files") should be ( + PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(",")) + sysPropsYarn("spark.submit.pyRequirements") should be ( + PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(",")) + + val clArgs = Seq( + "--master", "local", + "--py-requirements", pyReqs, + "mister.py" + ) + + val appArgs = new SparkSubmitArguments(clArgs) + val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3 + appArgs.pyRequirements should be (Utils.resolveURIs(pyReqs)) + sysProps("spark.submit.pyRequirements") should be ( + PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(",")) + } // scalastyle:on println // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 6767cc5079649..d036ac322809c 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -55,6 +55,7 @@ class SparkSubmitOptionParser { protected final String PROPERTIES_FILE = "--properties-file"; protected final String PROXY_USER = "--proxy-user"; protected final String PY_FILES = "--py-files"; + protected final String PY_REQUIREMENTS = "--py-requirements"; protected final String REPOSITORIES = "--repositories"; protected final String STATUS = "--status"; protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores"; diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 53ed31954537b..84b026d956a6b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -209,6 +209,12 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, self._python_includes.append(filename) sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + # Apply requirements file set by spark-submit. + for path in self._conf.get("spark.submit.pyRequirements", "").split(","): + if path != "": + (dirname, filename) = os.path.split(path) + self.addRequirementsFile(os.path.join(SparkFiles.getRootDirectory(), filename)) + # Create a temporary directory inside spark.local.dir: local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) self._temp_dir = \ From f4af8421f4586f6243eca8fc7220bb0fecd3dd69 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Fri, 13 May 2016 15:17:54 -0400 Subject: [PATCH 18/19] addRequirementsFile -> addPyRequirements --- python/pyspark/context.py | 31 ++++++++++++++++++++----------- python/pyspark/tests.py | 19 +++++++++---------- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 84b026d956a6b..4ec4646411c7b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -213,7 +213,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, for path in self._conf.get("spark.submit.pyRequirements", "").split(","): if path != "": (dirname, filename) = os.path.split(path) - self.addRequirementsFile(os.path.join(SparkFiles.getRootDirectory(), filename)) + reqs_file = os.path.join(SparkFiles.getRootDirectory(), filename) + reqs = open(reqs_file).readlines() + self.addPyRequirements(reqs) # Create a temporary directory inside spark.local.dir: local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) @@ -851,20 +853,27 @@ def addPyPackage(self, pkg): finally: shutil.rmtree(tmp_dir) - def addRequirementsFile(self, path): + def addPyRequirements(self, reqs): """ - Add a pip requirements file to distribute dependencies for all tasks - on thie SparkContext in the future. An ImportError will be thrown if - a module in the file can't be downloaded. + Add a list of pip requirements to distribute to workers. + The reqs list is composed of pip requirements strings. See https://pip.pypa.io/en/latest/user_guide.html#requirements-files - Raises ImportError if the requirement can't be found + Raises ImportError if the requirement can't be found. Example follows: + + reqs = ['pkg1', 'pkg2', 'pkg3>=1.0,<=2.0'] + sc.addPyRequirements(reqs) + // or load from requirements file + sc.addPyRequirements(open('requirements.txt').readlines()) """ import pip - for req in pip.req.parse_requirements(path, session=uuid.uuid1()): - if not req.check_if_exists(): - pip.main(['install', req.req.__str__()]) - pkg = __import__(req.name) - self.addPyPackage(pkg) + with tempfile.NamedTemporaryFile() as t: + t.write('\n'.join(reqs)) + t.flush() + for req in pip.req.parse_requirements(t.name, session=uuid.uuid1()): + if not req.check_if_exists(): + pip.main(['install', req.req.__str__()]) + pkg = __import__(req.name) + self.addPyPackage(pkg) def setCheckpointDir(self, dirName): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index e9d35154e9d56..a6ba9f5cf257a 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1955,23 +1955,22 @@ def test_add_py_package(self): temp.write("triple = lambda x: 3*x") pkg = __import__(name) with SparkContext() as sc: - #trips = sc.parallelize([0, 1, 2, 3]).map(test_tmp.triple) + #trips = sc.parallelize([0, 1, 2, 3]).map(pkg.triple) #sc.addPyPackage(pkg) trips = sc.parallelize([0, 1, 2, 3]).map(lambda x: pkg.triple(x)) self.assertSequenceEqual([0, 3, 6, 9], trips.collect()) finally: shutil.rmtree(name) - def test_requirements_file(self): + def test_add_py_requirements(self): import pip - with tempfile.NamedTemporaryFile() as temp: - temp.write('simplejson\nquadkey>=0.0.5\nsix==1.8.0') - with SparkContext() as sc: - sc.addRequirementsFile(temp.name) - import quadkey - qks = sc.parallelize([(0, 0), (1, 1), (2, 2)]) \ - .map(lambda pair: quadkey.from_geo(pair, 1).key) - self.assertSequenceEqual(['3', '1', '1'], qks.collect()) + reqs = ['requests', 'quadkey>=0.0.5', 'six==1.8.0'] + with SparkContext() as sc: + sc.addPyRequirements(reqs) + import quadkey + qks = sc.parallelize([(0, 0), (1, 1), (2, 2)]) \ + .map(lambda pair: quadkey.from_geo(pair, 1).key) + self.assertSequenceEqual(['3', '1', '1'], qks.collect()) def test_progress_api(self): with SparkContext() as sc: From 9c37e0649a75d22d5712b08dcd6c3acee412f719 Mon Sep 17 00:00:00 2001 From: buck heroux Date: Fri, 13 May 2016 15:26:21 -0400 Subject: [PATCH 19/19] pep8ing --- python/pyspark/context.py | 2 +- python/pyspark/tests.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 4ec4646411c7b..acd6ca1d5e7a0 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -834,7 +834,7 @@ def addPyPackage(self, pkg): import pyspark import foolib - + sc = pyspark.SparkContext() sc.addPyPackage(foolib) # foolib now in workers PYTHONPATH diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index a6ba9f5cf257a..8c8070ba75112 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1955,8 +1955,8 @@ def test_add_py_package(self): temp.write("triple = lambda x: 3*x") pkg = __import__(name) with SparkContext() as sc: - #trips = sc.parallelize([0, 1, 2, 3]).map(pkg.triple) - #sc.addPyPackage(pkg) + # trips = sc.parallelize([0, 1, 2, 3]).map(pkg.triple) + # sc.addPyPackage(pkg) trips = sc.parallelize([0, 1, 2, 3]).map(lambda x: pkg.triple(x)) self.assertSequenceEqual([0, 3, 6, 9], trips.collect()) finally: