Skip to content

Commit

Permalink
Merge 8bbef0b into 6961844
Browse files Browse the repository at this point in the history
  • Loading branch information
llllllllll committed Apr 1, 2016
2 parents 6961844 + 8bbef0b commit 9ffdb0f
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 47 deletions.
1 change: 1 addition & 0 deletions blaze/__init__.py
Expand Up @@ -19,6 +19,7 @@
import warnings
from .expr import (
Symbol,
broadcast_collect,
by,
cast,
coalesce,
Expand Down
48 changes: 24 additions & 24 deletions blaze/compute/core.py
@@ -1,6 +1,6 @@
from __future__ import absolute_import, division, print_function

from collections import defaultdict, Iterator
from collections import defaultdict, Iterator, Mapping
from datetime import date, datetime, timedelta
import itertools
import numbers
Expand Down Expand Up @@ -65,27 +65,6 @@ def compute_up(seq, scope=None, **kwargs):
return type(seq)(compute(item, scope or {}, **kwargs) for item in seq)


@dispatch(Expr, object)
def compute(expr, o, **kwargs):
""" Compute against single input
Assumes that only one Symbol exists in expression
>>> t = symbol('t', 'var * {name: string, balance: int}')
>>> deadbeats = t[t['balance'] < 0]['name']
>>> data = [['Alice', 100], ['Bob', -50], ['Charlie', -20]]
>>> # list(compute(deadbeats, {t: data}))
>>> list(compute(deadbeats, data))
['Bob', 'Charlie']
"""
ts = set([x for x in expr._subterms() if isinstance(x, Symbol)])
if len(ts) == 1:
return compute(expr, {first(ts): o}, **kwargs)
else:
raise ValueError("Give compute dictionary input, got %s" % str(o))


@dispatch(object)
def compute_down(expr, **kwargs):
""" Compute the expression on the entire inputs
Expand Down Expand Up @@ -378,7 +357,7 @@ def swap_resources_into_scope(expr, scope):
return expr, new_scope


@dispatch(Expr, dict)
@dispatch(Expr, Mapping)
def compute(expr, d, return_type=no_default, **kwargs):
"""Compute expression against data sources.
Expand Down Expand Up @@ -451,7 +430,28 @@ def compute(expr, d, return_type=no_default, **kwargs):
return result


@dispatch(Field, dict)
@compute.register(Expr, object)
def compute_single_object(expr, o, **kwargs):
""" Compute against single input
Assumes that only one Symbol exists in expression
>>> t = symbol('t', 'var * {name: string, balance: int}')
>>> deadbeats = t[t['balance'] < 0]['name']
>>> data = [['Alice', 100], ['Bob', -50], ['Charlie', -20]]
>>> # list(compute(deadbeats, {t: data}))
>>> list(compute(deadbeats, data))
['Bob', 'Charlie']
"""
ts = set([x for x in expr._subterms() if isinstance(x, Symbol)])
if len(ts) == 1:
return compute(expr, {first(ts): o}, **kwargs)
else:
raise ValueError("Give compute dictionary input, got %s" % str(o))


@dispatch(Field, Mapping)
def compute_up(expr, data, **kwargs):
return data[expr._name]

Expand Down
6 changes: 5 additions & 1 deletion blaze/compute/h5py.py
Expand Up @@ -17,14 +17,18 @@
from ..expr import path, shape, Symbol
from ..expr.split import split

from .core import compute
from .core import compute, compute_single_object
from ..dispatch import dispatch
from ..utils import available_memory


__all__ = []


# h5py File and Group are mapping objects.
compute.register(Expr, (h5py.File, h5py.Group))(compute_single_object)


@dispatch(Slice, h5py.Dataset)
def pre_compute(expr, data, scope=None, **kwargs):
""" Don't push slices into memory, they're about to come in anyway """
Expand Down
8 changes: 2 additions & 6 deletions blaze/compute/python.py
Expand Up @@ -11,6 +11,7 @@
"""
from __future__ import absolute_import, division, print_function

from collections import Mapping
import itertools
import numbers
import fnmatch
Expand Down Expand Up @@ -112,7 +113,7 @@ def pre_compute(expr, seq, scope=None, **kwargs):
first = next(iter(seq))
except StopIteration:
return []
if isinstance(first, dict):
if isinstance(first, Mapping):
leaf = expr._leaves()[0]
return pluck(leaf.fields, seq)
else:
Expand Down Expand Up @@ -768,11 +769,6 @@ def compute_up(expr, seq, **kwargs):
raise NotImplementedError("Only 1d slices supported")


@dispatch(Field, dict)
def compute_up(expr, data, **kwargs):
return data[expr._name]


@dispatch(Coerce, (np.float32, np.float64, np.int64, np.int32, base))
def compute_up(expr, ob, **kwargs):
tp = expr.to
Expand Down
29 changes: 29 additions & 0 deletions blaze/compute/tests/test_python_compute.py
@@ -1,17 +1,20 @@
from __future__ import absolute_import, division, print_function
from collections import Mapping

import math
import itertools
import operator
import pytest
from datetime import datetime, date
import datashape
from datashape.py2help import mappingproxy
from collections import Iterator, Iterable

import blaze
from blaze.compute.python import (nunique, mean, rrowfunc, rowfunc,
reduce_by_funcs, optimize)
from blaze import dshape, symbol, discover
from blaze.compatibility import PY2
from blaze.compute.core import compute, compute_up, pre_compute
from blaze.expr import (by, merge, join, distinct, sum, min, max, any, summary,
count, std, head, sample, transform, greatest, least)
Expand Down Expand Up @@ -834,6 +837,32 @@ def test_compute_field_on_dicts():
assert compute(s.x, {s: d}) == [1, 2, 3]


class _MapProxy(Mapping): # pragma: no cover
def __init__(self, mapping):
self._map = mapping

def __getitem__(self, key):
return self._map[key]

def __iter__(self):
return iter(self._map)

def __len__(self):
return len(self._map)


@pytest.mark.parametrize(
'cls',
# mappingproxy refers to PyDictProxy in py2 which is not registered
# as a mapping
(_MapProxy,) + (mappingproxy,) if not PY2 else (),
)
def test_comute_on_mapping(cls):
s = symbol('s', '{x: 3 * int64, y: 3 * float32}')
d = cls({'x': [1, 2, 3], 'y': [4.0, 5.0, 6.0]})
assert compute(s.x, {s: d}, return_type='native') == [1, 2, 3]


def test_truncate():
s = symbol('x', 'real')
assert compute(s.truncate(20), 154) == 140
Expand Down
2 changes: 1 addition & 1 deletion blaze/expr/broadcast.py
Expand Up @@ -12,7 +12,7 @@
from .strings import Like
from .datetime import DateTime

__all__ = ['broadcast', 'Broadcast', 'scalar_symbols']
__all__ = ['broadcast', 'Broadcast', 'scalar_symbols', 'broadcast_collect']


def broadcast(expr, leaves, scalars=None):
Expand Down
9 changes: 4 additions & 5 deletions blaze/expr/core.py
@@ -1,16 +1,15 @@
from __future__ import absolute_import, division, print_function

from collections import Mapping
import datetime
import numbers
import inspect

from pprint import pformat
from functools import reduce, partial

import numpy as np
import toolz
from toolz import unique, concat, first
import pandas as pd

from ..compatibility import _strtypes
from ..dispatch import dispatch
Expand Down Expand Up @@ -365,12 +364,12 @@ def subs(o, d):
return _subs(o, d)


@dispatch((tuple, list), dict)
@dispatch((tuple, list), Mapping)
def _subs(o, d):
return type(o)([subs(arg, d) for arg in o])


@dispatch(Node, dict)
@dispatch(Node, Mapping)
def _subs(o, d):
"""
Expand All @@ -383,7 +382,7 @@ def _subs(o, d):
return type(o)(*newargs)


@dispatch(object, dict)
@dispatch(object, Mapping)
def _subs(o, d):
""" Private dispatched version of ``subs``
Expand Down
13 changes: 6 additions & 7 deletions blaze/expr/expressions.py
@@ -1,5 +1,6 @@
from __future__ import absolute_import, division, print_function

from collections import Mapping
from keyword import iskeyword
import re

Expand Down Expand Up @@ -323,7 +324,7 @@ def __init__(self, name, dshape, token=0):
self._hash = None

def __repr__(self):
fmt = "<`{}` symbol; dshape='{}'>"
fmt = "<`{}` symbol; dshape='{}'>"
return fmt.format(self._name, sanitized_dshape(self.dshape))

def __str__(self):
Expand All @@ -339,7 +340,7 @@ def symbol(name, dshape, token=None):
return Symbol(name, dshape, token=token or 0)


@dispatch(Symbol, dict)
@dispatch(Symbol, Mapping)
def _subs(o, d):
""" Subs symbols using symbol function
Expand Down Expand Up @@ -485,7 +486,6 @@ def sliceit(child, index):
return s



class Slice(Expr):
"""Elements `start` until `stop`. On many backends, a `step` parameter
is also allowed.
Expand Down Expand Up @@ -671,9 +671,8 @@ def __str__(self):

@copydoc(ReLabel)
def relabel(child, labels=None, **kwargs):
labels = labels or dict()
labels = toolz.merge(labels, kwargs)
labels = dict((k, v) for k, v in labels.items() if k != v)
labels = {k: v
for k, v in toolz.merge(labels or {}, kwargs).items() if k != v}
label_keys = set(labels)
fields = child.fields
if not label_keys.issubset(fields):
Expand All @@ -682,7 +681,7 @@ def relabel(child, labels=None, **kwargs):
', '.join(map(repr, non_existent_fields)))
if not labels:
return child
if isinstance(labels, dict): # Turn dict into tuples
if isinstance(labels, Mapping): # Turn dict into tuples
labels = tuple(sorted(labels.items()))
if isscalar(child.dshape.measure):
if child._name == labels[0][0]:
Expand Down
5 changes: 2 additions & 3 deletions blaze/interactive.py
@@ -1,7 +1,6 @@
from __future__ import absolute_import, division, print_function

import sys
from collections import Iterator
from collections import Iterator, Mapping
import decimal
import datetime
from functools import reduce, partial
Expand Down Expand Up @@ -182,7 +181,7 @@ def data(data_source, dshape=None, name=None, fields=None, schema=None, **kwargs
return _Data(data_source, ds, name)


@dispatch(_Data, dict)
@dispatch(_Data, Mapping)
def _subs(o, d):
return o

Expand Down
2 changes: 2 additions & 0 deletions docs/source/whatsnew/0.10.0.txt
Expand Up @@ -52,6 +52,8 @@ Improved Backends

* Adds :class:`~blaze.expr.math.greatest` and :class:`~blaze.expr.math.least`
support to the sql backend (:issue:`1428`).
* Generalize ``Field`` to support :class:`collections.Mapping` object
(:issue:`1467`).

Experimental Features
~~~~~~~~~~~~~~~~~~~~~
Expand Down

0 comments on commit 9ffdb0f

Please sign in to comment.