Skip to content

Commit

Permalink
Merge pull request #58 from Tubular/imp/sparkly_options_env_var
Browse files Browse the repository at this point in the history
FIX respect PYSPARK_SUBMIT_ARGS
  • Loading branch information
mantzouratos committed Jun 14, 2018
2 parents 166ff5c + 848be64 commit 0256351
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 29 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## 2.4.0
* Respect PYSPARK_SUBMIT_ARGS if it is already set by appending SparklySession related options at the end instead of overwriting.
* Fix additional_options to always override SparklySession.options when a session is initialized
* Fix ujson dependency on environments where redis-py is already installed
* Access or initialize SparklySession through get_or_create classmethod
* Ammend `sparkly.functions.switch_case` to accept a user defined function for
Expand Down
22 changes: 22 additions & 0 deletions docs/source/session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,28 @@ Tuning options
spark = MySession({'spark.sql.shuffle.partitions': 10})
Tuning options through shell environment
----------------------------------------

**Why**: You want to customize your spark session in a way that depends on the
hardware specifications of your worker (or driver) machine(s), so you'd rather
define them close to where the actual machine specs are requested / defined.
Or you just want to test some new configuration without having to change your
code. In both cases, you can do so by using the environmental variable
``PYSPARK_SUBMIT_ARGS``. Note that any options defined this way will override
any conflicting options from your Python code.

**For example**:

- ``spark.executor.cores`` to tune the cores used by each executor;
- ``spark.executor.memory`` to tune the memory available to each executor.

.. code-block:: sh
PYSPARK_SUBMIT_ARGS='--conf "spark.executor.cores=32" --conf "spark.executor.memory=160g"' \
./my_spark_app.py
Using UDFs
----------

Expand Down
40 changes: 26 additions & 14 deletions sparkly/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import signal
import sys

from pyspark import SparkConf, SparkContext
from pyspark import SparkContext
from pyspark.sql import SparkSession

from sparkly.catalog import SparklyCatalog
Expand All @@ -28,7 +28,7 @@


class SparklySession(SparkSession):
"""Wrapper around HiveContext to simplify definition of options, packages, JARs and UDFs.
"""Wrapper around SparkSession to simplify definition of options, packages, JARs and UDFs.
Example::
Expand All @@ -55,9 +55,11 @@ class MySession(sparkly.SparklySession):
spark.read_ext.cassandra(...)
Attributes:
options (dict[str,str]): Configuration options that are passed to SparkConf.
options (dict[str,str]): Configuration options that are passed to spark-submit.
See `the list of possible options
<https://spark.apache.org/docs/2.1.0/configuration.html#available-properties>`_.
Note that any options set already through PYSPARK_SUBMIT_ARGS will override
these.
repositories (list[str]): List of additional maven repositories for package lookup.
packages (list[str]): Spark packages that should be installed.
See https://spark-packages.org/
Expand All @@ -78,33 +80,32 @@ class MySession(sparkly.SparklySession):

def __init__(self, additional_options=None):
os.environ['PYSPARK_PYTHON'] = sys.executable

submit_args = [
# options that were already defined through PYSPARK_SUBMIT_ARGS
# take precedence over SparklySession's
os.environ.get('PYSPARK_SUBMIT_ARGS', '').replace('pyspark-shell', ''),
self._setup_repositories(),
self._setup_packages(),
self._setup_jars(),
self._setup_options(additional_options),
'pyspark-shell',
]
os.environ['PYSPARK_SUBMIT_ARGS'] = ' '.join(filter(None, submit_args))

def _create_spark_context():
spark_conf = SparkConf()
spark_conf.set('spark.sql.catalogImplementation', 'hive')
spark_conf.setAll(self._setup_options(additional_options))
return SparkContext(conf=spark_conf)

# If we are in instant testing mode
if InstantTesting.is_activated():
spark_context = InstantTesting.get_context()

# It's the first run, so we have to create context and demonise the process.
if spark_context is None:
spark_context = _create_spark_context()
spark_context = SparkContext()
if os.fork() == 0: # Detached process.
signal.pause()
else:
InstantTesting.set_context(spark_context)
else:
spark_context = _create_spark_context()
spark_context = SparkContext()

# Init HiveContext
super(SparklySession, self).__init__(spark_context)
Expand Down Expand Up @@ -187,11 +188,22 @@ def _setup_jars(self):
return ''

def _setup_options(self, additional_options):
options = list(self.options.items())
options = {}

options.update(self.options)

if additional_options:
options += list(additional_options.items())
options.update(additional_options)

if 'spark.sql.catalogImplementation' not in options:
options['spark.sql.catalogImplementation'] = 'hive'

return sorted(options)
# Here we massage conf properties with the intent to pass them to
# spark-submit; this is convenient as it is unified with the approach
# we take for repos, packages and jars, and it also handles precedence
# of conf properties already defined by the user in a very
# straightforward way (since we always append to PYSPARK_SUBMIT_ARGS)
return ' '.join('--conf "{}={}"'.format(*o) for o in sorted(options.items()))

