Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add get_unit_status method #11

Merged
merged 18 commits into from Jan 7, 2020
Merged
11 changes: 11 additions & 0 deletions amclient/amclient.py
Expand Up @@ -459,6 +459,17 @@ def get_ingest_status(self):
headers=self._am_auth_headers(),
)

def get_unit_status(self, uuid):
sevein marked this conversation as resolved.
Show resolved Hide resolved
"""GET the status of a unit."""
transfer = utils._call_url("/api/ingest/status/{}".format(uuid))
sevein marked this conversation as resolved.
Show resolved Hide resolved
if (
transfer.get("status") == "COMPLETE"
and transfer.get("sip_uuid")
and transfer.get("sip_uuid") != "BACKLOG"
):
return utils._call_url("/api/transfer/status/{}".format(uuid))
sevein marked this conversation as resolved.
Show resolved Hide resolved
return transfer

def get_processing_config(self, assume_json=False):
"""GET a processing configuration file from an Archivematica instance.

Expand Down
70 changes: 54 additions & 16 deletions amclient/utils.py
Expand Up @@ -22,6 +22,38 @@
METHOD_DELETE = "DELETE"


def _call_url(
url, params=None, method=METHOD_GET, headers=None, data=None, assume_json=True
):
"""Helper to GET a URL.

:param str url: URL to call
:param dict params: Params to pass as HTTP query string
:param str method: HTTP method (e.g., 'GET')
:param dict headers: HTTP headers
:param dict data: Data to pass to request body
:param bool assume_json: set to False if the response body should not be
decoded as JSON
:returns: Dict of the returned JSON or raises an exception
"""
try:
response = requests.request(
method, url=url, params=params, headers=headers, data=data
)
LOGGER.debug("Response: %s", response)
LOGGER.debug("type(response.text): %s ", type(response.text))
LOGGER.debug("Response content-type: %s", response.headers["content-type"])
response.raise_for_status()
except Exception:
raise
sevein marked this conversation as resolved.
Show resolved Hide resolved
if assume_json:
try:
return response.json()
except ValueError: # JSON could not be decoded
raise
return response.text


def _call_url_json(url, params=None, method=METHOD_GET, headers=None, assume_json=True):
"""Helper to GET a URL where the expected response is 200 with JSON.

Expand All @@ -38,35 +70,41 @@ def _call_url_json(url, params=None, method=METHOD_GET, headers=None, assume_jso
LOGGER.debug("URL: %s; params: %s; method: %s", url, params, method)
try:
if method == METHOD_GET or method == METHOD_DELETE:
response = requests.request(method, url=url, params=params, headers=headers)
data = _call_url(
url,
method=method,
params=params,
headers=headers,
assume_json=assume_json,
)
else:
response = requests.request(method, url=url, data=params, headers=headers)
LOGGER.debug("Response: %s", response)
LOGGER.debug("type(response.text): %s ", type(response.text))
LOGGER.debug("Response content-type: %s", response.headers["content-type"])
data = _call_url(
url,
method=method,
data=params,
headers=headers,
assume_json=assume_json,
)
except (
urllib3.exceptions.NewConnectionError,
requests.exceptions.ConnectionError,
) as err:
LOGGER.error("Connection error %s", err)
return errors.ERR_SERVER_CONN
if not response.ok:
except requests.exceptions.RequestException as err:
LOGGER.debug("Response: %s", err.response.text)
LOGGER.warning(
"%s Request to %s returned %s %s",
method,
url,
response.status_code,
response.reason,
err.response.status_code,
err.response.reason,
)
LOGGER.debug("Response: %s", response.text)
return errors.ERR_INVALID_RESPONSE
if assume_json:
try:
return response.json()
except ValueError: # JSON could not be decoded
LOGGER.warning("Could not parse JSON from response: %s", response.text)
return errors.ERR_PARSE_JSON
return response.text
except ValueError as err:
LOGGER.warning("Could not parse JSON from response: %s", str(err))
return errors.ERR_PARSE_JSON
return data


try:
Expand Down
29 changes: 29 additions & 0 deletions tests/test_amclient.py
Expand Up @@ -10,6 +10,11 @@
import unittest
import uuid

try:
from unittest import mock
except ImportError:
import mock
sevein marked this conversation as resolved.
Show resolved Hide resolved

import vcr

amclient = os.path.join(
Expand Down Expand Up @@ -922,6 +927,30 @@ def test_get_jobs(self):
assert len(response) == 1
assert response[0]["name"] == "Verify metadata directory checksums"

@mock.patch("utils.requests.request")
def test_get_status(self, mock_request):
mock_request.return_value.json.return_value = {"status": "PROCESSING"}
response = amclient.AMClient(
am_api_key=AM_API_KEY, am_user_name=AM_USER_NAME, am_url=AM_URL
).get_unit_status("ca480d94-892c-4d99-bbb1-290698406571")
assert response.get("status") == "PROCESSING"

mock_request.return_value.json.return_value = {
"status": "COMPLETE",
"sip_uuid": "1234",
}
response = amclient.AMClient(
am_api_key=AM_API_KEY, am_user_name=AM_USER_NAME, am_url=AM_URL
).get_unit_status("ca480d94-892c-4d99-bbb1-290698406571")
assert response.get("status") == "COMPLETE"

mock_request.return_value.ok = False
mock_request.return_value.json.return_value = {}
response = amclient.AMClient(
am_api_key=AM_API_KEY, am_user_name=AM_USER_NAME, am_url=AM_URL
).get_unit_status("ca480d94-892c-4d99-bbb1-290698406571")
assert len(response) == 0

sevein marked this conversation as resolved.
Show resolved Hide resolved

if __name__ == "__main__":
unittest.main()