Skip to content

Commit

Permalink
Merge pull request #45 from Tubular/features/contribute_internal_feat…
Browse files Browse the repository at this point in the history
…ures

IMP migrate useful internal features
  • Loading branch information
mantzouratos committed Aug 18, 2017
2 parents caa398c + f8b1121 commit 1dc6d84
Show file tree
Hide file tree
Showing 25 changed files with 3,344 additions and 45 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 2.3.0
* Overwrite existing tables in the metastore
* Add functions module and provide switch_case column generation and multijoin
* Add implicit test target import and extended assertEqual variation
* Support writing to redis:// and rediss:// URLs
* Add LRU cache that persists DataFrames under the hood
* Add ability to check whether a complex type defines specific fields

# 2.2.1
* `spark.sql.shuffle.partitions` in `SparklyTest` should be set to string,
because `int` value breaks integration testing in Spark 2.0.2.
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ ENV SPARK_HOME "/usr/local/spark/"
ENV PYTHONPATH "/usr/local/spark/python/lib/pyspark.zip:/usr/local/spark/python/lib/py4j-0.10.4-src.zip:/opt/sparkly"
ENV SPARK_TESTING true

# Install Python testing utils
RUN apt-get update && apt-get install -y python python3-pip
# Install Python development & testing utils
RUN apt-get update && apt-get install -y python python-dev python3-pip
RUN python3 -m pip install tox==2.4.1

# Remove noisy spark logging
Expand Down
11 changes: 11 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ services:
condition: service_healthy
mysql.docker:
condition: service_healthy
redis.docker:
condition: service_healthy
volumes:
- .:/opt/sparkly/

Expand All @@ -41,6 +43,8 @@ services:
condition: service_healthy
mysql.docker:
condition: service_healthy
redis.docker:
condition: service_healthy

cassandra.docker:
image: cassandra:2.1.13
Expand Down Expand Up @@ -82,6 +86,13 @@ services:
healthcheck:
test: ps ax | grep kafka

redis.docker:
image: redis:3.2.4
expose:
- 6379
healthcheck:
test: ps ax | grep redis

zookeeper.docker:
image: confluent/zookeeper
expose:
Expand Down
3 changes: 2 additions & 1 deletion docs/source/catalog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ or set it dynamically via ``SparklySession`` options:
Tables management
-----------------

**Why:** sometimes you need more than just to create a table.
**Why:** you need to check if tables exist, rename them, drop them, or even overwrite existing aliases in your catalog.

.. code-block:: python
Expand All @@ -42,6 +42,7 @@ Tables management
assert spark.catalog_ext.has_table('my_table') in {True, False}
spark.catalog_ext.rename_table('my_table', 'my_new_table')
spark.catalog_ext.create_table('my_new_table', path='s3://my/parquet/data', source='parquet', mode='overwrite')
spark.catalog_ext.drop_table('my_new_table')
Table properties management
Expand Down
14 changes: 14 additions & 0 deletions docs/source/functions.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Column and DataFrame Functions
==============================

A counterpart of pyspark.sql.functions providing useful shortcuts:

- a cleaner alternative to chaining together multiple when/otherwise statements.
- an easy way to join multiple dataframes at once and disambiguate fields with the same name.


API documentation
-----------------

