Skip to content

Commit

Permalink
ENH: Support Table.asof_join
Browse files Browse the repository at this point in the history
closes #1118 with a Pandas implementation
  • Loading branch information
toryhaavik committed Oct 6, 2017
1 parent a832747 commit 6bf5f4d
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 62 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -10,6 +10,7 @@
*.log
*.swp
*.pdb
.idea

# Compiled source
*.a
Expand Down
24 changes: 24 additions & 0 deletions ibis/expr/api.py
Expand Up @@ -1974,6 +1974,29 @@ def join(left, right, predicates=(), how='inner'):
return TableExpr(op)


def asof_join(left, right, predicates=(), by=()):
"""
Perform an asof join between two tables. Similar to a left join
except that the match is done on nearest key rather than equal keys.
Optionally, match keys with 'by' before joining with predicates.
Parameters
----------
left : TableExpr
right : TableExpr
predicates : join expression(s)
by : string
column to group by before joining
Returns
-------
joined : TableExpr
Note, schema is not materialized yet
"""
return _ops.AsOfJoin(left, right, predicates, by).to_expr()


def cross_join(*tables, **kwargs):
"""
Perform a cross join (cartesian product) amongst a list of tables, with
Expand Down Expand Up @@ -2476,6 +2499,7 @@ def _table_drop(self, fields):
outer_join=_regular_join_method('outer_join', 'outer'),
semi_join=_regular_join_method('semi_join', 'semi'),
anti_join=_regular_join_method('anti_join', 'anti'),
asof_join=asof_join,
sort_by=_table_sort_by,
to_array=_table_to_array,
union=_table_union,
Expand Down
125 changes: 75 additions & 50 deletions ibis/expr/operations.py
Expand Up @@ -1355,73 +1355,82 @@ def output_type(self):
return rules.shape_like(self.args[0], self.args[1].type())


class Join(TableNode):
def _validate_join_tables(left, right):
if not rules.is_table(left):
raise TypeError('Can only join table expressions, got {} for '
'left table'.format(type(left).__name__))

_arg_names = ['left', 'right', 'predicates']
if not rules.is_table(right):
raise TypeError('Can only join table expressions, got {} for '
'right table'.format(type(right).__name__))

def __init__(self, left, right, predicates):
if not rules.is_table(left):
raise TypeError('Can only join table expressions, got {} for '
'left table'.format(type(left).__name__))

if not rules.is_table(right):
raise TypeError('Can only join table expressions, got {} for '
'right table'.format(type(right).__name__))
def _make_distinct_join_predicates(left, right, predicates):
# see GH #667

(self.left,
self.right,
self.predicates) = self._make_distinct(left, right, predicates)
# If left and right table have a common parent expression (e.g. they
# have different filters), must add a self-reference and make the
# appropriate substitution in the join predicates

# Validate join predicates. Each predicate must be valid jointly when
# considering the roots of each input table
from ibis.expr.analysis import CommonSubexpr
validator = CommonSubexpr([self.left, self.right])
validator.validate_all(self.predicates)
if left.equals(right):
right = right.view()

Node.__init__(self, [self.left, self.right, self.predicates])
predicates = _clean_join_predicates(left, right, predicates)
return left, right, predicates

def _make_distinct(self, left, right, predicates):
# see GH #667

# If left and right table have a common parent expression (e.g. they
# have different filters), must add a self-reference and make the
# appropriate substitution in the join predicates
def _clean_join_predicates(left, right, predicates):
import ibis.expr.analysis as L

if left.equals(right):
right = right.view()
result = []

predicates = self._clean_predicates(left, right, predicates)
return left, right, predicates
if not isinstance(predicates, (list, tuple)):
predicates = [predicates]

def _clean_predicates(self, left, right, predicates):
import ibis.expr.analysis as L
for pred in predicates:
if isinstance(pred, tuple):
if len(pred) != 2:
raise com.ExpressionError('Join key tuple must be '
'length 2')
lk, rk = pred
lk = left._ensure_expr(lk)
rk = right._ensure_expr(rk)
pred = lk == rk
elif isinstance(pred, six.string_types):
pred = left[pred] == right[pred]
elif not isinstance(pred, ir.Expr):
raise NotImplementedError

if not isinstance(pred, ir.BooleanColumn):
raise com.ExpressionError('Join predicate must be comparison')

result = []
preds = L.flatten_predicate(pred)
result.extend(preds)

if not isinstance(predicates, (list, tuple)):
predicates = [predicates]
_validate_join_predicates(left, right, result)
return result

for pred in predicates:
if isinstance(pred, tuple):
if len(pred) != 2:
raise com.ExpressionError('Join key tuple must be '
'length 2')
lk, rk = pred
lk = left._ensure_expr(lk)
rk = right._ensure_expr(rk)
pred = lk == rk
elif isinstance(pred, six.string_types):
pred = left[pred] == right[pred]
elif not isinstance(pred, ir.Expr):
raise NotImplementedError

if not isinstance(pred, ir.BooleanColumn):
raise com.ExpressionError('Join predicate must be comparison')
def _validate_join_predicates(left, right, predicates):
# Validate join predicates. Each predicate must be valid jointly when
# considering the roots of each input table
from ibis.expr.analysis import CommonSubexpr
validator = CommonSubexpr([left, right])
validator.validate_all(predicates)

preds = L.flatten_predicate(pred)
result.extend(preds)

return result
class Join(TableNode):

_arg_names = ['left', 'right', 'predicates']

def __init__(self, left, right, predicates):
_validate_join_tables(left, right)
(self.left,
self.right,
self.predicates) = _make_distinct_join_predicates(
left, right, predicates)

Node.__init__(self, [self.left, self.right, self.predicates])

def _get_schema(self):
# For joins retaining both table schemas, merge them together here
Expand Down Expand Up @@ -1530,6 +1539,22 @@ def __init__(self, *args, **kwargs):
InnerJoin.__init__(self, left, right, [])


class AsOfJoin(Join):
_arg_names = ['left', 'right', 'predicates', 'by']

def __init__(self, left, right, predicates, by_predicates):
_validate_join_tables(left, right)
(self.left,
self.right,
self.predicates) = _make_distinct_join_predicates(
left, right, predicates)
self.by_predicates = _clean_join_predicates(
self.left, self.right, by_predicates)

Node.__init__(
self, [self.left, self.right, self.predicates, self.by_predicates])


class Union(TableNode, HasSchema):

def __init__(self, left, right, distinct=False):
Expand Down
18 changes: 18 additions & 0 deletions ibis/expr/tests/test_table.py
Expand Up @@ -729,6 +729,24 @@ def test_join_no_predicate_list(con):
assert_equal(joined, expected)


def test_asof_join():
left = ibis.table([('time', 'int32'), ('value', 'double')])
right = ibis.table([('time', 'int32'), ('value2', 'double')])
joined = api.asof_join(left, right, 'time')
pred = joined.op().predicates[0].op()
assert pred.left.op().name == pred.right.op().name == 'time'


def test_asof_join_with_by():
left = ibis.table(
[('time', 'int32'), ('key', 'int32'), ('value', 'double')])
right = ibis.table(
[('time', 'int32'), ('key', 'int32'), ('value2', 'double')])
joined = api.asof_join(left, right, 'time', by='key')
by = joined.op().by_predicates[0].op()
assert by.left.op().name == by.right.op().name == 'key'


def test_equijoin_schema_merge():
table1 = ibis.table([('key1', 'string'), ('value1', 'double')])
table2 = ibis.table([('key2', 'string'), ('stuff', 'int32')])
Expand Down
46 changes: 36 additions & 10 deletions ibis/pandas/execution.py
Expand Up @@ -591,23 +591,51 @@ def execute_materialized_join(op, left, right, **kwargs):

overlapping_columns = set(left.columns) & set(right.columns)

left_on = []
right_on = []
left_on, right_on = _extract_predicate_names(op.predicates)
_validate_columns(overlapping_columns, left_on, right_on)

for predicate in map(operator.methodcaller('op'), op.predicates):
return pd.merge(left, right, how=how, left_on=left_on, right_on=right_on)


@execute_node.register(ops.AsOfJoin, pd.DataFrame, pd.DataFrame)
def execute_asof_join(op, left, right, **kwargs):
overlapping_columns = set(left.columns) & set(right.columns)
left_on, right_on = _extract_predicate_names(op.predicates)
merge_asof_args = {
'left': left,
'right': right,
'left_on': left_on,
'right_on': right_on
}
left_by, right_by = _extract_predicate_names(op.by_predicates)
_validate_columns(
overlapping_columns, left_on, right_on, left_by, right_by)

if left_by and right_by:
merge_asof_args['left_by'] = left_by
merge_asof_args['right_by'] = right_by

return pd.merge_asof(**merge_asof_args)


def _extract_predicate_names(predicates):
lefts = []
rights = []
for predicate in map(operator.methodcaller('op'), predicates):
if not isinstance(predicate, ops.Equals):
raise TypeError(
'Only equality join predicates supported with pandas'
)
left_name = predicate.left._name
right_name = predicate.right._name
left_on.append(left_name)
right_on.append(right_name)
lefts.append(left_name)
rights.append(right_name)
return lefts, rights

# TODO(phillipc): Is this the correct approach? That is, can we safely
# ignore duplicate join keys?
overlapping_columns -= {left_name, right_name}

def _validate_columns(orig_columns, *key_lists):
all_keys = set([item for sublist in key_lists for item in sublist])
overlapping_columns = orig_columns.difference(all_keys)
if overlapping_columns:
raise ValueError(
'left and right DataFrame columns overlap on {} in a join. '
Expand All @@ -617,8 +645,6 @@ def execute_materialized_join(op, left, right, **kwargs):
)
)

return pd.merge(left, right, how=how, left_on=left_on, right_on=right_on)


_BINARY_OPERATIONS = {
ops.Greater: operator.gt,
Expand Down
62 changes: 60 additions & 2 deletions ibis/pandas/tests/conftest.py
Expand Up @@ -86,9 +86,47 @@ def df2():


@pytest.fixture(scope='module')
def client(df, df1, df2):
def time_df1():
return pd.DataFrame(
{'time': pd.to_datetime([1, 2, 3, 4]), 'value': [1.1, 2.2, 3.3, 4.4]}
)


@pytest.fixture(scope='module')
def time_df2():
return pd.DataFrame(
{'time': pd.to_datetime([2, 4]), 'other_value': [1.2, 2.0]}
)


@pytest.fixture(scope='module')
def time_keyed_df1():
return pd.DataFrame(
{
'time': pd.to_datetime([1, 1, 2, 2, 3, 3, 4, 4]),
'key': [1, 2, 1, 2, 1, 2, 1, 2],
'value': [1.1, 1.2, 2.2, 2.4, 3.3, 3.6, 4.4, 4.8]
}
)


@pytest.fixture(scope='module')
def time_keyed_df2():
return pd.DataFrame(
{
'time': pd.to_datetime([2, 2, 4, 4]),
'key': [1, 2, 1, 2],
'other_value': [1.2, 1.4, 2.0, 4.0]
}
)


@pytest.fixture(scope='module')
def client(df, df1, df2, time_df1, time_df2, time_keyed_df1, time_keyed_df2):
return ibis.pandas.connect(
{'df': df, 'df1': df1, 'df2': df2, 'left': df1, 'right': df2}
{'df': df, 'df1': df1, 'df2': df2, 'left': df1, 'right': df2,
'time_df1': time_df1, 'time_df2': time_df2,
'time_keyed_df1': time_keyed_df1, 'time_keyed_df2': time_keyed_df2}
)


Expand Down Expand Up @@ -123,6 +161,26 @@ def right(client):
return client.table('right')


@pytest.fixture(scope='module')
def time_left(client):
return client.table('time_df1')


@pytest.fixture(scope='module')
def time_right(client):
return client.table('time_df2')


@pytest.fixture(scope='module')
def time_keyed_left(client):
return client.table('time_keyed_df1')


@pytest.fixture(scope='module')
def time_keyed_right(client):
return client.table('time_keyed_df2')


@pytest.fixture(scope='module')
def batting(lahman):
return lahman.table('batting')
Expand Down

0 comments on commit 6bf5f4d

Please sign in to comment.