Skip to content

Commit

Permalink
Merge pull request #44 from Tubular/feature/custom-repositories
Browse files Browse the repository at this point in the history
IMP allow custom maven repositories
  • Loading branch information
skliarpawlo committed Jul 5, 2017
2 parents c892c7e + ab7b107 commit 14c38c0
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 2.1.0
* Add ability to specify custom maven repositories.

## 2.0.4
* Make it possible to override default value of spark.sql.catalogImplementation

Expand Down
25 changes: 25 additions & 0 deletions docs/source/session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,30 @@ declared as part of the job.
df = spark.read_ext.by_url('cassandra://<cassandra-host>/<db>/<table>?consistency=QUORUM')
Custom Maven repositories
-------------------------

**Why**: If you have a private maven repository, this is how to point spark to it when it performs a package lookup.
Order in which dependencies will be resolved is next:
- Local cache
- Custom maven repositories (if specified)
- Maven Central

**For example**: Let's assume your maven repository is available on: http://my.repo.net/maven,
and there is some spark package published there, with identifier: `my.corp:spark-handy-util:0.0.1`
You can install it to a spark session like this:

..code-block:: python

from sparkly import SparklySession

class MySession(SparklySession):
repositories = ['http://my.repo.net/maven']
packages = ['my.corp:spark-handy-util:0.0.1']

spark = MySession()


Tuning options
--------------

Expand Down Expand Up @@ -131,6 +155,7 @@ We think it's too many actions for such simple functionality.
spark.sql('SELECT collect_max(amount) FROM my_data GROUP BY ...')
spark.sql('SELECT my_udf(amount) FROM my_data')
API documentation
-----------------

Expand Down
2 changes: 1 addition & 1 deletion sparkly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
assert SparklySession


__version__ = '2.0.4'
__version__ = '2.1.0'
20 changes: 16 additions & 4 deletions sparkly/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SparklySession(SparkSession):
class MySession(sparkly.SparklySession):
options = {'spark.sql.shuffle.partitions': '2000'}
repositories = ['http://packages.confluent.io/maven/']
packages = ['com.databricks:spark-csv_2.10:1.4.0']
jars = ['../path/to/brickhouse-0.7.1.jar']
udfs = {
Expand All @@ -51,6 +52,7 @@ class MySession(sparkly.SparklySession):
options (dict[str,str]): Configuration options that are passed to SparkConf.
See `the list of possible options
<https://spark.apache.org/docs/2.1.0/configuration.html#available-properties>`_.
repositories (list[str]): List of additional maven repositories for package lookup.
packages (list[str]): Spark packages that should be installed.
See https://spark-packages.org/
jars (list[str]): Full paths to jar files that we want to include to the session.
Expand All @@ -64,13 +66,17 @@ class MySession(sparkly.SparklySession):
packages = []
jars = []
udfs = {}
repositories = []

def __init__(self, additional_options=None):
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_SUBMIT_ARGS'] = '{packages} {jars} pyspark-shell'.format(
packages=self._setup_packages(),
jars=self._setup_jars(),
)
submit_args = [
self._setup_repositories(),
self._setup_packages(),
self._setup_jars(),
'pyspark-shell',
]
os.environ['PYSPARK_SUBMIT_ARGS'] = ' '.join(filter(None, submit_args))

# Init SparkContext
spark_conf = SparkConf()
Expand Down Expand Up @@ -117,6 +123,12 @@ def has_jar(self, jar_name):
"""
return any(jar for jar in self.jars if jar_name in jar)

def _setup_repositories(self):
if self.repositories:
return '--repositories {}'.format(','.join(self.repositories))
else:
return ''

def _setup_packages(self):
if self.packages:
return '--packages {}'.format(','.join(self.packages))
Expand Down
5 changes: 5 additions & 0 deletions tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class SparklyTestSession(SparklySession):
'org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1',
'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0',
'mysql:mysql-connector-java:5.1.39',
'io.confluent:kafka-avro-serializer:3.0.1',
]

repositories = [
'http://packages.confluent.io/maven/',
]

jars = [
Expand Down
26 changes: 23 additions & 3 deletions tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,27 @@ class _Session(SparklySession):

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS': '--packages package1,package2 pyspark-shell',
'PYSPARK_SUBMIT_ARGS': '--packages package1,package2 pyspark-shell',
})

@mock.patch('sparkly.session.os')
def test_session_with_repositories(self, os_mock):
os_mock.environ = {}

class _Session(SparklySession):
packages = ['package1', 'package2']
repositories = [
'http://my.maven.repo',
'http://another.maven.repo',
]

_Session()

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS':
'--repositories http://my.maven.repo,http://another.maven.repo '
'--packages package1,package2 pyspark-shell',
})

@mock.patch('sparkly.session.os')
Expand All @@ -81,7 +101,7 @@ class _Session(SparklySession):

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS': ' --jars file_a.jar,file_b.jar pyspark-shell',
'PYSPARK_SUBMIT_ARGS': '--jars file_a.jar,file_b.jar pyspark-shell',
})

def test_session_with_options(self):
Expand All @@ -107,7 +127,7 @@ def test_session_without_packages_jars_and_options(self, os_mock):

self.assertEqual(os_mock.environ, {
'PYSPARK_PYTHON': sys.executable,
'PYSPARK_SUBMIT_ARGS': ' pyspark-shell',
'PYSPARK_SUBMIT_ARGS': 'pyspark-shell',
})

def test_broken_udf(self):
Expand Down

0 comments on commit 14c38c0

Please sign in to comment.