Skip to content

Commit

Permalink
Merge pull request #442 from skalish/mastering-ops
Browse files Browse the repository at this point in the history
TC: Fix mastering functions and conform to synchronous operations
  • Loading branch information
pcattori committed Aug 21, 2020
2 parents 9e6708e + 61bbb54 commit 710b343
Show file tree
Hide file tree
Showing 16 changed files with 531 additions and 60 deletions.
4 changes: 1 addition & 3 deletions tamr_client/categorization/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ def manual_labels(
_dataset.NotFound: If no dataset could be found at the specified URL
Ambiguous: If multiple targets match dataset name
"""
unified_dataset = unified.from_project(
session=session, instance=instance, project=project
)
unified_dataset = unified.from_project(session=session, project=project)
labels_dataset_name = unified_dataset.name + "_manual_categorizations"
datasets_url = URL(instance=instance, path="datasets")
r = session.get(
Expand Down
8 changes: 2 additions & 6 deletions tamr_client/dataset/unified.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from tamr_client import operation, response
from tamr_client._types import (
Instance,
JsonDict,
Operation,
Project,
Expand All @@ -24,23 +23,20 @@ class NotFound(TamrClientException):
pass


def from_project(
session: Session, instance: Instance, project: Project
) -> UnifiedDataset:
def from_project(session: Session, project: Project) -> UnifiedDataset:
"""Get unified dataset of a project
Fetches the unified dataset of a given project from Tamr server
Args:
instance: Tamr instance containing this dataset
project: Tamr project of this Unified Dataset
Raises:
unified.NotFound: If no unified dataset could be found at the specified URL.
Corresponds to a 404 HTTP error.
requests.HTTPError: If any other HTTP error is encountered.
"""
url = URL(instance=instance, path=f"{project.url.path}/unifiedDataset")
url = URL(instance=project.url.instance, path=f"{project.url.path}/unifiedDataset")
return _from_url(session, url)


Expand Down
8 changes: 8 additions & 0 deletions tamr_client/mastering/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
"""
from tamr_client.mastering import project
from tamr_client.mastering._mastering import (
_apply_feedback_async,
_estimate_pairs_async,
_generate_pairs_async,
_publish_clusters_async,
_update_cluster_results_async,
_update_high_impact_pairs_async,
_update_pair_results_async,
_update_unified_dataset_async,
apply_feedback,
estimate_pairs,
generate_pairs,
Expand Down
102 changes: 79 additions & 23 deletions tamr_client/mastering/_mastering.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,87 +3,143 @@
See https://docs.tamr.com/docs/overall-workflow-mastering
The terminology used here is consistent with Tamr UI terminology
Asynchronous versions of each function can be found with the suffix `_async` and may be of
interest to power users
"""
from tamr_client import operation
from tamr_client._types import MasteringProject, Operation, Session
from tamr_client.dataset import unified


def update_unified_dataset(session: Session, project: MasteringProject) -> Operation:
"""Applies changes to the unified dataset and waits for the operation to complete
"""Apply changes to the unified dataset and wait for the operation to complete
Args:
project: Tamr Mastering project
"""
unified_dataset = unified.from_project(session, project.url.instance, project)
return unified.apply_changes(session, unified_dataset)
op = _update_unified_dataset_async(session, project)
return operation.wait(session, op)


def estimate_pairs(session: Session, project: MasteringProject) -> Operation:
"""Updates the estimated pair counts
"""Update the estimated pair counts and wait for the operation to complete
Args:
project: Tamr Mastering project
"""
r = session.post(str(project.url) + "estimatedPairCounts:refresh")
return operation._from_response(project.url.instance, r)
op = _estimate_pairs_async(session, project)
return operation.wait(session, op)


def generate_pairs(session: Session, project: MasteringProject) -> Operation:
"""Generates pairs according to the binning model
"""Generate pairs according to the binning model and wait for the operation
to complete
Args:
project: Tamr Mastering project
"""
r = session.post(str(project.url) + "recordPairs:refresh")
return operation._from_response(project.url.instance, r)
op = _generate_pairs_async(session, project)
return operation.wait(session, op)


def apply_feedback(session: Session, project: MasteringProject) -> Operation:
"""Trains the pair-matching model according to verified labels
"""Train the pair-matching model according to verified labels and wait for the
operation to complete
Args:
project: Tamr Mastering project
"""
r = session.post(str(project.url) + "recordPairsWithPredictions/model:refresh")
return operation._from_response(project.url.instance, r)
op = _apply_feedback_async(session, project)
return operation.wait(session, op)


def update_pair_results(session: Session, project: MasteringProject) -> Operation:
"""Updates record pair predictions according to the latest pair-matching model
"""Update record pair predictions according to the latest pair-matching model and
wait for the operation to complete
Args:
project: Tamr Mastering project
"""
r = session.post(str(project.url) + "recordPairsWithPredictions:refresh")
return operation._from_response(project.url.instance, r)
op = _update_pair_results_async(session, project)
return operation.wait(session, op)


def update_high_impact_pairs(session: Session, project: MasteringProject) -> Operation:
"""Produces new high-impact pairs according to the latest pair-matching model
"""Produce new high-impact pairs according to the latest pair-matching model and
wait for the operation to complete
Args:
project: Tamr Mastering project
"""
r = session.post(str(project.url) + "highImpactPairs:refresh")
return operation._from_response(project.url.instance, r)
op = _update_high_impact_pairs_async(session, project)
return operation.wait(session, op)


def update_cluster_results(session: Session, project: MasteringProject) -> Operation:
"""Generates clusters based on the latest pair-matching model
"""Generate clusters based on the latest pair-matching model and wait for the
operation to complete
Args:
project: Tamr Mastering project
"""
r = session.post(str(project.url) + "recordClusters:refresh")
return operation._from_response(project.url.instance, r)
op = _update_cluster_results_async(session, project)
return operation.wait(session, op)


def publish_clusters(session: Session, project: MasteringProject) -> Operation:
"""Publishes current record clusters
"""Publish current record clusters and wait for the operation to complete
Args:
project: Tamr Mastering project
"""
r = session.post(str(project.url) + "publishedClustersWithData:refresh")
op = _publish_clusters_async(session, project)
return operation.wait(session, op)


def _update_unified_dataset_async(
session: Session, project: MasteringProject
) -> Operation:
unified_dataset = unified.from_project(session, project)
return unified._apply_changes_async(session, unified_dataset)


def _estimate_pairs_async(session: Session, project: MasteringProject) -> Operation:
r = session.post(str(project.url) + "/estimatedPairCounts:refresh")
return operation._from_response(project.url.instance, r)


def _generate_pairs_async(session: Session, project: MasteringProject) -> Operation:
r = session.post(str(project.url) + "/recordPairs:refresh")
return operation._from_response(project.url.instance, r)


def _apply_feedback_async(session: Session, project: MasteringProject) -> Operation:
r = session.post(str(project.url) + "/recordPairsWithPredictions/model:refresh")
return operation._from_response(project.url.instance, r)


def _update_pair_results_async(
session: Session, project: MasteringProject
) -> Operation:
r = session.post(str(project.url) + "/recordPairsWithPredictions:refresh")
return operation._from_response(project.url.instance, r)


def _update_high_impact_pairs_async(
session: Session, project: MasteringProject
) -> Operation:
r = session.post(str(project.url) + "/highImpactPairs:refresh")
return operation._from_response(project.url.instance, r)


def _update_cluster_results_async(
session: Session, project: MasteringProject
) -> Operation:
r = session.post(str(project.url) + "/recordClusters:refresh")
return operation._from_response(project.url.instance, r)


def _publish_clusters_async(session: Session, project: MasteringProject) -> Operation:
r = session.post(str(project.url) + "/publishedClustersWithData:refresh")
return operation._from_response(project.url.instance, r)
46 changes: 18 additions & 28 deletions tests/tamr_client/dataset/test_unified.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,40 @@
import pytest
import responses

import tamr_client as tc
from tests.tamr_client import fake, utils
from tests.tamr_client import fake


@responses.activate
@fake.json
def test_from_project():
s = fake.session()
instance = fake.instance()
project = fake.mastering_project()

dataset_json = utils.load_json("dataset.json")
url = tc.URL(path="projects/1/unifiedDataset")
responses.add(responses.GET, str(url), json=dataset_json)

unified_dataset = tc.dataset.unified.from_project(s, instance, project)
unified_dataset = tc.dataset.unified.from_project(s, project)
assert unified_dataset.name == "dataset 1 name"
assert unified_dataset.description == "dataset 1 description"
assert unified_dataset.key_attribute_names == ("tamr_id",)


@responses.activate
@fake.json
def test_from_project_dataset_not_found():
s = fake.session()
instance = fake.instance()
project = fake.mastering_project()

url = tc.URL(path="projects/1/unifiedDataset")
responses.add(responses.GET, str(url), status=404)

with pytest.raises(tc.dataset.unified.NotFound):
tc.dataset.unified.from_project(s, instance, project)
tc.dataset.unified.from_project(s, project)


@responses.activate
def test_apply_changes():
@fake.json
def test_apply_changes_async():
s = fake.session()
dataset_json = utils.load_json("dataset.json")
dataset_url = tc.URL(path="projects/1/unifiedDataset")
unified_dataset = tc.dataset.unified._from_json(dataset_url, dataset_json)

operation_json = utils.load_json("operation_pending.json")
operation_url = tc.URL(path="operations/1")
url = tc.URL(path="projects/1/unifiedDataset:refresh")
responses.add(responses.POST, str(url), json=operation_json)

response = tc.dataset.unified._apply_changes_async(s, unified_dataset)
assert response == tc.operation._from_json(operation_url, operation_json)
unified_dataset = fake.unified_dataset()

op = tc.dataset.unified._apply_changes_async(s, unified_dataset)
assert op.type == "SPARK"
assert op.description == "operation 1 description"
assert op.status == {
"state": "PENDING",
"startTime": "",
"endTime": "",
"message": "Job has not yet been submitted to Spark",
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[
{
"request": {
"method": "POST",
"path": "projects/1/unifiedDataset:refresh"
},
"response": {
"status": 200,
"json": {
"id": "1",
"type": "SPARK",
"description": "operation 1 description",
"status": {
"state": "PENDING",
"startTime": "",
"endTime": "",
"message": "Job has not yet been submitted to Spark"
},
"created": {
"username": "admin",
"time": "2020-06-12T18:21:42.288Z",
"version": "operation 1 created version"
},
"lastModified": {
"username": "admin",
"time": "2020-06-12T18:21:42.288Z",
"version": "operation 1 modified version"
},
"relativeId": "operations/1"
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[
{
"request": {
"method": "GET",
"path": "projects/1/unifiedDataset"
},
"response": {
"status": 200,
"json": {
"id": "unify://unified-data/v1/datasets/1",
"externalId": "number 1",
"name": "dataset 1 name",
"description": "dataset 1 description",
"version": "dataset 1 version",
"keyAttributeNames": [
"tamr_id"
],
"tags": [],
"created": {
"username": "admin",
"time": "2018-09-10T16:06:20.636Z",
"version": "dataset 1 created version"
},
"lastModified": {
"username": "admin",
"time": "2018-09-10T16:06:20.851Z",
"version": "dataset 1 modified version"
},
"relativeId": "datasets/1",
"upstreamDatasetIds": []
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[
{
"request": {
"method": "GET",
"path": "projects/1/unifiedDataset"
},
"response": {
"status": 404
}
}
]

0 comments on commit 710b343

Please sign in to comment.