Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add map type execution support in the pandas backend #1498

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 13 additions & 19 deletions ci/datamgr.py
Expand Up @@ -229,10 +229,8 @@ def sqlite(database, schema, tables, data_directory, **params):
@click.option('-t', '--tables', multiple=True, default=TEST_TABLES)
@click.option('-d', '--data-directory', default=DATA_DIR)
def mapd(schema, tables, data_directory, **params):
if sys.version_info[0] < 3:
click.echo(
'[MAPD|EE] MapD backend is unavailable for Python 2.'
)
if sys.version_info.major < 3:
logger.info('MapD backend is unavailable for Python 2.')
return

import pymapd
Expand All @@ -241,7 +239,7 @@ def mapd(schema, tables, data_directory, **params):
reserved_words = ['table', 'year', 'month']

# connection
click.echo('Initializing MapD...')
logger.info('Initializing MapD...')
if params['database'] != 'mapd':
conn = pymapd.connect(
host=params['host'],
Expand All @@ -250,10 +248,11 @@ def mapd(schema, tables, data_directory, **params):
port=params['port'],
dbname='mapd'
)
stmt = 'CREATE DATABASE {}'.format(params['database'])
try:
conn.execute('CREATE DATABASE {}'.format(params['database']))
except Exception as e:
click.echo('[MAPD|WW]{}'.format(e))
conn.execute(stmt)
except Exception:
logger.exception('MapD DDL statement %r failed', stmt)
conn.close()

conn = pymapd.connect(
Expand All @@ -263,22 +262,19 @@ def mapd(schema, tables, data_directory, **params):
)

# create tables
for stmt in schema.read().split(';'):
stmt = stmt.strip()
if len(stmt):
try:
conn.execute(stmt)
except Exception as e:
click.echo('[MAPD|WW] {}'.format(str(e)))
click.echo('[MAPD|II] Creating tables ... OK')
for stmt in filter(None, map(str.strip, schema.read().split(';'))):
try:
conn.execute(stmt)
except Exception:
logger.exception('MapD DDL statement \n%r\n failed', stmt)

# import data
click.echo('[MAPD|II] Loading data ...')
for table, df in read_tables(tables, data_directory):
if table == 'batting':
# float nan problem
cols = df.select_dtypes([float]).columns
df[cols] = df[cols].fillna(0).astype(int)

# string None driver problem
cols = df.select_dtypes([object]).columns
df[cols] = df[cols].fillna('')
Expand All @@ -300,8 +296,6 @@ def mapd(schema, tables, data_directory, **params):

conn.close()

click.echo('[MAPD|II] Done!')


@cli.command()
@click.option('-h', '--host', default='localhost')
Expand Down
2 changes: 1 addition & 1 deletion ibis/expr/api.py
Expand Up @@ -2136,7 +2136,7 @@ def _array_slice(array, index):
# ---------------------------------------------------------------------
# Map API

