Skip to content

Commit

Permalink
Merge pull request #388 from skalish/stream-records
Browse files Browse the repository at this point in the history
TC: Stream records from a dataset
  • Loading branch information
pcattori committed Jun 11, 2020
2 parents c79dd7e + 263e361 commit 3bc45ee
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [#367](https://github.com/Datatamer/tamr-client/issues/367) Support for projects:
- generic projects via `tc.project`
- Mastering projects via `tc.mastering.project`
- Support for streaming records from a dataset via `tc.record.stream`

**BUG FIXES**
- `from_geo_features` now returns information on the operation.
Expand Down
1 change: 1 addition & 0 deletions docs/beta/dataset/record.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Record
.. autofunction:: tamr_client.record.upsert
.. autofunction:: tamr_client.record.delete
.. autofunction:: tamr_client.record._update
.. autofunction:: tamr_client.record.stream

Exceptions
----------
Expand Down
19 changes: 16 additions & 3 deletions tamr_client/dataset/record.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""
See https://docs.tamr.com/reference/record
"The recommended approach for interacting with records is to use the :func:`~tamr_client.record.upsert` and
"The recommended approach for modifying records is to use the :func:`~tamr_client.record.upsert` and
:func:`~tamr_client.record.delete` functions for all use cases they can handle. For more advanced use cases, the
underlying _update function can be used directly."
underlying :func:`~tamr_client.record._update` function can be used directly."
"""
import json
from typing import cast, Dict, IO, Iterable, Optional
from typing import cast, Dict, IO, Iterable, Iterator, Optional

from tamr_client import response
from tamr_client.dataset.dataset import Dataset
Expand Down Expand Up @@ -143,3 +143,16 @@ def _delete_command(record: Dict, *, primary_key_name: str) -> Dict:
The DELETE command in the proper format
"""
return {"action": "DELETE", "recordId": record[primary_key_name]}


def stream(session: Session, dataset: Dataset) -> Iterator[JsonDict]:
"""Stream the records in this dataset as Python dictionaries.
Args:
dataset: Dataset from which to stream records
Returns:
Python generator yielding records
"""
with session.get(str(dataset.url) + "/records", stream=True) as r:
return response.ndjson(r)
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from functools import partial
import json
from typing import Dict

import pytest
Expand Down Expand Up @@ -154,6 +155,20 @@ def test_delete_infer_primary_key():
assert snoop["payload"] == utils.stringify(deletes)


@responses.activate
def test_stream():
s = utils.session()
dataset = utils.dataset()

url = tc.URL(path="datasets/1/records")
responses.add(
responses.GET, str(url), body="\n".join(json.dumps(x) for x in _records_json)
)

records = tc.record.stream(s, dataset)
assert list(records) == _records_json


_records_json = [{"primary_key": 1}, {"primary_key": 2}]

_response_json = {
Expand Down

0 comments on commit 3bc45ee

Please sign in to comment.