diff --git a/ibis/expr/api.py b/ibis/expr/api.py index 71fbe0475f61d..40fc6ed05a22c 100644 --- a/ibis/expr/api.py +++ b/ibis/expr/api.py @@ -1937,6 +1937,7 @@ def between_time(arg, lower, upper, timezone=None): 'inner': _ops.InnerJoin, 'left': _ops.LeftJoin, 'outer': _ops.OuterJoin, + 'right': _ops.RightJoin, 'left_semi': _ops.LeftSemiJoin, 'semi': _ops.LeftSemiJoin, 'anti': _ops.LeftAntiJoin, @@ -1958,6 +1959,7 @@ def join(left, right, predicates=(), how='inner'): - 'inner': inner join - 'left': left join - 'outer': full outer join + - 'right': right outer join - 'semi' or 'left_semi': left semi join - 'anti': anti join diff --git a/ibis/pandas/execution.py b/ibis/pandas/execution.py index c56d8497c4c7f..5ac93dc710719 100644 --- a/ibis/pandas/execution.py +++ b/ibis/pandas/execution.py @@ -1,13 +1,15 @@ from __future__ import absolute_import -import numbers -import operator import datetime -import functools import decimal -import collections +import functools +import itertools +import numbers +import operator import re +from collections import OrderedDict + import six import numpy as np @@ -17,6 +19,8 @@ import toolz +from multipledispatch import Dispatcher + from ibis import compat import ibis.common as com @@ -342,69 +346,305 @@ def _compute_sorted_frame(sort_keys, df, **kwargs): return result +compute_projection = Dispatcher('compute_projection') + + +@compute_projection.register(ir.ScalarExpr, ops.Selection, pd.DataFrame) +def compute_projection_scalar_expr(expr, parent, data, scope=None, **kwargs): + op = expr.op() + root_tables = op.root_tables() + additional_scope = OrderedDict( + zip(root_tables, (data for _ in range(len(root_tables)))) + ) + new_scope = toolz.merge(scope, additional_scope, factory=OrderedDict) + return execute(expr, new_scope, **kwargs) + + +def get_column_set(table_op, root, available, required=None): + """Return an ``OrderedDict`` mapping possibly suffixed column names to + column names without suffixes. + + Parameters + ---------- + table_op : TableNode + The ``TableNode`` we're selecting from. + root : TableNode + The root table of the expression we're selecting from. + available : set or frozenset + The set of available columns we're selecting from. These originate from + the result of a computed Join, which is a ``DataFrame``. + required : set or frozenset or None, optional + The set of required columns. If this is not ``None`` then we only + include columns that are in both ``root.schema.names`` and + ``required``. + + Returns + ------- + mapping : OrderedDict[str, str] + A map from possibly-suffixed column names to column names without + suffixes. + """ + # The mapping is trivial if we're not selecting from a Join operation + if not isinstance(table_op, ops.Join): + return OrderedDict( + (name, name) for name in root.schema.names + if required is None or name in required + ) + + # Figure out which side of the join the root table comes from + if root.equals(table_op.left.op()): + suffix = _LEFT_JOIN_SUFFIX + else: + assert root.equals(table_op.right.op()) + suffix = _RIGHT_JOIN_SUFFIX + + # Map the possibly-suffixed column names to column names without suffixes + return OrderedDict( + (column if column in available else column + suffix, column) + for column in root.schema.names + if (column in available or column + suffix in available) + and ( + required is None or + column in required or + column + suffix in required + ) + ) + + +@compute_projection.register(ir.ColumnExpr, ops.Selection, pd.DataFrame) +def compute_projection_column_expr(expr, parent, data, scope=None, **kwargs): + op = expr.op() + parent_table_op = parent.table.op() + + if isinstance(op, ir.TableColumn): + # slightly faster path for simple column selection + name = op.name + + if name in data: + return data[name] + + if not isinstance(parent_table_op, ops.Join): + raise KeyError(name) + + root_table, = op.root_tables() + mapping = get_column_set( + parent_table_op, root_table, data, required={name} + ) + raw_name, = mapping.keys() + return data.loc[:, raw_name].rename(name) + + new_scope = toolz.merge( + scope, + {t: data for t in expr.op().root_tables()}, + {parent_table_op: data}, + ) + result = execute(expr, new_scope, **kwargs) + return result + + +@compute_projection.register(ir.TableExpr, ops.Selection, pd.DataFrame) +def compute_projection_table_expr(expr, parent, data, **kwargs): + if expr is parent.table: + return data + else: + table_op = parent.table.op() + assert isinstance(table_op, ops.Join) + assert expr.equals(table_op.left) or expr.equals(table_op.right) + + mapping = get_column_set(table_op, expr.op(), frozenset(data.columns)) + df = data.loc[:, list(mapping.keys())] + df.columns = list(mapping.values()) + return df + + +@compute_projection.register(object, ops.Selection, pd.DataFrame) +def compute_projection_default(op, parent, data, **kwargs): + raise TypeError( + "Don't know how to compute projection of {}".format(type(op).__name__) + ) + + +def _compute_predicates(table_op, predicates, data, scope, **kwargs): + for predicate in predicates: + # Map each root table of the predicate to the data so that we compute + # predicates on the result instead of any left or right tables if the + # Selection is on a Join. Project data to only inlude columns from + # the root table. + root_tables = predicate.op().root_tables() + + # handle suffixes + additional_scope = {} + for t in root_tables: + mapping = get_column_set(table_op, t, frozenset(data.columns)) + new_data = data.loc[:, list(mapping.keys())] + additional_scope[t] = new_data.rename(columns=mapping) + + new_scope = toolz.merge(scope, additional_scope) + yield execute(predicate, new_scope, **kwargs) + + +def get_table_columns(expression): + stack = [expression] + seen = set() + + while stack: + expr = stack.pop() + node = expr.op() + node_key = str(node) + + if node_key not in seen: + seen.add(node_key) + + if isinstance(expr, ir.ColumnExpr): + try: + yield expr.get_name(), node + except com.ExpressionError: + pass + + if isinstance(node, ops.PhysicalTable): + # Special case PhysicalTable to look at the individual columns + stack.extend(expr[name] for name in node.schema.names) + else: + # Otherwise flatten the args of the current node + stack.extend( + arg for arg in node.flat_args() if isinstance(arg, ir.Expr) + ) + + +physical_tables = Dispatcher('physical_tables') + + +@physical_tables.register(ops.Selection) +def physical_tables_selection(sel): + # Every column's root table is the selection's .table attribute return the + # .table + sel_table_op = sel.table.op() + if all( + isinstance(s, ir.ColumnExpr) and + len(s.op().root_tables()) == 1 and + s.op().root_tables()[0].equals(sel_table_op) + for s in sel.selections + ): + return [sel_table_op] + return [sel] + + +@physical_tables.register(ops.PhysicalTable) +def physical_tables_physical_table(t): + return [t] + + +@physical_tables.register(ops.Join) +def physical_tables_join(join): + return list(itertools.chain( + toolz.unique(physical_tables(join.left.op()), key=id), + toolz.unique(physical_tables(join.right.op()), key=id) + )) + + +@physical_tables.register(ir.Node) +def physical_tables_node(node): + tables = toolz.concat(map(physical_tables, node.root_tables())) + return list(toolz.unique(tables, key=id)) + + +def prune_overlapping_column_suffixes(selection_expr, data): + """ + Parameters + ---------- + selection : Expr + data : pandas.DataFrame + + Returns + ------- + df : pandas.DataFrame + + Notes + ----- + """ + root = selection_expr.op().table.op() + + if not isinstance(root, ops.Join): + # Ibis doesn't allow duplicate column names, so this is cheap to assert + assert len(root.schema.names) == len(data.columns), \ + 'Columns in DataFrame are not unique' + return data + + # Get every named column node in the Selection + physical_columns = list(get_table_columns(selection_expr)) + + # The roots of the join -> suffix + suffixes = dict(zip(physical_tables(root), _JOIN_SUFFIXES)) + + # Columns from the input DataFrame + data_columns = frozenset(data.columns) + + # We either have 2 unique roots or every column is unique without a join + # suffix + assert len(suffixes) == 2 or not { + column for column in data_columns + if _LEFT_JOIN_SUFFIX in column or _RIGHT_JOIN_SUFFIX in column + } + + possible_columns = [] + possible_used_columns = set() + + # Get the columns that we could possibly use, by determining which of the + # physical columns we actually get in our input DataFrame + for name, column in physical_columns: + physical_table, = physical_tables(column) + suffix = suffixes[physical_table] + name_with_suffix = name + suffix + if name_with_suffix in data_columns: + possible_used_columns.add(name_with_suffix) + possible_columns.append((name, name_with_suffix, column)) + + # Build up a dictionary to use for renaming, depending on whether a column + # actually overlaps and is used + used_columns = OrderedDict() + for name, name_with_suffix, column in possible_columns: + at_least_one_overlap = name_with_suffix in possible_used_columns + both_overlap = ( + at_least_one_overlap and + name + _ALTERNATE_SUFFIXES[suffix] in possible_used_columns + ) + + if both_overlap: + # both overlap so we need to keep the suffix + key = new_name = name_with_suffix + + elif at_least_one_overlap: + # exactly one overlap, we can remove the suffix + key = name_with_suffix + new_name = name + + else: + # no overlap, we have a unique column name from a join (e.g., the + # join key) + key = new_name = name + + used_columns[key] = new_name + + df = data.loc[:, list(used_columns.keys())] + result = df.rename(columns=used_columns) + return result + + @execute_node.register(ops.Selection, pd.DataFrame) def execute_selection_dataframe(op, data, scope=None, **kwargs): selections = op.selections predicates = op.predicates sort_keys = op.sort_keys - result = data + result = data = prune_overlapping_column_suffixes(op.to_expr(), data) if selections: data_pieces = [] + for selection in selections: - table_op = op.table.op() - selection_operation = selection.op() - - if op.table is selection: - pandas_object = data - elif isinstance(selection, ir.ScalarExpr): - root_tables = selection_operation.root_tables() - additional_scope = collections.OrderedDict( - zip(root_tables, (data for _ in range(len(root_tables)))) - ) - new_scope = toolz.merge( - scope, - additional_scope, - factory=collections.OrderedDict, - ) - pandas_object = execute(selection, new_scope, **kwargs) - elif isinstance(selection, ir.ColumnExpr): - if isinstance(selection_operation, ir.TableColumn): - # slightly faster path for simple column selection - pandas_object = data[selection_operation.name] - elif isinstance(table_op, ops.Join): - pandas_object = execute( - selection, - toolz.merge( - scope, {selection_operation.table.op(): data} - ), - **kwargs - ) - else: - pandas_object = execute( - selection, - toolz.merge(scope, {op.table.op(): data}), - **kwargs - ) - elif isinstance(selection, ir.TableExpr): - # These two statements should never raise unless our - # assumptions are wrong because: - # 1. If we're selecting ourself, then we've already caught that - # case above - # 2. We've checked that `s` originates from its parent before - # executing - assert isinstance(table_op, ops.Join) - assert selection.equals(table_op.left) or selection.equals( - table_op.right - ) - pandas_object = data[selection.columns] - else: - raise TypeError( - "Don't know how to compute selection of type {}".format( - type(selection_operation).__name__ - ) - ) + pandas_object = compute_projection( + selection, op, data, scope=scope, **kwargs + ) if isinstance(pandas_object, pd.Series): pandas_object = pandas_object.rename( @@ -414,9 +654,12 @@ def execute_selection_dataframe(op, data, scope=None, **kwargs): result = pd.concat(data_pieces, axis=1) if predicates: - where = functools.reduce( - operator.and_, (execute(p, scope, **kwargs) for p in predicates) + computed_predicates = _compute_predicates( + op.table.op(), predicates, data, scope, **kwargs ) + where = functools.reduce(operator.and_, computed_predicates) + assert len(where) == len(result), \ + 'Selection predicate length does not match underlying table' result = result.loc[where] if sort_keys: @@ -577,24 +820,72 @@ def execute_not_bool(op, data, **kwargs): _JOIN_TYPES = { ops.LeftJoin: 'left', + ops.RightJoin: 'right', ops.InnerJoin: 'inner', ops.OuterJoin: 'outer', } +def _compute_join_column(column_expr, **kwargs): + column_op = column_expr.op() + + if isinstance(column_op, ops.TableColumn): + new_column = column_op.name + else: + new_column = execute(column_expr, **kwargs) + + root_table, = column_op.root_tables() + return new_column, root_table + + +_LEFT_JOIN_SUFFIX = '_ibis_left_{}'.format(util.guid()) +_RIGHT_JOIN_SUFFIX = '_ibis_right_{}'.format(util.guid()) +_JOIN_SUFFIXES = _LEFT_JOIN_SUFFIX, _RIGHT_JOIN_SUFFIX +_ALTERNATE_SUFFIXES = { + _LEFT_JOIN_SUFFIX: _RIGHT_JOIN_SUFFIX, + _RIGHT_JOIN_SUFFIX: _LEFT_JOIN_SUFFIX +} + + @execute_node.register(ops.Join, pd.DataFrame, pd.DataFrame) def execute_materialized_join(op, left, right, **kwargs): + op_type = type(op) + try: - how = _JOIN_TYPES[type(op)] + how = _JOIN_TYPES[op_type] except KeyError: - raise NotImplementedError('{} not supported'.format(type(op).__name__)) + raise NotImplementedError('{} not supported'.format(op_type.__name__)) - overlapping_columns = set(left.columns) & set(right.columns) + left_op = op.left.op() + right_op = op.right.op() - left_on, right_on = _extract_predicate_names(op.predicates) - _validate_columns(overlapping_columns, left_on, right_on) + on = {left_op: [], right_op: []} - return pd.merge(left, right, how=how, left_on=left_on, right_on=right_on) + for predicate in map(operator.methodcaller('op'), op.predicates): + if not isinstance(predicate, ops.Equals): + raise TypeError( + 'Only equality join predicates supported with pandas' + ) + new_left_column, left_pred_root = _compute_join_column( + predicate.left, + **kwargs + ) + on[left_pred_root].append(new_left_column) + + new_right_column, right_pred_root = _compute_join_column( + predicate.right, + **kwargs + ) + on[right_pred_root].append(new_right_column) + + df = pd.merge( + left, right, + how=how, + left_on=on[left_op], + right_on=on[right_op], + suffixes=_JOIN_SUFFIXES, + ) + return df @execute_node.register(ops.AsOfJoin, pd.DataFrame, pd.DataFrame) @@ -1067,7 +1358,7 @@ def execute_frame_window_op(op, data, scope=None, context=None, **kwargs): 'Following with a value other than 0 (current row) with order_by ' 'is not yet implemented in the pandas backend. Use ' 'ibis.trailing_window or ibis.cumulative_window to ' - 'construct windows when using pandas.' + 'construct windows when using the pandas backend.' ) group_by = window._group_by @@ -1076,8 +1367,7 @@ def execute_frame_window_op(op, data, scope=None, context=None, **kwargs): key, context=context, **kwargs - ) - for key, key_op in zip( + ) for key, key_op in zip( group_by, map(operator.methodcaller('op'), group_by) ) ] @@ -1107,10 +1397,8 @@ def execute_frame_window_op(op, data, scope=None, context=None, **kwargs): new_scope = toolz.merge( scope, - collections.OrderedDict( - (t, source) for t in operand.op().root_tables() - ), - factory=collections.OrderedDict, + OrderedDict((t, source) for t in operand.op().root_tables()), + factory=OrderedDict, ) # no order by or group by: default summarization context @@ -1131,4 +1419,12 @@ def execute_frame_window_op(op, data, scope=None, context=None, **kwargs): context = ctx.Transform() result = execute(operand, new_scope, context=context, **kwargs) - return post_process(result, data.index) + series = post_process(result, data.index) + assert len(data) == len(series), \ + 'input data source and computed column do not have the same length' + return series + + +@execute_node.register(ops.SelfReference, pd.DataFrame) +def execute_node_self_reference_dataframe(op, data, **kwargs): + return data diff --git a/ibis/pandas/tests/conftest.py b/ibis/pandas/tests/conftest.py index f5cd47b959157..269f8ac4e6856 100644 --- a/ibis/pandas/tests/conftest.py +++ b/ibis/pandas/tests/conftest.py @@ -80,9 +80,11 @@ def df1(): @pytest.fixture(scope='module') def df2(): - return pd.DataFrame( - {'key': list('ac'), 'other_value': [4.0, 6.0], 'key3': list('fe')} - ) + return pd.DataFrame({ + 'key': list('ac'), + 'other_value': [4.0, 6.0], + 'key3': list('fe') + }) @pytest.fixture(scope='module') @@ -122,14 +124,35 @@ def time_keyed_df2(): @pytest.fixture(scope='module') -def client(df, df1, df2, time_df1, time_df2, time_keyed_df1, time_keyed_df2): +def client( + df, df1, df2, df3, time_df1, time_df2, time_keyed_df1, time_keyed_df2 +): return ibis.pandas.connect( - {'df': df, 'df1': df1, 'df2': df2, 'left': df1, 'right': df2, - 'time_df1': time_df1, 'time_df2': time_df2, - 'time_keyed_df1': time_keyed_df1, 'time_keyed_df2': time_keyed_df2} + dict( + df=df, + df1=df1, + df2=df2, + df3=df3, + left=df1, + right=df2, + time_df1=time_df1, + time_df2=time_df2, + time_keyed_df1=time_keyed_df1, + time_keyed_df2=time_keyed_df2 + ) ) +@pytest.fixture(scope='module') +def df3(): + return pd.DataFrame({ + 'key': list('ac'), + 'other_value': [4.0, 6.0], + 'key2': list('ae'), + 'key3': list('fe') + }) + + @pytest.fixture(scope='module') def t(client): return client.table( @@ -189,3 +212,25 @@ def batting(lahman): @pytest.fixture(scope='module') def awards_players(lahman): return lahman.table('awards_players') + + +@pytest.fixture(scope='module') +def sel_cols(batting): + cols = batting.columns + start, end = cols.index('AB'), cols.index('H') + 1 + return ['playerID', 'yearID', 'teamID', 'G'] + cols[start:end] + + +@pytest.fixture(scope='module') +def players_base(batting, sel_cols): + return batting[sel_cols].sort_by(sel_cols[:3]) + + +@pytest.fixture(scope='module') +def players(players_base): + return players_base.groupby('playerID') + + +@pytest.fixture(scope='module') +def players_df(players_base): + return players_base.execute().reset_index(drop=True) diff --git a/ibis/pandas/tests/test_client.py b/ibis/pandas/tests/test_client.py index ddbc4df9ca22f..6cc9c50c20db7 100644 --- a/ibis/pandas/tests/test_client.py +++ b/ibis/pandas/tests/test_client.py @@ -10,3 +10,12 @@ def test_client_table(t): assert isinstance(t.op(), ibis.expr.operations.DatabaseTable) assert isinstance(t.op(), PandasTable) assert 'PandasTable' in repr(t) + + +def test_literal(client): + assert client.execute(ibis.literal(1)) == 1 + + +def test_read_with_undiscoverable_type(client): + with pytest.raises(TypeError): + client.table('df') diff --git a/ibis/pandas/tests/test_join.py b/ibis/pandas/tests/test_join.py new file mode 100644 index 0000000000000..3a73790e60b34 --- /dev/null +++ b/ibis/pandas/tests/test_join.py @@ -0,0 +1,260 @@ +import pandas as pd +import pandas.util.testing as tm + +import pytest + + +pytest.importorskip('multipledispatch') + +pytestmark = pytest.mark.pandas + + +join_type = pytest.mark.parametrize( + 'how', + [ + 'inner', + 'left', + 'right', + 'outer', + pytest.mark.xfail( + 'semi', + raises=NotImplementedError, + reason='Semi join not implemented' + ), + pytest.mark.xfail( + 'anti', + raises=NotImplementedError, + reason='Anti join not implemented' + ), + ] +) + + +@join_type +def test_join(how, left, right, df1, df2): + expr = left.join(right, left.key == right.key, how=how) + result = expr.execute() + expected = pd.merge(df1, df2, how=how, on='key') + tm.assert_frame_equal(result[expected.columns], expected) + + +@join_type +def test_join_project_left_table(how, left, right, df1, df2): + expr = left.join(right, left.key == right.key, how=how)[left, right.key3] + result = expr.execute() + expected = pd.merge(df1, df2, how=how, on='key')[ + list(left.columns) + ['key3'] + ] + tm.assert_frame_equal(result[expected.columns], expected) + + +@join_type +def test_join_with_multiple_predicates(how, left, right, df1, df2): + expr = left.join( + right, [left.key == right.key, left.key2 == right.key3], how=how + ) + result = expr.execute() + expected = pd.merge( + df1, df2, + how=how, + left_on=['key', 'key2'], + right_on=['key', 'key3'], + ) + tm.assert_frame_equal(result[expected.columns], expected) + + +@join_type +def test_join_with_multiple_predicates_written_as_one( + how, left, right, df1, df2 +): + predicate = (left.key == right.key) & (left.key2 == right.key3) + expr = left.join(right, predicate, how=how) + result = expr.execute() + expected = pd.merge( + df1, df2, + how=how, + left_on=['key', 'key2'], + right_on=['key', 'key3'], + ) + tm.assert_frame_equal(result[expected.columns], expected) + + +@join_type +def test_join_with_invalid_predicates(how, left, right): + predicate = (left.key == right.key) & (left.key2 <= right.key3) + expr = left.join(right, predicate, how=how) + with pytest.raises(TypeError): + expr.execute() + + predicate = left.key >= right.key + expr = left.join(right, predicate, how=how) + with pytest.raises(TypeError): + expr.execute() + + +@join_type +@pytest.mark.xfail(reason='Hard to detect this case') +def test_join_with_duplicate_non_key_columns(how, left, right, df1, df2): + left = left.mutate(x=left.value * 2) + right = right.mutate(x=right.other_value * 3) + expr = left.join(right, left.key == right.key, how=how) + + # This is undefined behavior because `x` is duplicated. This is difficult + # to detect + with pytest.raises(ValueError): + expr.execute() + + +@join_type +def test_join_with_duplicate_non_key_columns_not_selected( + how, left, right, df1, df2 +): + left = left.mutate(x=left.value * 2) + right = right.mutate(x=right.other_value * 3) + right = right[['key', 'other_value']] + expr = left.join(right, left.key == right.key, how=how) + result = expr.execute() + expected = pd.merge( + df1.assign(x=df1.value * 2), + df2[['key', 'other_value']], + how=how, + on='key', + ) + tm.assert_frame_equal(result[expected.columns], expected) + + +@join_type +def test_join_with_post_expression_selection(how, left, right, df1, df2): + join = left.join(right, left.key == right.key, how=how) + expr = join[left.key, left.value, right.other_value] + result = expr.execute() + expected = pd.merge(df1, df2, on='key', how=how)[[ + 'key', 'value', 'other_value' + ]] + tm.assert_frame_equal(result[expected.columns], expected) + + +@join_type +def test_join_with_post_expression_filter(how, left, df1): + lhs = left[['key', 'key2']] + rhs = left[['key2', 'value']] + + joined = lhs.join(rhs, 'key2', how=how) + projected = joined[lhs, rhs.value] + expr = projected[projected.value == 4] + result = expr.execute() + + df1 = lhs.execute() + df2 = rhs.execute() + expected = pd.merge(df1, df2, on='key2', how=how) + expected = expected.loc[expected.value == 4].reset_index(drop=True) + + tm.assert_frame_equal(result, expected) + + +@join_type +def test_multi_join_with_post_expression_filter(how, left, df1): + lhs = left[['key', 'key2']] + rhs = left[['key2', 'value']] + rhs2 = left[['key2', 'value']].relabel(dict(value='value2')) + + joined = lhs.join(rhs, 'key2', how=how) + projected = joined[lhs, rhs.value] + filtered = projected[projected.value == 4] + + joined2 = filtered.join(rhs2, 'key2') + projected2 = joined2[filtered.key, rhs2.value2] + expr = projected2[projected2.value2 == 3] + + result = expr.execute() + + df1 = lhs.execute() + df2 = rhs.execute() + df3 = rhs2.execute() + expected = pd.merge(df1, df2, on='key2', how=how) + expected = expected.loc[expected.value == 4].reset_index(drop=True) + expected = pd.merge(expected, df3, on='key2')[['key', 'value2']] + expected = expected.loc[expected.value2 == 3].reset_index(drop=True) + + tm.assert_frame_equal(result, expected) + + +@join_type +def test_join_with_non_trivial_key(how, left, right, df1, df2): + # also test that the order of operands in the predicate doesn't matter + join = left.join(right, right.key.length() == left.key.length(), how=how) + expr = join[left.key, left.value, right.other_value] + result = expr.execute() + + expected = pd.merge( + df1.assign(key_len=df1.key.str.len()), + df2.assign(key_len=df2.key.str.len()), + on='key_len', + how=how, + ).drop(['key_len', 'key_y', 'key2', 'key3'], axis=1).rename( + columns={'key_x': 'key'} + ) + tm.assert_frame_equal(result[expected.columns], expected) + + +@join_type +def test_join_with_non_trivial_key_project_table(how, left, right, df1, df2): + # also test that the order of operands in the predicate doesn't matter + join = left.join(right, right.key.length() == left.key.length(), how=how) + expr = join[left, right.other_value] + expr = expr[expr.key.length() == 1] + result = expr.execute() + + expected = pd.merge( + df1.assign(key_len=df1.key.str.len()), + df2.assign(key_len=df2.key.str.len()), + on='key_len', + how=how, + ).drop(['key_len', 'key_y', 'key2', 'key3'], axis=1).rename( + columns={'key_x': 'key'} + ) + expected = expected.loc[expected.key.str.len() == 1] + tm.assert_frame_equal(result[expected.columns], expected) + + +@join_type +def test_join_with_project_right_duplicate_column(client, how, left, df1, df3): + # also test that the order of operands in the predicate doesn't matter + right = client.table('df3') + join = left.join(right, ['key'], how=how) + expr = join[left.key, right.key2, right.other_value] + result = expr.execute() + + expected = pd.merge( + df1, df3, on='key', how=how, + ).drop( + ['key2_x', 'key3', 'value'], + axis=1 + ).rename(columns={'key2_y': 'key2'}) + tm.assert_frame_equal(result[expected.columns], expected) + + +def test_join_with_window_function( + players_base, players_df, batting, batting_df +): + players = players_base + + # this should be semi_join + tbl = batting.left_join(players, ['playerID']) + t = tbl[batting.G, batting.playerID, batting.teamID] + expr = t.groupby(t.teamID).mutate( + team_avg=lambda d: d.G.mean(), + demeaned_by_player=lambda d: d.G - d.G.mean() + ) + result = expr.execute() + + expected = pd.merge( + batting_df, players_df[['playerID']], on='playerID', how='left' + )[['G', 'playerID', 'teamID']] + team_avg = expected.groupby('teamID').G.transform('mean') + expected = expected.assign( + team_avg=team_avg, + demeaned_by_player=lambda df: df.G - team_avg + ) + + tm.assert_frame_equal(result[expected.columns], expected) diff --git a/ibis/pandas/tests/test_operations.py b/ibis/pandas/tests/test_operations.py index c441ad339c074..705c05ab18223 100644 --- a/ibis/pandas/tests/test_operations.py +++ b/ibis/pandas/tests/test_operations.py @@ -674,18 +674,16 @@ def test_array_repeat_scalar(client, n, mul): assert result == expected -@pytest.mark.parametrize('catop', [lambda x, y: x + y, lambda x, y: y + x]) -def test_array_concat(t, df, catop): +@pytest.mark.parametrize('op', [lambda x, y: x + y, lambda x, y: y + x]) +def test_array_concat(t, df, op): x = t.array_of_float64.cast('array') y = t.array_of_strings - expr = t[(x + y).name('catted')].catted + expr = op(x, y) result = expr.execute() - expected = pd.DataFrame({ - 'catted': ( - df.array_of_float64.apply(lambda x: list(map(str, x))) + - df.array_of_strings - ) - }).catted + expected = op( + df.array_of_float64.apply(lambda x: list(map(str, x))), + df.array_of_strings + ) tm.assert_series_equal(result, expected) diff --git a/ibis/pandas/tests/test_window.py b/ibis/pandas/tests/test_window.py index 689aae07be0ee..c7e419b2355a7 100644 --- a/ibis/pandas/tests/test_window.py +++ b/ibis/pandas/tests/test_window.py @@ -3,7 +3,6 @@ import pytest import ibis from pandas.util import testing as tm -import ibis.expr.types as ir pytestmark = pytest.mark.pandas @@ -60,29 +59,6 @@ def test_group_by_mutate_analytic(t, df): tm.assert_frame_equal(result[expected.columns], expected) -@pytest.fixture(scope='module') -def sel_cols(batting): - assert isinstance(batting, ir.TableExpr) - cols = batting.columns - start, end = cols.index('AB'), cols.index('H') + 1 - return ['playerID', 'yearID', 'teamID', 'G'] + cols[start:end] - - -@pytest.fixture(scope='module') -def players_base(batting, sel_cols): - return batting[sel_cols].sort_by(sel_cols[:3]) - - -@pytest.fixture(scope='module') -def players(players_base): - return players_base.groupby('playerID') - - -@pytest.fixture(scope='module') -def players_df(players_base): - return players_base.execute().reset_index(drop=True) - - def test_players(players, players_df): lagged = players.mutate(pct=lambda t: t.G - t.G.lag()) result = lagged.execute()