def get(expr, key, default):
def get(expr, key, default=None):
"""
Return the mapped value for this key, or the default
if the key does not exist
Expand Down
37 changes: 21 additions & 16 deletions ibis/expr/operations.py
Expand Up @@ -2211,20 +2211,20 @@ class BetweenTime(Between):

class Contains(ValueOp, BooleanValueOp):
value = Arg(rlz.any)
options = Arg(rlz.one_of([rlz.set_,
options = Arg(rlz.one_of([rlz.list_of(rlz.any),
rlz.set_,
rlz.column(rlz.any),
rlz.list_of(rlz.any)]))
rlz.array_of(rlz.any)]))

def __init__(self, value, options):
if isinstance(options, ir.Expr):
# it can be a single expression, like a column
pass
elif util.any_of(options, ir.Expr):
# or a list of expressions
options = ir.sequence(options)
else:
# or a set of scalar values
options = frozenset(options)
# it can be a single expression, like a column
if not isinstance(options, ir.Expr):
if util.any_of(options, ir.Expr):
# or a list of expressions
options = ir.sequence(options)
else:
# or a set of scalar values
options = frozenset(options)
super(Contains, self).__init__(value, options)

def output_type(self):
Expand Down Expand Up @@ -2675,8 +2675,7 @@ class MapValueForKey(ValueOp):
key = Arg(rlz.one_of([rlz.string, rlz.integer]))

def output_type(self):
value_dtype = self.arg.type().value_type
return rlz.shape_like(self.arg, value_dtype)
return rlz.shape_like(tuple(self.args), self.arg.type().value_type)


class MapValueOrDefaultForKey(ValueOp):
Expand All @@ -2693,17 +2692,23 @@ def output_type(self):
raise ValueError("default type: {} must be the same "
"as the map value_type {}".format(
default_type, value_type))
return rlz.shape_like(self.arg, map_type.value_type)
return rlz.shape_like(tuple(self.args), map_type.value_type)


class MapKeys(ValueOp):
arg = Arg(rlz.mapping)
output_type = rlz.typeof('arg')

def output_type(self):
arg = self.arg
return rlz.shape_like(arg, dt.Array(arg.type().key_type))


class MapValues(ValueOp):
arg = Arg(rlz.mapping)
output_type = rlz.typeof('arg')

def output_type(self):
arg = self.arg
return rlz.shape_like(arg, dt.Array(arg.type().value_type))


class MapConcat(ValueOp):
Expand Down
10 changes: 8 additions & 2 deletions ibis/expr/rules.py
@@ -1,6 +1,10 @@
import collections
import enum

from itertools import starmap, product

import six

from ibis.compat import suppress
import ibis.util as util
import ibis.common as com
Expand Down Expand Up @@ -147,8 +151,10 @@ def member_of(obj, arg):

@validator
def list_of(inner, arg, min_length=0):
if not isinstance(arg, (tuple, list, ir.ListExpr)):
arg = [arg]
if isinstance(arg, six.string_types) or not isinstance(
arg, (collections.Sequence, ir.ListExpr)
):
raise com.IbisTypeError('Argument must be a sequence')

if len(arg) < min_length:
raise com.IbisTypeError(
Expand Down
28 changes: 20 additions & 8 deletions ibis/expr/tests/test_rules.py
Expand Up @@ -159,11 +159,18 @@ def test_invalid_member_of(obj, value, expected):


@pytest.mark.parametrize(('validator', 'values', 'expected'), [
(rlz.list_of(identity), 3, ibis.sequence([3])),
(rlz.list_of(identity), (3, 2), ibis.sequence([3, 2])),
(rlz.list_of(rlz.integer), (3, 2), ibis.sequence([3, 2])),
(rlz.list_of(rlz.integer), (3, None), ibis.sequence([3, ibis.NA])),
(rlz.list_of(rlz.string), 'asd', ibis.sequence(['asd'])),
(rlz.list_of(rlz.string), ('a',), ibis.sequence(['a'])),
(rlz.list_of(rlz.string), ['a', 'b'], ibis.sequence(['a', 'b'])),
pytest.param(
rlz.list_of(rlz.list_of(rlz.string)),
[[], ['a']],
ibis.sequence([[], ['a']]),
marks=pytest.mark.xfail(
raises=ValueError, reason='Not yet implemented'),
),
(rlz.list_of(rlz.boolean, min_length=2), [True, False],
ibis.sequence([True, False]))
])
Expand All @@ -172,12 +179,17 @@ def test_valid_list_of(validator, values, expected):
assert result.equals(expected)


@pytest.mark.parametrize(('validator', 'values', 'expected'), [
(rlz.list_of(rlz.double, min_length=2), [1], IbisTypeError),
(rlz.list_of(rlz.integer), 1.1, IbisTypeError),
])
def test_invalid_list_of(validator, values, expected):
with pytest.raises(expected):
@pytest.mark.parametrize(
('validator', 'values'),
[
(rlz.list_of(rlz.double, min_length=2), [1]),
(rlz.list_of(rlz.integer), 1.1),
(rlz.list_of(rlz.string), 'asd'),
(rlz.list_of(identity), 3),
]
)
def test_invalid_list_of(validator, values):
with pytest.raises(IbisTypeError):
validator(values)


Expand Down
38 changes: 38 additions & 0 deletions ibis/expr/tests/test_value_exprs.py
Expand Up @@ -1309,3 +1309,41 @@ def test_large_timestamp():
expected = datetime(year=4567, month=2, day=3)
result = expr.op().value
assert result == expected


def test_map_get_broadcast():
t = ibis.table([('a', 'string')], name='t')
lookup_table = ibis.literal({'a': 1, 'b': 2})
expr = lookup_table.get(t.a)
assert isinstance(expr, ir.IntegerColumn)


def test_map_getitem_broadcast():
t = ibis.table([('a', 'string')], name='t')
lookup_table = ibis.literal({'a': 1, 'b': 2})
expr = lookup_table[t.a]
assert isinstance(expr, ir.IntegerColumn)


def test_map_keys_output_type():
mapping = ibis.literal({'a': 1, 'b': 2})
assert mapping.keys().type() == dt.Array(dt.string)


def test_map_values_output_type():
mapping = ibis.literal({'a': 1, 'b': 2})
assert mapping.values().type() == dt.Array(dt.int8)


def test_scalar_isin_map_keys():
mapping = ibis.literal({'a': 1, 'b': 2})
key = ibis.literal('a')
expr = key.isin(mapping.keys())
assert isinstance(expr, ir.BooleanScalar)


def test_column_isin_map_keys():
t = ibis.table([('a', 'string')], name='t')
mapping = ibis.literal({'a': 1, 'b': 2})
expr = t.a.isin(mapping.keys())
assert isinstance(expr, ir.BooleanColumn)
1 change: 1 addition & 0 deletions ibis/pandas/execution/__init__.py
Expand Up @@ -2,6 +2,7 @@
from ibis.pandas.execution.strings import * # noqa: F401,F403
from ibis.pandas.execution.temporal import * # noqa: F401,F403
from ibis.pandas.execution.arrays import * # noqa: F401,F403
from ibis.pandas.execution.maps import * # noqa: F401,F403
from ibis.pandas.execution.selection import * # noqa: F401,F403
from ibis.pandas.execution.join import * # noqa: F401,F403
from ibis.pandas.execution.window import * # noqa: F401,F403
51 changes: 51 additions & 0 deletions ibis/pandas/execution/generic.py
Expand Up @@ -16,8 +16,10 @@

import ibis
import ibis.common as com
import ibis.expr.types as ir
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops

from ibis.compat import functools, map, DatetimeTZDtype, zip

from ibis.pandas.core import (
Expand Down Expand Up @@ -888,3 +890,52 @@ def execute_node_expr_list(op, sequence, **kwargs):
schema = ibis.schema(list(zip(columns, (e.type() for e in op.exprs))))
data = {col: [execute(el, **kwargs)] for col, el in zip(columns, sequence)}
return schema.apply_to(pd.DataFrame(data, columns=columns))


def wrap_case_result(raw, expr):
"""Wrap a CASE statement result in a Series and handle returning scalars.

