Skip to content

Commit

Permalink
Sparkly Instant Testing (#49)
Browse files Browse the repository at this point in the history
* Add instant iterative development mode. `sparkly-testing --help` for more details.
* Use in-memory db for Hive Metastore in `SparklyTest` (faster tests).
* `spark.sql.shuffle.partitions = 4` for `SparklyTest` (faster tests).
* `spark.sql.warehouse.dir = <random tmp dir>` for `SparklyTest` (no side effects)
  • Loading branch information
drudim committed Aug 3, 2017
1 parent b64e06c commit f34fd1d
Show file tree
Hide file tree
Showing 10 changed files with 439 additions and 23 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 2.2.0
* Add instant iterative development mode. `sparkly-testing --help` for more details.
* Use in-memory db for Hive Metastore in `SparklyTest` (faster tests).
* `spark.sql.shuffle.partitions = 4` for `SparklyTest` (faster tests).
* `spark.sql.warehouse.dir = <random tmp dir>` for `SparklyTest` (no side effects)

## 2.1.1
* Fix: remove backtick quoting from catalog utils to ease work with different databases.

Expand Down
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ RUN curl -s http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz |
RUN cd /usr/local && ln -s spark-2.1.0-bin-hadoop2.7 spark

ENV SPARK_HOME "/usr/local/spark/"
ENV PYTHONPATH "/usr/local/spark/python/lib/pyspark.zip:/usr/local/spark/python/lib/py4j-0.10.4-src.zip"
ENV PYTHONPATH "/usr/local/spark/python/lib/pyspark.zip:/usr/local/spark/python/lib/py4j-0.10.4-src.zip:/opt/sparkly"
ENV SPARK_TESTING true

# Install Python testing utils
Expand All @@ -38,11 +38,12 @@ RUN python3 -m pip install tox==2.4.1
COPY spark.log4j.properties /usr/local/spark/conf/log4j.properties

# Make integration tests faster
RUN /usr/local/spark/bin/spark-shell --packages=\
RUN /usr/local/spark/bin/spark-shell --repositories=http://packages.confluent.io/maven/ --packages=\
datastax:spark-cassandra-connector:2.0.0-M2-s_2.11,\
org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1,\
org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,\
mysql:mysql-connector-java:5.1.39
mysql:mysql-connector-java:5.1.39,\
io.confluent:kafka-avro-serializer:3.0.1

# Python env
RUN apt-get update && apt-get install -y git
Expand Down
75 changes: 75 additions & 0 deletions bin/sparkly-testing
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python

#
# Copyright 2017 Tubular Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse
import logging
import sys
import textwrap

from sparkly.instant_testing import InstantTesting


logging.basicConfig(
stream=sys.stderr,
level=logging.INFO,
format='%(levelname)s %(message)s',
)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent(
"""\
Sparkly Instant Testing.
The tool speeds up iterative development on spark-based tests.
It keeps JVM with initialised SparkContext running between multiple test sessions.
Usage:
sparkly-testing up
py.test path/to/test_integration_with_pyspark.py # slow (first run)
py.test path/to/test_integration_with_pyspark.py # fast (next runs)
sparkly-testing down
To change SparkContext options or to add new jars/packages call:
sparkly-testing refresh
""",
)
)

sub_commands = parser.add_subparsers()

# Instant testing mode.
sub_commands.add_parser(
name='up',
help='Activate instant testing mode.',
).set_defaults(func=lambda _: InstantTesting.activate())

sub_commands.add_parser(
name='down',
help='Deactivate instant testing mode.',
).set_defaults(func=lambda _: InstantTesting.deactivate())

sub_commands.add_parser(
name='refresh',
help='Refresh SparkContext options or add new jars/packages.',
).set_defaults(func=lambda _: InstantTesting.deactivate() or InstantTesting.activate())

args = parser.parse_args()
args.func(args)
29 changes: 29 additions & 0 deletions docs/source/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,35 @@ There are two main test cases available in Sparkly:
...
Instant Iterative Development
-----------------------------

The slowest part in Spark integration testing is context initialisation.
``SparklyGlobalSessionTest`` allows you to keep the same instance of spark context between different test cases,
but it still kills the context at the end. It's especially annoying if you work in `TDD fashion <https://en.wikipedia.org/wiki/Test-driven_development>`_.
On each run you have to wait 25-30 seconds till a new context is ready.
We added a tool to preserve spark context between multiple test runs.

.. code-block::
# Activate instant testing mode.
sparkly-testing up
# The first run is slow (context is created).
py.test tests/my_integration_test_with_sparkly.py
# The second run and all after it are fast (context is reused).
py.test tests/my_integration_test_with_sparkly.py
# Deactivate instant testing mode (when you are done with testing).
sparkly-testing down
.. note::
In case if you change ``SparklySession`` definition (new options, jars or packages)
you have to refresh the context via ``sparkly-testing refresh``.
However, you don't need to refresh context if ``udfs`` are changed.


Fixtures
--------

Expand Down
2 changes: 1 addition & 1 deletion sparkly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
assert SparklySession


__version__ = '2.1.1'
__version__ = '2.2.0'
159 changes: 159 additions & 0 deletions sparkly/instant_testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#
# Copyright 2017 Tubular Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import json
import logging
import os
import signal
import tempfile

