Skip to content

Commit

Permalink
refactor: modularize xmlupload (DEV-2813) (#568)
Browse files Browse the repository at this point in the history
  • Loading branch information
BalduinLandolt committed Oct 14, 2023
1 parent b38c2d3 commit 8026fcb
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 217 deletions.
10 changes: 6 additions & 4 deletions src/dsp_tools/cli.py
Expand Up @@ -27,6 +27,7 @@
from dsp_tools.utils.rosetta import upload_rosetta
from dsp_tools.utils.shared import validate_xml_against_schema
from dsp_tools.utils.stack_handling import StackConfiguration, StackHandler
from dsp_tools.utils.xmlupload.upload_config import UploadConfig
from dsp_tools.utils.xmlupload.xmlupload import xmlupload

logger = get_logger(__name__)
Expand Down Expand Up @@ -464,10 +465,11 @@ def _call_requested_action(args: argparse.Namespace) -> bool:
password=args.password,
imgdir=args.imgdir,
sipi=args.sipi_url,
verbose=args.verbose,
dump=args.dump,
save_metrics=args.metrics,
preprocessing_done=False,
config=UploadConfig(
verbose=args.verbose,
dump=args.dump,
save_metrics=args.metrics,
),
)
elif args.action == "process-files":
success = process_files(
Expand Down
5 changes: 2 additions & 3 deletions src/dsp_tools/fast_xmlupload/upload_xml.py
Expand Up @@ -8,6 +8,7 @@
from dsp_tools.fast_xmlupload.upload_files import get_pkl_files
from dsp_tools.models.exceptions import UserError
from dsp_tools.utils.create_logger import get_logger
from dsp_tools.utils.xmlupload.upload_config import UploadConfig
from dsp_tools.utils.xmlupload.xmlupload import xmlupload

logger = get_logger(__name__)
Expand Down Expand Up @@ -117,9 +118,7 @@ def fast_xmlupload(
password=password,
imgdir=".",
sipi=sipi_url,
verbose=False,
save_metrics=False,
preprocessing_done=True,
config=UploadConfig(preprocessing_done=True),
)

end_time = datetime.now()
Expand Down
5 changes: 2 additions & 3 deletions src/dsp_tools/utils/rosetta.py
Expand Up @@ -4,6 +4,7 @@

from dsp_tools.models.exceptions import UserError
from dsp_tools.utils.project_create import create_project
from dsp_tools.utils.xmlupload.upload_config import UploadConfig
from dsp_tools.utils.xmlupload.xmlupload import xmlupload


Expand Down Expand Up @@ -96,9 +97,7 @@ def _upload_xml(rosetta_folder: Path) -> bool:
password="test",
imgdir=str(rosetta_folder),
sipi="http://0.0.0.0:1024",
verbose=False,
save_metrics=False,
preprocessing_done=False,
config=UploadConfig(),
)
return success

Expand Down
64 changes: 64 additions & 0 deletions src/dsp_tools/utils/xmlupload/upload_config.py
@@ -0,0 +1,64 @@
from __future__ import annotations

import dataclasses
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path

import regex

from dsp_tools.utils.create_logger import get_logger

logger = get_logger(__name__)


def _transform_server_url_to_foldername(server: str) -> str:
"""
Take the server URL and transform it so that it can be used as foldername.
Args:
server: server, e.g. "https://api.test.dasch.swiss/" or "http://0.0.0.0:3333"
Returns:
simplified version, e.g. "test.dasch.swiss" or "localhost"
"""
server_substitutions = {
r"https?://": "",
r"^api\.": "",
r":\d{2,5}/?$": "",
r"/$": "",
r"0.0.0.0": "localhost",
}
for pattern, repl in server_substitutions.items():
server = regex.sub(pattern, repl, server)
return server


@dataclass(frozen=True)
class UploadConfig:
"""Configuration for the upload process."""

verbose: bool = False
dump: bool = False
save_metrics: bool = False
preprocessing_done: bool = False
server_as_foldername: str = field(default="unknown")
save_location: Path = field(default=Path.home() / ".dsp-tools" / "xmluploads")
timestamp_str: str = field(default=datetime.now().strftime("%Y-%m-%d_%H%M%S"))

def with_specific_save_location(
self,
server: str,
shortcode: str,
onto_name: str,
) -> UploadConfig:
"""Create a new UploadConfig with the given server."""
server_as_foldername = _transform_server_url_to_foldername(server)
save_location = Path.home() / Path(".dsp-tools") / "xmluploads" / server_as_foldername / shortcode / onto_name
save_location.mkdir(parents=True, exist_ok=True)
logger.info(f"save_location='{save_location}'")
return dataclasses.replace(
self,
save_location=save_location,
server_as_foldername=server_as_foldername,
)
125 changes: 31 additions & 94 deletions src/dsp_tools/utils/xmlupload/write_diagnostic_info.py
Expand Up @@ -3,117 +3,54 @@
import json
import os
from collections import namedtuple
from datetime import datetime
from pathlib import Path
from typing import Any, Optional, Union
from typing import Any

import pandas as pd
import regex
from lxml import etree

from dsp_tools.utils.create_logger import get_logger
from dsp_tools.utils.xmlupload.upload_config import UploadConfig