.. automodule:: sparkly.functions
:members:
8 changes: 5 additions & 3 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ A brief tour on Sparkly features:
spark.catalog_ext.get_table_properties('my_custom_table')
# Easy integration testing with Fixtures and base test classes.
from pyspark.sql import types as T
from sparkly.testing import SparklyTest
Expand All @@ -52,9 +53,9 @@ A brief tour on Sparkly features:
def test_job_works_with_mysql(self):
df = self.spark.read_ext.by_url('mysql://<my-testing-host>/<test-db>/<test-table>?user=<test-usre>&password=<test-password>')
res_df = my_shiny_script(df)
self.assertDataFrameEqual(
res_df,
{'fieldA': 'DataA', 'fieldB': 'DataB', 'fieldC': 'DataC'},
self.assertRowsEqual(
res_df.collect(),
[T.Row(fieldA='DataA', fieldB='DataB', fieldC='DataC')],
)
.. toctree::
Expand All @@ -64,6 +65,7 @@ A brief tour on Sparkly features:
reader_and_writer
catalog
testing
functions
utils
license

Expand Down
33 changes: 33 additions & 0 deletions docs/source/reader_and_writer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,39 @@ Basically, it's just a high level api on top of the native
'rewriteBatchedStatements': 'true', # improves write throughput dramatically
})
.. _redis:

Redis
-----

Sparkly provides a writer for Redis that is built on top of the official redis python library
`redis-py <https://github.com/andymccurdy/redis-py>`_ .
It is currently capable of exporting your DataFrame as a JSON blob per row or group of rows.

.. note::
- To interact with Redis, ``sparkly`` needs the ``redis`` library. You can get it via:
``pip install sparkly[redis]``

.. code-block:: python
import json
from sparkly import SparklySession
spark = SparklySession()
# Write JSON.gz data indexed by col1.col2 that will expire in a day
df.write_ext.redis(
host='localhost',
port=6379,
key_by=['col1', 'col2'],
exclude_key_columns=True,
expire=24 * 60 * 60,
compression='gzip',
)
.. _universal-reader-and-writer:

Universal reader/writer
Expand Down
90 changes: 74 additions & 16 deletions docs/source/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ There are two main test cases available in Sparkly:

