Skip to content

Commit

Permalink
Merge pull request #9 from xmnlab/master
Browse files Browse the repository at this point in the history
Improving ibis.mapd client and compiler
  • Loading branch information
xmnlab committed Apr 11, 2018
2 parents 7289937 + 7b62e02 commit 14bcedf
Show file tree
Hide file tree
Showing 6 changed files with 533 additions and 249 deletions.
30 changes: 29 additions & 1 deletion ibis/mapd/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ The main classes are:
- `MapDClient`
- `MapDQuery`
- `MapDDataType`
- `MapDCursor`

`MapDDataType` class is used to translate data type from `ibis` and to `ibis`.
Its main methods are:
Expand Down Expand Up @@ -106,14 +107,41 @@ expression. Its main methods are:
- get_schema
- version

`_build_ast` method is required.

`MapDQuery` class should be used redefine at least `_fetch` method. If `Query`
class is used instead, when `MapDClient.execute` method is called, a exception
is raised.

(...) once the data arrives from the database we need to convert that data
to a pandas DataFrame.

The Query class, with its _fetch() method, provides a way for ibis
SQLClient objects to do any additional processing necessary after
the database returns results to the client.
(http://docs.ibis-project.org/design.html#execution)

`MapDCursor` class was created just to allow `ibis.client.Query.execute`
useful automatically, because it uses `with` statement:

.. code-block:: Python
with self.client._execute(self.compiled_ddl, results=True) as cur:
...
Otherwise, `MapDQuery` should rewrites `execute` method with no `with`
statement.

compiler
--------

@TODO
The main classes inside `compiler` module are:

- MapDDialect
- MapDExprTranslator
- MapDQueryBuilder
- MapDSelect
- MapDSelectBuilder
- MapDTableSetFormatter

operations
----------
Expand Down
2 changes: 1 addition & 1 deletion ibis/mapd/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from ibis.config import options
from ibis.mapd.compiler import dialect, compiles, rewrites
from ibis.mapd.client import MapDClient
from ibis.mapd.compiler import dialect

import ibis.common as com

Expand Down
95 changes: 58 additions & 37 deletions ibis/mapd/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from ibis.compat import parse_version
from ibis.client import Database, Query, SQLClient
from ibis.mapd import compiler as comp
from ibis.mapd.compiler import MapDDialect, build_ast
from ibis.util import log
from pymapd.cursor import Cursor

try:
from pygdf.dataframe import DataFrame as GPUDataFrame
except ImportError:
GPUDataFrame = None

import regex as re
import pandas as pd
import pymapd
Expand All @@ -15,28 +20,30 @@

fully_qualified_re = re.compile(r"(.*)\.(?:`(.*)`|(.*))")

# using impala.client._HS2_TTypeId_to_dtype as reference
# https://www.mapd.com/docs/latest/mapd-core-guide/fixed-encoding/
_mapd_dtypes = {
'BIGINT': dt.int64,
'BOOLEAN': dt.Boolean,
'CHAR': dt.string,
'DATE': dt.date,
'DECIMAL': dt.float,
'DOUBLE': dt.float,
'DECIMAL': dt.float64,
'DOUBLE': dt.float64,
'INT': dt.int32,
'FLOAT': dt.float,
'INTEGER': dt.int32,
'FLOAT': dt.float32,
'NULL': dt.Null,
'NUMERIC': dt.float,
'REAL': dt.float,
'SMALLINT': dt.int8,
'NUMERIC': dt.float64,
'REAL': dt.float32,
'SMALLINT': dt.int16,
'STR': dt.string,
'TEXT': dt.string,
'TIME': dt.time,
'TIMESTAMP': dt.timestamp,
'VAR': dt.string,
'VARCHAR': dt.string,
}

_ibis_dtypes = {v: k for k, v in _mapd_dtypes.items()}
_ibis_dtypes[dt.String] = 'String'


class MapDDataType(object):
Expand Down Expand Up @@ -76,25 +83,41 @@ def from_ibis(cls, dtype, nullable=None):
return cls(typename, nullable=nullable)


class MapDQuery(Query):
class MapDCursor(object):
"""Cursor to allow the MapD client to reuse machinery in ibis/client.py
"""

def __init__(self, cursor):
self.cursor = cursor

def to_df(self):
if isinstance(self.cursor, Cursor):
col_names = [c.name for c in self.cursor.description]
result = pd.DataFrame(self.cursor.fetchall(), columns=col_names)
elif self.cursor is None:
result = pd.DataFrame([])
elif isinstance(self.cursor, pd.DataFrame):
result = self.cursor
elif GPUDataFrame is not None and isinstance(self.cursor, GPUDataFrame):
result = self.cursor

return result

def __enter__(self):
# For compatibility when constructed from Query.execute()
return self

def __exit__(self, exc_type, exc_value, traceback):
pass


class MapDQuery(Query):
"""
def execute(self):
cursor = self.client._execute(
self.compiled_ddl
)
result = self._fetch(cursor)
return self._wrap_result(result)
"""
def _fetch(self, cursor):
# check if cursor is a pymapd cursor.Cursor
if isinstance(cursor, Cursor):
col_names = [c.name for c in cursor.description]
result = pd.DataFrame(cursor.fetchall(), columns=col_names)
else:
result = cursor
return self.schema().apply_to(result)
return self.schema().apply_to(cursor.to_df())


class MapDClient(SQLClient):
Expand All @@ -103,7 +126,7 @@ class MapDClient(SQLClient):
"""
database_class = Database
sync_query = MapDQuery
dialect = comp.MapDDialect
dialect = MapDDialect

def __init__(
self, uri: str=None, user: str=None, password: str=None,
Expand Down Expand Up @@ -147,7 +170,14 @@ def close(self):
self.con.close()

def _build_ast(self, expr, context):
result = comp.build_ast(expr, context)
"""
Required.
:param expr:
:param context:
:return:
"""
result = build_ast(expr, context)
return result

def _fully_qualified_name(self, name, database):
Expand All @@ -169,25 +199,16 @@ def _get_table_schema(self, table_name):
database, table_name = table_name_
return self.get_schema(table_name, database)

def _execute(self, query, results=True):
def _execute(self, query, results=False):
"""
:param query:
:return:
"""
if self.execution_type == 1:
stmt_exec = self.con.select_ipc_gpu
elif self.execution_type == 2:
stmt_exec = self.con.select_ipc
else:
stmt_exec = self.con.cursor().execute
if not self.execution_type == 3:
raise NotImplemented()

result = stmt_exec(query)

if results:
return result
else:
return
return MapDCursor(self.con.cursor().execute(query))

def database(self, name=None):
"""Connect to a database called `name`.
Expand Down
44 changes: 30 additions & 14 deletions ibis/mapd/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import ibis.common as com
import ibis.util as util
import ibis.expr.operations as ops
import ibis.sql.compiler as comp
import ibis.sql.compiler as compiles

from .identifiers import quote_identifier
from .operations import _operation_registry, _name_expr
from .operations import (
_operation_registry, _name_expr, Sin
)


def build_ast(expr, context):
Expand All @@ -26,7 +27,7 @@ def to_sql(expr, context=None):
return query.compile()


class MapDSelectBuilder(comp.SelectBuilder):
class MapDSelectBuilder(compiles.SelectBuilder):
"""
"""
Expand All @@ -38,22 +39,22 @@ def _convert_group_by(self, exprs):
return exprs


class MapDQueryBuilder(comp.QueryBuilder):
class MapDQueryBuilder(compiles.QueryBuilder):
"""
"""
select_builder = MapDSelectBuilder


class MapDQueryContext(comp.QueryContext):
class MapDQueryContext(compiles.QueryContext):
"""
"""
def _to_sql(self, expr, ctx):
return to_sql(expr, context=ctx)


class MapDSelect(comp.Select):
class MapDSelect(compiles.Select):
"""
"""
Expand Down Expand Up @@ -103,7 +104,7 @@ def format_limit(self):
return buf.getvalue()


class MapDTableSetFormatter(comp.TableSetFormatter):
class MapDTableSetFormatter(compiles.TableSetFormatter):
"""
"""
Expand Down Expand Up @@ -161,7 +162,7 @@ def _quote_identifier(self, name):
return name


class MapDExprTranslator(comp.ExprTranslator):
class MapDExprTranslator(compiles.ExprTranslator):
"""
"""
Expand All @@ -171,8 +172,19 @@ class MapDExprTranslator(comp.ExprTranslator):
def name(self, translated, name, force=True):
return _name_expr(translated, name)

@classmethod
def compiles(cls, klass, f=None):
def decorator(f):
cls._registry[klass] = f

class MapDDialect(comp.Dialect):
if f is None:
return decorator
else:
decorator(f)
return f


class MapDDialect(compiles.Dialect):
"""
"""
Expand All @@ -184,7 +196,11 @@ class MapDDialect(comp.Dialect):
rewrites = MapDExprTranslator.rewrites


@rewrites(ops.FloorDivide)
def _floor_divide(expr):
left, right = expr.op().args
return left.div(right).floor()
@compiles(Sin)
def compile_sin(translator, expr):
# pull out the arguments to the expression
arg, = expr.op().args

# compile the argument
compiled_arg = translator.translate(arg)
return 'sin(%s)' % compiled_arg
Loading

0 comments on commit 14bcedf

Please sign in to comment.