from py4j.java_gateway import java_import
from pyspark import SparkContext
from pyspark.java_gateway import launch_gateway


logger = logging.getLogger(__name__)


class InstantTesting(object):
"""The set of tools to run tests using Spark Context running in the background.
Implementation:
We create a lock file that will contain Python gateway port (exposed by JVM).
On the first run:
- initialise Spark Context as usual;
- write Python gateway port to the lock file;
- fork current process.
On the second run:
- connect to the background JVM process using Python gateway port from the lock file;
- recover Spark Context from JVM.
"""
LOCK_FILE_PATH = os.path.join(tempfile.gettempdir(), 'sparkly_instant_testing_lock')

@classmethod
def activate(cls):
"""Activate instant testing mode."""
if os.path.exists(cls.LOCK_FILE_PATH):
logger.error('Instant testing mode is already activate, deactivate it first.')
else:
with open(cls.LOCK_FILE_PATH, 'w'):
logger.info('Instant testing mode has been activated.')

@classmethod
def deactivate(cls):
"""Deactivate instance testing mode."""
if not os.path.exists(cls.LOCK_FILE_PATH):
logger.error('Instant testing mode is not activated, activate it first.')
else:
try:
with open(cls.LOCK_FILE_PATH) as lock:
state = lock.read()
if state:
session_pid = json.loads(state)['session_pid']
try:
os.kill(session_pid, signal.SIGTERM)
except OSError:
logger.exception(
'Can not kill background SparkContext (pid %d)', session_pid,
)
else:
logger.info(
'Killed background SparkContext (pid %d)', session_pid,
)
finally:
try:
os.remove(cls.LOCK_FILE_PATH)
except OSError:
logger.exception('Can not remove lock file: %s', cls.LOCK_FILE_PATH)

logger.info('Instant testing mode has been deactivated.')

@classmethod
def is_activated(cls):
"""Check if instant testing has been activated before.
Returns:
bool
"""
return os.path.exists(cls.LOCK_FILE_PATH)

@classmethod
def set_context(cls, spark_context):
"""Set the given spark context for instant testing.
Args:
spark_context (pyspark.SparkContext)
"""
assert cls.is_activated()

gateway_port = spark_context._gateway.java_gateway_server.getListeningPort()

# pid of the python process that holds JVM with running Spark Context.
session_pid = os.getpid()

with open(cls.LOCK_FILE_PATH, 'w') as lock:
json.dump({'gateway_port': gateway_port, 'session_pid': session_pid}, lock)
logger.info(
'Successfully set spark context for the instant testing [pid=%s, gateway=%s]',
session_pid, gateway_port
)

@classmethod
def get_context(cls):
"""Get the current global spark context.
Returns:
pyspark.SparkContext or None (if wasn't set before).
"""
assert cls.is_activated()

state = None

with open(cls.LOCK_FILE_PATH) as lock:
serialised_state = lock.read()
if serialised_state:
try:
state = json.loads(serialised_state)
except ValueError:
logger.error(
'Unable to deserialize lock file. Try to reactivate instant testing. '
'The broken content is: %s',
serialised_state,
)

if state:
logger.info(
'Recovering context for the instant testing [pid=%s, gateway=%s]',
state['session_pid'], state['gateway_port'],
)

os.environ['PYSPARK_GATEWAY_PORT'] = str(state['gateway_port'])
gateway = launch_gateway()
java_import(gateway.jvm, 'org.apache.spark.SparkContext')
jvm_spark_context = gateway.jvm.SparkContext.getOrCreate()
jvm_java_spark_context = gateway.jvm.JavaSparkContext(jvm_spark_context)

SparkContext._gateway = gateway
SparkContext._jvm = gateway.jvm

return SparkContext(
appName=jvm_spark_context.appName(),
master=jvm_spark_context.master(),
gateway=gateway,
jsc=jvm_java_spark_context,
)
26 changes: 21 additions & 5 deletions sparkly/session.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
#

import os
import signal
import sys

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

from sparkly.catalog import SparklyCatalog
from sparkly.instant_testing import InstantTesting
from sparkly.reader import SparklyReader
from sparkly.writer import attach_writer_to_dataframe

Expand Down Expand Up @@ -78,11 +80,25 @@ def __init__(self, additional_options=None):
]
os.environ['PYSPARK_SUBMIT_ARGS'] = ' '.join(filter(None, submit_args))

# Init SparkContext
spark_conf = SparkConf()
spark_conf.set('spark.sql.catalogImplementation', 'hive')
spark_conf.setAll(self._setup_options(additional_options))
spark_context = SparkContext(conf=spark_conf)
def _create_spark_context():
spark_conf = SparkConf()
spark_conf.set('spark.sql.catalogImplementation', 'hive')
spark_conf.setAll(self._setup_options(additional_options))
return SparkContext(conf=spark_conf)

# If we are in instant testing mode
if InstantTesting.is_activated():
spark_context = InstantTesting.get_context()

# It's the first run, so we have to create context and demonise the process.
if spark_context is None:
spark_context = _create_spark_context()
if os.fork() == 0: # Detached process.
signal.pause()
else:
InstantTesting.set_context(spark_context)
else:
spark_context = _create_spark_context()

# Init HiveContext
super(SparklySession, self).__init__(spark_context)
Expand Down

0 comments on commit f34fd1d

Please sign in to comment.