MetricRecord = namedtuple("MetricRecord", ["res_id", "filetype", "filesize_mb", "event", "duration_ms", "mb_per_sec"])


logger = get_logger(__name__)


def _transform_server_url_to_foldername(server: str) -> str:
"""
Take the server URL and transform it so that it can be used as foldername.
Args:
server: server, e.g. "https://api.test.dasch.swiss/" or "http://0.0.0.0:3333"
Returns:
simplified version, e.g. "test.dasch.swiss" or "localhost"
"""
server_substitutions = {
r"https?://": "",
r"^api\.": "",
r":\d{2,5}/?$": "",
r"/$": "",
r"0.0.0.0": "localhost",
}
for pattern, repl in server_substitutions.items():
server = regex.sub(pattern, repl, server)
return server


def determine_save_location_of_diagnostic_info(
server: str,
proj_shortcode: str,
onto_name: str,
) -> tuple[Path, str, str]:
"""
Determine the save location for diagnostic info that will be used if the xmlupload is interrupted.
They are going to be stored in ~/.dsp-tools/xmluploads/server/shortcode/ontoname.
This path is computed and created.
Args:
server: URL of the DSP server where the data is uploaded to
proj_shortcode: 4-digit hexadecimal shortcode of the project
onto_name: name of the ontology that the data belongs to
Returns:
a tuple consisting of the absolute full path to the storage location,
a version of the server URL that can be used as foldername,
and the timestamp string that can be used as component of file names
(so that different diagnostic files of the same xmlupload have the same timestamp)
"""
server_as_foldername = _transform_server_url_to_foldername(server)
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H%M%S")
save_location = Path.home() / Path(".dsp-tools") / "xmluploads" / server_as_foldername / proj_shortcode / onto_name
save_location.mkdir(parents=True, exist_ok=True)
return save_location, server_as_foldername, timestamp_str


def write_id2iri_mapping_and_metrics(
def write_id2iri_mapping(
id2iri_mapping: dict[str, str],
metrics: Optional[list[MetricRecord]],
failed_uploads: list[str],
input_file: Union[str, Path, etree._ElementTree[Any]],
input_file: str | Path | etree._ElementTree[Any],
timestamp_str: str,
server_as_foldername: str,
) -> bool:
"""
Writes the id2iri mapping and the metrics into the current working directory,
and prints the failed uploads (if applicable).
Args:
id2iri_mapping: mapping of ids from the XML file to IRIs in DSP (initially empty, gets filled during the upload)
metrics: list with the metric records collected until now (gets filled during the upload)
failed_uploads: ids of resources that could not be uploaded (initially empty, gets filled during the upload)
input_file: path to the XML file or parsed ElementTree
timestamp_str: timestamp for the name of the log files
server_as_foldername: simplified version of the server URL that can be used as folder name
Returns:
True if there are no failed_uploads, False otherwise
"""
# determine names of files
if isinstance(input_file, (str, Path)):
id2iri_filename = f"{Path(input_file).stem}_id2iri_mapping_{timestamp_str}.json"
metrics_filename = f"{timestamp_str}_metrics_{server_as_foldername}_{Path(input_file).stem}.csv"
else:
id2iri_filename = f"{timestamp_str}_id2iri_mapping.json"
metrics_filename = f"{timestamp_str}_metrics_{server_as_foldername}.csv"

# write files and print info
success = True
) -> None:
"""Writes the mapping of internal IDs to IRIs to a file."""
match input_file:
case str() | Path():
id2iri_filename = f"{Path(input_file).stem}_id2iri_mapping_{timestamp_str}.json"
case _:
id2iri_filename = f"{timestamp_str}_id2iri_mapping.json"
with open(id2iri_filename, "x", encoding="utf-8") as f:
json.dump(id2iri_mapping, f, ensure_ascii=False, indent=4)
print(f"The mapping of internal IDs to IRIs was written to {id2iri_filename}")
logger.info(f"The mapping of internal IDs to IRIs was written to {id2iri_filename}")
if failed_uploads:
print(f"\nWARNING: Could not upload the following resources: {failed_uploads}\n")
logger.warning(f"Could not upload the following resources: {failed_uploads}")
success = False
if metrics:
os.makedirs("metrics", exist_ok=True)
df = pd.DataFrame(metrics)
df.to_csv(f"metrics/{metrics_filename}")
print(f"Total time of xmlupload: {sum(int(record.duration_ms) for record in metrics) / 1000:.1f} seconds")

return success

def write_metrics(
metrics: list[MetricRecord],
input_file: str | Path | etree._ElementTree[Any],
config: UploadConfig,
) -> None:
"""Writes the metrics to a file."""
match input_file:
case str() | Path():
metrics_filename = (
f"{config.timestamp_str}_metrics_{config.server_as_foldername}_{Path(input_file).stem}.csv"
)
case _:
metrics_filename = f"{config.timestamp_str}_metrics_{config.server_as_foldername}.csv"

# write files and print info
os.makedirs("metrics", exist_ok=True)
df = pd.DataFrame(metrics)
df.to_csv(f"metrics/{metrics_filename}")
print(f"Total time of xmlupload: {sum(int(record.duration_ms) for record in metrics) / 1000:.1f} seconds")

0 comments on commit 8026fcb

Please sign in to comment.