Skip to content

Commit

Permalink
refactor: save location for log files (DEV-1856) (#328)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum committed Mar 14, 2023
1 parent f21a64e commit 30f914d
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 60 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -64,12 +64,13 @@ venv.bak/
# created files
lists.json
out.json
id2iri_*
*id2iri_mapping*
stashed*
**/~$*.*
testdata/tmp/
testdata/test-list.json
src/dsp_tools/docker/sipi.docker-config.lua
metrics/

# for testing in development
tmp/
185 changes: 127 additions & 58 deletions src/dsp_tools/utils/xml_upload.py
Expand Up @@ -32,6 +32,103 @@
MetricRecord = namedtuple("MetricRecord", ["res_id", "filetype", "filesize_mb", "event", "duration_ms", "mb_per_sec"])


def _transform_server_url_to_foldername(server: str) -> str:
"""
Take the server URL and transform it so that it can be used as foldername.
Args:
server: server, e.g. "https://api.test.dasch.swiss/" or "http://0.0.0.0:3333"
Returns:
simplified version, e.g. "test.dasch.swiss" or "localhost"
"""
server_substitutions = {
r"https?://": "",
r"^api\.": "",
r":\d{2,5}/?$": "",
r"/$": "",
r"0.0.0.0": "localhost"
}
for pattern, repl in server_substitutions.items():
server = re.sub(pattern, repl, server)
return server


def _determine_save_location_of_logs(
server: str,
proj_shortcode: str,
onto_name: str
) -> tuple[Path, str, str]:
"""
Determine the save location for the logs that will be used if the xmlupload is interrupted.
They are going to be stored in ~/.dsp-tools/xmluploads/server/shortcode/ontoname.
This path is computed and created.
Args:
server: URL of the DSP server where the data is uploaded to
proj_shortcode: 4-digit hexadecimal shortcode of the project
onto_name: name of the ontology that the data belongs to
Returns:
a tuple consisting of the absolute full path to the storage location,
a version of the server URL that can be used as foldername,
and the timestamp string that can be used as component of file names
(so that different log files of the same xmlupload have the same timestamp)
"""
server_as_foldername = _transform_server_url_to_foldername(server)
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H%M%S")
save_location = Path.home() / Path(".dsp-tools") / "xmluploads" / server_as_foldername / proj_shortcode / onto_name
save_location.mkdir(parents=True, exist_ok=True)
return save_location, server_as_foldername, timestamp_str


def _write_log_files_and_metrics(
id2iri_mapping: dict[str, str],
metrics: Optional[list[MetricRecord]],
failed_uploads: list[str],
input_file: Union[str, etree._ElementTree[Any]],
timestamp_str: str,
server_as_foldername: str
) -> bool:
"""
Writes the id2iri mapping and the metrics into the current working directory,
and prints the failed uploads (if applicable).
Args:
id2iri_mapping: mapping of ids from the XML file to IRIs in DSP (initially empty, gets filled during the upload)
metrics: list with the metric records collected until now (gets filled during the upload)
failed_uploads: ids of resources that could not be uploaded (initially empty, gets filled during the upload)
input_file: path to the XML file or parsed ElementTree
timestamp_str: timestamp for the name of the log files
server_as_foldername: simplified version of the server URL that can be used as folder name
Returns:
True if there are no failed_uploads, False otherwise
"""
# determine names of log files
if isinstance(input_file, str):
id2iri_filename = f"{Path(input_file).stem}_id2iri_mapping_{timestamp_str}.json"
metrics_filename = f"{timestamp_str}_metrics_{server_as_foldername}_{Path(input_file).stem}.csv"
else:
id2iri_filename = f"{timestamp_str}_id2iri_mapping.json"
metrics_filename = f"{timestamp_str}_metrics_{server_as_foldername}.csv"

# write log files and print info
success = True
with open(id2iri_filename, "x") as f:
json.dump(id2iri_mapping, f, ensure_ascii=False, indent=4)
print(f"The mapping of internal IDs to IRIs was written to {id2iri_filename}")
if failed_uploads:
print(f"\nWARNING: Could not upload the following resources: {failed_uploads}\n")
success = False
if metrics:
os.makedirs("metrics", exist_ok=True)
df = pd.DataFrame(metrics)
df.to_csv(f"metrics/{metrics_filename}")

return success


def _remove_circular_references(resources: list[XMLResource], verbose: bool) -> \
tuple[list[XMLResource],
dict[XMLResource, dict[XMLProperty, dict[str, KnoraStandoffXml]]],
Expand Down Expand Up @@ -344,37 +441,32 @@ def xml_upload(
uploaded because there is an error in it
"""

# Validate the input XML file
# parse the XML file
validate_xml_against_schema(input_file=input_file)
tree = _parse_xml_file(input_file=input_file)
root = tree.getroot()
default_ontology = root.attrib['default-ontology']
shortcode = root.attrib['shortcode']

# determine save location that will be used for logs if the xmlupload is interrupted
save_location, server_as_foldername, timestamp_str = _determine_save_location_of_logs(
server=server,
proj_shortcode=shortcode,
onto_name=default_ontology
)

# start metrics
metrics: list[MetricRecord] = []
preparation_start = datetime.now()

# figure out where to save the state of an interrupted xmlupload
save_location = Path.home() / Path(".dsp-tools")
server_as_foldername = server
server_substitutions = {
r"https?://": "",
r"^api\..+": "",
r":\d{4}/?$": "",
r"0.0.0.0": "localhost"
}
for pattern, repl in server_substitutions.items():
server_as_foldername = re.sub(pattern, repl, server_as_foldername)

# Connect to the DaSCH Service Platform API and get the project context
con = Connection(server)
try_network_action(failure_msg="Unable to login to DSP server", action=lambda: con.login(user, password))
proj_context = try_network_action(failure_msg="Unable to retrieve project context from DSP server",
action=lambda: ProjectContext(con=con))
sipi_server = Sipi(sipi, con.get_token())

# parse the XML file
tree = _parse_xml_file(input_file=input_file)
root = tree.getroot()
default_ontology = root.attrib['default-ontology']
shortcode = root.attrib['shortcode']
# make Python object representations of the XML file
resources: list[XMLResource] = []
permissions: dict[str, XmlPermission] = {}
for child in root:
Expand Down Expand Up @@ -437,36 +529,21 @@ def xml_upload(
failed_uploads=failed_uploads,
stashed_xml_texts=nonapplied_xml_texts or stashed_xml_texts,
stashed_resptr_props=nonapplied_resptr_props or stashed_resptr_props,
proj_shortcode=shortcode,
onto_name=default_ontology,
server_as_foldername=server_as_foldername,
save_location=save_location
save_location=save_location,
timestamp_str=timestamp_str
)

# determine names of log files
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H%M%S")
if isinstance(input_file, str):
id2iri_filename = f"{Path(input_file).stem}_id2iri_mapping_{timestamp_str}.json"
metrics_filename = f"{timestamp_str}_metrics_{server_as_foldername}_{Path(input_file).stem}.csv"
else:
id2iri_filename = f"{timestamp_str}_id2iri_mapping.json"
metrics_filename = f"{timestamp_str}_metrics_{server_as_foldername}.csv"

# write log files and print info
success = True
with open(id2iri_filename, "x") as f:
json.dump(id2iri_mapping, f, ensure_ascii=False, indent=4)
print(f"The mapping of internal IDs to IRIs was written to {id2iri_filename}")
if failed_uploads:
print(f"\nWARNING: Could not upload the following resources: {failed_uploads}\n")
success = False
if save_metrics:
os.makedirs("metrics", exist_ok=True)
df = pd.DataFrame(metrics)
df.to_csv(f"metrics/{metrics_filename}")
# write regular log files, metrics, and print some final info
success = _write_log_files_and_metrics(
id2iri_mapping=id2iri_mapping,
failed_uploads=failed_uploads,
metrics=metrics,
input_file=input_file,
timestamp_str=timestamp_str,
server_as_foldername=server_as_foldername
)
if success:
print("All resources have successfully been uploaded.")

return success


Expand Down Expand Up @@ -816,10 +893,8 @@ def _handle_upload_error(
failed_uploads: list[str],
stashed_xml_texts: dict[XMLResource, dict[XMLProperty, dict[str, KnoraStandoffXml]]],
stashed_resptr_props: dict[XMLResource, dict[XMLProperty, list[str]]],
proj_shortcode: str,
onto_name: str,
server_as_foldername: str,
save_location: Path
save_location: Path,
timestamp_str: str
) -> None:
"""
In case the xmlupload must be interrupted, e.g. because of an error that could not be handled, or due to keyboard
Expand All @@ -833,43 +908,37 @@ def _handle_upload_error(
failed_uploads: resources that caused an error when uploading to DSP
stashed_xml_texts: all xml texts that have been stashed
stashed_resptr_props: all resptr props that have been stashed
proj_shortcode: shortcode of the project the data belongs to
onto_name: name of the ontology the data references
server_as_foldername: the server which the data is uploaded onto (in a form that can be used as folder name)
save_location: path where to save the logs
timestamp_str: timestamp for the name of the log files
Returns:
None
"""

print(f'\n=========================================='
f'\nxmlupload must be aborted because of an error')
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H%M%S")

save_location_full = save_location / "xmluploads" / server_as_foldername / proj_shortcode / onto_name
save_location_full.mkdir(parents=True, exist_ok=True)

# only stashed properties of resources that already exist in DSP are of interest
stashed_xml_texts = _purge_stashed_xml_texts(stashed_xml_texts, id2iri_mapping)
stashed_resptr_props = _purge_stashed_resptr_props(stashed_resptr_props, id2iri_mapping)

if id2iri_mapping:
id2iri_mapping_file = f"{save_location_full}/{timestamp_str}_id2iri_mapping.json"
id2iri_mapping_file = f"{save_location}/{timestamp_str}_id2iri_mapping.json"
with open(id2iri_mapping_file, "x") as f:
json.dump(id2iri_mapping, f, ensure_ascii=False, indent=4)
print(f"The mapping of internal IDs to IRIs was written to {id2iri_mapping_file}")

if stashed_xml_texts:
stashed_xml_texts_serializable = {r.id: {p.name: xml for p, xml in rdict.items()} for r, rdict in stashed_xml_texts.items()}
xml_filename = f"{save_location_full}/{timestamp_str}_stashed_text_properties.json"
xml_filename = f"{save_location}/{timestamp_str}_stashed_text_properties.json"
with open(xml_filename, "x") as f:
json.dump(stashed_xml_texts_serializable, f, ensure_ascii=False, indent=4, cls=KnoraStandoffXmlEncoder)
print(f"There are stashed text properties that could not be reapplied to the resources they were stripped "
f"from. They were saved to {xml_filename}.")

if stashed_resptr_props:
stashed_resptr_props_serializable = {r.id: {p.name: plist for p, plist in rdict.items()} for r, rdict in stashed_resptr_props.items()}
resptr_filename = f"{save_location_full}/{timestamp_str}_stashed_resptr_properties.json"
resptr_filename = f"{save_location}/{timestamp_str}_stashed_resptr_properties.json"
with open(resptr_filename, "x") as f:
json.dump(stashed_resptr_props_serializable, f, ensure_ascii=False, indent=4)
print(
Expand Down
41 changes: 40 additions & 1 deletion test/unittests/test_xmlupload.py
Expand Up @@ -9,11 +9,50 @@
from dsp_tools.models.xmlresource import XMLResource
from dsp_tools.utils.xml_upload import (_convert_ark_v0_to_resource_iri,
_parse_xml_file,
_remove_circular_references)
_remove_circular_references,
_transform_server_url_to_foldername,
_determine_save_location_of_logs)


class TestXMLUpload(unittest.TestCase):

def test_transform_server_url_to_foldername(self) -> None:
testcases: list[tuple[str, str]] = [
("https://api.test.dasch.swiss/", "test.dasch.swiss"),
("http://api.082e-test-server.dasch.swiss/", "082e-test-server.dasch.swiss"),
("http://0.0.0.0:12345", "localhost"),
("https://0.0.0.0:80/", "localhost")
]
for input, expected_output in testcases:
actual_output = _transform_server_url_to_foldername(input)
self.assertEqual(actual_output, expected_output)



def test_determine_save_location_of_logs(self) -> None:
shortcode = "082E"
onto_name = "rosetta"
testcases: list[tuple[str, str]] = [
("https://api.test.dasch.swiss/", f"/.dsp-tools/xmluploads/test.dasch.swiss/{shortcode}/{onto_name}"),
("http://api.082e-test-server.dasch.swiss/", f"/.dsp-tools/xmluploads/082e-test-server.dasch.swiss/{shortcode}/{onto_name}")
]
for server, expected_path in testcases:
save_location, _, _ = _determine_save_location_of_logs(
server=server,
proj_shortcode=shortcode,
onto_name=onto_name
)
self.assertTrue(str(save_location).endswith(expected_path))
self.assertTrue(save_location.is_dir())
try:
save_location.rmdir()
save_location.parent.rmdir()
save_location.parent.parent.rmdir()
except OSError:
# there was already stuff in the folder before this test: do nothing
pass


def test_parse_xml_file(self) -> None:
test_data_systematic_tree = etree.parse("testdata/xml-data/test-data-systematic.xml")
output1 = _parse_xml_file("testdata/xml-data/test-data-systematic.xml")
Expand Down

0 comments on commit 30f914d

Please sign in to comment.