Skip to content

Commit

Permalink
Merge pull request #339 from skalish/upsert-records-from-dataframe
Browse files Browse the repository at this point in the history
TUC: dataset.upsert_from_dataframe
  • Loading branch information
pcattori committed Apr 3, 2020
2 parents a9bbc89 + f10083d commit 7113e46
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
- functions: `tc.record.upsert`, `tc.record.delete`
- `tc.dataframe` module
- functions: `tc.dataframe.upsert`
- [#377](https://github.com/Datatamer/tamr-client/issues/377) dataset.upsert_from_dataframe() functionality added. Can now upsert records from a pandas DataFrame.
**BUG FIXES**
- Links from our docs to the `requests` docs were outdated. Links have been updated to point to the new `requests` docs URL.
- [#323](https://github.com/Datatamer/tamr-client/issues/323) Documentation for setting `dtype=str` before calling `client.datasets.create_from_dataframe`
Expand Down
24 changes: 24 additions & 0 deletions tamr_unify_client/dataset/resource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from copy import deepcopy

import pandas as pd
import simplejson as json

from tamr_unify_client.attribute.collection import AttributeCollection
Expand Down Expand Up @@ -84,6 +85,29 @@ def _update_records(self, updates, **json_args):
.json()
)

def upsert_from_dataframe(
self, df: pd.DataFrame, *, primary_key_name: str, ignore_nan: bool = True
) -> dict:
"""Upserts a record for each row of `df` with attributes for each column in `df`.
Args:
df: The data to upsert records from.
primary_key_name: The name of the primary key of the dataset. Must be a column of `df`.
ignore_nan: Whether to convert `NaN` values to `null` before upserting records to Tamr. If `False` and `NaN` is in `df`, this function will fail. Optional, default is `True`.
Returns:
JSON response body from the server.
Raises:
KeyError: If `primary_key_name` is not a column in `df`.
"""
if primary_key_name not in df.columns:
raise KeyError(f"{primary_key_name} is not an attribute of the data")

records = df.to_dict(orient="records")
return self.upsert_records(records, primary_key_name, ignore_nan=ignore_nan)

def upsert_records(self, records, primary_key_name, **json_args):
"""Creates or updates the specified records.
Expand Down
24 changes: 24 additions & 0 deletions tests/unit/test_dataset_records.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from functools import partial
from unittest import TestCase

from pandas import DataFrame
from requests.exceptions import HTTPError
import responses
import simplejson
Expand Down Expand Up @@ -99,6 +100,28 @@ def create_callback(request, snoop):
self.assertEqual(response, self._response_json)
self.assertEqual(snoop["payload"], TestDatasetRecords.stringify(updates, False))

@responses.activate
def test_upsert_from_dataframe(self):
def create_callback(request, snoop):
snoop["payload"] = list(request.body)
return 200, {}, simplejson.dumps(self._response_json)

responses.add(responses.GET, self._dataset_url, json={})
dataset = self.tamr.datasets.by_resource_id(self._dataset_id)

records_url = f"{self._dataset_url}:updateRecords"
updates = TestDatasetRecords.records_to_updates(self._records_json)
snoop = {}
responses.add_callback(
responses.POST, records_url, partial(create_callback, snoop=snoop)
)

response = dataset.upsert_from_dataframe(
self._dataframe, primary_key_name="attribute1"
)
self.assertEqual(response, self._response_json)
self.assertEqual(snoop["payload"], TestDatasetRecords.stringify(updates, False))

@responses.activate
def test_delete(self):
def create_callback(request, snoop):
Expand Down Expand Up @@ -173,6 +196,7 @@ def stringify(updates, ignore_nan):
_dataset_url = f"http://localhost:9100/api/versioned/v1/datasets/{_dataset_id}"

_records_json = [{"attribute1": 1}, {"attribute1": 2}]
_dataframe = DataFrame(_records_json, columns=["attribute1"])
_nan_records_json = [{"attribute1": float("nan")}, {"attribute1": float("nan")}]
_response_json = {
"numCommandsProcessed": 2,
Expand Down

0 comments on commit 7113e46

Please sign in to comment.