From 16fefae3e20231dbe2484e908e084688b195c893 Mon Sep 17 00:00:00 2001 From: Nora-Olivia-Ammann <103038637+Nora-Olivia-Ammann@users.noreply.github.com> Date: Thu, 5 Oct 2023 10:32:38 +0200 Subject: [PATCH] refactor(xmlupload): file xmlupload.py (DEV-2775) (#543) --- src/dsp_tools/models/xmlresource.py | 2 +- .../utils/xmlupload/resource_multimedia.py | 117 +++++++ src/dsp_tools/utils/xmlupload/xmlupload.py | 305 +++++++++++++----- 3 files changed, 334 insertions(+), 90 deletions(-) create mode 100644 src/dsp_tools/utils/xmlupload/resource_multimedia.py diff --git a/src/dsp_tools/models/xmlresource.py b/src/dsp_tools/models/xmlresource.py index 0e4b4a343..82d7d3d41 100644 --- a/src/dsp_tools/models/xmlresource.py +++ b/src/dsp_tools/models/xmlresource.py @@ -165,7 +165,7 @@ def get_propvals( prop_data[prop.name] = vals if len(vals) > 1 else vals[0] return prop_data - def get_bitstream( + def get_bitstream_information_from_sipi( self, internal_file_name_bitstream: str, permissions_lookup: dict[str, Permissions] ) -> Optional[dict[str, Union[str, Permissions]]]: """ diff --git a/src/dsp_tools/utils/xmlupload/resource_multimedia.py b/src/dsp_tools/utils/xmlupload/resource_multimedia.py new file mode 100644 index 000000000..84d8e526a --- /dev/null +++ b/src/dsp_tools/utils/xmlupload/resource_multimedia.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +from datetime import datetime +from pathlib import Path +from typing import Any, Optional + +from dsp_tools.models.permission import Permissions +from dsp_tools.models.sipi import Sipi +from dsp_tools.models.xmlresource import XMLResource +from dsp_tools.utils.create_logger import get_logger +from dsp_tools.utils.shared import try_network_action +from dsp_tools.utils.xmlupload.write_diagnostic_info import MetricRecord + +logger = get_logger(__name__) + + +def calculate_multimedia_file_size( + resources: list[XMLResource], + imgdir: str, + preprocessing_done: bool, +) -> tuple[list[float], float | int]: + """ + This function calculates the size of the bitstream files in the specified directory. + + Args: + resources: List of resources to identify the files used + imgdir: directory where the files are + preprocessing_done: True if sipi has preprocessed the files + + Returns: + List with all the file sizes + Total of all the file sizes + """ + # If there are multimedia files: calculate their total size + bitstream_all_sizes_mb = [ + Path(Path(imgdir) / Path(res.bitstream.value)).stat().st_size / 1000000 + if res.bitstream and not preprocessing_done + else 0.0 + for res in resources + ] + if sum(bitstream_all_sizes_mb) > 0: + bitstream_size_total_mb = round(sum(bitstream_all_sizes_mb), 1) + print(f"This xmlupload contains multimedia files with a total size of {bitstream_size_total_mb} MB.") + logger.info(f"This xmlupload contains multimedia files with a total size of {bitstream_size_total_mb} MB.") + else: # make Pylance happy + bitstream_size_total_mb = 0.0 + return bitstream_all_sizes_mb, bitstream_size_total_mb + + +def get_sipi_multimedia_information( + resource: XMLResource, + sipi_server: Sipi, + imgdir: str, + filesize: float, + permissions_lookup: dict[str, Permissions], + metrics: list[MetricRecord], + preprocessing_done: bool, +) -> dict[str, str | Permissions] | None: + """ + This function takes a resource with a corresponding bitstream filepath. + If the pre-processing is not done, it retrieves the file from the directory and uploads it to sipi. + If pre-processing is done it retrieves the bitstream information from sipi. + + Args: + resource: resource with that has a bitstream + sipi_server: server to upload + imgdir: directory of the file + filesize: size of the file + permissions_lookup: dictionary that contains the permission name as string and the corresponding Python object + metrics: to store metric information in + preprocessing_done: If True, then no upload is necessary + + Returns: + The information from sipi which is needed to establish a link from the resource + """ + if preprocessing_done: + resource_bitstream = resource.get_bitstream_information_from_sipi( + internal_file_name_bitstream=resource.bitstream.value, # type: ignore[union-attr] + permissions_lookup=permissions_lookup, + ) + else: + resource_bitstream = _upload_multimedia_to_sipi( + resource=resource, + sipi_server=sipi_server, + imgdir=imgdir, + filesize=filesize, + permissions_lookup=permissions_lookup, + metrics=metrics, + ) + return resource_bitstream + + +def _upload_multimedia_to_sipi( + resource: XMLResource, + sipi_server: Sipi, + imgdir: str, + filesize: float, + permissions_lookup: dict[str, Permissions], + metrics: list[MetricRecord], +) -> dict[str, str | Permissions] | None: + pth = resource.bitstream.value # type: ignore[union-attr] + bitstream_start = datetime.now() + filetype = Path(pth).suffix[1:] + img: Optional[dict[Any, Any]] = try_network_action( + sipi_server.upload_bitstream, + filepath=str(Path(imgdir) / Path(pth)), + ) + bitstream_duration = datetime.now() - bitstream_start + bitstream_duration_ms = bitstream_duration.seconds * 1000 + int(bitstream_duration.microseconds / 1000) + mb_per_sec = round((filesize / bitstream_duration_ms) * 1000, 1) + metrics.append(MetricRecord(resource.id, filetype, filesize, "bitstream upload", bitstream_duration_ms, mb_per_sec)) + internal_file_name_bitstream = img["uploadedFiles"][0]["internalFilename"] # type: ignore[index] + resource_bitstream = resource.get_bitstream_information_from_sipi( + internal_file_name_bitstream=internal_file_name_bitstream, + permissions_lookup=permissions_lookup, + ) + return resource_bitstream diff --git a/src/dsp_tools/utils/xmlupload/xmlupload.py b/src/dsp_tools/utils/xmlupload/xmlupload.py index 6e424fdc2..70279e883 100644 --- a/src/dsp_tools/utils/xmlupload/xmlupload.py +++ b/src/dsp_tools/utils/xmlupload/xmlupload.py @@ -7,7 +7,7 @@ import sys from datetime import datetime from pathlib import Path -from typing import Any, Optional, Union +from typing import Any, Union from lxml import etree @@ -28,6 +28,10 @@ check_consistency_with_ontology, validate_and_parse_xml_file, ) +from dsp_tools.utils.xmlupload.resource_multimedia import ( + calculate_multimedia_file_size, + get_sipi_multimedia_information, +) from dsp_tools.utils.xmlupload.stash_circular_references import remove_circular_references from dsp_tools.utils.xmlupload.upload_stashed_resptr_props import ( purge_stashed_resptr_props, @@ -100,35 +104,21 @@ def xmlupload( con = login(server=server, user=user, password=password, dump=dump) sipi_server = Sipi(sipi, con.get_token()) - # get the project context - try: - proj_context = try_network_action(lambda: ProjectContext(con=con)) - except BaseError: - logger.error("Unable to retrieve project context from DSP server", exc_info=True) - raise UserError("Unable to retrieve project context from DSP server") from None + proj_context = _get_project_context_from_server(connection=con) # make Python object representations of the XML file - resources: list[XMLResource] = [] - permissions: dict[str, XmlPermission] = {} - for child in root: - if child.tag == "permissions": - permission = XmlPermission(child, proj_context) - permissions[permission.id] = permission - elif child.tag == "resource": - resources.append(XMLResource(child, default_ontology)) + permissions, resources = _extract_resources_and_permissions_from_xml( + root=root, + default_ontology=default_ontology, + proj_context=proj_context, + ) - # get the project information and project ontology from the server - try: - res_inst_factory = try_network_action(lambda: ResourceInstanceFactory(con, shortcode)) - except BaseError: - logger.error(f"A project with shortcode {shortcode} could not be found on the DSP server", exc_info=True) - raise UserError(f"A project with shortcode {shortcode} could not be found on the DSP server") from None - permissions_lookup: dict[str, Permissions] = {s: perm.get_permission_instance() for s, perm in permissions.items()} - resclass_name_2_type: dict[str, type] = { - s: res_inst_factory.get_resclass_type(s) for s in res_inst_factory.get_resclass_names() - } + permissions_lookup, resclass_name_2_type = _get_project_permissions_and_classes_from_server( + server_connection=con, + permissions=permissions, + shortcode=shortcode, + ) - # check if the data in the XML is consistent with the ontology check_consistency_with_ontology( resources=resources, resclass_name_2_type=resclass_name_2_type, @@ -138,7 +128,7 @@ def xmlupload( ) # temporarily remove circular references - resources, stashed_xml_texts, stashed_resptr_props = remove_circular_references(resources, verbose) + resources, stashed_xml_texts, stashed_resptr_props = remove_circular_references(resources, verbose=verbose) preparation_duration = datetime.now() - preparation_start preparation_duration_ms = preparation_duration.seconds * 1000 + int(preparation_duration.microseconds / 1000) @@ -209,6 +199,105 @@ def xmlupload( return success +def _get_project_permissions_and_classes_from_server( + server_connection: Connection, + permissions: dict[str, XmlPermission], + shortcode: str, +) -> tuple[dict[str, Permissions], dict[str, type]]: + """ + This function tries to connect to the server and retrieve the project information. + If the project is not on the server, it raises a UserError. + From the information from the server, it creates a dictionary with the permission information, + and a dictionary with the information about the classes. + + Args: + server_connection: connection to the server + permissions: the permissions extracted from the XML + shortcode: the shortcode specified in the XML + + Returns: + A dictionary with the name of the permission with the Python object + And a dictionary with the class name as string and the Python type of the class + + Raises: + UserError: If the project is not uploaded on the server + """ + # get the project information and project ontology from the server + try: + res_inst_factory = try_network_action( + lambda: ResourceInstanceFactory(con=server_connection, projident=shortcode) + ) + except BaseError: + logger.error( + f"A project with shortcode {shortcode} could not be found on the DSP server", + exc_info=True, + ) + raise UserError(f"A project with shortcode {shortcode} could not be found on the DSP server") from None + permissions_lookup = { + permission_name: perm.get_permission_instance() for permission_name, perm in permissions.items() + } + resclass_name_2_type = { + resource_class_name: res_inst_factory.get_resclass_type(prefixedresclass=resource_class_name) + for resource_class_name in res_inst_factory.get_resclass_names() + } + return permissions_lookup, resclass_name_2_type + + +def _get_project_context_from_server(connection: Connection) -> ProjectContext: + """ + This function retrieves the project context previously uploaded on the server (json file) + + Args: + connection: connection to the server + + Returns: + Project context + + Raises: + UserError: If the project was not previously uploaded on the server + """ + try: + proj_context: ProjectContext = try_network_action(lambda: ProjectContext(con=connection)) + except BaseError: + logger.error( + "Unable to retrieve project context from DSP server", + exc_info=True, + ) + raise UserError("Unable to retrieve project context from DSP server") from None + return proj_context + + +def _extract_resources_and_permissions_from_xml( + root: etree._Element, + proj_context: ProjectContext, + default_ontology: str, +) -> tuple[dict[str, XmlPermission], list[XMLResource]]: + """ + This function takes the root of the tree, the project context on the server and the name of the default ontology. + From the root it separates the resource permissions. + It returns a collection of corresponding Python objects. + + Args: + root: root of the parsed XML file + proj_context: Project context retrieved from server + default_ontology: name of the default ontology as specified in the XML file + + Returns: + A dictionary with the permission name and the permission object + A list with the XML resource Python objects + """ + # make Python object representations of the XML file + resources: list[XMLResource] = [] + permissions: dict[str, XmlPermission] = {} + for child in root: + if child.tag == "permissions": + permission = XmlPermission(child, proj_context) + permissions[permission.id] = permission + elif child.tag == "resource": + resources.append(XMLResource(child, default_ontology)) + return permissions, resources + + def _upload_resources( resources: list[XMLResource], imgdir: str, @@ -242,63 +331,47 @@ def _upload_resources( id2iri_mapping, failed_uploads, metrics """ - # If there are multimedia files: calculate their total size - bitstream_all_sizes_mb = [ - Path(Path(imgdir) / Path(res.bitstream.value)).stat().st_size / 1000000 - if res.bitstream and not preprocessing_done - else 0.0 - for res in resources - ] - if sum(bitstream_all_sizes_mb) > 0: - bitstream_size_total_mb = round(sum(bitstream_all_sizes_mb), 1) - bitstream_size_uploaded_mb = 0.0 - print(f"This xmlupload contains multimedia files with a total size of {bitstream_size_total_mb} MB.") - logger.info(f"This xmlupload contains multimedia files with a total size of {bitstream_size_total_mb} MB.") - else: # make Pylance happy - bitstream_size_total_mb = 0.0 - bitstream_size_uploaded_mb = 0.0 + bitstream_all_sizes_mb, bitstream_size_total_mb = calculate_multimedia_file_size( + resources=resources, + imgdir=imgdir, + preprocessing_done=preprocessing_done, + ) + bitstream_size_uploaded_mb = 0.0 for i, resource in enumerate(resources): resource_start = datetime.now() filetype = "" filesize = round(bitstream_all_sizes_mb[i], 1) bitstream_duration_ms = None + resource_iri = resource.iri if resource.ark: resource_iri = convert_ark_v0_to_resource_iri(resource.ark) - # in case of a multimedia resource: upload the multimedia file - resource_bitstream = None - if preprocessing_done and resource.bitstream: - resource_bitstream = resource.get_bitstream(resource.bitstream.value, permissions_lookup) - elif resource.bitstream: - pth = resource.bitstream.value + if resource.bitstream: try: - bitstream_start = datetime.now() - filetype = Path(pth).suffix[1:] - img: Optional[dict[Any, Any]] = try_network_action( - sipi_server.upload_bitstream, - filepath=str(Path(imgdir) / Path(pth)), - ) - bitstream_duration = datetime.now() - bitstream_start - bitstream_duration_ms = bitstream_duration.seconds * 1000 + int(bitstream_duration.microseconds / 1000) - mb_per_sec = round((filesize / bitstream_duration_ms) * 1000, 1) - metrics.append( - MetricRecord(resource.id, filetype, filesize, "bitstream upload", bitstream_duration_ms, mb_per_sec) + resource_bitstream = get_sipi_multimedia_information( + resource=resource, + sipi_server=sipi_server, + imgdir=imgdir, + filesize=filesize, + permissions_lookup=permissions_lookup, + metrics=metrics, + preprocessing_done=preprocessing_done, ) + bitstream_size_uploaded_mb += filesize except BaseError as err: + pth = resource.bitstream.value err_msg = err.orig_err_msg_from_api or err.message msg = f"Unable to upload file '{pth}' of resource '{resource.label}' ({resource.id})" print(f"WARNING: {msg}: {err_msg}") logger.warning(msg, exc_info=True) - failed_uploads.append(resource.id) + msg = f"Uploaded file '{pth}' ({bitstream_size_uploaded_mb:.1f} MB / {bitstream_size_total_mb} MB)" + print(msg) + logger.info(msg) continue - bitstream_size_uploaded_mb += bitstream_all_sizes_mb[i] - msg = f"Uploaded file '{pth}' ({bitstream_size_uploaded_mb:.1f} MB / {bitstream_size_total_mb} MB)" - print(msg) - logger.info(msg) - internal_file_name_bitstream = img["uploadedFiles"][0]["internalFilename"] # type: ignore[index] - resource_bitstream = resource.get_bitstream(internal_file_name_bitstream, permissions_lookup) + else: + resource_bitstream = None # create the resource in DSP resclass_type = resclass_name_2_type[resource.restype] @@ -334,9 +407,11 @@ def _upload_resources( failed_uploads.append(resource.id) continue id2iri_mapping[resource.id] = created_resource.iri + resource_designation = f"'{created_resource.label}' (ID: '{resource.id}', IRI: '{created_resource.iri}')" print(f"Created resource {i+1}/{len(resources)}: {resource_designation}") logger.info(f"Created resource {i+1}/{len(resources)}: {resource_designation}") + resource_duration = datetime.now() - resource_start resource_duration_ms = resource_duration.seconds * 1000 + int(resource_duration.microseconds / 1000) looping_overhead_ms = resource_duration_ms - resource_creation_duration_ms - (bitstream_duration_ms or 0) @@ -398,18 +473,11 @@ def _handle_upload_error( logger.info(f"The mapping of internal IDs to IRIs was written to {id2iri_mapping_file}") if stashed_xml_texts: - stashed_xml_texts_serializable = { - r.id: {p.name: xml for p, xml in rdict.items()} for r, rdict in stashed_xml_texts.items() - } - xml_filename = f"{save_location}/{timestamp_str}_stashed_text_properties.json" - with open(xml_filename, "x", encoding="utf-8") as f: - json.dump( - obj=stashed_xml_texts_serializable, - fp=f, - ensure_ascii=False, - indent=4, - cls=KnoraStandoffXmlEncoder, - ) + xml_filename = save_json_stashed_text_properties( + stashed_xml_texts=stashed_xml_texts, + save_location=save_location, + timestamp_str=timestamp_str, + ) msg = ( f"There are stashed text properties that could not be reapplied to the resources they were stripped from. " f"They were saved to {xml_filename}.\n" @@ -418,17 +486,11 @@ def _handle_upload_error( logger.info(msg) if stashed_resptr_props: - stashed_resptr_props_serializable = { - r.id: {p.name: plist for p, plist in rdict.items()} for r, rdict in stashed_resptr_props.items() - } - resptr_filename = f"{save_location}/{timestamp_str}_stashed_resptr_properties.json" - with open(resptr_filename, "x", encoding="utf-8") as f: - json.dump( - obj=stashed_resptr_props_serializable, - fp=f, - ensure_ascii=False, - indent=4, - ) + resptr_filename = save_json_stashed_resptr_properties( + stashed_resptr_props=stashed_resptr_props, + save_location=save_location, + timestamp_str=timestamp_str, + ) msg = ( f"There are stashed resptr properties that could not be reapplied " f"to the resources they were stripped from. They were saved to {resptr_filename}\n" @@ -443,3 +505,68 @@ def _handle_upload_error( logger.info(msg) sys.exit(1) + + +def save_json_stashed_resptr_properties( + stashed_resptr_props: dict[XMLResource, dict[XMLProperty, list[str]]], + save_location: Path, + timestamp_str: str, +) -> str: + """ + This function saves the stashed resptr properties in a JSON file. + It returns the name of the file. + + Args: + stashed_resptr_props: Dictionary with the stash + save_location: filepath to the save location + timestamp_str: timestamp from the beginning of the upload + + Returns: + name of the JSON file + """ + stashed_resptr_props_serializable = { + resource.id: {_property.name: property_list for _property, property_list in res_dict.items()} + for resource, res_dict in stashed_resptr_props.items() + } + resptr_filename = f"{save_location}/{timestamp_str}_stashed_resptr_properties.json" + with open(resptr_filename, "x", encoding="utf-8") as f: + json.dump( + obj=stashed_resptr_props_serializable, + fp=f, + ensure_ascii=False, + indent=4, + ) + return resptr_filename + + +def save_json_stashed_text_properties( + stashed_xml_texts: dict[XMLResource, dict[XMLProperty, dict[str, KnoraStandoffXml]]], + save_location: Path, + timestamp_str: str, +) -> str: + """ + This function saves the stashed XML properties in a JSON file. + It returns the name of the file. + + Args: + stashed_xml_texts: Dictionary with the stash + save_location: filepath to the save location + timestamp_str: timestamp from the beginning of the upload + + Returns: + name of the JSON file + """ + stashed_xml_texts_serializable = { + resource.id: {_property.name: xml for _property, xml in res_dict.items()} + for resource, res_dict in stashed_xml_texts.items() + } + xml_filename = f"{save_location}/{timestamp_str}_stashed_text_properties.json" + with open(xml_filename, "x", encoding="utf-8") as file: + json.dump( + obj=stashed_xml_texts_serializable, + fp=file, + ensure_ascii=False, + indent=4, + cls=KnoraStandoffXmlEncoder, + ) + return xml_filename