Skip to content

Commit

Permalink
[SW-539] Fix bug when pysparkling is executed in parallel on the same…
Browse files Browse the repository at this point in the history
… node (#393)

(cherry picked from commit c93c0ee)
  • Loading branch information
jakubhava committed Oct 10, 2017
1 parent 62f8653 commit d248459
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
2 changes: 2 additions & 0 deletions py/pysparkling/context.py
Expand Up @@ -139,12 +139,14 @@ def getOrCreate(spark, conf=None, **kwargs):


def stop_with_jvm(self):
Initializer.clean_temp_dir()
h2o.cluster().shutdown()
self.stop()


def stop(self):
warnings.warn("Stopping H2OContext. (Restarting H2O is not yet fully supported...) ")
Initializer.clean_temp_dir()
self._jhc.stop(False)

def __del__(self):
Expand Down
35 changes: 26 additions & 9 deletions py/pysparkling/initializer.py
Expand Up @@ -11,6 +11,7 @@ class Initializer(object):
# Flag to inform us whether sparkling jar has been already loaded or not.
# We don't want to load it more than once.
__sparkling_jar_loaded = False
__extracted_jar_dir = None

@staticmethod
def load_sparkling_jar(spark_context):
Expand Down Expand Up @@ -56,24 +57,40 @@ def __add_sparkling_jar_to_spark(sc):
field_jars_value = field_jars.get(file_server)
field_jars_value.put('sparkling_water_assembly.jar', jvm.java.io.File(sw_jar_file))


@staticmethod
def __get_sw_jar():
def __extracted_jar_path():
import sparkling_water
sw_pkg_file = sparkling_water.__file__
# Extract jar file from zip
if '.zip' in sw_pkg_file:
from pkg_resources import get_cache_path

if Initializer.__extracted_jar_dir is None:
zip_file = sw_pkg_file[:-len('/sparkling_water/__init__.py')]
zip_filename = os.path.basename(zip_file)
jar_path = get_cache_path(zip_filename)
sw_jar = os.path.abspath("{}/sparkling_water/sparkling_water_assembly.jar".format(jar_path))
import tempfile
Initializer.__extracted_jar_dir = tempfile.mkdtemp()
import zipfile
with zipfile.ZipFile(zip_file) as fzip:
fzip.extract('sparkling_water/sparkling_water_assembly.jar', path = jar_path)
return sw_jar
fzip.extract('sparkling_water/sparkling_water_assembly.jar', path = Initializer.__extracted_jar_dir)

return os.path.abspath("{}/sparkling_water/sparkling_water_assembly.jar".format(Initializer.__extracted_jar_dir))

@staticmethod
def clean_temp_dir():
## Clean temporary directory containing extracted Sparkling Water Jar
if Initializer.__extracted_jar_dir is not None:
import shutil
shutil.rmtree(Initializer.__extracted_jar_dir)

@staticmethod
def __get_sw_jar():
import sparkling_water
sw_pkg_file = sparkling_water.__file__
# Extract jar file from zip
if '.zip' in sw_pkg_file:
return Initializer.__extracted_jar_path()
else:
from pkg_resources import resource_filename
return os.path.abspath(resource_filename("sparkling_water", 'sparkling_water_assembly.jar'))

@staticmethod
def __find_spark_cl(start_cl, cl_name):
cl = start_cl
Expand Down

0 comments on commit d248459

Please sign in to comment.