Skip to content

Commit

Permalink
Merge pull request #390 from skalish/add-operations
Browse files Browse the repository at this point in the history
TC: Add operations
  • Loading branch information
pcattori committed Jun 24, 2020
2 parents 344f024 + ef8b0e4 commit 80dac5d
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- generic projects via `tc.project`
- Mastering projects via `tc.mastering.project`
- Support for streaming records from a dataset via `tc.record.stream`
- Support for operations via `tc.operations`

**BUG FIXES**
- `from_geo_features` now returns information on the operation.
Expand Down
1 change: 1 addition & 0 deletions docs/beta.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [Dataset](beta/dataset)
* [Instance](beta/instance)
* [Mastering](beta/mastering)
* [Operation](beta/operation)
* [Project](beta/project)
* [Response](beta/response)
* [Session](beta/session)
2 changes: 1 addition & 1 deletion docs/beta/dataset/unified.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Unified
.. autoclass:: tamr_client.dataset.unified.UnifiedDataset

.. autofunction:: tamr_client.dataset.unified.from_project
.. autofunction:: tamr_client.dataset.unified.commit
.. autofunction:: tamr_client.dataset.unified.apply_changes

Exceptions
----------
Expand Down
8 changes: 8 additions & 0 deletions docs/beta/operation.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Operation
=========

.. autoclass:: tamr_client.Operation

.. autofunction:: tamr_client.operation.poll
.. autofunction:: tamr_client.operation.wait
.. autofunction:: tamr_client.operation.succeeded
4 changes: 4 additions & 0 deletions tamr_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@

from tamr_client import mastering
from tamr_client import project

# operations
from tamr_client.operation import Operation
from tamr_client import operation
2 changes: 2 additions & 0 deletions tamr_client/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ class Dataset:
Args:
url
name
key_attribute_names
description
"""

url: URL
Expand Down
26 changes: 20 additions & 6 deletions tamr_client/dataset/unified.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from dataclasses import dataclass
from typing import Optional, Tuple

from tamr_client import response
from tamr_client import operation, response
from tamr_client.instance import Instance
from tamr_client.operation import Operation
from tamr_client.project import Project
from tamr_client.session import Session
from tamr_client.types import JsonDict
Expand All @@ -29,7 +30,9 @@ class UnifiedDataset:
Args:
url
name
key_attribute_names
description
"""

url: URL
Expand Down Expand Up @@ -94,15 +97,26 @@ def _from_json(url: URL, data: JsonDict) -> UnifiedDataset:
)


