Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Query.map and Query.map_async. #218

Merged
merged 1 commit into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion MIGRATION_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,11 @@ that are affected are: `memcache_add`, `memcache_cas`, `memcache_decr`,
from GAE to GCP.
- The `max_memcache_items` option is no longer supported.
- The `force_writes` option is no longer supported.
- `Query.map` and `Query.map_async` are no longer supported.
- The `blobstore` module is no longer supported.
- The `pass_batch_into_callback` argument to `Query.map` and `Query.map_async`
is no longer supported.
- The `merge_future` argument to `Query.map` and `Query.map_async` is no longer
supported.

## Privatization

Expand Down
63 changes: 35 additions & 28 deletions google/cloud/ndb/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,8 @@ class QueryOptions(_options.ReadOptions):
"end_cursor",
# Both (!?!)
"projection",
# Map only
"callback",
)

def __init__(self, config=None, client=None, **kwargs):
Expand All @@ -1237,6 +1239,12 @@ def __init__(self, config=None, client=None, **kwargs):
if kwargs.get("prefetch_size"):
raise exceptions.NoLongerImplementedError()

if kwargs.get("pass_batch_into_callback"):
raise exceptions.NoLongerImplementedError()

if kwargs.get("merge_future"):
raise exceptions.NoLongerImplementedError()

if kwargs.pop("produce_cursors", None):
_log.warning(
"Deprecation warning: 'produce_cursors' is deprecated. "
Expand Down Expand Up @@ -1877,12 +1885,11 @@ def iter(

__iter__ = iter

@_query_options
def map(
self,
callback,
*,
pass_batch_into_callback=None,
merge_future=None,
keys_only=None,
limit=None,
projection=None,
Expand All @@ -1898,15 +1905,15 @@ def map(
read_policy=None,
transaction=None,
options=None,
pass_batch_into_callback=None,
merge_future=None,
_options=None,
):
"""Map a callback function or tasklet over the query results.

DEPRECATED: This method is no longer supported.

Args:
callback (Callable): A function or tasklet to be applied to each
result; see below.
merge_future: Optional ``Future`` subclass; see below.
keys_only (bool): Return keys instead of entities.
projection (list[str]): The fields to return as part of the query
results.
Expand Down Expand Up @@ -1934,33 +1941,21 @@ def map(
Implies ``read_policy=ndb.STRONG``.
options (QueryOptions): DEPRECATED: An object containing options
values for some of these arguments.
pass_batch_info_callback: DEPRECATED: No longer implemented.
merge_future: DEPRECATED: No longer implemented.

Callback signature: The callback is normally called with an entity
as argument. However if keys_only=True is given, it is called
with a Key. Also, when pass_batch_into_callback is True, it is
called with three arguments: the current batch, the index within
the batch, and the entity or Key at that index. The callback can
return whatever it wants. If the callback is None, a trivial
callback is assumed that just returns the entity or key passed in
(ignoring produce_cursors).

Optional merge future: The merge_future is an advanced argument
that can be used to override how the callback results are combined
into the overall map() return value. By default a list of
callback return values is produced. By substituting one of a
small number of specialized alternatives you can arrange
otherwise. See tasklets.MultiFuture for the default
implementation and a description of the protocol the merge_future
object must implement the default. Alternatives from the same
module include QueueFuture, SerialQueueFuture and ReducingFuture.
Callback signature: The callback is normally called with an entity as
argument. However if keys_only=True is given, it is called with a Key.
The callback can return whatever it wants.

Returns:
Any: When the query has run to completion and all callbacks have
returned, map() returns a list of the results of all callbacks.
(But see 'optional merge future' above.)
"""
raise exceptions.NoLongerImplementedError()
return self.map_async(None, _options=_options).result()

@tasklets.tasklet
@_query_options
def map_async(
self,
callback,
Expand All @@ -1982,17 +1977,29 @@ def map_async(
read_policy=None,
transaction=None,
options=None,
_options=None,
):
"""Map a callback function or tasklet over the query results.

DEPRECATED: This method is no longer supported.

This is the asynchronous version of :meth:`Query.map`.

Returns:
tasklets.Future: See :meth:`Query.map` for eventual result.
"""
raise exceptions.NoLongerImplementedError()
callback = _options.callback
futures = []
results = _datastore_query.iterate(_options)
while (yield results.has_next_async()):
result = results.next()
mapped = callback(result)
if not isinstance(mapped, tasklets.Future):
future = tasklets.Future()
future.set_result(mapped)
mapped = future
futures.append(mapped)

mapped_results = yield futures
raise tasklets.Return(mapped_results)

@_query_options
def get(
Expand Down
32 changes: 32 additions & 0 deletions tests/system/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1289,3 +1289,35 @@ class SomeKind(ndb.Model):
results = query.fetch()
assert len(results) == 1
assert results[0].foo == 1


@pytest.mark.usefixtures("client_context")
def test_map(dispose_of):
class SomeKind(ndb.Model):
foo = ndb.StringProperty()
ref = ndb.KeyProperty()

class OtherKind(ndb.Model):
foo = ndb.StringProperty()

foos = ("aa", "bb", "cc", "dd", "ee")
others = [OtherKind(foo=foo) for foo in foos]
other_keys = ndb.put_multi(others)
for key in other_keys:
dispose_of(key._key)

things = [SomeKind(foo=foo, ref=key) for foo, key in zip(foos, other_keys)]
keys = ndb.put_multi(things)
for key in keys:
dispose_of(key._key)

eventually(SomeKind.query().fetch, _length_equals(5))
eventually(OtherKind.query().fetch, _length_equals(5))

@ndb.tasklet
def get_other_foo(thing):
other = yield thing.ref.get_async()
return other.foo

query = SomeKind.query().order(SomeKind.foo)
assert query.map(get_other_foo) == foos
53 changes: 49 additions & 4 deletions tests/unit/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1858,17 +1858,62 @@ def test___iter__():

@staticmethod
@pytest.mark.usefixtures("in_context")
def test_map():
@unittest.mock.patch("google.cloud.ndb.query._datastore_query")
def test_map(_datastore_query):
class DummyQueryIterator:
def __init__(self, items):
self.items = list(items)

def has_next_async(self):
return utils.future_result(bool(self.items))

def next(self):
return self.items.pop(0)

_datastore_query.iterate.return_value = DummyQueryIterator(range(5))

def callback(result):
return result + 1

query = query_module.Query()
assert query.map(callback) == (1, 2, 3, 4, 5)

@staticmethod
@pytest.mark.usefixtures("in_context")
@unittest.mock.patch("google.cloud.ndb.query._datastore_query")
def test_map_async(_datastore_query):
class DummyQueryIterator:
def __init__(self, items):
self.items = list(items)

def has_next_async(self):
return utils.future_result(bool(self.items))

def next(self):
return self.items.pop(0)

_datastore_query.iterate.return_value = DummyQueryIterator(range(5))

def callback(result):
return utils.future_result(result + 1)

query = query_module.Query()
future = query.map_async(callback)
assert future.result() == (1, 2, 3, 4, 5)

@staticmethod
@pytest.mark.usefixtures("in_context")
def test_map_pass_batch_into_callback():
query = query_module.Query()
with pytest.raises(NotImplementedError):
query.map(None)
query.map(None, pass_batch_into_callback=True)

@staticmethod
@pytest.mark.usefixtures("in_context")
def test_map_async():
def test_map_merge_future():
query = query_module.Query()
with pytest.raises(NotImplementedError):
query.map_async(None)
query.map(None, merge_future="hi mom!")

@staticmethod
@pytest.mark.usefixtures("in_context")
Expand Down