From b9588c3436514cc2490c7cfde4ee06f83b64f9b4 Mon Sep 17 00:00:00 2001 From: Balduin Landolt <33053745+BalduinLandolt@users.noreply.github.com> Date: Wed, 18 Oct 2023 09:25:02 +0200 Subject: [PATCH] refactor: modularize xmlupload method (DEV-2836) (#574) --- pyproject.toml | 35 ++-- src/dsp_tools/cli.py | 6 +- src/dsp_tools/models/xmlresource.py | 2 +- .../utils/xmlupload/resource_multimedia.py | 126 +++++-------- .../utils/xmlupload/upload_config.py | 1 - .../utils/xmlupload/write_diagnostic_info.py | 28 --- src/dsp_tools/utils/xmlupload/xmlupload.py | 172 +++++++----------- 7 files changed, 137 insertions(+), 233 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5c4fa60fe..c20b69d49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,28 +96,25 @@ addopts = ["--import-mode=importlib"] [tool.mypy] -ignore_missing_imports = true # TODO: deactivate this +ignore_missing_imports = true # TODO: deactivate this show_column_numbers = true strict = true exclude = [ - "src/dsp_tools/import_scripts", # TODO: activate this - "src/dsp_tools/models/exceptions.py", # TODO: activate this - "src/dsp_tools/models/group.py", # TODO: activate this - "src/dsp_tools/models/helpers.py ", # TODO: activate this - "src/dsp_tools/models/langstring.py", # TODO: activate this - "src/dsp_tools/models/listnode.py", # TODO: activate this - "src/dsp_tools/models/model.py", # TODO: activate this - "src/dsp_tools/models/ontology.py", # TODO: activate this - "src/dsp_tools/models/permission.py", # TODO: activate this - "src/dsp_tools/models/project.py", # TODO: activate this - "src/dsp_tools/models/projectContext.py", # TODO: activate this - "src/dsp_tools/models/propertyclass.py", # TODO: activate this - "src/dsp_tools/models/propertyelement.py", # TODO: activate this - "src/dsp_tools/models/resource.py", # TODO: activate this - "src/dsp_tools/models/resourceclass.py", # TODO: activate this - "src/dsp_tools/models/user.py", # TODO: activate this - "src/dsp_tools/models/value.py", # TODO: activate this - "src/dsp_tools/models/xmlresource.py", # TODO: activate this + "src/dsp_tools/import_scripts", # TODO: activate this + "src/dsp_tools/models/group.py", # TODO: activate this + "src/dsp_tools/models/helpers.py ", # TODO: activate this + "src/dsp_tools/models/langstring.py", # TODO: activate this + "src/dsp_tools/models/listnode.py", # TODO: activate this + "src/dsp_tools/models/ontology.py", # TODO: activate this + "src/dsp_tools/models/permission.py", # TODO: activate this + "src/dsp_tools/models/project.py", # TODO: activate this + "src/dsp_tools/models/projectContext.py", # TODO: activate this + "src/dsp_tools/models/propertyclass.py", # TODO: activate this + "src/dsp_tools/models/resource.py", # TODO: activate this + "src/dsp_tools/models/resourceclass.py", # TODO: activate this + "src/dsp_tools/models/user.py", # TODO: activate this + "src/dsp_tools/models/value.py", # TODO: activate this + "src/dsp_tools/models/xmlresource.py", # TODO: activate this ] diff --git a/src/dsp_tools/cli.py b/src/dsp_tools/cli.py index 964ec02a0..b9785e360 100644 --- a/src/dsp_tools/cli.py +++ b/src/dsp_tools/cli.py @@ -465,11 +465,7 @@ def _call_requested_action(args: argparse.Namespace) -> bool: password=args.password, imgdir=args.imgdir, sipi=args.sipi_url, - config=UploadConfig( - verbose=args.verbose, - dump=args.dump, - save_metrics=args.metrics, - ), + config=UploadConfig(verbose=args.verbose, dump=args.dump), ) elif args.action == "process-files": success = process_files( diff --git a/src/dsp_tools/models/xmlresource.py b/src/dsp_tools/models/xmlresource.py index 3955c585b..314a50d3d 100644 --- a/src/dsp_tools/models/xmlresource.py +++ b/src/dsp_tools/models/xmlresource.py @@ -174,7 +174,7 @@ def get_propvals( prop_data[prop.name] = vals if len(vals) > 1 else vals[0] return prop_data - def get_bitstream_information_from_sipi( + def get_bitstream_information( self, internal_file_name_bitstream: str, permissions_lookup: dict[str, Permissions] ) -> Optional[dict[str, Union[str, Permissions]]]: """ diff --git a/src/dsp_tools/utils/xmlupload/resource_multimedia.py b/src/dsp_tools/utils/xmlupload/resource_multimedia.py index 84d8e526a..c52aa2ce5 100644 --- a/src/dsp_tools/utils/xmlupload/resource_multimedia.py +++ b/src/dsp_tools/utils/xmlupload/resource_multimedia.py @@ -1,117 +1,89 @@ from __future__ import annotations -from datetime import datetime from pathlib import Path from typing import Any, Optional +from dsp_tools.models.exceptions import BaseError from dsp_tools.models.permission import Permissions from dsp_tools.models.sipi import Sipi +from dsp_tools.models.xmlbitstream import XMLBitstream from dsp_tools.models.xmlresource import XMLResource from dsp_tools.utils.create_logger import get_logger from dsp_tools.utils.shared import try_network_action -from dsp_tools.utils.xmlupload.write_diagnostic_info import MetricRecord logger = get_logger(__name__) -def calculate_multimedia_file_size( - resources: list[XMLResource], - imgdir: str, - preprocessing_done: bool, -) -> tuple[list[float], float | int]: - """ - This function calculates the size of the bitstream files in the specified directory. - - Args: - resources: List of resources to identify the files used - imgdir: directory where the files are - preprocessing_done: True if sipi has preprocessed the files - - Returns: - List with all the file sizes - Total of all the file sizes - """ - # If there are multimedia files: calculate their total size - bitstream_all_sizes_mb = [ - Path(Path(imgdir) / Path(res.bitstream.value)).stat().st_size / 1000000 - if res.bitstream and not preprocessing_done - else 0.0 - for res in resources - ] - if sum(bitstream_all_sizes_mb) > 0: - bitstream_size_total_mb = round(sum(bitstream_all_sizes_mb), 1) - print(f"This xmlupload contains multimedia files with a total size of {bitstream_size_total_mb} MB.") - logger.info(f"This xmlupload contains multimedia files with a total size of {bitstream_size_total_mb} MB.") - else: # make Pylance happy - bitstream_size_total_mb = 0.0 - return bitstream_all_sizes_mb, bitstream_size_total_mb - - -def get_sipi_multimedia_information( +def _upload_bitstream( resource: XMLResource, sipi_server: Sipi, imgdir: str, - filesize: float, permissions_lookup: dict[str, Permissions], - metrics: list[MetricRecord], - preprocessing_done: bool, ) -> dict[str, str | Permissions] | None: """ - This function takes a resource with a corresponding bitstream filepath. - If the pre-processing is not done, it retrieves the file from the directory and uploads it to sipi. - If pre-processing is done it retrieves the bitstream information from sipi. + This function uploads a specified bitstream file to SIPI and then returns the file information from SIPI. Args: resource: resource with that has a bitstream sipi_server: server to upload imgdir: directory of the file - filesize: size of the file permissions_lookup: dictionary that contains the permission name as string and the corresponding Python object - metrics: to store metric information in - preprocessing_done: If True, then no upload is necessary Returns: The information from sipi which is needed to establish a link from the resource """ - if preprocessing_done: - resource_bitstream = resource.get_bitstream_information_from_sipi( - internal_file_name_bitstream=resource.bitstream.value, # type: ignore[union-attr] - permissions_lookup=permissions_lookup, - ) - else: - resource_bitstream = _upload_multimedia_to_sipi( - resource=resource, - sipi_server=sipi_server, - imgdir=imgdir, - filesize=filesize, - permissions_lookup=permissions_lookup, - metrics=metrics, - ) - return resource_bitstream - - -def _upload_multimedia_to_sipi( - resource: XMLResource, - sipi_server: Sipi, - imgdir: str, - filesize: float, - permissions_lookup: dict[str, Permissions], - metrics: list[MetricRecord], -) -> dict[str, str | Permissions] | None: pth = resource.bitstream.value # type: ignore[union-attr] - bitstream_start = datetime.now() - filetype = Path(pth).suffix[1:] img: Optional[dict[Any, Any]] = try_network_action( sipi_server.upload_bitstream, filepath=str(Path(imgdir) / Path(pth)), ) - bitstream_duration = datetime.now() - bitstream_start - bitstream_duration_ms = bitstream_duration.seconds * 1000 + int(bitstream_duration.microseconds / 1000) - mb_per_sec = round((filesize / bitstream_duration_ms) * 1000, 1) - metrics.append(MetricRecord(resource.id, filetype, filesize, "bitstream upload", bitstream_duration_ms, mb_per_sec)) internal_file_name_bitstream = img["uploadedFiles"][0]["internalFilename"] # type: ignore[index] - resource_bitstream = resource.get_bitstream_information_from_sipi( + resource_bitstream = resource.get_bitstream_information( internal_file_name_bitstream=internal_file_name_bitstream, permissions_lookup=permissions_lookup, ) return resource_bitstream + + +def handle_bitstream( + resource: XMLResource, + bitstream: XMLBitstream, + preprocessing_done: bool, + permissions_lookup: dict[str, Permissions], + sipi_server: Sipi, + imgdir: str, +) -> dict[str, Any] | None: + """ + Upload a bitstream file to SIPI + + Args: + resource: resource holding the bitstream + bitstream: the bitstream object + preprocessing_done: whether the preprocessing is done already + permissions_lookup: dictionary that contains the permission name as string and the corresponding Python object + sipi_server: server to upload + imgdir: directory of the file + + Returns: + The information from sipi which is needed to establish a link from the resource + """ + try: + if preprocessing_done: + resource_bitstream = resource.get_bitstream_information(bitstream.value, permissions_lookup) + else: + resource_bitstream = _upload_bitstream( + resource=resource, + sipi_server=sipi_server, + imgdir=imgdir, + permissions_lookup=permissions_lookup, + ) + msg = f"Uploaded file '{bitstream.value}'" + print(msg) + logger.info(msg) + return resource_bitstream + except BaseError as err: + err_msg = err.orig_err_msg_from_api or err.message + msg = f"Unable to upload file '{bitstream.value}' of resource '{resource.label}' ({resource.id})" + print(f"WARNING: {msg}: {err_msg}") + logger.warning(msg, exc_info=True) + return None diff --git a/src/dsp_tools/utils/xmlupload/upload_config.py b/src/dsp_tools/utils/xmlupload/upload_config.py index 6f15b88ad..0c8417c3a 100644 --- a/src/dsp_tools/utils/xmlupload/upload_config.py +++ b/src/dsp_tools/utils/xmlupload/upload_config.py @@ -40,7 +40,6 @@ class UploadConfig: verbose: bool = False dump: bool = False - save_metrics: bool = False preprocessing_done: bool = False server_as_foldername: str = field(default="unknown") save_location: Path = field(default=Path.home() / ".dsp-tools" / "xmluploads") diff --git a/src/dsp_tools/utils/xmlupload/write_diagnostic_info.py b/src/dsp_tools/utils/xmlupload/write_diagnostic_info.py index ee0b5a92c..0a7c0f7e9 100644 --- a/src/dsp_tools/utils/xmlupload/write_diagnostic_info.py +++ b/src/dsp_tools/utils/xmlupload/write_diagnostic_info.py @@ -1,19 +1,12 @@ from __future__ import annotations import json -import os -from collections import namedtuple from pathlib import Path from typing import Any -import pandas as pd from lxml import etree from dsp_tools.utils.create_logger import get_logger -from dsp_tools.utils.xmlupload.upload_config import UploadConfig - -MetricRecord = namedtuple("MetricRecord", ["res_id", "filetype", "filesize_mb", "event", "duration_ms", "mb_per_sec"]) - logger = get_logger(__name__) @@ -33,24 +26,3 @@ def write_id2iri_mapping( json.dump(id2iri_mapping, f, ensure_ascii=False, indent=4) print(f"The mapping of internal IDs to IRIs was written to {id2iri_filename}") logger.info(f"The mapping of internal IDs to IRIs was written to {id2iri_filename}") - - -def write_metrics( - metrics: list[MetricRecord], - input_file: str | Path | etree._ElementTree[Any], - config: UploadConfig, -) -> None: - """Writes the metrics to a file.""" - match input_file: - case str() | Path(): - metrics_filename = ( - f"{config.timestamp_str}_metrics_{config.server_as_foldername}_{Path(input_file).stem}.csv" - ) - case _: - metrics_filename = f"{config.timestamp_str}_metrics_{config.server_as_foldername}.csv" - - # write files and print info - os.makedirs("metrics", exist_ok=True) - df = pd.DataFrame(metrics) - df.to_csv(f"metrics/{metrics_filename}") - print(f"Total time of xmlupload: {sum(int(record.duration_ms) for record in metrics) / 1000:.1f} seconds") diff --git a/src/dsp_tools/utils/xmlupload/xmlupload.py b/src/dsp_tools/utils/xmlupload/xmlupload.py index c575834ee..89684116f 100644 --- a/src/dsp_tools/utils/xmlupload/xmlupload.py +++ b/src/dsp_tools/utils/xmlupload/xmlupload.py @@ -5,7 +5,6 @@ import json import sys -from datetime import datetime from pathlib import Path from typing import Any, Union @@ -26,16 +25,13 @@ check_consistency_with_ontology, validate_and_parse_xml_file, ) -from dsp_tools.utils.xmlupload.resource_multimedia import ( - calculate_multimedia_file_size, - get_sipi_multimedia_information, -) +from dsp_tools.utils.xmlupload.resource_multimedia import handle_bitstream from dsp_tools.utils.xmlupload.stash.stash_models import Stash from dsp_tools.utils.xmlupload.stash_circular_references import remove_circular_references from dsp_tools.utils.xmlupload.upload_config import UploadConfig from dsp_tools.utils.xmlupload.upload_stashed_resptr_props import upload_stashed_resptr_props from dsp_tools.utils.xmlupload.upload_stashed_xml_texts import upload_stashed_xml_texts -from dsp_tools.utils.xmlupload.write_diagnostic_info import MetricRecord, write_id2iri_mapping, write_metrics +from dsp_tools.utils.xmlupload.write_diagnostic_info import write_id2iri_mapping logger = get_logger(__name__) @@ -81,9 +77,6 @@ def xmlupload( onto_name=default_ontology, ) - # start metrics - metrics: list[MetricRecord] = [] - # establish connection to DSP server con = login(server=server, user=user, password=password, dump=config.dump) sipi_server = Sipi(sipi, con.get_token()) @@ -94,24 +87,20 @@ def xmlupload( default_ontology=default_ontology, shortcode=shortcode, verbose=config.verbose, - metrics=metrics, ) - id2iri_mapping, failed_uploads, metrics = _upload( + id2iri_mapping, failed_uploads = _upload( resources=resources, imgdir=imgdir, sipi_server=sipi_server, permissions_lookup=permissions_lookup, resclass_name_2_type=resclass_name_2_type, con=con, - metrics=metrics, stash=stash, config=config, ) write_id2iri_mapping(id2iri_mapping, input_file, config.timestamp_str) - if config.save_metrics and metrics: - write_metrics(metrics, input_file, config) success = not failed_uploads if success: print("All resources have successfully been uploaded.") @@ -128,9 +117,7 @@ def _prepare_upload( default_ontology: str, shortcode: str, verbose: bool, - metrics: list[MetricRecord], ) -> tuple[list[XMLResource], dict[str, Permissions], dict[str, type], Stash | None]: - preparation_start = datetime.now() resources, permissions_lookup, resclass_name_2_type = _get_data_from_xml( con=con, root=root, @@ -140,9 +127,6 @@ def _prepare_upload( ) # temporarily remove circular references resources, stash = remove_circular_references(resources, verbose=verbose) - preparation_duration = datetime.now() - preparation_start - preparation_duration_ms = preparation_duration.seconds * 1000 + int(preparation_duration.microseconds / 1000) - metrics.append(MetricRecord("", "", "", "xml upload preparation", preparation_duration_ms, "")) return resources, permissions_lookup, resclass_name_2_type, stash @@ -153,21 +137,19 @@ def _upload( permissions_lookup: dict[str, Permissions], resclass_name_2_type: dict[str, type], con: Connection, - metrics: list[MetricRecord], stash: Stash | None, config: UploadConfig, -) -> tuple[dict[str, str], list[str], list[MetricRecord]]: +) -> tuple[dict[str, str], list[str]]: # upload all resources, then update the resources with the stashed XML texts and resptrs failed_uploads: list[str] = [] try: - id2iri_mapping, failed_uploads, metrics = _upload_resources( + id2iri_mapping, failed_uploads = _upload_resources( resources=resources, imgdir=imgdir, sipi_server=sipi_server, permissions_lookup=permissions_lookup, resclass_name_2_type=resclass_name_2_type, con=con, - metrics=metrics, preprocessing_done=config.preprocessing_done, ) nonapplied_stash = _upload_stash(stash, id2iri_mapping, con, config.verbose) if stash else None @@ -187,7 +169,7 @@ def _upload( save_location=config.save_location, timestamp_str=config.timestamp_str, ) - return id2iri_mapping, failed_uploads, metrics + return id2iri_mapping, failed_uploads def _get_data_from_xml( @@ -267,7 +249,7 @@ def _get_project_permissions_and_classes_from_server( """ # get the project information and project ontology from the server try: - res_inst_factory = try_network_action( + res_inst_factory: ResourceInstanceFactory = try_network_action( lambda: ResourceInstanceFactory(con=server_connection, projident=shortcode) ) except BaseError: @@ -328,9 +310,8 @@ def _upload_resources( permissions_lookup: dict[str, Permissions], resclass_name_2_type: dict[str, type], con: Connection, - metrics: list[MetricRecord], preprocessing_done: bool, -) -> tuple[dict[str, str], list[str], list[MetricRecord]]: +) -> tuple[dict[str, str], list[str]]: """ Iterates through all resources and tries to upload them to DSP. If a temporary exception occurs, the action is repeated until success, @@ -343,7 +324,6 @@ def _upload_resources( permissions_lookup: maps permission strings to Permission objects resclass_name_2_type: maps resource class names to their types con: connection to DSP - metrics: list with the metric records collected until now (gets filled during the upload) preprocessing_done: if set, all multimedia files referenced in the XML file must already be on the server Returns: @@ -352,93 +332,81 @@ def _upload_resources( id2iri_mapping: dict[str, str] = {} failed_uploads: list[str] = [] - bitstream_all_sizes_mb, bitstream_size_total_mb = calculate_multimedia_file_size( - resources=resources, - imgdir=imgdir, - preprocessing_done=preprocessing_done, - ) - bitstream_size_uploaded_mb = 0.0 - for i, resource in enumerate(resources): - resource_start = datetime.now() - filetype = "" - filesize = round(bitstream_all_sizes_mb[i], 1) - bitstream_duration_ms = None - resource_iri = resource.iri if resource.ark: resource_iri = convert_ark_v0_to_resource_iri(resource.ark) - if resource.bitstream: - try: - resource_bitstream = get_sipi_multimedia_information( - resource=resource, - sipi_server=sipi_server, - imgdir=imgdir, - filesize=filesize, - permissions_lookup=permissions_lookup, - metrics=metrics, - preprocessing_done=preprocessing_done, - ) - bitstream_size_uploaded_mb += filesize - except BaseError as err: - pth = resource.bitstream.value - err_msg = err.orig_err_msg_from_api or err.message - msg = f"Unable to upload file '{pth}' of resource '{resource.label}' ({resource.id})" - print(f"WARNING: {msg}: {err_msg}") - logger.warning(msg, exc_info=True) - msg = f"Uploaded file '{pth}' ({bitstream_size_uploaded_mb:.1f} MB / {bitstream_size_total_mb} MB)" - print(msg) - logger.info(msg) - continue - else: - resource_bitstream = None - - # create the resource in DSP - resclass_type = resclass_name_2_type[resource.restype] - properties = resource.get_propvals(id2iri_mapping, permissions_lookup) - try: - resource_instance: ResourceInstance = resclass_type( - con=con, - label=resource.label, - iri=resource_iri, - permissions=permissions_lookup.get(str(resource.permissions)), - creation_date=resource.creation_date, - bitstream=resource_bitstream, - values=properties, - ) - resource_creation_start = datetime.now() - created_resource: ResourceInstance = try_network_action(resource_instance.create) - resource_creation_duration = datetime.now() - resource_creation_start - resource_creation_duration_ms = resource_creation_duration.seconds * 1000 + int( - resource_creation_duration.microseconds / 1000 - ) - metrics.append( - MetricRecord(resource.id, filetype, filesize, "resource creation", resource_creation_duration_ms, "") + bitstream_information = None + if bitstream := resource.bitstream: + bitstream_information = handle_bitstream( + resource=resource, + bitstream=bitstream, + preprocessing_done=preprocessing_done, + permissions_lookup=permissions_lookup, + sipi_server=sipi_server, + imgdir=imgdir, ) - except BaseError as err: - err_msg = err.orig_err_msg_from_api or err.message - print(f"WARNING: Unable to create resource '{resource.label}' ({resource.id}): {err_msg}") - log_msg = ( - f"Unable to create resource '{resource.label}' ({resource.id})\n" - f"Resource details:\n{vars(resource)}\n" - f"Property details:\n" + "\n".join([str(vars(prop)) for prop in resource.properties]) - ) - logger.warning(log_msg, exc_info=True) + if not bitstream_information: + # Note: previously, if the bitstream could not be uploaded, the resource was skipped without adding to + # the failed_uploads list, which I'm not sure was as intended + failed_uploads.append(resource.id) + continue + + res = _create_resource( + res_type=resclass_name_2_type[resource.restype], + resource=resource, + resource_iri=resource_iri, + bitstream_information=bitstream_information, + con=con, + permissions_lookup=permissions_lookup, + id2iri_mapping=id2iri_mapping, + ) + if not res: failed_uploads.append(resource.id) continue - id2iri_mapping[resource.id] = created_resource.iri + iri, label = res + id2iri_mapping[resource.id] = iri - resource_designation = f"'{created_resource.label}' (ID: '{resource.id}', IRI: '{created_resource.iri}')" + resource_designation = f"'{label}' (ID: '{resource.id}', IRI: '{iri}')" print(f"Created resource {i+1}/{len(resources)}: {resource_designation}") logger.info(f"Created resource {i+1}/{len(resources)}: {resource_designation}") - resource_duration = datetime.now() - resource_start - resource_duration_ms = resource_duration.seconds * 1000 + int(resource_duration.microseconds / 1000) - looping_overhead_ms = resource_duration_ms - resource_creation_duration_ms - (bitstream_duration_ms or 0) - metrics.append(MetricRecord(resource.id, filetype, filesize, "looping overhead", looping_overhead_ms, "")) + return id2iri_mapping, failed_uploads + - return id2iri_mapping, failed_uploads, metrics +def _create_resource( + res_type: type, + resource: XMLResource, + resource_iri: str | None, + bitstream_information: dict[str, Any] | None, + con: Connection, + permissions_lookup: dict[str, Permissions], + id2iri_mapping: dict[str, str], +) -> tuple[str, str] | None: + properties = resource.get_propvals(id2iri_mapping, permissions_lookup) + try: + resource_instance: ResourceInstance = res_type( + con=con, + label=resource.label, + iri=resource_iri, + permissions=permissions_lookup.get(str(resource.permissions)), + creation_date=resource.creation_date, + bitstream=bitstream_information, + values=properties, + ) + created_resource: ResourceInstance = try_network_action(resource_instance.create) + return created_resource.iri, created_resource.label + except BaseError as err: + err_msg = err.orig_err_msg_from_api or err.message + print(f"WARNING: Unable to create resource '{resource.label}' ({resource.id}): {err_msg}") + log_msg = ( + f"Unable to create resource '{resource.label}' ({resource.id})\n" + f"Resource details:\n{vars(resource)}\n" + f"Property details:\n" + "\n".join([str(vars(prop)) for prop in resource.properties]) + ) + logger.warning(log_msg, exc_info=True) + return None def _handle_upload_error(