Skip to content

Commit

Permalink
Merge 243db9a into 308c5b3
Browse files Browse the repository at this point in the history
  • Loading branch information
ehebert committed May 7, 2017
2 parents 308c5b3 + 243db9a commit 5f2cbd4
Show file tree
Hide file tree
Showing 16 changed files with 351 additions and 301 deletions.
1 change: 1 addition & 0 deletions blaze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
)
from .expr.arrays import (tensordot, transpose)
from .expr.functions import *
from .expr.literal import data
from .index import create_index
from .interactive import *
from .compute.pmap import set_default_pmap
Expand Down
91 changes: 89 additions & 2 deletions blaze/compute/core.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
from __future__ import absolute_import, division, print_function

from collections import defaultdict, Iterator, Mapping
import decimal
from datetime import date, datetime, timedelta
from functools import partial
import itertools
import numbers
import warnings

from datashape.predicates import (
isscalar,
iscollection,
isrecord,
istabular,
_dimensions,
)
from odo import odo
from odo.compatibility import unicode
import numpy as np
import pandas as pd
import toolz
from toolz import first, unique, assoc
from toolz.utils import no_default

from ..compatibility import basestring
from ..expr import Expr, Field, Symbol, symbol, Join, Cast
from ..expr import Data, Expr, Field, Symbol, symbol, Join, Cast
from ..dispatch import dispatch
from ..interactive import coerce_core, into
from ..types import iscoretype


__all__ = ['compute', 'compute_up']
Expand Down Expand Up @@ -357,6 +368,30 @@ def swap_resources_into_scope(expr, scope):
return expr, new_scope


@dispatch((object, type, str, unicode), Data)
def into(a, b, **kwargs):
return into(a, b.data, **kwargs)


@dispatch((object, type, str, unicode), Expr)
def into(a, b, **kwargs):
result = compute(b, return_type='native', **kwargs)
kwargs['dshape'] = b.dshape
return into(a, result, **kwargs)


Expr.__iter__ = into(Iterator)


@dispatch(Expr)
def compute(expr, **kwargs):
resources = expr._resources()
if not resources:
raise ValueError("No data resources found")
else:
return compute(expr, resources, **kwargs)