def commit(session: Session, unified_dataset: UnifiedDataset) -> JsonDict:
"""Commits the Unified Dataset.
def apply_changes(session: Session, unified_dataset: UnifiedDataset) -> Operation:
"""Applies changes to the unified dataset and waits for the operation to complete
Args:
unified_dataset: The UnifiedDataset which will be committed
session: The Tamr Session
unified_dataset: The Unified Dataset which will be committed
"""
op = _apply_changes_async(session, unified_dataset)
return operation.wait(session, op)


def _apply_changes_async(
session: Session, unified_dataset: UnifiedDataset
) -> Operation:
"""Applies changes to the unified dataset
Args:
unified_dataset: The Unified Dataset which will be committed
"""
r = session.post(
str(unified_dataset.url) + ":refresh",
headers={"Content-Type": "application/json", "Accept": "application/json"},
)
return response.successful(r).json()
return operation._from_response(unified_dataset.url.instance, r)
163 changes: 163 additions & 0 deletions tamr_client/operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
"""
See https://docs.tamr.com/new/reference/the-operation-object
"""
from copy import deepcopy
from dataclasses import dataclass
from time import sleep, time as now
from typing import Dict, Optional

import requests

from tamr_client import response
from tamr_client.instance import Instance
from tamr_client.session import Session
from tamr_client.types import JsonDict
from tamr_client.url import URL


class NotFound(Exception):
"""Raised when referencing an operation that does not exist on the server.
"""

pass


@dataclass(frozen=True)
class Operation:
"""A Tamr operation
See https://docs.tamr.com/new/reference/the-operation-object
Args:
url
type
status
description
"""

url: URL
type: str
status: Optional[Dict[str, str]] = None
description: Optional[str] = None


def poll(session: Session, operation: Operation) -> Operation:
"""Poll this operation for server-side updates.
Does not update the :class:`~tamr_client.operation.Operation` object.
Instead, returns a new :class:`~tamr_client.operation.Operation`.
Args:
operation: Operation to be polled.
"""
return _from_url(session, operation.url)


def wait(
session: Session,
operation: Operation,
*,
poll_interval_seconds: int = 3,
timeout_seconds: Optional[int] = None,
) -> Operation:
"""Continuously polls for this operation's server-side state.
Args:
operation: Operation to be polled.
poll_interval_seconds: Time interval (in seconds) between subsequent polls.
timeout_seconds: Time (in seconds) to wait for operation to resolve.
Raises:
TimeoutError: If operation takes longer than `timeout_seconds` to resolve.
"""
started = now()
while timeout_seconds is None or now() - started < timeout_seconds:
if operation.status is None:
return operation
elif operation.status["state"] in ["PENDING", "RUNNING"]:
sleep(poll_interval_seconds)
elif operation.status["state"] in ["CANCELED", "SUCCEEDED", "FAILED"]:
return operation
operation = poll(session, operation)
raise TimeoutError(
f"Waiting for operation took longer than {timeout_seconds} seconds."
)


def succeeded(operation: Operation) -> bool:
"""Convenience method for checking if operation was successful.
"""
return operation.status is not None and operation.status["state"] == "SUCCEEDED"


def _from_response(instance: Instance, response: requests.Response) -> Operation:
"""
Handle idiosyncrasies in constructing Operations from Tamr responses.
When a Tamr API call would start an operation, but all results that would be
produced by that operation are already up-to-date, Tamr returns `HTTP 204 No Content`
To make it easy for client code to handle these API responses without checking
the response code, this method will either construct an Operation, or a
dummy `NoOp` operation representing the 204 Success response.
Args:
response: HTTP Response from the request that started the operation.
"""
if response.status_code == 204:
# Operation was successful, but the response contains no content.
# Create a dummy operation to represent this.
_never = "0000-00-00T00:00:00.000Z"
_description = """Tamr returned HTTP 204 for this operation, indicating that all
results that would be produced by the operation are already up-to-date."""
resource_json = {
"id": "-1",
"type": "NOOP",
"description": _description,
"status": {
"state": "SUCCEEDED",
"startTime": _never,
"endTime": _never,
"message": "",
},
"created": {"username": "", "time": _never, "version": "-1"},
"lastModified": {"username": "", "time": _never, "version": "-1"},
"relativeId": "operations/-1",
}
else:
resource_json = response.json()
_id = resource_json["id"]
_url = URL(instance=instance, path=f"operations/{_id}")
return _from_json(_url, resource_json)


def _from_url(session: Session, url: URL) -> Operation:
"""Get operation by URL
Fetches operation from Tamr server
Args:
url: Operation URL
Raises:
OperationNotFound: If no operation could be found at the specified URL.
Corresponds to a 404 HTTP error.
requests.HTTPError: If any other HTTP error is encountered.
"""
r = session.get(str(url))
if r.status_code == 404:
raise NotFound(str(url))
data = response.successful(r).json()
return _from_json(url, data)


def _from_json(url: URL, data: JsonDict):
"""Make operation from JSON data (deserialize)
Args:
url: Operation URL
data: Operation JSON data from Tamr server
"""
cp = deepcopy(data)
return Operation(
url, type=cp["type"], status=cp.get("status"), description=cp.get("description")
)
File renamed without changes.
22 changes: 22 additions & 0 deletions tests/tamr_client/data/operation_succeeded.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"id": "1",
"type": "SPARK",
"description": "operation 1 description",
"status": {
"state": "SUCCEEDED",
"startTime": "",
"endTime": "",
"message": ""
},
"created": {
"username": "admin",
"time": "2020-06-12T18:21:42.288Z",
"version": "operation 1 created version"
},
"lastModified": {
"username": "admin",
"time": "2020-06-12T18:21:42.288Z",
"version": "operation 1 modified version"
},
"relativeId": "operations/1"
}
17 changes: 7 additions & 10 deletions tests/tamr_client/dataset/test_unified.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,16 @@ def test_from_project_dataset_not_found():


@responses.activate
def test_commit():
def test_apply_changes():
s = utils.session()
instance = utils.instance()
project = utils.mastering_project()

operation_json = utils.load_json("operation.json")
dataset_json = utils.load_json("dataset.json")
prj_url = tc.URL(path="projects/1/unifiedDataset")
responses.add(responses.GET, str(prj_url), json=dataset_json)
unified_dataset = tc.dataset.unified.from_project(s, instance, project)
dataset_url = tc.URL(path="projects/1/unifiedDataset")
unified_dataset = tc.dataset.unified._from_json(dataset_url, dataset_json)

operation_json = utils.load_json("operation_pending.json")
operation_url = tc.URL(path="operations/1")
url = tc.URL(path="projects/1/unifiedDataset:refresh")
responses.add(responses.POST, str(url), json=operation_json)

response = tc.dataset.unified.commit(s, unified_dataset)
assert response == operation_json
response = tc.dataset.unified._apply_changes_async(s, unified_dataset)
assert response == tc.operation._from_json(operation_url, operation_json)

0 comments on commit 80dac5d

Please sign in to comment.