Parameters
----------
raw : ndarray[T]
The raw results of executing the ``CASE`` expression
expr : ValueExpr
The expression from the which `raw` was computed

Returns
-------
Union[scalar, Series]
"""
raw_1d = np.atleast_1d(raw)
if np.any(pd.isnull(raw_1d)):
result = pd.Series(raw_1d)
else:
result = pd.Series(
raw_1d, dtype=constants.IBIS_TYPE_TO_PANDAS_TYPE[expr.type()])
if result.size == 1 and isinstance(expr, ir.ScalarExpr):
return result.item()
return result


@execute_node.register(ops.SearchedCase, list, list, object)
def execute_searched_case(op, whens, thens, otherwise, **kwargs):
if otherwise is None:
otherwise = np.nan
raw = np.select(whens, thens, otherwise)
return wrap_case_result(raw, op.to_expr())


@execute_node.register(ops.SimpleCase, object, list, list, object)
def execute_simple_case_scalar(op, value, whens, thens, otherwise, **kwargs):
if otherwise is None:
otherwise = np.nan
raw = np.select(np.asarray(whens) == value, thens, otherwise)
return wrap_case_result(raw, op.to_expr())


@execute_node.register(ops.SimpleCase, pd.Series, list, list, object)
def execute_simple_case_series(op, value, whens, thens, otherwise, **kwargs):
if otherwise is None:
otherwise = np.nan
raw = np.select([value == when for when in whens], thens, otherwise)
return wrap_case_result(raw, op.to_expr())