def _setup_udfs(self):
for name, defn in self.udfs.items():
Expand Down
125 changes: 110 additions & 15 deletions tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@
except ImportError:
import mock

from pyspark import SparkConf, SparkContext
from pyspark import SparkContext

from sparkly import SparklySession


class TestSparklySession(unittest.TestCase):

maxDiff = None

def setUp(self):
super(TestSparklySession, self).setUp()
self.spark_conf_mock = mock.Mock(spec=SparkConf)
self.spark_context_mock = mock.Mock(spec=SparkContext)

self.patches = [
mock.patch('sparkly.session.SparkConf', self.spark_conf_mock),
mock.patch('sparkly.session.SparkContext', self.spark_context_mock),
]
[p.start() for p in self.patches]
Expand Down Expand Up @@ -67,7 +68,11 @@ class _Session(SparklySession):

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS': '--packages package1,package2 pyspark-shell',
'PYSPARK_SUBMIT_ARGS': (
'--packages package1,package2 '
'--conf "spark.sql.catalogImplementation=hive" '
'pyspark-shell'
),
})

@mock.patch('sparkly.session.os')
Expand All @@ -85,9 +90,12 @@ class _Session(SparklySession):

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS':
'PYSPARK_SUBMIT_ARGS': (
'--repositories http://my.maven.repo,http://another.maven.repo '
'--packages package1,package2 pyspark-shell',
'--packages package1,package2 '
'--conf "spark.sql.catalogImplementation=hive" '
'pyspark-shell'
),
})

@mock.patch('sparkly.session.os')
Expand All @@ -101,23 +109,71 @@ class _Session(SparklySession):

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS': '--jars file_a.jar,file_b.jar pyspark-shell',
'PYSPARK_SUBMIT_ARGS': (
'--jars file_a.jar,file_b.jar '
'--conf "spark.sql.catalogImplementation=hive" '
'pyspark-shell'
),
})

def test_session_with_options(self):
@mock.patch('sparkly.session.os')
def test_session_with_options(self, os_mock):
os_mock.environ = {}

# test options attached to class definition
class _Session(SparklySession):
options = {
'spark.option.a': 'value_a',
'spark.option.b': 'value_b',
}

_Session(additional_options={'spark.option.c': 'value_c'})
_Session()

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS': (
'--conf "spark.option.a=value_a" '
'--conf "spark.option.b=value_b" '
'--conf "spark.sql.catalogImplementation=hive" '
'pyspark-shell'
),
})

# test additional_options override/extend options attached to class definition
os_mock.environ = {}

_Session(additional_options={
'spark.option.b': 'value_0',
'spark.option.c': 'value_c',
})

self.spark_conf_mock.return_value.setAll.assert_called_once_with([
('spark.option.a', 'value_a'),
('spark.option.b', 'value_b'),
('spark.option.c', 'value_c'),
])
self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS': (
'--conf "spark.option.a=value_a" '
'--conf "spark.option.b=value_0" '
'--conf "spark.option.c=value_c" '
'--conf "spark.sql.catalogImplementation=hive" '
'pyspark-shell'
),
})

# test catalog implementation is respected
os_mock.environ = {}

_Session.options = {
'spark.sql.catalogImplementation': 'my_fancy_catalog',
}

_Session()

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS': (
'--conf "spark.sql.catalogImplementation=my_fancy_catalog" '
'pyspark-shell'
),
})

@mock.patch('sparkly.session.os')
def test_session_without_packages_jars_and_options(self, os_mock):
Expand All @@ -127,7 +183,46 @@ def test_session_without_packages_jars_and_options(self, os_mock):

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS': 'pyspark-shell',
'PYSPARK_SUBMIT_ARGS': '--conf "spark.sql.catalogImplementation=hive" pyspark-shell',
})

@mock.patch('sparkly.session.os')
def test_session_appends_to_pyspark_submit_args(self, os_mock):
os_mock.environ = {
'PYSPARK_SUBMIT_ARGS': '--conf "my.conf.here=5g" --and-other-properties',
}

SparklySession()

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS': (
'--conf "my.conf.here=5g" --and-other-properties '
'--conf "spark.sql.catalogImplementation=hive" '
'pyspark-shell'
),
})

# test more complicated session
os_mock.environ = {
'PYSPARK_SUBMIT_ARGS': '--conf "my.conf.here=5g" --and-other-properties',
}

class _Session(SparklySession):
options = {'my.conf.here': '10g'}

_Session()

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS': (
'--conf "my.conf.here=5g" --and-other-properties '
# Note that spark honors the first conf it sees when multiple
# are defined
'--conf "my.conf.here=10g" '
'--conf "spark.sql.catalogImplementation=hive" '
'pyspark-shell'
),
})

def test_broken_udf(self):
Expand Down

0 comments on commit 0256351

Please sign in to comment.