Skip to content

Commit

Permalink
FEAT: PySpark backend
Browse files Browse the repository at this point in the history
This is a Pyspark backend for ibis. This is different from the spark
backend where the ibis expr is compiled to SQL string. Instead, the
pyspark backend compiles the ibis expr to pyspark.DataFrame exprs.
Author: Li Jin <ice.xelloss@gmail.com>
Author: Hyonjee Joo <5000208+hjoo@users.noreply.github.com>

Closes ibis-project#1913 from icexelloss/pyspark-backend-prototype and squashes the following commits:

213e371 [Li Jin] Add pyspark/__init__.py
8f1c35e [Li Jin] Address comments
f173425 [Li Jin] Fix tests
0969b0a [Li Jin] Skip unimplemented tests
1f9409b [Li Jin] Change pyspark imports to optional
26b041c [Li Jin] Add importskip
108ccd8 [Li Jin] Add scope
e00dc00 [Li Jin] Address PR comments
4764a4e [Li Jin] Add pyspark marker to setup.cfg
7cc2a9e [Li Jin] Remove dead code
72b45f8 [Li Jin] Fix rebase errors
9ad663f [Hyonjee Joo] implement pyspark numeric operations to pass all/test_numeric.py (#9)
675a89f [Li Jin] Implement compiler rules to pass all/test_aggregation.py
215c0d9 [Li Jin] Link existing tests with PySpark backend (#7)
88705fe [Li Jin] Implement basic join
c4a2b79 [Hyonjee Joo] add pyspark compile rule for greatest, fix bug with selection (#4)
fa4ad23 [Li Jin] Implement basic aggregation, group_by and window (#3)
54c2f2d [Li Jin] Initial commit of pyspark DataFrame backend (#1)
  • Loading branch information
icexelloss authored and cpcloud committed Aug 22, 2019
1 parent b748593 commit 99a2f2e
Show file tree
Hide file tree
Showing 14 changed files with 822 additions and 11 deletions.
3 changes: 3 additions & 0 deletions ibis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
# pip install ibis-framework[spark]
import ibis.spark.api as spark # noqa: F401

with suppress(ImportError):
import ibis.pyspark.api as pyspark # noqa: F401


def hdfs_connect(
host='localhost',
Expand Down
Empty file added ibis/pyspark/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions ibis/pyspark/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from ibis.pyspark.client import PySparkClient


def connect(session):
"""
Create a `SparkClient` for use with Ibis. Pipes **kwargs into SparkClient,
which pipes them into SparkContext. See documentation for SparkContext:
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/context.html#SparkContext
"""
client = PySparkClient(session)

# Spark internally stores timestamps as UTC values, and timestamp data that
# is brought in without a specified time zone is converted as local time to
# UTC with microsecond resolution.
# https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#timestamp-with-time-zone-semantics
client._session.conf.set('spark.sql.session.timeZone', 'UTC')

return client
46 changes: 46 additions & 0 deletions ibis/pyspark/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from pyspark.sql.column import Column

import ibis.common.exceptions as com
import ibis.expr.types as types
from ibis.pyspark.compiler import PySparkExprTranslator
from ibis.pyspark.operations import PySparkTable
from ibis.spark.client import SparkClient


class PySparkClient(SparkClient):
"""
An ibis client that uses PySpark SQL Dataframe
"""

table_class = PySparkTable

def __init__(self, session):
super().__init__(session)
self.translator = PySparkExprTranslator()

def compile(self, expr, *args, **kwargs):
"""Compile an ibis expression to a PySpark DataFrame object
"""
return self.translator.translate(expr, scope={})

def execute(self, expr, params=None, limit='default', **kwargs):
if isinstance(expr, types.TableExpr):
return self.compile(expr).toPandas()
elif isinstance(expr, types.ColumnExpr):
# expression must be named for the projection
expr = expr.name('tmp')
return self.compile(expr.to_projection()).toPandas()['tmp']
elif isinstance(expr, types.ScalarExpr):
compiled = self.compile(expr)
if isinstance(compiled, Column):
# attach result column to a fake DataFrame and
# select the result
compiled = self._session.range(0, 1).select(compiled)
return compiled.toPandas().iloc[0, 0]
else:
raise com.IbisError(
"Cannot execute expression of type: {}".format(type(expr)))

def sql(self, query):
raise NotImplementedError(
"PySpark backend doesn't support sql query")
Loading

0 comments on commit 99a2f2e

Please sign in to comment.