Skip to content

Commit

Permalink
Merge pull request #348 from skalish/beta-update-dataset
Browse files Browse the repository at this point in the history
TC: Upsert and delete records in dataset + DataFrame support
  • Loading branch information
pcattori committed Apr 3, 2020
2 parents f348833 + 8d4cf0c commit a9bbc89
Show file tree
Hide file tree
Showing 13 changed files with 575 additions and 1 deletion.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
- `tc.response` module
- functions: `successful`, `ndjson`
- [#35](https://github.com/Datatamer/tamr-client/issues/35) projects.by_name() functionality added. Can now fetch a project by its name.
- BETA: New record upsert, delete, upsert from DataFrame functionality!
- `tc.record` module
- functions: `tc.record.upsert`, `tc.record.delete`
- `tc.dataframe` module
- functions: `tc.dataframe.upsert`
**BUG FIXES**
- Links from our docs to the `requests` docs were outdated. Links have been updated to point to the new `requests` docs URL.
- [#323](https://github.com/Datatamer/tamr-client/issues/323) Documentation for setting `dtype=str` before calling `client.datasets.create_from_dataframe`
Expand Down
2 changes: 1 addition & 1 deletion docs/beta.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
* [Dataset](beta/datasets)
* [Instance](beta/instance)
* [Response](beta/response)
* [Session](beta/session)
* [Session](beta/session)
2 changes: 2 additions & 0 deletions docs/beta/datasets.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Datasets

* [Dataset](/beta/datasets/dataset)
* [Record](/beta/datasets/record)
* [Dataframe](/beta/datasets/dataframe)
4 changes: 4 additions & 0 deletions docs/beta/datasets/dataframe.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Dataframe
=========

.. autofunction:: tamr_client.dataframe.upsert
15 changes: 15 additions & 0 deletions docs/beta/datasets/record.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Record
=========

.. automodule:: tamr_client.record
:no-members:

.. autofunction:: tamr_client.record.upsert
.. autofunction:: tamr_client.record.delete
.. autofunction:: tamr_client.record._update

Exceptions
----------

.. autoclass:: tamr_client.PrimaryKeyNotFound
:no-inherited-members:
19 changes: 19 additions & 0 deletions stubs/pandas.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Any, Dict, Iterator, List, Tuple

JsonDict = Dict[str, Any]

class DataFrame:
index: Index
columns: Index
def __init__(self, data: List[JsonDict] = None, index: List[int] = None): ...
def drop(self, labels: str, axis: int, inplace: bool): ...
def insert(self, loc: int, column: str, value: Index): ...
def iterrows(self) -> Iterator[Tuple[int, Series]]: ...
def set_index(self, keys: str) -> DataFrame: ...

class Series:
def to_json(self) -> str: ...

class Index:
name: str
def __iter__(self) -> Iterator[str]: ...
2 changes: 2 additions & 0 deletions stubs/responses.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import partial
from typing import Any, Dict, Optional, TypeVar

JsonDict = Dict[str, Any]
Expand All @@ -18,3 +19,4 @@ def add(
T = TypeVar("T")

def activate(T) -> T: ...
def add_callback(method: Optional[str], url: Optional[str], callback: partial[Any]): ...
8 changes: 8 additions & 0 deletions tamr_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,11 @@
AttributeNotFound,
)
import tamr_client.attributes.attribute as attribute

# records
from tamr_client.datasets.record import PrimaryKeyNotFound
import tamr_client.datasets.record as record

# dataframe
from tamr_client.datasets.dataframe import AmbiguousPrimaryKey
import tamr_client.datasets.dataframe as dataframe
65 changes: 65 additions & 0 deletions tamr_client/datasets/dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
Convenient functionality for interacting with pandas DataFrames.
"""

import json
from typing import Optional

import pandas as pd

import tamr_client as tc
from tamr_client.types import JsonDict


class AmbiguousPrimaryKey(Exception):
"""Raised when referencing a primary key by name that matches multiple possible targets."""

pass


def upsert(
session: tc.Session,
dataset: tc.Dataset,
df: pd.DataFrame,
*,
primary_key_name: Optional[str] = None,
) -> JsonDict:
"""Upserts a record for each row of `df` with attributes for each column in `df`.
Args:
dataset: Dataset to receive record updates
df: The DataFrame containing records to be upserted
primary_key_name: The primary key of the dataset. Must be a column of `df`. By default the key_attribute_name of dataset
Returns:
JSON response body from the server
Raises:
requests.HTTPError: If an HTTP error is encountered
PrimaryKeyNotFound: If `primary_key_name` is not a column in `df` or the index of `df`
ValueError: If `primary_key_name` matches both a column in `df` and the index of `df`
"""
if primary_key_name is None:
primary_key_name = dataset.key_attribute_names[0]

# preconditions
if primary_key_name in df.columns and primary_key_name == df.index.name:
raise AmbiguousPrimaryKey(
f"Index {primary_key_name} has the same name as column {primary_key_name}"
)
elif primary_key_name not in df.columns and primary_key_name != df.index.name:
raise tc.PrimaryKeyNotFound(
f"Primary key: {primary_key_name} is not DataFrame index name: {df.index.name} or in DataFrame column names: {df.columns}"
)

# promote primary key column to index
if primary_key_name in df.columns:
df = df.set_index(primary_key_name)

# serialize records via to_json to handle `np.nan` values
serialized_records = ((pk, row.to_json()) for pk, row in df.iterrows())
records = (
{primary_key_name: pk, **json.loads(row)} for pk, row in serialized_records
)
return tc.datasets.record.upsert(
session, dataset, records, primary_key_name=primary_key_name
)
145 changes: 145 additions & 0 deletions tamr_client/datasets/record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""
See https://docs.tamr.com/reference/record
"The recommended approach for interacting with records is to use the :func:`~tamr_client.record.upsert` and
:func:`~tamr_client.record.delete` functions for all use cases they can handle. For more advanced use cases, the
underlying _update function can be used directly."
"""
import json
from typing import cast, Dict, IO, Iterable, Optional

import tamr_client as tc
from tamr_client.types import JsonDict


class PrimaryKeyNotFound(Exception):
"""Raised when referencing a primary key by name that does not exist."""

pass


def _update(
session: tc.Session, dataset: tc.Dataset, updates: Iterable[Dict]
) -> JsonDict:
"""Send a batch of record creations/updates/deletions to this dataset.
You probably want to use :func:`~tamr_client.record.upsert`
or :func:`~tamr_client.record.delete` instead.
Args:
dataset: Dataset containing records to be updated
updates: Each update should be formatted as specified in the `Public Docs for Dataset updates <https://docs.tamr.com/reference#modify-a-datasets-records>`_.
Returns:
JSON response body from server
Raises:
requests.HTTPError: If an HTTP error is encountered
"""
stringified_updates = (json.dumps(update) for update in updates)
# `requests` accepts a generator for `data` param, but stubs for `requests` in https://github.com/python/typeshed expects this to be a file-like object
io_updates = cast(IO, stringified_updates)
r = session.post(
str(dataset.url) + ":updateRecords",
headers={"Content-Encoding": "utf-8"},
data=io_updates,
)
return tc.response.successful(r).json()


def upsert(
session: tc.Session,
dataset: tc.Dataset,
records: Iterable[Dict],
*,
primary_key_name: Optional[str] = None,
) -> JsonDict:
"""Create or update the specified records.
Args:
dataset: Dataset to receive record updates
records: The records to update, as dictionaries
primary_key_name: The primary key for these records, which must be a key in each record dictionary.
By default the key_attribute_name of dataset
Returns:
JSON response body from server
Raises:
requests.HTTPError: If an HTTP error is encountered
PrimaryKeyNotFound: If primary_key_name does not match dataset primary key
PrimaryKeyNotFound: If primary_key_name not in a record dictionary
"""
if primary_key_name is None:
primary_key_name = dataset.key_attribute_names[0]

if primary_key_name not in dataset.key_attribute_names:
raise PrimaryKeyNotFound(
f"Primary key: {primary_key_name} is not in dataset key attribute names: {dataset.key_attribute_names}"
)
updates = (
_create_command(record, primary_key_name=primary_key_name) for record in records
)
return _update(session, dataset, updates)


def delete(
session: tc.Session,
dataset: tc.Dataset,
records: Iterable[Dict],
*,
primary_key_name: Optional[str] = None,
) -> JsonDict:
"""Deletes the specified records, based on primary key values. Does not check that other attribute values match.
Args:
dataset: Dataset from which to delete records
records: The records to update, as dictionaries
primary_key_name: The primary key for these records, which must be a key in each record dictionary.
By default the key_attribute_name of dataset
Returns:
JSON response body from server
Raises:
requests.HTTPError: If an HTTP error is encountered
PrimaryKeyNotFound: If primary_key_name does not match dataset primary key
PrimaryKeyNotFound: If primary_key_name not in a record dictionary
"""
if primary_key_name is None:
primary_key_name = dataset.key_attribute_names[0]

if primary_key_name not in dataset.key_attribute_names:
raise PrimaryKeyNotFound(
f"Primary key: {primary_key_name} is not in dataset key attribute names: {dataset.key_attribute_names}"
)
updates = (
_delete_command(record, primary_key_name=primary_key_name) for record in records
)
return _update(session, dataset, updates)


def _create_command(record: Dict, *, primary_key_name: str) -> Dict:
"""Generates the CREATE command formatted as specified in the `Public Docs for Dataset updates
<https://docs.tamr.com/reference#modify-a-datasets-records>`_.
Args:
record: The record to create, as a dictionary
primary_key_name: The primary key for this record, which must be a key in the dictionary
Returns:
The CREATE command in the proper format
"""
return {"action": "CREATE", "recordId": record[primary_key_name], "record": record}


def _delete_command(record: Dict, *, primary_key_name: str) -> Dict:
"""Generates the DELETE command formatted as specified in the `Public Docs for Dataset updates
<https://docs.tamr.com/reference#modify-a-datasets-records>`_.
Args:
record: The record to delete, as a dictionary
primary_key_name: The primary key for this record, which must be a key in the dictionary
Returns:
The DELETE command in the proper format
"""
return {"action": "DELETE", "recordId": record[primary_key_name]}

0 comments on commit a9bbc89

Please sign in to comment.