Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-539] Fix bug when pysparkling is executed in parallel on the same node #393

Merged
merged 1 commit into from Sep 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions py/pysparkling/context.py
Expand Up @@ -139,12 +139,14 @@ def getOrCreate(spark, conf=None, **kwargs):


def stop_with_jvm(self):
Initializer.clean_temp_dir()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: This seems causing print of stack-traces during spark shutdown. It cannot expect that it will be fully executed. The better solution is simply skip cleanup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point! Will work on this tomorrow

h2o.cluster().shutdown()
self.stop()


def stop(self):
warnings.warn("Stopping H2OContext. (Restarting H2O is not yet fully supported...) ")
Initializer.clean_temp_dir()
self._jhc.stop(False)

def __del__(self):
Expand Down
35 changes: 26 additions & 9 deletions py/pysparkling/initializer.py
Expand Up @@ -11,6 +11,7 @@ class Initializer(object):
# Flag to inform us whether sparkling jar has been already loaded or not.
# We don't want to load it more than once.
__sparkling_jar_loaded = False
__extracted_jar_dir = None

@staticmethod
def load_sparkling_jar(spark_context):
Expand Down Expand Up @@ -56,24 +57,40 @@ def __add_sparkling_jar_to_spark(sc):
field_jars_value = field_jars.get(file_server)
field_jars_value.put('sparkling_water_assembly.jar', jvm.java.io.File(sw_jar_file))


@staticmethod
def __get_sw_jar():
def __extracted_jar_path():
import sparkling_water
sw_pkg_file = sparkling_water.__file__
# Extract jar file from zip
if '.zip' in sw_pkg_file:
from pkg_resources import get_cache_path

if Initializer.__extracted_jar_dir is None:
zip_file = sw_pkg_file[:-len('/sparkling_water/__init__.py')]
zip_filename = os.path.basename(zip_file)
jar_path = get_cache_path(zip_filename)
sw_jar = os.path.abspath("{}/sparkling_water/sparkling_water_assembly.jar".format(jar_path))
import tempfile
Initializer.__extracted_jar_dir = tempfile.mkdtemp()
import zipfile
with zipfile.ZipFile(zip_file) as fzip:
fzip.extract('sparkling_water/sparkling_water_assembly.jar', path = jar_path)
return sw_jar
fzip.extract('sparkling_water/sparkling_water_assembly.jar', path = Initializer.__extracted_jar_dir)

return os.path.abspath("{}/sparkling_water/sparkling_water_assembly.jar".format(Initializer.__extracted_jar_dir))

@staticmethod
def clean_temp_dir():
## Clean temporary directory containing extracted Sparkling Water Jar
if Initializer.__extracted_jar_dir is not None:
import shutil
shutil.rmtree(Initializer.__extracted_jar_dir)

@staticmethod
def __get_sw_jar():
import sparkling_water
sw_pkg_file = sparkling_water.__file__
# Extract jar file from zip
if '.zip' in sw_pkg_file:
return Initializer.__extracted_jar_path()
else:
from pkg_resources import resource_filename
return os.path.abspath(resource_filename("sparkling_water", 'sparkling_water_assembly.jar'))

@staticmethod
def __find_spark_cl(start_cl, cl_name):
cl = start_cl
Expand Down