Skip to content

Commit

Permalink
Merge pull request #468 from skalish/json-encoder
Browse files Browse the repository at this point in the history
Remove dependency on simplejson and use a custom encoder when needed
  • Loading branch information
pcattori committed Oct 27, 2020
2 parents 0beb17d + 11a6869 commit 22651b3
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 75 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
**NEW FEATURES**
- [#383](https://github.com/Datatamer/tamr-client/issues/383) Now able to create an Operation from Job resource id

**BREAKING CHANGES**
- [#468](https://github.com/Datatamer/tamr-client/pull/468) `Dataset.upsert_records` and `Dataset._update_records` no longer take general `**json_args` arguments and will only accept `ignore_nan`
- The `ignore_nan` argument in `Dataset.upsert_records`, `Dataset._update_records`, `Dataset.upsert_from_dataframe`, and `DatasetCollection.create_from_dataframe` is now deprecated and will be removed in a future release

## 0.12.0
**BETA**
Important: Do not use BETA features for production workflows.
Expand Down
33 changes: 1 addition & 32 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ include = ["tamr_client/py.typed"]
[tool.poetry.dependencies]
python = "^3.6.1"
requests = "^2.22"
simplejson = "^3.16"
dataclasses = "^0.6.0"

[tool.poetry.dev-dependencies]
Expand Down
67 changes: 67 additions & 0 deletions tamr_unify_client/_ignore_nan_encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Adaptation of the Python standard library JSONEncoder to encode `NaN` as 'null'
Compare to https://github.com/python/cpython/blob/3.9/Lib/json/encoder.py
The only functional difference is in the definition of `floatstr` where 'NaN', 'Infinity', and '-Infinity' are encoded as 'null'
"""
from json import JSONEncoder
from json.encoder import (
_make_iterencode,
py_encode_basestring,
py_encode_basestring_ascii,
)

try:
from _json import encode_basestring_ascii as c_encode_basestring_ascii
except ImportError:
c_encode_basestring_ascii = None
try:
from _json import encode_basestring as c_encode_basestring
except ImportError:
c_encode_basestring = None
try:
from _json import make_encoder as c_make_encoder
except ImportError:
c_make_encoder = None

INFINITY = float("inf")
encode_basestring = c_encode_basestring or py_encode_basestring
encode_basestring_ascii = c_encode_basestring_ascii or py_encode_basestring_ascii


class IgnoreNanEncoder(JSONEncoder):
def iterencode(self, o, _one_shot=False):
"""Encode the given object and yield each string
representation as available.
For example::
for chunk in JSONEncoder().iterencode(bigobject):
mysocket.write(chunk)
"""
if self.check_circular:
markers = {}
else:
markers = None
if self.ensure_ascii:
_encoder = encode_basestring_ascii
else:
_encoder = encode_basestring

def floatstr(
o, _repr=float.__repr__, _inf=INFINITY, _neginf=-INFINITY,
):
if o != o or o == _inf or o == _neginf:
return "null"
else:
return _repr(o)

_iterencode = _make_iterencode(
markers,
self.default,
_encoder,
self.indent,
floatstr,
self.key_separator,
self.item_separator,
self.sort_keys,
self.skipkeys,
_one_shot,
)
return _iterencode(o, 0)
17 changes: 11 additions & 6 deletions tamr_unify_client/dataset/collection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import warnings

from requests.exceptions import HTTPError

from tamr_unify_client.base_collection import BaseCollection
Expand Down Expand Up @@ -108,7 +110,7 @@ def create(self, creation_spec):
return Dataset.from_json(self.client, data)

def create_from_dataframe(
self, df, primary_key_name, dataset_name, ignore_nan=True
self, df, primary_key_name, dataset_name, ignore_nan=None
):
"""Creates a dataset in this collection with the given name, creates an attribute for each column in the `df`
(with `primary_key_name` as the key attribute), and upserts a record for each row of `df`.
Expand All @@ -125,14 +127,18 @@ def create_from_dataframe(
:type primary_key_name: str
:param dataset_name: What to name the dataset in Tamr. There cannot already be a dataset with this name.
:type dataset_name: str
:param ignore_nan: Whether to convert `NaN` values to `null` before upserting records to Tamr. If `False` and
`NaN` is in `df`, this function will fail. Optional, default is `True`.
:param ignore_nan: Legacy parameter that does nothing
:type ignore_nan: bool
:returns: The newly created dataset.
:rtype: :class:`~tamr_unify_client.dataset.resource.Dataset`
:raises KeyError: If `primary_key_name` is not a column in `df`.
:raises CreationError: If a step in creating the dataset fails.
"""
if ignore_nan is not None:
warnings.warn(
"'ignore_nan' is deprecated. DataFrame `NaN`s are always ignored in upsert",
DeprecationWarning,
)
if primary_key_name not in df.columns:
raise KeyError(f"{primary_key_name} is not an attribute of the data")

Expand All @@ -158,10 +164,9 @@ def create_from_dataframe(
except HTTPError:
self._handle_creation_failure(dataset, "An attribute was not created")

records = df.to_dict(orient="records")
try:
response = dataset.upsert_records(
records, primary_key_name, ignore_nan=ignore_nan
response = dataset.upsert_from_dataframe(
df, primary_key_name=primary_key_name
)
except HTTPError:
self._handle_creation_failure(dataset, "Records could not be created")
Expand Down
56 changes: 41 additions & 15 deletions tamr_unify_client/dataset/resource.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from copy import deepcopy
import json
import os
from typing import TYPE_CHECKING

import simplejson as json
from typing import Optional, TYPE_CHECKING
import warnings

from tamr_unify_client._ignore_nan_encoder import IgnoreNanEncoder
from tamr_unify_client.attribute.collection import AttributeCollection
from tamr_unify_client.base_resource import BaseResource
from tamr_unify_client.dataset.profile import DatasetProfile
Expand Down Expand Up @@ -64,20 +65,27 @@ def attributes(self):
alias = self.api_path + "/attributes"
return AttributeCollection(self.client, alias)

def _update_records(self, updates, **json_args):
def _update_records(self, updates, *, ignore_nan=False):
"""Send a batch of record creations/updates/deletions to this dataset.
You probably want to use :func:`~tamr_unify_client.dataset.resource.Dataset.upsert_records`
or :func:`~tamr_unify_client.dataset.resource.Dataset.delete_records` instead.
:param records: Each record should be formatted as specified in the `Public Docs for Dataset updates <https://docs.tamr.com/reference#modify-a-datasets-records>`_.
:type records: iterable[dict]
:param `**json_args`: Arguments to pass to the JSON `dumps` function, as documented `here <https://simplejson.readthedocs.io/en/latest/#simplejson.dumps>`_.
Some of these, such as `indent`, may not work with Tamr.
:param ignore_nan: Whether to treat `NaN` values as null. Unconverted `NaN`s will raise an error if found. Deprecated.
:type ignore_nan: bool
:returns: JSON response body from server.
:rtype: :py:class:`dict`
"""
if ignore_nan:
warnings.warn(
"'ignore_nan' is deprecated. Users are expected to provide valid JSON representations instead",
DeprecationWarning,
)
encoder = IgnoreNanEncoder if ignore_nan else None
stringified_updates = (
json.dumps(update, **json_args).encode("utf-8") for update in updates
json.dumps(update, cls=encoder, allow_nan=False).encode("utf-8")
for update in updates
)

return (
Expand All @@ -91,14 +99,18 @@ def _update_records(self, updates, **json_args):
)

def upsert_from_dataframe(
self, df: "pd.DataFrame", *, primary_key_name: str, ignore_nan: bool = True
self,
df: "pd.DataFrame",
*,
primary_key_name: str,
ignore_nan: Optional[bool] = None,
) -> dict:
"""Upserts a record for each row of `df` with attributes for each column in `df`.
Args:
df: The data to upsert records from.
primary_key_name: The name of the primary key of the dataset. Must be a column of `df`.
ignore_nan: Whether to convert `NaN` values to `null` before upserting records to Tamr. If `False` and `NaN` is in `df`, this function will fail. Optional, default is `True`.
ignore_nan: Legacy parameter that does nothing. Deprecated.
Returns:
JSON response body from the server.
Expand All @@ -107,29 +119,43 @@ def upsert_from_dataframe(
KeyError: If `primary_key_name` is not a column in `df`.
"""
if ignore_nan is not None:
warnings.warn(
"'ignore_nan' is deprecated. DataFrame `NaN`s are always ignored in upsert",
DeprecationWarning,
)
if primary_key_name not in df.columns:
raise KeyError(f"{primary_key_name} is not an attribute of the data")

records = df.to_dict(orient="records")
return self.upsert_records(records, primary_key_name, ignore_nan=ignore_nan)
# serialize records via to_json to handle `np.nan` values
serialized_records = ((pk, row.to_json()) for pk, row in df.iterrows())
records = (
{primary_key_name: pk, **json.loads(row)} for pk, row in serialized_records
)
return self.upsert_records(records, primary_key_name)

def upsert_records(self, records, primary_key_name, **json_args):
def upsert_records(self, records, primary_key_name, *, ignore_nan=False):
"""Creates or updates the specified records.
:param records: The records to update, as dictionaries.
:type records: iterable[dict]
:param primary_key_name: The name of the primary key for these records, which must be a key in each record dictionary.
:type primary_key_name: str
:param `**json_args`: Arguments to pass to the JSON `dumps` function, as documented `here <https://simplejson.readthedocs.io/en/latest/#simplejson.dumps>`_.
Some of these, such as `indent`, may not work with Tamr.
:param ignore_nan: Whether to convert `NaN` values to `null` when upserting records. If `False` and `NaN` is found this function will fail. Deprecated.
:type ignore_nan: bool
:return: JSON response body from the server.
:rtype: dict
"""
if ignore_nan:
warnings.warn(
"'ignore_nan' is deprecated. Users are expected to provide valid JSON representations instead",
DeprecationWarning,
)
updates = (
{"action": "CREATE", "recordId": record[primary_key_name], "record": record}
for record in records
)
return self._update_records(updates, **json_args)
return self._update_records(updates, ignore_nan=ignore_nan)

def delete_records(self, records, primary_key_name):
"""Deletes the specified records.
Expand Down

0 comments on commit 22651b3

Please sign in to comment.