From 73becc5f10bed724a8bc53e1d445aab698b59cf3 Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Thu, 5 Apr 2018 15:23:58 -0400 Subject: [PATCH] Added mapd backend initial files. --- ibis/__init__.py | 4 + ibis/mapd/__init__.py | 0 ibis/mapd/api.py | 47 ++++ ibis/mapd/client.py | 417 +++++++++++++++++++++++++++++++ ibis/mapd/compiler.py | 416 ++++++++++++++++++++++++++++++ ibis/mapd/tests/__init__.py | 0 ibis/mapd/tests/conftest.py | 44 ++++ ibis/mapd/tests/test_client.py | 376 ++++++++++++++++++++++++++++ ibis/mapd/tests/test_compiler.py | 14 ++ 9 files changed, 1318 insertions(+) create mode 100644 ibis/mapd/__init__.py create mode 100644 ibis/mapd/api.py create mode 100644 ibis/mapd/client.py create mode 100644 ibis/mapd/compiler.py create mode 100644 ibis/mapd/tests/__init__.py create mode 100644 ibis/mapd/tests/conftest.py create mode 100644 ibis/mapd/tests/test_client.py create mode 100644 ibis/mapd/tests/test_compiler.py diff --git a/ibis/__init__.py b/ibis/__init__.py index 760d04651f44..36b69f31cbf9 100644 --- a/ibis/__init__.py +++ b/ibis/__init__.py @@ -71,6 +71,10 @@ # pip install ibis-framework[bigquery] import ibis.bigquery.api as bigquery +with suppress(ImportError): + # pip install ibis-framework[bigquery] + import ibis.mapd.api as mapd + restart_ordering() diff --git a/ibis/mapd/__init__.py b/ibis/mapd/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ibis/mapd/api.py b/ibis/mapd/api.py new file mode 100644 index 000000000000..a6200b4b7cbf --- /dev/null +++ b/ibis/mapd/api.py @@ -0,0 +1,47 @@ +# api.py +import pymapd +import ibis.common as com +from ibis.config import options +from ibis.mapd.client import MapDClient +from ibis.mapd.compiler import dialect + + +def compile(expr, params=None): + """ + Force compilation of expression as though it were an expression depending + on MapD. Note you can also call expr.compile() + + Returns + ------- + compiled : string + """ + from ibis.mapd.compiler import to_sql + return to_sql(expr, dialect.make_context(params=params)) + + +def verify(expr, params=None): + """ + Determine if expression can be successfully translated to execute on + MapD + """ + try: + compile(expr, params=params) + return True + except com.TranslationError: + return False + + +def connect(project_id, dataset_id): + """Create a MapDClient for use with Ibis + + Parameters + ---------- + project_id: str + dataset_id: str + + Returns + ------- + MapDClient + """ + + return MapDClient(project_id, dataset_id) diff --git a/ibis/mapd/client.py b/ibis/mapd/client.py new file mode 100644 index 000000000000..ee88d4dd56e1 --- /dev/null +++ b/ibis/mapd/client.py @@ -0,0 +1,417 @@ +import ibis +import regex as re +import time +import collections +import datetime + +import six + +import pandas as pd +import pymapd + +from multipledispatch import Dispatcher + +# import ibis +import ibis.common as com +import ibis.expr.types as ir +import ibis.expr.schema as sch +import ibis.expr.datatypes as dt + +from ibis.compat import parse_version +from ibis.client import Database, Query, SQLClient +from ibis.mapd import compiler as comp + +# from google.api.core.exceptions import BadRequest + + +NATIVE_PARTITION_COL = '_PARTITIONTIME' + + +def _ensure_split(table_id, dataset_id): + split = table_id.split('.') + if len(split) > 1: + assert len(split) == 2 + if dataset_id: + raise ValueError( + "Can't pass a fully qualified table name *AND* a dataset_id" + ) + (dataset_id, table_id) = split + return (table_id, dataset_id) + + +_IBIS_TYPE_TO_DTYPE = { + 'string': 'STRING', + 'int64': 'INT64', + 'double': 'FLOAT64', + 'boolean': 'BOOL', + 'timestamp': 'TIMESTAMP', + 'date': 'DATE', +} + +_DTYPE_TO_IBIS_TYPE = { + 'INT64': dt.int64, + 'FLOAT64': dt.double, + 'BOOL': dt.boolean, + 'STRING': dt.string, + 'DATE': dt.date, + # FIXME: enforce no tz info + 'DATETIME': dt.timestamp, + 'TIME': dt.time, + 'TIMESTAMP': dt.timestamp, + 'BYTES': dt.binary, +} + + +_LEGACY_TO_STANDARD = { + 'INTEGER': 'INT64', + 'FLOAT': 'FLOAT64', + 'BOOLEAN': 'BOOL', +} + + +@dt.dtype.register(pymapd.schema.SchemaField) +def pymapd_field_to_ibis_dtype(field): + typ = field.field_type + if typ == 'RECORD': + fields = field.fields + assert fields + names = [el.name for el in fields] + ibis_types = list(map(dt.dtype, fields)) + ibis_type = dt.Struct(names, ibis_types) + else: + ibis_type = _LEGACY_TO_STANDARD.get(typ, typ) + ibis_type = _DTYPE_TO_IBIS_TYPE.get(ibis_type, ibis_type) + if field.mode == 'REPEATED': + ibis_type = dt.Array(ibis_type) + return ibis_type + + +@sch.infer.register(pymapd.table.Table) +def pymapd_schema(table): + pairs = [(el.name, dt.dtype(el)) for el in table.schema] + try: + if table.list_partitions(): + pairs.append((NATIVE_PARTITION_COL, dt.timestamp)) + except Exception: + pass + return sch.schema(pairs) + + +class MapDCursor(object): + """Cursor to allow the MapD client to reuse machinery in ibis/client.py + """ + + def __init__(self, query): + self.query = query + + def fetchall(self): + return list(self.query.fetch_data()) + + @property + def columns(self): + return [field.name for field in self.query.schema] + + def __enter__(self): + # For compatibility when constructed from Query.execute() + return self + + def __exit__(self, exc_type, exc_value, traceback): + pass + + +class MapD(Query): + + def __init__(self, client, ddl, query_parameters=None): + super(MapD, self).__init__(client, ddl) + self.query_parameters = query_parameters or {} + + def _fetch(self, cursor): + df = pd.DataFrame(cursor.fetchall(), columns=cursor.columns) + return self.schema().apply_to(df) + + def execute(self): + # synchronous by default + with self.client._execute( + self.compiled_ddl, + results=True, + query_parameters=self.query_parameters + ) as cur: + result = self._fetch(cur) + + return self._wrap_result(result) + + +class MapDAPIProxy(object): + + def __init__(self, project_id): + self._client = pymapd.Client(project_id) + + @property + def client(self): + return self._client + + @property + def project_id(self): + return self.client.project + + def get_datasets(self): + return list(self.client.list_datasets()) + + def get_dataset(self, dataset_id): + return self.client.dataset(dataset_id) + + def get_table(self, table_id, dataset_id, reload=True): + (table_id, dataset_id) = _ensure_split(table_id, dataset_id) + table = self.client.dataset(dataset_id).table(table_id) + if reload: + table.reload() + return table + + def get_schema(self, table_id, dataset_id): + return self.get_table(table_id, dataset_id).schema + + def run_sync_query(self, stmt): + query = self.client.run_sync_query(stmt) + query.use_legacy_sql = False + query.run() + # run_sync_query is not really synchronous: there's a timeout + while not query.job.done(): + query.job.reload() + time.sleep(0.1) + return query + + +class MapDDatabase(Database): + pass + + +pymapd_param = Dispatcher('pymapd_param') + + +@pymapd_param.register(ir.StructScalar, collections.OrderedDict) +def pymapd_param_struct(param, value): + field_params = [pymapd_param(param[k], v) for k, v in value.items()] + return pymapd.StructQueryParameter(param.get_name(), *field_params) + + +@pymapd_param.register(ir.ArrayValue, list) +def pymapd_param_array(param, value): + param_type = param.type() + assert isinstance(param_type, dt.Array), str(param_type) + + try: + pymapd_type = _IBIS_TYPE_TO_DTYPE[str(param_type.value_type)] + except KeyError: + raise com.UnsupportedBackendType(param_type) + else: + return pymapd.ArrayQueryParameter(param.get_name(), pymapd_type, value) + + +@pymapd_param.register( + ir.TimestampScalar, + six.string_types + (datetime.datetime, datetime.date) +) +def pymapd_param_timestamp(param, value): + assert isinstance(param.type(), dt.Timestamp) + + # TODO(phillipc): Not sure if this is the correct way to do this. + timestamp_value = pd.Timestamp(value, tz='UTC').to_pydatetime() + return pymapd.ScalarQueryParameter( + param.get_name(), 'TIMESTAMP', timestamp_value) + + +@pymapd_param.register(ir.StringScalar, six.string_types) +def pymapd_param_string(param, value): + return pymapd.ScalarQueryParameter(param.get_name(), 'STRING', value) + + +@pymapd_param.register(ir.Int64Scalar, six.integer_types) +def pymapd_param_integer(param, value): + return pymapd.ScalarQueryParameter(param.get_name(), 'INT64', value) + + +@pymapd_param.register(ir.DoubleScalar, float) +def pymapd_param_double(param, value): + return pymapd.ScalarQueryParameter(param.get_name(), 'FLOAT64', value) + + +@pymapd_param.register(ir.BooleanScalar, bool) +def pymapd_param_boolean(param, value): + return pymapd.ScalarQueryParameter(param.get_name(), 'BOOL', value) + + +@pymapd_param.register(ir.DateScalar, six.string_types) +def pymapd_param_date_string(param, value): + return pymapd_param(param, pd.Timestamp(value).to_pydatetime().date()) + + +@pymapd_param.register(ir.DateScalar, datetime.datetime) +def pymapd_param_date_datetime(param, value): + return pymapd_param(param, value.date()) + + +@pymapd_param.register(ir.DateScalar, datetime.date) +def pymapd_param_date(param, value): + return pymapd.ScalarQueryParameter(param.get_name(), 'DATE', value) + + +class MapDClient(SQLClient): + + sync_query = MapD + database_class = MapDDatabase + proxy_class = MapDAPIProxy + dialect = comp.MapDDialect + + def __init__(self, project_id, dataset_id): + self._proxy = type(self).proxy_class(project_id) + self._dataset_id = dataset_id + + @property + def project_id(self): + return self._proxy.project_id + + @property + def dataset_id(self): + return self._dataset_id + + @property + def _table_expr_klass(self): + return ir.TableExpr + + def table(self, *args, **kwargs): + t = super(MapDClient, self).table(*args, **kwargs) + if NATIVE_PARTITION_COL in t.columns: + col = ibis.options.pymapd.partition_col + assert col not in t + return (t + .mutate(**{col: t[NATIVE_PARTITION_COL]}) + .drop([NATIVE_PARTITION_COL])) + return t + + def _build_ast(self, expr, context): + result = comp.build_ast(expr, context) + return result + + def _execute_query(self, ddl, async=False): + klass = self.async_query if async else self.sync_query + inst = klass(self, ddl, query_parameters=ddl.context.params) + return inst.execute() + + def _fully_qualified_name(self, name, database): + dataset_id = database or self.dataset_id + return dataset_id + '.' + name + + def _get_table_schema(self, qualified_name): + return self.get_schema(qualified_name) + + def _execute(self, stmt, results=True, query_parameters=None): + # TODO(phillipc): Allow **kwargs in calls to execute + query = self._proxy.client.run_sync_query(stmt) + query.use_legacy_sql = False + query.query_parameters = [ + pymapd_param(param.to_expr(), value) + for param, value in (query_parameters or {}).items() + ] + query.run() + + # run_sync_query is not really synchronous: there's a timeout + while not query.job.done(): + query.job.reload() + time.sleep(0.1) + + return MapDCursor(query) + + def database(self, name=None): + if name is None: + name = self.dataset_id + return self.database_class(name, self) + + @property + def current_database(self): + return self.database(self.dataset_id) + + def set_database(self, name): + self._dataset_id = name + + def exists_database(self, name): + return self._proxy.get_dataset(name).exists() + + def list_databases(self, like=None): + results = [dataset.name + for dataset in self._proxy.get_datasets()] + if like: + results = [ + dataset_name for dataset_name in results + if re.match(like, dataset_name) + ] + return results + + def exists_table(self, name, database=None): + (table_id, dataset_id) = _ensure_split(name, database) + return self._proxy.get_table(table_id, dataset_id).exists() + + def list_tables(self, like=None, database=None): + dataset = self._proxy.get_dataset(database or self.dataset_id) + result = [table.name for table in dataset.list_tables()] + if like: + result = [ + table_name for table_name in result + if re.match(like, table_name) + ] + return result + + def get_schema(self, name, database=None): + (table_id, dataset_id) = _ensure_split(name, database) + pymapd_table = self._proxy.get_table(table_id, dataset_id) + return sch.infer(pymapd_table) + + @property + def version(self): + return parse_version(pymapd.__version__) + + +_DTYPE_TO_IBIS_TYPE = { + 'INT64': dt.int64, + 'FLOAT64': dt.double, + 'BOOL': dt.boolean, + 'STRING': dt.string, + 'DATE': dt.date, + # FIXME: enforce no tz info + 'DATETIME': dt.timestamp, + 'TIME': dt.time, + 'TIMESTAMP': dt.timestamp, + 'BYTES': dt.binary, +} + + +_LEGACY_TO_STANDARD = { + 'INTEGER': 'INT64', + 'FLOAT': 'FLOAT64', + 'BOOLEAN': 'BOOL', +} + + +def _discover_type(field): + typ = field.field_type + if typ == 'RECORD': + fields = field.fields + assert fields + names = [el.name for el in fields] + ibis_types = [_discover_type(el) for el in fields] + ibis_type = dt.Struct(names, ibis_types) + else: + ibis_type = _LEGACY_TO_STANDARD.get(typ, typ) + ibis_type = _DTYPE_TO_IBIS_TYPE.get(ibis_type, ibis_type) + if field.mode == 'REPEATED': + ibis_type = dt.Array(ibis_type) + return ibis_type + + +def pymapd_table_to_ibis_schema(table): + pairs = [(el.name, _discover_type(el)) for el in table.schema] + try: + if table.list_partitions(): + pairs.append((NATIVE_PARTITION_COL, dt.timestamp)) + except Exception: + pass + return ibis.schema(pairs) diff --git a/ibis/mapd/compiler.py b/ibis/mapd/compiler.py new file mode 100644 index 000000000000..50d914bbbf10 --- /dev/null +++ b/ibis/mapd/compiler.py @@ -0,0 +1,416 @@ +# compiler.py +import ibis +import ibis.common as com + +import numpy as np + +import ibis.expr.datatypes as dt +import ibis.expr.types as ir + +import ibis.sql.compiler as comp +import ibis.expr.operations as ops +from ibis.impala.compiler import ImpalaSelect, unary, fixed_arity +from ibis.impala import compiler as impala_compiler + + +class MapDSelectBuilder(comp.SelectBuilder): + + @property + def _select_class(self): + return MapDSelect + + +class MapDQueryBuilder(comp.QueryBuilder): + + select_builder = MapDSelectBuilder + + @property + def _union_class(self): + # return MapDUnion + raise NotImplementedError() + + +def build_ast(expr, context): + builder = MapDQueryBuilder(expr, context=context) + return builder.get_result() + + +def _get_query(expr, context): + ast = build_ast(expr, context) + (query, rest) = (ast.queries[0], ast.queries[1:]) + assert not rest + return query + + +def to_sql(expr, context): + query = _get_query(expr, context) + compiled = query.compile() + return compiled + + +class MapDContext(comp.QueryContext): + + def _to_sql(self, expr, ctx): + return to_sql(expr, context=ctx) + + +def _extract_field(sql_attr): + def extract_field_formatter(translator, expr): + op = expr.op() + arg = translator.translate(op.args[0]) + return 'EXTRACT({} from {})'.format(sql_attr, arg) + return extract_field_formatter + + +_sql_type_names = { + 'int8': 'int64', + 'int16': 'int64', + 'int32': 'int64', + 'int64': 'int64', + 'float': 'float64', + 'double': 'float64', + 'string': 'string', + 'boolean': 'boolean', + 'timestamp': 'timestamp', + 'date': 'date', +} + + +def _cast(translator, expr): + op = expr.op() + arg, target_type = op.args + arg_formatted = translator.translate(arg) + sql_type = _sql_type_names[target_type.name.lower()] + return 'CAST({} AS {})'.format(arg_formatted, sql_type.upper()) + + +def _struct_field(translator, expr): + arg, field = expr.op().args + arg_formatted = translator.translate(arg) + return '{}.`{}`'.format(arg_formatted, field) + + +def _array_concat(translator, expr): + return 'ARRAY_CONCAT({})'.format( + ', '.join(map(translator.translate, expr.op().args)) + ) + + +def _array_index(translator, expr): + # SAFE_OFFSET returns NULL if out of bounds + return '{}[SAFE_OFFSET({})]'.format( + *map(translator.translate, expr.op().args) + ) + + +def _string_find(translator, expr): + haystack, needle, start, end = expr.op().args + + if start is not None: + raise NotImplementedError('start not implemented for string find') + if end is not None: + raise NotImplementedError('end not implemented for string find') + + return 'STRPOS({}, {}) - 1'.format( + translator.translate(haystack), + translator.translate(needle) + ) + + +def _translate_pattern(translator, pattern): + # add 'r' to string literals to indicate to MapD this is a raw string + return 'r' * isinstance(pattern.op(), ir.Literal) + translator.translate( + pattern + ) + + +def _regex_search(translator, expr): + arg, pattern = expr.op().args + regex = _translate_pattern(translator, pattern) + result = 'REGEXP_CONTAINS({}, {})'.format(translator.translate(arg), regex) + return result + + +def _regex_extract(translator, expr): + arg, pattern, index = expr.op().args + regex = _translate_pattern(translator, pattern) + result = 'REGEXP_EXTRACT_ALL({}, {})[SAFE_OFFSET({})]'.format( + translator.translate(arg), + regex, + translator.translate(index) + ) + return result + + +def _regex_replace(translator, expr): + arg, pattern, replacement = expr.op().args + regex = _translate_pattern(translator, pattern) + result = 'REGEXP_REPLACE({}, {}, {})'.format( + translator.translate(arg), + regex, + translator.translate(replacement), + ) + return result + + +def _string_concat(translator, expr): + return 'CONCAT({})'.format( + ', '.join(map(translator.translate, expr.op().args)) + ) + + +def _string_join(translator, expr): + sep, args = expr.op().args + return 'ARRAY_TO_STRING([{}], {})'.format( + ', '.join(map(translator.translate, args)), + translator.translate(sep) + ) + + +def _string_ascii(translator, expr): + arg, = expr.op().args + return 'TO_CODE_POINTS({})[SAFE_OFFSET(0)]'.format( + translator.translate(arg) + ) + + +def _string_right(translator, expr): + arg, nchars = map(translator.translate, expr.op().args) + return 'SUBSTR({arg}, -LEAST(LENGTH({arg}), {nchars}))'.format( + arg=arg, + nchars=nchars, + ) + + +def _array_literal_format(expr): + return str(list(expr.op().value)) + + +def _log(translator, expr): + op = expr.op() + arg, base = op.args + arg_formatted = translator.translate(arg) + + if base is None: + return 'ln({})'.format(arg_formatted) + + base_formatted = translator.translate(base) + return 'log({}, {})'.format(arg_formatted, base_formatted) + + +def _literal(translator, expr): + + if isinstance(expr, ir.NumericValue): + value = expr.op().value + if not np.isfinite(value): + return 'CAST({!r} AS FLOAT64)'.format(str(value)) + + try: + return impala_compiler._literal(translator, expr) + except NotImplementedError: + if isinstance(expr, ir.ArrayValue): + return _array_literal_format(expr) + raise NotImplementedError(type(expr).__name__) + + +def _arbitrary(translator, expr): + arg, how, where = expr.op().args + + if where is not None: + arg = where.ifelse(arg, ibis.NA) + + if how != 'first': + raise com.UnsupportedOperationError( + '{!r} value not supported for arbitrary in MapD'.format(how) + ) + + return 'ANY_VALUE({})'.format(translator.translate(arg)) + + +_date_units = { + 'Y': 'YEAR', + 'Q': 'QUARTER', + 'W': 'WEEK', + 'M': 'MONTH', +} + + +_timestamp_units = { + 'us': 'MICROSECOND', + 'ms': 'MILLISECOND', + 's': 'SECOND', + 'm': 'MINUTE', + 'h': 'HOUR', +} +_timestamp_units.update(_date_units) + + +def _truncate(kind, units): + def truncator(translator, expr): + op = expr.op() + arg, unit = op.args + + arg = translator.translate(op.args[0]) + try: + unit = units[unit] + except KeyError: + raise com.UnsupportedOperationError( + '{!r} unit is not supported in timestamp truncate'.format(unit) + ) + + return "{}_TRUNC({}, {})".format(kind, arg, unit) + return truncator + + +def _timestamp_op(func, units): + def _formatter(translator, expr): + op = expr.op() + arg, offset = op.args + + if offset.unit not in units: + raise com.UnsupportedOperationError( + 'MapD does not allow binary operation ' + '{} with INTERVAL offset {}'.format(func, offset.unit) + ) + formatted_arg = translator.translate(arg) + formatted_offset = translator.translate(offset) + result = '{}({}, {})'.format(func, formatted_arg, formatted_offset) + return result + + return _formatter + + +_operation_registry = impala_compiler._operation_registry.copy() +_operation_registry.update({ + ops.ExtractYear: _extract_field('year'), + ops.ExtractMonth: _extract_field('month'), + ops.ExtractDay: _extract_field('day'), + ops.ExtractHour: _extract_field('hour'), + ops.ExtractMinute: _extract_field('minute'), + ops.ExtractSecond: _extract_field('second'), + ops.ExtractMillisecond: _extract_field('millisecond'), + + ops.StringReplace: fixed_arity('REPLACE', 3), + ops.StringSplit: fixed_arity('SPLIT', 2), + ops.StringConcat: _string_concat, + ops.StringJoin: _string_join, + ops.StringAscii: _string_ascii, + ops.StringFind: _string_find, + ops.StrRight: _string_right, + ops.Repeat: fixed_arity('REPEAT', 2), + ops.RegexSearch: _regex_search, + ops.RegexExtract: _regex_extract, + ops.RegexReplace: _regex_replace, + + ops.GroupConcat: fixed_arity('STRING_AGG', 2), + + ops.IfNull: fixed_arity('IFNULL', 2), + ops.Cast: _cast, + + ops.StructField: _struct_field, + + ops.ArrayCollect: unary('ARRAY_AGG'), + ops.ArrayConcat: _array_concat, + ops.ArrayIndex: _array_index, + ops.ArrayLength: unary('ARRAY_LENGTH'), + + ops.Log: _log, + ops.Modulus: fixed_arity('MOD', 2), + + ops.Date: unary('DATE'), + + # MapD doesn't have these operations built in. + # ops.ArrayRepeat: _array_repeat, + # ops.ArraySlice: _array_slice, + ir.Literal: _literal, + ops.Arbitrary: _arbitrary, + + ops.TimestampTruncate: _truncate('TIMESTAMP', _timestamp_units), + ops.DateTruncate: _truncate('DATE', _date_units), + + ops.TimestampAdd: _timestamp_op( + 'TIMESTAMP_ADD', {'h', 'm', 's', 'ms', 'us'}), + ops.TimestampSub: _timestamp_op( + 'TIMESTAMP_DIFF', {'h', 'm', 's', 'ms', 'us'}), + + ops.DateAdd: _timestamp_op('DATE_ADD', {'D', 'W', 'M', 'Q', 'Y'}), + ops.DateSub: _timestamp_op('DATE_SUB', {'D', 'W', 'M', 'Q', 'Y'}), +}) + +_invalid_operations = { + ops.Translate, + ops.FindInSet, + ops.Capitalize, + ops.DateDiff, + ops.TimestampDiff +} + +_operation_registry = { + k: v for k, v in _operation_registry.items() + if k not in _invalid_operations +} + + +class MapDExprTranslator(impala_compiler.ImpalaExprTranslator): + _registry = _operation_registry + _rewrites = impala_compiler.ImpalaExprTranslator._rewrites.copy() + + context_class = MapDContext + + def _trans_param(self, expr): + op = expr.op() + if op not in self.context.params: + raise KeyError(op) + return '@{}'.format(expr.get_name()) + + +rewrites = MapDExprTranslator.rewrites + + +@rewrites(ops.Any) +def mapd_rewrite_any(expr): + arg, = expr.op().args + return arg.cast(dt.int64).sum() > 0 + + +@rewrites(ops.NotAny) +def mapd_rewrite_notany(expr): + arg, = expr.op().args + return arg.cast(dt.int64).sum() == 0 + + +@rewrites(ops.All) +def mapd_rewrite_all(expr): + arg, = expr.op().args + return (1 - arg.cast(dt.int64)).sum() == 0 + + +@rewrites(ops.NotAll) +def mapd_rewrite_notall(expr): + arg, = expr.op().args + return (1 - arg.cast(dt.int64)).sum() != 0 + + +class MapDSelect(ImpalaSelect): + + translator = MapDExprTranslator + + +@rewrites(ops.IdenticalTo) +def identical_to(expr): + left, right = expr.op().args + return (left.isnull() & right.isnull()) | (left == right) + + +@rewrites(ops.Log2) +def log2(expr): + arg, = expr.op().args + return arg.log(2) + + +class MapDDialect(impala_compiler.ImpalaDialect): + + translator = MapDExprTranslator + + +dialect = MapDDialect \ No newline at end of file diff --git a/ibis/mapd/tests/__init__.py b/ibis/mapd/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ibis/mapd/tests/conftest.py b/ibis/mapd/tests/conftest.py new file mode 100644 index 000000000000..800d7994172f --- /dev/null +++ b/ibis/mapd/tests/conftest.py @@ -0,0 +1,44 @@ +import os + +import pytest + +import ibis + + +PROJECT_ID = os.environ.get('MAPD_PROJECT_ID') +DATASET_ID = 'testing' + + +@pytest.fixture(scope='session') +def client(): + ga = pytest.importorskip('google.auth') + + try: + return ibis.mapd.connect(PROJECT_ID, DATASET_ID) + except ga.exceptions.DefaultCredentialsError: + pytest.skip("no credentials found, skipping") + + +@pytest.fixture(scope='session') +def alltypes(client): + return client.table('functional_alltypes') + + +@pytest.fixture(scope='session') +def df(alltypes): + return alltypes.execute() + + +@pytest.fixture(scope='session') +def parted_alltypes(client): + return client.table('functional_alltypes_parted') + + +@pytest.fixture(scope='session') +def parted_df(parted_alltypes): + return parted_alltypes.execute() + + +@pytest.fixture(scope='session') +def struct_table(client): + return client.table('struct_table') diff --git a/ibis/mapd/tests/test_client.py b/ibis/mapd/tests/test_client.py new file mode 100644 index 000000000000..18a891da09a0 --- /dev/null +++ b/ibis/mapd/tests/test_client.py @@ -0,0 +1,376 @@ +import collections + +from datetime import date, datetime + +import pytest + +import numpy as np +import pandas as pd +import pandas.util.testing as tm + +import ibis +import ibis.common as com +import ibis.expr.datatypes as dt +import ibis.expr.types as ir + + +pytestmark = pytest.mark.mapd +pytest.importorskip('mapd') + + +def test_table(alltypes): + assert isinstance(alltypes, ir.TableExpr) + + +def test_column_execute(alltypes, df): + col_name = 'float_col' + expr = alltypes[col_name] + result = expr.execute() + expected = df[col_name] + tm.assert_series_equal(result, expected) + + +def test_literal_execute(client): + expected = '1234' + expr = ibis.literal(expected) + result = client.execute(expr) + assert result == expected + + +def test_simple_aggregate_execute(alltypes, df): + col_name = 'float_col' + expr = alltypes[col_name].sum() + result = expr.execute() + expected = df[col_name].sum() + np.testing.assert_allclose(result, expected) + + +def test_list_tables(client): + assert set(client.list_tables(like='functional_alltypes')) == { + 'functional_alltypes', + 'functional_alltypes_parted', + } + + +def test_current_database(client): + assert client.current_database.name == 'testing' + assert client.current_database.name == client.dataset_id + assert client.current_database.tables == client.list_tables() + + +def test_database(client): + database = client.database(client.dataset_id) + assert database.list_tables() == client.list_tables() + + +def test_database_layer(client): + mapd_dataset = client._proxy.get_dataset(client.dataset_id) + actual = client.list_tables() + expected = [el.name for el in mapd_dataset.list_tables()] + assert sorted(actual) == sorted(expected) + + +def test_compile_toplevel(): + t = ibis.table([('foo', 'double')], name='t0') + + # it works! + expr = t.foo.sum() + result = ibis.mapd.compile(expr) + # FIXME: remove quotes because mapd can't use anythig that needs + # quoting? + expected = """\ +SELECT sum(`foo`) AS `sum` +FROM t0""" # noqa + assert str(result) == expected + + +def test_struct_field_access(struct_table): + expr = struct_table.struct_col.string_field + result = expr.execute() + expected = pd.Series([None, 'a'], name='tmp') + tm.assert_series_equal(result, expected) + + +def test_array_index(struct_table): + expr = struct_table.array_of_structs_col[1] + result = expr.execute() + expected = pd.Series( + [ + {'int_field': None, 'string_field': None}, + {'int_field': None, 'string_field': 'hijklmnop'} + ], + name='tmp' + ) + tm.assert_series_equal(result, expected) + + +def test_array_concat(struct_table): + c = struct_table.array_of_structs_col + expr = c + c + result = expr.execute() + expected = pd.Series( + [ + [ + {'int_field': 12345, 'string_field': 'abcdefg'}, + {'int_field': None, 'string_field': None}, + {'int_field': 12345, 'string_field': 'abcdefg'}, + {'int_field': None, 'string_field': None}, + ], + [ + {'int_field': 12345, 'string_field': 'abcdefg'}, + {'int_field': None, 'string_field': 'hijklmnop'}, + {'int_field': 12345, 'string_field': 'abcdefg'}, + {'int_field': None, 'string_field': 'hijklmnop'}, + ], + ], + name='tmp', + ) + tm.assert_series_equal(result, expected) + + +def test_array_length(struct_table): + expr = struct_table.array_of_structs_col.length() + result = expr.execute() + expected = pd.Series([2, 2], name='tmp') + tm.assert_series_equal(result, expected) + + +def test_array_collect(struct_table): + key = struct_table.array_of_structs_col[0].string_field + expr = struct_table.groupby(key=key).aggregate( + foo=lambda t: t.array_of_structs_col[0].int_field.collect() + ) + result = expr.execute() + expected = struct_table.execute() + expected = expected.assign( + key=expected.array_of_structs_col.apply(lambda x: x[0]['string_field']) + ).groupby('key').apply( + lambda df: list( + df.array_of_structs_col.apply(lambda x: x[0]['int_field']) + ) + ).reset_index().rename(columns={0: 'foo'}) + tm.assert_frame_equal(result, expected) + + +def test_count_distinct_with_filter(alltypes): + expr = alltypes.string_col.nunique( + where=alltypes.string_col.cast('int64') > 1 + ) + result = expr.execute() + expected = alltypes.string_col.execute() + expected = expected[expected.astype('int64') > 1].nunique() + assert result == expected + + +@pytest.mark.parametrize('type', ['date', dt.date]) +def test_cast_string_to_date(alltypes, df, type): + import toolz + + string_col = alltypes.date_string_col + month, day, year = toolz.take(3, string_col.split('/')) + + expr = '20' + ibis.literal('-').join([year, month, day]) + expr = expr.cast(type) + result = expr.execute().astype( + 'datetime64[ns]' + ).sort_values().reset_index(drop=True).rename('date_string_col') + expected = pd.to_datetime( + df.date_string_col + ).dt.normalize().sort_values().reset_index(drop=True) + tm.assert_series_equal(result, expected) + + +def test_has_partitions(alltypes, parted_alltypes, client): + col = ibis.options.mapd.partition_col + assert col not in alltypes.columns + assert col in parted_alltypes.columns + + +def test_different_partition_col_name(client): + col = ibis.options.mapd.partition_col = 'FOO_BAR' + alltypes = client.table('functional_alltypes') + parted_alltypes = client.table('functional_alltypes_parted') + assert col not in alltypes.columns + assert col in parted_alltypes.columns + + +def test_subquery_scalar_params(alltypes): + t = alltypes + param = ibis.param('timestamp').name('my_param') + expr = t[['float_col', 'timestamp_col', 'int_col', 'string_col']][ + lambda t: t.timestamp_col < param + ].groupby('string_col').aggregate( + foo=lambda t: t.float_col.sum() + ).foo.count() + result = expr.compile(params={param: '20140101'}) + expected = """\ +SELECT count(`foo`) AS `count` +FROM ( + SELECT `string_col`, sum(`float_col`) AS `foo` + FROM ( + SELECT `float_col`, `timestamp_col`, `int_col`, `string_col` + FROM testing.functional_alltypes + WHERE `timestamp_col` < @my_param + ) t1 + GROUP BY 1 +) t0""" + assert result == expected + + +_IBIS_TYPE_TO_DTYPE = { + 'string': 'STRING', + 'int64': 'INT64', + 'double': 'FLOAT64', + 'boolean': 'BOOL', + 'timestamp': 'TIMESTAMP', + 'date': 'DATE', +} + + +def test_scalar_param_string(alltypes, df): + param = ibis.param('string') + expr = alltypes[alltypes.string_col == param] + + string_value = '0' + result = expr.execute( + params={param: string_value} + ).sort_values('id').reset_index(drop=True) + expected = df.loc[ + df.string_col == string_value + ].sort_values('id').reset_index(drop=True) + tm.assert_frame_equal(result, expected) + + +def test_scalar_param_int64(alltypes, df): + param = ibis.param('int64') + expr = alltypes[alltypes.string_col.cast('int64') == param] + + int64_value = 0 + result = expr.execute( + params={param: int64_value} + ).sort_values('id').reset_index(drop=True) + expected = df.loc[ + df.string_col.astype('int64') == int64_value + ].sort_values('id').reset_index(drop=True) + tm.assert_frame_equal(result, expected) + + +def test_scalar_param_double(alltypes, df): + param = ibis.param('double') + expr = alltypes[alltypes.string_col.cast('int64').cast('double') == param] + + double_value = 0.0 + result = expr.execute( + params={param: double_value} + ).sort_values('id').reset_index(drop=True) + expected = df.loc[ + df.string_col.astype('int64').astype('float64') == double_value + ].sort_values('id').reset_index(drop=True) + tm.assert_frame_equal(result, expected) + + +def test_scalar_param_boolean(alltypes, df): + param = ibis.param('boolean') + expr = alltypes[(alltypes.string_col.cast('int64') == 0) == param] + + bool_value = True + result = expr.execute( + params={param: bool_value} + ).sort_values('id').reset_index(drop=True) + expected = df.loc[ + df.string_col.astype('int64') == 0 + ].sort_values('id').reset_index(drop=True) + tm.assert_frame_equal(result, expected) + + +@pytest.mark.parametrize( + 'timestamp_value', + ['2009-01-20 01:02:03', date(2009, 1, 20), datetime(2009, 1, 20, 1, 2, 3)] +) +def test_scalar_param_timestamp(alltypes, df, timestamp_value): + param = ibis.param('timestamp') + expr = alltypes[alltypes.timestamp_col <= param][['timestamp_col']] + + result = expr.execute( + params={param: timestamp_value} + ).sort_values('timestamp_col').reset_index(drop=True) + value = pd.Timestamp(timestamp_value, tz='UTC') + expected = df.loc[ + df.timestamp_col <= value, ['timestamp_col'] + ].sort_values('timestamp_col').reset_index(drop=True) + tm.assert_frame_equal(result, expected) + + +@pytest.mark.parametrize( + 'date_value', + ['2009-01-20', date(2009, 1, 20), datetime(2009, 1, 20)] +) +def test_scalar_param_date(alltypes, df, date_value): + param = ibis.param('date') + expr = alltypes[alltypes.timestamp_col.cast('date') <= param] + + result = expr.execute( + params={param: date_value} + ).sort_values('timestamp_col').reset_index(drop=True) + value = pd.Timestamp(date_value) + expected = df.loc[ + df.timestamp_col.dt.normalize() <= value + ].sort_values('timestamp_col').reset_index(drop=True) + tm.assert_frame_equal(result, expected) + + +def test_scalar_param_array(alltypes, df): + param = ibis.param('array') + expr = alltypes.sort_by('id').limit(1).double_col.collect() + param + result = expr.execute(params={param: [1]}) + expected = [df.sort_values('id').double_col.iat[0]] + [1.0] + assert result == expected + + +def test_scalar_param_struct(client): + struct_type = dt.Struct.from_tuples([('x', dt.int64), ('y', dt.string)]) + param = ibis.param(struct_type) + value = collections.OrderedDict([('x', 1), ('y', 'foobar')]) + result = client.execute(param, {param: value}) + assert value == result + + +@pytest.mark.xfail( + raises=com.UnsupportedBackendType, + reason='Cannot handle nested structs/arrays in 0.27 API', +) +def test_scalar_param_nested(client): + param = ibis.param('struct>>>') + value = collections.OrderedDict([ + ( + 'x', + [ + collections.OrderedDict([ + ('y', [1.0, 2.0, 3.0]) + ]) + ] + ) + ]) + result = client.execute(param, {param: value}) + assert value == result + + +def test_raw_sql(client): + assert client.raw_sql('SELECT 1').fetchall() == [(1,)] + + +def test_scalar_param_scope(alltypes): + t = alltypes + param = ibis.param('timestamp') + mut = t.mutate(param=param).compile(params={param: '2017-01-01'}) + assert mut == """\ +SELECT *, @param AS `param` +FROM testing.functional_alltypes""" + + +def test_scalar_param_partition_time(parted_alltypes): + t = parted_alltypes + param = ibis.param('timestamp').name('time_param') + expr = t[t.PARTITIONTIME < param] + df = expr.execute(params={param: '2017-01-01'}) + assert df.empty diff --git a/ibis/mapd/tests/test_compiler.py b/ibis/mapd/tests/test_compiler.py new file mode 100644 index 000000000000..c72d4fe2a674 --- /dev/null +++ b/ibis/mapd/tests/test_compiler.py @@ -0,0 +1,14 @@ +import ibis +import ibis.expr.datatypes as dt + + +def test_timestamp_accepts_date_literals(alltypes): + date_string = '2009-03-01' + param = ibis.param(dt.timestamp).name('param_0') + expr = alltypes.mutate(param=param) + params = {param: date_string} + result = expr.compile(params=params) + expected = """\ +SELECT *, @param AS `param` +FROM testing.functional_alltypes""" + assert result == expected