.. code-block:: python
from pyspark.sql import types as T
from sparkly import SparklySession
from sparkly.test import SparklyTest
Expand All @@ -21,22 +23,12 @@ There are two main test cases available in Sparkly:
df = self.spark.read_ext.by_url(...)
# Compare all fields
self.assertDataFrameEqual(
actual_df=df,
expected_data=[
{'col1': 'row1', 'col2': 1},
{'col1': 'row2', 'col2': 2},
],
)
# Compare a subset of fields
self.assertDataFrameEqual(
actual_df=df,
expected_data=[
{'col1': 'row1'},
{'col1': 'row2'},
self.assertRowsEqual(
df.collect(),
[
T.Row(col1='row1', col2=1),
T.Row(col1='row2', col2=2),
],
fields=['col1'],
)
...
Expand All @@ -49,6 +41,72 @@ There are two main test cases available in Sparkly:
...
DataFrame Assertions
--------------------

Asserting that the dataframe produced by your transformation is equal to some expected
output can be unnecessarily complicated at times. Common issues include:

- Ignoring the order in which elements appear in an array.
This could be particularly useful when that array is generated as part of a
``groupBy`` aggregation, and you only care about all elements being part of the end
result, rather than the order in which Spark encountered them.
- Comparing floats that could be arbitrarily nested in complicated datatypes
within a given tolerance; exact matching is either fragile or impossible.
- Ignoring whether a field of a complex datatype is nullable.
Spark infers this based on the applied transformations, but it is oftentimes
inaccurate. As a result, assertions on complex data types might fail, even
though in theory they shouldn't have.
- Having rows with different field names compare equal if the values match in
alphabetical order of the names (see unit tests for example).
- Unhelpful diffs in case of mismatches.

Sparkly addresses these issues by providing ``assertRowsEqual``:

.. code-block:: python
from pyspark.sql import types as T
from sparkly import SparklySession
from sparkly.test import SparklyTest
def my_transformation(spark):
return spark.createDataFrame(
data=[
('row1', {'field': 'value_1'}, [1.1, 2.2, 3.3]),
('row2', {'field': 'value_2'}, [4.1, 5.2, 6.3]),
],
schema=T.StructType([
T.StructField('id', T.StringType()),
T.StructField(
'st',
T.StructType([
T.StructField('field', T.StringType()),
]),
),
T.StructField('ar', T.ArrayType(T.FloatType())),
]),
)
class MyTestCase(SparklyTest):
session = SparklySession
def test(self):
df = my_transformation(self.spark)
self.assertRowsEqual(
df.collect(),
[
T.Row(id='row2', st=T.Row(field='value_2'), ar=[6.0, 5.0, 4.0]),
T.Row(id='row1', st=T.Row(field='value_1'), ar=[2.0, 3.0, 1.0]),
],
atol=0.5,
)
Instant Iterative Development
-----------------------------

Expand Down Expand Up @@ -88,7 +146,7 @@ There are several storages supported in Sparkly:
- Elastic
- Cassandra (requires ``cassandra-driver``)
- Mysql (requires ``PyMySql``)
_ Kafka (requires ``kafka-python``)
- Kafka (requires ``kafka-python``)

.. code-block:: python
Expand Down
18 changes: 18 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Copyright 2017 Tubular Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pylru==1.0.9
six==1.10.0
2 changes: 2 additions & 0 deletions requirements_extras.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@
cassandra-driver==3.7.1
PyMySQL==0.7.9
kafka-python==1.2.2
redis==2.10.5
ujson==1.35
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
install_requires=requirements,
extras_require={
'kafka': ['kafka-python>=1.2.2,<1.3'],
'test': ['cassandra-driver>=3.7,<3.8', 'PyMySQL>=0.7,<0.8', 'kafka-python>=1.2.2,<1.3'],
'redis': ['redis>=2.10,<3', 'ujson>=1.33,<2'],
'test': ['cassandra-driver>=3.7,<3.8', 'PyMySQL>=0.7,<0.8', 'kafka-python>=1.2.2,<1.3', 'redis>=2.10,<3', 'ujson>=1.33,<2'],
},
)
2 changes: 1 addition & 1 deletion sparkly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
assert SparklySession


__version__ = '2.2.1'
__version__ = '2.3.0'
58 changes: 58 additions & 0 deletions sparkly/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import uuid


class SparklyCatalog(object):
Expand All @@ -26,6 +27,63 @@ def __init__(self, spark):
"""
self._spark = spark

def create_table(self, table_name, path=None, source=None, schema=None, **options):
"""Create table in the metastore.
Extend ``SparkSession.Catalog.createExternalTable`` by accepting
a ``mode='overwrite'`` option which creates the table even if a
table with the same name already exists. All other args are
exactly the same.
Note:
If the table exists, create two unique names, one for the
new and one for the old instance, then try to swap names
and drop the "old" instance. If any step fails, the metastore
might be currently left at a broken state.
Args:
mode (str): if set to ``'overwrite'``, drop any table of the
same name from the metastore. Given as a kwarg. Default
is error out if table already exists.
Returns:
pyspark.sql.DataFrame: DataFrame associated with the created
table.
"""
overwrite_existing_table = (
options.pop('mode', '').lower() == 'overwrite' and
self.has_table(table_name)
)

def _append_unique_suffix(*args):
return '__'.join(args + (uuid.uuid4().hex, ))

if overwrite_existing_table:
new_table_name = _append_unique_suffix(table_name, 'new')
else:
new_table_name = table_name

if hasattr(self._spark.catalog, 'createTable'):
createTable = self._spark.catalog.createTable
else: # before Spark 2.2
createTable = self._spark.catalog.createExternalTable

df = createTable(
new_table_name,
path=path,
source=source,
schema=schema,
**options
)

if overwrite_existing_table:
old_table_name = _append_unique_suffix(table_name, 'old')
self.rename_table(table_name, old_table_name)
self.rename_table(new_table_name, table_name)
self.drop_table(old_table_name)

return df

def drop_table(self, table_name, checkfirst=True):
"""Drop table from the metastore.
Expand Down

0 comments on commit 1dc6d84

Please sign in to comment.