@dispatch(Expr, Mapping)
def compute(expr, d, return_type=no_default, **kwargs):
"""Compute expression against data sources.
Expand Down Expand Up @@ -470,3 +505,55 @@ def join_dataframe_to_selectable(expr, lhs, rhs, scope=None, **kwargs):
},
**kwargs
)


def coerce_to(typ, x, odo_kwargs=None):
try:
return typ(x)
except TypeError:
return odo(x, typ, **(odo_kwargs or {}))


def coerce_scalar(result, dshape, odo_kwargs=None):
dshape = str(dshape)
coerce_ = partial(coerce_to, x=result, odo_kwargs=odo_kwargs)
if 'float' in dshape:
return coerce_(float)
if 'decimal' in dshape:
return coerce_(decimal.Decimal)
elif 'int' in dshape:
return coerce_(int)
elif 'bool' in dshape:
return coerce_(bool)
elif 'datetime' in dshape:
return coerce_(pd.Timestamp)
elif 'date' in dshape:
return coerce_(date)
elif 'timedelta' in dshape:
return coerce_(timedelta)
else:
return result


def coerce_core(result, dshape, odo_kwargs=None):
"""Coerce data to a core data type."""
if iscoretype(result):
return result
elif isscalar(dshape):
result = coerce_scalar(result, dshape, odo_kwargs=odo_kwargs)
elif istabular(dshape) and isrecord(dshape.measure):
result = into(pd.DataFrame, result, **(odo_kwargs or {}))
elif iscollection(dshape):
dim = _dimensions(dshape)
if dim == 1:
result = into(pd.Series, result, **(odo_kwargs or {}))
elif dim > 1:
result = into(np.ndarray, result, **(odo_kwargs or {}))
else:
msg = "Expr with dshape dimensions < 1 should have been handled earlier: dim={}"
raise ValueError(msg.format(str(dim)))
else:
msg = "Expr does not evaluate to a core return type"
raise ValueError(msg)

return result
26 changes: 26 additions & 0 deletions blaze/compute/literal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from odo import append, drop

from ..compute.core import compute
from ..dispatch import dispatch
from ..expr.expressions import Expr
from ..expr.literal import Data


@dispatch(Expr, Data)
def compute_down(expr, dta, **kwargs):
return compute(expr, dta.data, **kwargs)


@dispatch(Expr, Data)
def pre_compute(expr, dta, **kwargs):
return pre_compute(expr, dta.data, **kwargs)


@dispatch(Data, object)
def append(a, b, **kwargs):
return append(a.data, b, **kwargs)


@dispatch(Data)
def drop(d):
return drop(d.data)
13 changes: 13 additions & 0 deletions blaze/compute/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from datashape import to_numpy, to_numpy_dtype
from numbers import Number

from odo import odo

from ..expr import (
Reduction, Field, Projection, Broadcast, Selection, ndim,
Distinct, Sort, Tail, Head, Label, ReLabel, Expr, Slice, Join,
Expand Down Expand Up @@ -473,3 +475,14 @@ def compute_up(t, data, **kwargs):
rhs = data

return array_coalesce(t, lhs, rhs)


def intonumpy(data, dtype=None, **kwargs):
# TODO: Don't ignore other kwargs like copy
result = odo(data, np.ndarray)
if dtype and result.dtype != dtype:
result = result.astype(dtype)
return result


Expr.__array__ = intonumpy
3 changes: 1 addition & 2 deletions blaze/compute/tests/test_comprehensive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
from pandas import DataFrame
from odo import into
from datashape.predicates import isscalar, iscollection, isrecord
from blaze.expr import symbol, by
from blaze.interactive import data
from blaze.expr import symbol, by, data
from blaze.compute import compute
from blaze.expr.functions import sin, exp

Expand Down
35 changes: 31 additions & 4 deletions blaze/compute/tests/test_core_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

from datashape import discover, dshape

from blaze.compute.core import (compute_up, compute, bottom_up_until_type_break,
top_then_bottom_then_top_again_etc,
swap_resources_into_scope)
from blaze.compute.core import (
coerce_core,
compute_up,
compute,
into,
bottom_up_until_type_break,
top_then_bottom_then_top_again_etc,
swap_resources_into_scope
)
from blaze.expr import by, symbol, Expr, Symbol
from blaze.dispatch import dispatch
from blaze.compatibility import raises, reduce
from blaze.utils import example
from blaze.interactive import into

import pandas as pd
import numpy as np
Expand Down Expand Up @@ -160,3 +165,25 @@ def test_simple_add(n):
])
def test_compute_return_type(data, expr, ret_type, exp_type):
assert isinstance(compute(expr, data, return_type=ret_type), exp_type)


@pytest.mark.parametrize('data,dshape,exp_type',
[(1, symbol('x', 'int').dshape, int),
# test 1-d to series
(into(da.core.Array, [1, 2], chunks=(10,)),
dshape('2 * int'),
pd.Series),
# test 2-d tabular to dataframe
(into(da.core.Array,
[{'a': 1, 'b': 2}, {'a': 3, 'b': 4}],
chunks=(10,)),
dshape('2 * {a: int, b: int}'),
pd.DataFrame),
# test 2-d non tabular to ndarray
(into(da.core.Array,
[[1, 2], [3, 4]],
chunks=(10, 10)),
dshape('2 * 2 * int'),
np.ndarray)])
def test_coerce_core(data, dshape, exp_type):
assert isinstance(coerce_core(data, dshape), exp_type)
1 change: 1 addition & 0 deletions blaze/expr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .core import *
from .expressions import *
from .literal import *
from .strings import *
from .datetime import *
from .math import *
Expand Down
2 changes: 1 addition & 1 deletion blaze/expr/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
varargsexpr,
)
from .utils import maxshape
from .literal import data
from ..compatibility import zip_longest, _strtypes
from ..interactive import data
from ..utils import listpack


Expand Down

0 comments on commit 5f2cbd4

Please sign in to comment.