Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions data_pipeline_api/fdp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
from datetime import datetime
from typing import Any, Optional

# from typing import BinaryIO, Union
from urllib.parse import urlsplit

import requests
import yaml

FILE_PREFIX = "file://"
SERVER_RESPONSE_STR = "Server responded with: "

def get_first_entry(entries: list) -> dict:
"""
Expand Down Expand Up @@ -74,7 +75,7 @@ def get_entry(
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise ValueError(
"Server responded with: "
SERVER_RESPONSE_STR
+ str(response.status_code)
+ " Query = "
+ url
Expand Down Expand Up @@ -107,7 +108,7 @@ def get_entity(
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise ValueError(
"Server responded with: "
SERVER_RESPONSE_STR
+ str(response.status_code)
+ " Query = "
+ url
Expand Down Expand Up @@ -162,7 +163,7 @@ def post_entry(
return existing_entry[0]

if response.status_code != 201:
raise ValueError("Server responded with: " + str(response.status_code))
raise ValueError(SERVER_RESPONSE_STR + str(response.status_code))

return response.json()

Expand All @@ -187,7 +188,7 @@ def patch_entry(

response = requests.patch(url, data_json, headers=headers)
if response.status_code != 200:
raise ValueError("Server responded with: " + str(response.status_code))
raise ValueError(SERVER_RESPONSE_STR + str(response.status_code))

return response.json()

Expand Down Expand Up @@ -224,7 +225,7 @@ def post_storage_root(
| dict: repsonse from the local registy
"""
if "local" in data and data["local"]:
data["root"] = "file://" + data["root"]
data["root"] = FILE_PREFIX + data["root"]
if data["root"][-1] != os.sep:
data["root"] = data["root"] + os.sep
elif data["root"][-1] != "/":
Expand All @@ -237,7 +238,7 @@ def post_file_type(
"""
Internal wrapper function to return check if a file_type already exists and return it.
"""
if not "extension" in data and data["extension"]:
if "extension" not in data and data["extension"]:
raise ValueError("error file_type name not specified")
file_type_exists = get_entry(url= url,
endpoint= "file_type",
Expand All @@ -258,8 +259,8 @@ def remove_local_from_root(root: str) -> str:
Returns:
| str: the root without file://
"""
if "file://" in root:
root = root.replace("file://", "")
if FILE_PREFIX in root:
root = root.replace(FILE_PREFIX, "")

return root

Expand Down Expand Up @@ -290,7 +291,6 @@ def get_file_hash(
"""
with open(path, "rb") as data:
_data = data.read()
# data = data.encode('utf-8')
hashed = hashlib.sha1(_data)

return hashed.hexdigest()
Expand Down Expand Up @@ -407,7 +407,7 @@ def register_issues(token: str, handle: dict) -> dict: # sourcery no-metrics
severity = None
for i in issues:
if issues[i]["group"] == group:
type = issues[i]["type"]
issue_type = issues[i]["type"]
issue = issues[i]["issue"]
severity = issues[i]["severity"]
index = issues[i]["index"]
Expand All @@ -418,12 +418,12 @@ def register_issues(token: str, handle: dict) -> dict: # sourcery no-metrics

component_url = None
object_id = None
if type == "config":
if issue_type == "config":
object_id = handle["model_config"]
elif type == "github_repo":
elif issue_type == "github_repo":
object_id = handle["code_repo"]

elif type == "submission_script":
elif issue_type == "submission_script":
object_id = handle["submission_script"]
if object_id:
component_url = get_entry(
Expand Down Expand Up @@ -470,7 +470,7 @@ def register_issues(token: str, handle: dict) -> dict: # sourcery no-metrics
api_version=api_version,
)[0]["url"]

object = get_entry(
object_entry = get_entry(
url=api_url,
endpoint="data_product",
query={
Expand All @@ -480,7 +480,7 @@ def register_issues(token: str, handle: dict) -> dict: # sourcery no-metrics
},
api_version=api_version,
)[0]["object"]
object_id = extract_id(object)
object_id = extract_id(object_entry)
if component:
component_url = get_entry(
url=api_url,
Expand Down
6 changes: 4 additions & 2 deletions data_pipeline_api/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def link_read(handle: dict, data_product: str) -> str:
api_version = handle["yaml"]["run_metadata"]["api_version"]

if "namespace" in use:
namespace = use["namespace"]
if use["namespace"]:
namespace = use["namespace"]

# Get namespace url and extract id
namespace_url = fdp_utils.get_entry(
Expand All @@ -136,7 +137,8 @@ def link_read(handle: dict, data_product: str) -> str:
namespace_id = fdp_utils.extract_id(namespace_url)

if "data_product" in use:
data_product = use["data_product"]
if use["data_product"]:
data_product = use["data_product"]

version = use["version"] if "version" in use else "0.0.1"

Expand Down
20 changes: 9 additions & 11 deletions data_pipeline_api/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from data_pipeline_api import fdp_utils

WRITING_STR = "Writing {} to local registry"

def initialise(token: str, config: str, script: str) -> dict:
"""Reads in token, config file and script, creates necessary registry items
Expand Down Expand Up @@ -149,7 +150,7 @@ def initialise(token: str, config: str, script: str) -> dict:

config_object_url = config_object["url"]

logging.info("Writing {} to local registry".format(filename))
logging.info(WRITING_STR.format(filename))

# Check if script exists in storage_location
script_storageroot_url = config_storageroot_url
Expand Down Expand Up @@ -185,7 +186,6 @@ def initialise(token: str, config: str, script: str) -> dict:
script_location_url = script_location_response["url"]
script_file_type = os.path.basename(script).split(".")[-1]

# TODO: Change to Batch?
# Create Script File Type
script_filetype_response = fdp_utils.post_file_type(
token=token,
Expand All @@ -212,7 +212,7 @@ def initialise(token: str, config: str, script: str) -> dict:

script_object_url = script_object["url"]

logging.info("Writing {} to local registry".format(script))
logging.info(WRITING_STR.format(script))

# Create new remote storage root
repo_storageroot_url = fdp_utils.post_storage_root(
Expand Down Expand Up @@ -271,7 +271,7 @@ def initialise(token: str, config: str, script: str) -> dict:

coderepo_object_url = coderepo_object_response["url"]

logging.info("Writing {} to local registry".format(repo_name))
logging.info(WRITING_STR.format(repo_name))

# Register new code run

Expand Down Expand Up @@ -343,7 +343,6 @@ def finalise(token: str, handle: dict) -> None:
| component_url: component url
| data_product_url: data product url
"""
# token = fdp_utils.read_token(token)
registry_url = handle["yaml"]["run_metadata"]["local_data_registry_url"]
datastore = handle["yaml"]["run_metadata"]["write_data_store"]
api_version = handle["yaml"]["run_metadata"]["api_version"]
Expand Down Expand Up @@ -399,13 +398,13 @@ def finalise(token: str, handle: dict) -> None:
api_version=api_version,
)["url"]

hash = fdp_utils.get_file_hash(handle["output"][output]["path"])
file_hash = fdp_utils.get_file_hash(handle["output"][output]["path"])

storage_exists = fdp_utils.get_entry(
url=registry_url,
endpoint="storage_location",
query={
"hash": hash,
"hash": file_hash,
"public": handle["output"][output]["public"],
"storage_root": datastore_root_id,
},
Expand Down Expand Up @@ -433,7 +432,6 @@ def finalise(token: str, handle: dict) -> None:
directory
)
)
pass
directory = os.path.split(directory)[0]
i += 1
if i > 4:
Expand Down Expand Up @@ -461,7 +459,7 @@ def finalise(token: str, handle: dict) -> None:
handle["output"][output]["path"]
)
extension = tmp_filename.split(sep=".")[-1]
new_filename = ".".join([hash, extension])
new_filename = ".".join([file_hash, extension])
data_product = handle["output"][output]["data_product"]
namespace = handle["output"][output]["use_namespace"]
new_path = os.path.join(
Expand All @@ -478,7 +476,7 @@ def finalise(token: str, handle: dict) -> None:
endpoint="storage_location",
data={
"path": new_storage_location,
"hash": hash,
"hash": file_hash,
"public": handle["output"][output]["public"],
"storage_root": datastore_root_url,
},
Expand Down Expand Up @@ -575,7 +573,7 @@ def finalise(token: str, handle: dict) -> None:
handle["output"][output]["data_product_url"] = data_product_url

logging.info(
"Writing {} to local registry".format(
WRITING_STR.format(
handle["output"][output]["use_data_product"]
)
)
Expand Down
22 changes: 9 additions & 13 deletions tests/test_fdp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import data_pipeline_api.fdp_utils as fdp_utils

TEST_ROOT = "https://test.com"

@pytest.fixture
def test_dir() -> str:
Expand Down Expand Up @@ -37,8 +38,7 @@ def test_is_file_exists(test_dir: str) -> None:
"file_path",
[
"file_not_found",
"",
# None
""
],
)
def test_is_file_not_exists(file_path: str) -> None:
Expand All @@ -57,9 +57,7 @@ def test_is_yaml(file_path: str, request: FixtureRequest) -> None:
"file_path",
[
"file_not_found",
"",
# None,
# os.path.join(test_dir, 'test.csv')
""
],
)
def test_is_yaml_not(file_path: str) -> None:
Expand All @@ -78,9 +76,7 @@ def test_is_valid_yaml(file_path: str, request: FixtureRequest) -> None:
"file_path",
[
"file_not_found",
"",
# None,
# os.path.join(test_dir, 'test.csv')
""
],
)
def test_is_valid_yaml_not(file_path: str) -> None:
Expand Down Expand Up @@ -201,7 +197,7 @@ def test_post_entry(token: str, url: str) -> None:
storage_root = fdp_utils.post_entry(
token=token,
url=url,
data={"root": "https://test.com"},
data={"root": TEST_ROOT},
endpoint="storage_root",
)
assert type(storage_root) == dict
Expand All @@ -212,7 +208,7 @@ def test_post_entry_409(token: str, url: str) -> None:
storage_root = fdp_utils.post_entry(
token=token,
url=url,
data={"root": "https://test.com"},
data={"root": TEST_ROOT},
endpoint="storage_root",
)
assert type(storage_root) == dict
Expand Down Expand Up @@ -241,7 +237,7 @@ def test_post_entry_500(token: str, url: str) -> None:
fdp_utils.post_entry(
token=token,
url=url,
data={"root": "https://test.com"},
data={"root": TEST_ROOT},
endpoint="non_existant",
)

Expand Down Expand Up @@ -299,7 +295,7 @@ def test_wrong_api_version(token: str, url: str) -> None:
fdp_utils.post_entry(
token=token,
url=url,
data={"root": "https://test.com"},
data={"root": TEST_ROOT},
endpoint="storage_root",
api_version="2.2.2",
)
Expand All @@ -311,7 +307,7 @@ def test_wrong_api_version_get(token: str, url: str) -> None:
fdp_utils.get_entry(
token=token,
url=url,
query={"root": "https://test.com"},
query={"root": TEST_ROOT},
endpoint="storage_root",
api_version="3.0.0",
)
Expand Down