From c20a64ff798ed69661a988f55dac6873c71f0f44 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sat, 12 May 2018 13:54:57 -0400 Subject: [PATCH 1/3] ENH: Fix BigQuery transpilation Author: Phillip Cloud Closes #1442 from cpcloud/fix-bq-udf and squashes the following commits: a76514b [Phillip Cloud] ENH: Fix BigQuery transpilation --- ibis/bigquery/udf/core.py | 11 +++++++++-- ibis/bigquery/udf/tests/test_core.py | 11 +++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/ibis/bigquery/udf/core.py b/ibis/bigquery/udf/core.py index 9715f2d03b62..e4cb48dcd2b2 100644 --- a/ibis/bigquery/udf/core.py +++ b/ibis/bigquery/udf/core.py @@ -520,10 +520,17 @@ def visit_ListComp(self, node): 'Only single loop comprehensions are allowed' ) - names = find_names(node.elt) + names = find_names(generator.target) argslist = [ast.arg(arg=name.id, annotation=None) for name in names] if len(names) <= 1: - signature = ast.arguments(args=argslist) + signature = ast.arguments( + args=argslist, + vararg=None, + kwonlyargs=[], + kw_defaults=[], + kwarg=None, + defaults=[] + ) else: signature = ast.List(elts=argslist, ctx=ast.Load()) diff --git a/ibis/bigquery/udf/tests/test_core.py b/ibis/bigquery/udf/tests/test_core.py index cd11411e83cf..5f80ca47fabf 100644 --- a/ibis/bigquery/udf/tests/test_core.py +++ b/ibis/bigquery/udf/tests/test_core.py @@ -572,3 +572,14 @@ def f(*args): }""" js = compile(f) assert js == expected + + +def test_missing_vararg(): + def my_range(n): + return [1 for x in [n]] + js = compile(my_range) + expected = """\ +function my_range(n) { + return [n].map(((x) => 1)); +}""" + assert js == expected From d61ccb22b59e5fbaca0ec32583b73ef016dd8352 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sun, 13 May 2018 15:09:07 -0400 Subject: [PATCH 2/3] BUG: Handle large timestamps Author: Phillip Cloud Closes #1440 from cpcloud/fix-bigquery-large-timestamps and squashes the following commits: 74902a0 [Phillip Cloud] BUG: Handle large timestamps --- ibis/bigquery/tests/test_client.py | 7 + ibis/expr/api.py | 34 ++--- ibis/expr/tests/test_value_exprs.py | 7 + ibis/pandas/client.py | 193 ++++++++++++++++++++-------- ibis/pandas/tests/test_schema.py | 13 ++ ibis/tests/all/test_string.py | 7 +- 6 files changed, 183 insertions(+), 78 deletions(-) diff --git a/ibis/bigquery/tests/test_client.py b/ibis/bigquery/tests/test_client.py index a51ab9050ed3..ae32da93e588 100644 --- a/ibis/bigquery/tests/test_client.py +++ b/ibis/bigquery/tests/test_client.py @@ -509,3 +509,10 @@ def test_multiple_project_queries_execute(client): result = join.execute() assert list(result.columns) == ['title'] assert len(result) == 5 + + +def test_large_timestamp(client): + huge_timestamp = datetime(year=4567, month=1, day=1) + expr = ibis.timestamp('4567-01-01 00:00:00') + result = client.execute(expr) + assert result == huge_timestamp diff --git a/ibis/expr/api.py b/ibis/expr/api.py index 6cef012d7670..fa69256525c2 100644 --- a/ibis/expr/api.py +++ b/ibis/expr/api.py @@ -1,27 +1,17 @@ -# Copyright 2015 Cloudera Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - from __future__ import print_function +import collections +import datetime +import functools +import operator +import warnings + import six import toolz -import warnings -import operator -import datetime -import functools -import collections +import dateutil.parser + +import pandas as pd import ibis.util as util import ibis.common as com @@ -174,8 +164,10 @@ def timestamp(value): result : TimestampScalar """ if isinstance(value, six.string_types): - from pandas import Timestamp - value = Timestamp(value) + try: + value = pd.Timestamp(value) + except pd.errors.OutOfBoundsDatetime: + value = dateutil.parser.parse(value) if isinstance(value, six.integer_types): warnings.warn( 'Integer values for timestamp literals are deprecated in 0.11.0 ' diff --git a/ibis/expr/tests/test_value_exprs.py b/ibis/expr/tests/test_value_exprs.py index 57b029a82fa3..ae50be0cdddc 100644 --- a/ibis/expr/tests/test_value_exprs.py +++ b/ibis/expr/tests/test_value_exprs.py @@ -1302,3 +1302,10 @@ def test_interval_negate(base_expr): assert isinstance(expr.op(), ops.Negate) assert expr.equals(expr2) assert expr.equals(expr3) + + +def test_large_timestamp(): + expr = ibis.timestamp('4567-02-03') + expected = datetime(year=4567, month=2, day=3) + result = expr.op().value + assert result == expected diff --git a/ibis/pandas/client.py b/ibis/pandas/client.py index 66a48b588499..cab1518d5209 100644 --- a/ibis/pandas/client.py +++ b/ibis/pandas/client.py @@ -4,6 +4,9 @@ import toolz import numpy as np import pandas as pd +import dateutil.parser + +from multipledispatch import Dispatcher import ibis.client as client import ibis.expr.types as ir @@ -20,53 +23,57 @@ infer_pandas_dtype = pd.lib.infer_dtype -_ibis_dtypes = { - dt.Boolean: 'bool', - dt.Null: 'object', - dt.Array: 'object', - dt.String: 'object', - dt.Binary: 'object', - dt.Category: 'category', - dt.Date: 'datetime64[ns]', - dt.Time: 'datetime64[ns]', - dt.Timestamp: 'datetime64[ns]', - dt.Interval: 'timedelta64[ns]', - dt.Int8: 'int8', - dt.Int16: 'int16', - dt.Int32: 'int32', - dt.Int64: 'int64', - dt.UInt8: 'uint8', - dt.UInt16: 'uint16', - dt.UInt32: 'uint32', - dt.UInt64: 'uint64', - dt.Float32: 'float32', - dt.Float64: 'float64', - dt.Decimal: 'float64', - dt.Struct: 'object', -} +_ibis_dtypes = toolz.valmap( + np.dtype, + { + dt.Boolean: np.bool_, + dt.Null: np.object_, + dt.Array: np.object_, + dt.String: np.object_, + dt.Binary: np.object_, + dt.Date: 'datetime64[ns]', + dt.Time: 'timedelta64[ns]', + dt.Timestamp: 'datetime64[ns]', + dt.Int8: np.int8, + dt.Int16: np.int16, + dt.Int32: np.int32, + dt.Int64: np.int64, + dt.UInt8: np.uint8, + dt.UInt16: np.uint16, + dt.UInt32: np.uint32, + dt.UInt64: np.uint64, + dt.Float32: np.float32, + dt.Float64: np.float64, + dt.Decimal: np.float64, + dt.Struct: np.object_, + } +) -_numpy_dtypes = toolz.keymap(np.dtype, { - 'bool': dt.boolean, - 'int8': dt.int8, - 'int16': dt.int16, - 'int32': dt.int32, - 'int64': dt.int64, - 'uint8': dt.uint8, - 'uint16': dt.uint16, - 'uint32': dt.uint32, - 'uint64': dt.uint64, - 'float16': dt.float16, - 'float32': dt.float32, - 'float64': dt.float64, - 'double': dt.double, - 'unicode': dt.string, - 'str': dt.string, - 'datetime64': dt.timestamp, - 'datetime64[ns]': dt.timestamp, - 'timedelta64': dt.interval, - 'timedelta64[ns]': dt.Interval('ns') -}) +_numpy_dtypes = toolz.keymap( + np.dtype, + { + 'bool': dt.boolean, + 'int8': dt.int8, + 'int16': dt.int16, + 'int32': dt.int32, + 'int64': dt.int64, + 'uint8': dt.uint8, + 'uint16': dt.uint16, + 'uint32': dt.uint32, + 'uint64': dt.uint64, + 'float16': dt.float16, + 'float32': dt.float32, + 'float64': dt.float64, + 'double': dt.double, + 'unicode': dt.string, + 'str': dt.string, + 'datetime64': dt.timestamp, + 'datetime64[ns]': dt.timestamp, + 'timedelta64': dt.interval, + 'timedelta64[ns]': dt.Interval('ns') + } +) _inferable_pandas_dtypes = { @@ -163,29 +170,107 @@ def ibis_dtype_to_pandas(ibis_dtype): if isinstance(ibis_dtype, dt.Timestamp) and ibis_dtype.timezone: return DatetimeTZDtype('ns', ibis_dtype.timezone) elif isinstance(ibis_dtype, dt.Interval): - return 'timedelta64[{}]'.format(ibis_dtype.unit) + return np.dtype('timedelta64[{}]'.format(ibis_dtype.unit)) + elif isinstance(ibis_dtype, dt.Category): + return CategoricalDtype() elif type(ibis_dtype) in _ibis_dtypes: return _ibis_dtypes[type(ibis_dtype)] else: - return 'object' + return np.dtype(np.object_) def ibis_schema_to_pandas(schema): return list(zip(schema.names, map(ibis_dtype_to_pandas, schema.types))) +convert = Dispatcher( + 'convert', + doc="""\ +Convert `column` to the pandas dtype corresponding to `out_dtype`, where the +dtype of `column` is `in_dtype`. + +Parameters +---------- +in_dtype : Union[np.dtype, pandas_dtype] + The dtype of `column`, used for dispatching +out_dtype : ibis.expr.datatypes.DataType + The requested ibis type of the output +column : pd.Series + The column to convert + +Returns +------- +result : pd.Series + The converted column +""") + + +@convert.register(DatetimeTZDtype, dt.Timestamp, pd.Series) +def convert_datetimetz_to_timestamp(in_dtype, out_dtype, column): + output_timezone = out_dtype.timezone + if output_timezone is not None: + return column.dt.tz_convert(output_timezone) + return column.astype(out_dtype.to_pandas(), errors='ignore') + + +@convert.register(np.dtype, dt.Timestamp, pd.Series) +def convert_datetime64_to_timestamp(in_dtype, out_dtype, column): + if in_dtype.type == np.datetime64: + return column.astype(out_dtype.to_pandas(), errors='ignore') + try: + return pd.to_datetime(column) + except pd.errors.OutOfBoundsDatetime: + return column.map(dateutil.parser.parse) + + +@convert.register(np.dtype, dt.Interval, pd.Series) +def convert_any_to_interval(_, out_dtype, column): + return column.values.astype(out_dtype.to_pandas()) + + +@convert.register(np.dtype, dt.String, pd.Series) +def convert_any_to_string(_, out_dtype, column): + result = column.astype(out_dtype.to_pandas(), errors='ignore') + if PY2: + return column.str.decode('utf-8', errors='ignore') + return result + + +@convert.register(object, dt.DataType, pd.Series) +def convert_any_to_any(_, out_dtype, column): + return column.astype(out_dtype.to_pandas(), errors='ignore') + + def ibis_schema_apply_to(schema, df): - """Applies the Ibis schema on a pandas dataframe""" + """Applies the Ibis schema to a pandas DataFrame + + Parameters + ---------- + schema : ibis.schema.Schema + df : pandas.DataFrame + + Returns + ------- + df : pandas.DataFrame + + Notes + ----- + Mutates `df` + """ for column, dtype in schema.items(): pandas_dtype = dtype.to_pandas() - if isinstance(dtype, dt.Interval): - df[column] = df[column].values.astype(pandas_dtype) - else: - df[column] = df[column].astype(pandas_dtype, errors='ignore') + col = df[column] + col_dtype = col.dtype + + try: + not_equal = pandas_dtype != col_dtype + except TypeError: + # ugh, we can't compare dtypes coming from pandas, assume not equal + not_equal = True - if PY2 and dtype == dt.string: - df[column] = df[column].str.decode('utf-8', errors='ignore') + if not_equal or dtype == dt.string: + df[column] = convert(col_dtype, dtype, col) return df diff --git a/ibis/pandas/tests/test_schema.py b/ibis/pandas/tests/test_schema.py index 46c72fb0ba2a..5605a05efb8a 100644 --- a/ibis/pandas/tests/test_schema.py +++ b/ibis/pandas/tests/test_schema.py @@ -2,6 +2,8 @@ import numpy as np import pandas as pd +import pandas.util.testing as tm + import ibis from ibis.expr import datatypes as dt from ibis.expr import schema as sch @@ -94,4 +96,15 @@ def test_infer_exhaustive_dataframe(): assert sch.infer(df) == ibis.schema(expected) +def test_apply_to_schema_with_timezone(): + data = { + 'time': pd.date_range('2018-01-01', '2018-01-02', freq='H') + } + df = pd.DataFrame(data) + expected = df.assign(time=df.time.astype('datetime64[ns, EST]')) + desired_schema = ibis.schema([('time', 'timestamp("EST")')]) + result = desired_schema.apply_to(df.copy()) + tm.assert_frame_equal(expected, result) + + # TODO(kszucs): test_Schema_to_pandas diff --git a/ibis/tests/all/test_string.py b/ibis/tests/all/test_string.py index 2ea2dc872c1e..8fc0e4d41389 100644 --- a/ibis/tests/all/test_string.py +++ b/ibis/tests/all/test_string.py @@ -11,9 +11,10 @@ def test_string_col_is_unicode(backend, alltypes, df): dtype = alltypes.string_col.type() assert dtype == dt.String(nullable=dtype.nullable) - - for s in [alltypes.string_col.execute(), df.string_col]: - assert s.apply(lambda x: isinstance(x, six.text_type)).all() + is_text_type = lambda x: isinstance(x, six.text_type) # noqa: E731 + assert df.string_col.map(is_text_type).all() + result = alltypes.string_col.execute() + assert result.map(is_text_type).all() @pytest.mark.parametrize( From 6f90d2f77c4bccb0d527f99385579a544ec6dd1a Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 15 May 2018 14:25:57 -0400 Subject: [PATCH 3/3] ENH: Add tolerance parameter to asof_join API Author: Phillip Cloud Closes #1443 from cpcloud/tolerance and squashes the following commits: f7ad15f [Phillip Cloud] ENH: Add tolerance parameter to asof_join API --- ibis/expr/api.py | 18 +++++++++--------- ibis/expr/operations.py | 4 +++- ibis/pandas/execution/join.py | 11 +++++++---- ibis/pandas/execution/tests/test_join.py | 14 ++++++++++++++ 4 files changed, 33 insertions(+), 14 deletions(-) diff --git a/ibis/expr/api.py b/ibis/expr/api.py index fa69256525c2..e4701d488c18 100644 --- a/ibis/expr/api.py +++ b/ibis/expr/api.py @@ -2402,8 +2402,7 @@ def _time_sub(left, right): def join(left, right, predicates=(), how='inner'): - """ - Perform a relational join between two tables. Does not resolve resulting + """Perform a relational join between two tables. Does not resolve resulting table schema. Parameters @@ -2422,7 +2421,7 @@ def join(left, right, predicates=(), how='inner'): Returns ------- joined : TableExpr - Note, schema is not materialized yet + Note that the schema is not materialized yet """ klass = _join_classes[how.lower()] if isinstance(predicates, Expr): @@ -2432,9 +2431,8 @@ def join(left, right, predicates=(), how='inner'): return ir.TableExpr(op) -def asof_join(left, right, predicates=(), by=()): - """ - Perform an asof join between two tables. Similar to a left join +def asof_join(left, right, predicates=(), by=(), tolerance=None): + """Perform an asof join between two tables. Similar to a left join except that the match is done on nearest key rather than equal keys. Optionally, match keys with 'by' before joining with predicates. @@ -2445,14 +2443,16 @@ def asof_join(left, right, predicates=(), by=()): right : TableExpr predicates : join expression(s) by : string - column to group by before joining + column to group by before joining + tolerance : interval + Amount of time to look behind when joining Returns ------- joined : TableExpr - Note, schema is not materialized yet + Note that the schema is not materialized yet """ - return ops.AsOfJoin(left, right, predicates, by).to_expr() + return ops.AsOfJoin(left, right, predicates, by, tolerance).to_expr() def cross_join(*tables, **kwargs): diff --git a/ibis/expr/operations.py b/ibis/expr/operations.py index 254d2b229e87..62414c934d5a 100644 --- a/ibis/expr/operations.py +++ b/ibis/expr/operations.py @@ -1558,10 +1558,12 @@ class AsOfJoin(Join): right = Arg(rlz.noop) predicates = Arg(rlz.noop) by = Arg(rlz.noop, default=None) + tolerance = Arg(rlz.interval(), default=None) - def __init__(self, left, right, predicates, by): + def __init__(self, left, right, predicates, by, tolerance): super(AsOfJoin, self).__init__(left, right, predicates) self.by = _clean_join_predicates(self.left, self.right, by) + self.tolerance = tolerance class Union(TableNode, HasSchema): diff --git a/ibis/pandas/execution/join.py b/ibis/pandas/execution/join.py index 06f53e1b8ace..7a1c5c189841 100644 --- a/ibis/pandas/execution/join.py +++ b/ibis/pandas/execution/join.py @@ -62,9 +62,11 @@ def execute_materialized_join(op, left, right, **kwargs): return df -@execute_node.register(ops.AsOfJoin, pd.DataFrame, pd.DataFrame) -def execute_asof_join(op, left, right, **kwargs): - overlapping_columns = set(left.columns) & set(right.columns) +@execute_node.register( + ops.AsOfJoin, pd.DataFrame, pd.DataFrame, (pd.Timedelta, type(None)) +) +def execute_asof_join(op, left, right, tolerance, **kwargs): + overlapping_columns = frozenset(left.columns) & frozenset(right.columns) left_on, right_on = _extract_predicate_names(op.predicates) left_by, right_by = _extract_predicate_names(op.by) _validate_columns( @@ -76,7 +78,8 @@ def execute_asof_join(op, left, right, **kwargs): left_on=left_on, right_on=right_on, left_by=left_by or None, - right_by=right_by or None + right_by=right_by or None, + tolerance=tolerance, ) diff --git a/ibis/pandas/execution/tests/test_join.py b/ibis/pandas/execution/tests/test_join.py index f773a65e5101..376efe26e7b3 100644 --- a/ibis/pandas/execution/tests/test_join.py +++ b/ibis/pandas/execution/tests/test_join.py @@ -3,6 +3,8 @@ import pytest +import ibis + pytest.importorskip('multipledispatch') @@ -290,3 +292,15 @@ def test_keyed_asof_join( expected = pd.merge_asof( time_keyed_df1, time_keyed_df2, on='time', by='key') tm.assert_frame_equal(result[expected.columns], expected) + + +@merge_asof_minversion +def test_keyed_asof_join_with_tolerance( + time_keyed_left, time_keyed_right, time_keyed_df1, time_keyed_df2): + expr = time_keyed_left.asof_join( + time_keyed_right, 'time', by='key', tolerance=2 * ibis.day()) + result = expr.execute() + expected = pd.merge_asof( + time_keyed_df1, time_keyed_df2, + on='time', by='key', tolerance=pd.Timedelta('2D')) + tm.assert_frame_equal(result[expected.columns], expected)