Skip to content

Commit

Permalink
feat: resume an interrupted xmlupload (DEV-3323) (#853)
Browse files Browse the repository at this point in the history
Co-authored-by: Balduin Landolt <33053745+BalduinLandolt@users.noreply.github.com>
  • Loading branch information
jnussbaum and BalduinLandolt committed Mar 7, 2024
1 parent 914acd8 commit 4acf841
Show file tree
Hide file tree
Showing 16 changed files with 398 additions and 139 deletions.
28 changes: 28 additions & 0 deletions docs/cli-commands.md
Expand Up @@ -139,6 +139,7 @@ The following options are available:
- `-i` | `--imgdir` (optional, default: `.`): folder from where the paths in the `<bitstream>` tags are evaluated
- `-V` | `--validate` (optional): validate the XML file without uploading it
- `-v` | `--verbose` (optional): print more information about the progress to the console
- `--interrupt-after=int` (optional): interrupt the upload after `int` resources have been uploaded

Output:

Expand All @@ -164,6 +165,33 @@ dsp-tools xmlupload -s https://api.dasch.swiss -u 'your@email.com' -p 'password'

The expected XML format is [documented here](./file-formats/xml-data-file.md).

If an XML upload is interrupted before it finished (e.g. by hitting `Ctrl + C`),
it can be resumed with the `resume-xmlupload` command.
When an upload is interrupted,
the current state of the upload is saved in a pickle file,
which is stored in `~/.dsp-tools/xmluploads/[server]/resumable/latest.pkl`.
If the upload should be resumed later,
this file must remain in place.



## `resume-xmlupload`

This command resumes a previously interrupted XML upload.

```bash
dsp-tools resume-xmlupload [options]
```

The following options are available:

- `-s` | `--server` (optional, default: `0.0.0.0:3333`): URL of the DSP server where DSP-TOOLS sends the data to
- `-u` | `--user` (optional, default: `root@example.com`): username (e-mail) used for authentication with the DSP-API
- `-p` | `--password` (optional, default: `test`): password used for authentication with the DSP-API

For this command to work,
the pickle file `~/.dsp-tools/xmluploads/[server]/resumable/latest.pkl` must exist.
Currently, only one interrupted upload can be resumed at a time per server.


## `excel2json`
Expand Down
20 changes: 19 additions & 1 deletion src/dsp_tools/cli/call_action.py
Expand Up @@ -13,6 +13,7 @@
from dsp_tools.commands.project.create.project_create_lists import create_lists
from dsp_tools.commands.project.create.project_validate import validate_project
from dsp_tools.commands.project.get import get_project
from dsp_tools.commands.resume_xmlupload.resume_xmlupload import resume_xmlupload
from dsp_tools.commands.rosetta import upload_rosetta
from dsp_tools.commands.start_stack import StackConfiguration
from dsp_tools.commands.start_stack import StackHandler
Expand Down Expand Up @@ -46,6 +47,8 @@ def call_requested_action(args: argparse.Namespace) -> bool:
result = _call_create(args)
case "xmlupload":
result = _call_xmlupload(args)
case "resume-xmlupload":
result = _call_resume_xmlupload(args)
case "excel2json":
result = _call_excel2json(args)
case "excel2lists":
Expand Down Expand Up @@ -144,12 +147,14 @@ def _call_excel2json(args: argparse.Namespace) -> bool:


def _call_ingest_xmlupload(args: argparse.Namespace) -> bool:
interrupt_after = args.interrupt_after if args.interrupt_after > 0 else None
ingest_xmlupload(
xml_file=Path(args.xml_file),
user=args.user,
password=args.password,
dsp_url=args.server,
sipi_url=args.sipi_url,
interrupt_after=interrupt_after,
)
return True

Expand All @@ -158,17 +163,30 @@ def _call_xmlupload(args: argparse.Namespace) -> bool:
if args.validate_only:
return validate_xml(args.xmlfile)
else:
interrupt_after = args.interrupt_after if args.interrupt_after > 0 else None
return xmlupload(
input_file=args.xmlfile,
server=args.server,
user=args.user,
password=args.password,
imgdir=args.imgdir,
sipi=args.sipi_url,
config=UploadConfig(diagnostics=DiagnosticsConfig(verbose=args.verbose)),
config=UploadConfig(
diagnostics=DiagnosticsConfig(verbose=args.verbose),
interrupt_after=interrupt_after,
),
)


def _call_resume_xmlupload(args: argparse.Namespace) -> bool:
return resume_xmlupload(
server=args.server,
user=args.user,
password=args.password,
sipi=args.sipi_url,
)


def _call_get(args: argparse.Namespace) -> bool:
return get_project(
project_identifier=args.project,
Expand Down
19 changes: 19 additions & 0 deletions src/dsp_tools/cli/create_parsers.py
Expand Up @@ -46,6 +46,8 @@ def make_parser(

_add_xmlupload(subparsers, default_dsp_api_url, root_user_email, root_user_pw)

_add_resume_xmlupload(subparsers, default_dsp_api_url, root_user_email, root_user_pw)

_add_ingest_xmlupload(subparsers, default_dsp_api_url, root_user_email, root_user_pw)

_add_excel2json(subparsers)
Expand Down Expand Up @@ -189,6 +191,7 @@ def _add_ingest_xmlupload(
subparser.add_argument("-s", "--server", default=default_dsp_api_url, help=dsp_server_text)
subparser.add_argument("-u", "--user", default=root_user_email, help=username_text)
subparser.add_argument("-p", "--password", default=root_user_pw, help=password_text)
subparser.add_argument("--interrupt-after", type=int, default=-1, help="interrupt after this number of resources")
subparser.add_argument("xml_file", help="path to XML file containing the data")


Expand All @@ -212,9 +215,25 @@ def _add_xmlupload(
"-V", "--validate-only", action="store_true", help="validate the XML file without uploading it"
)
subparser.add_argument("-v", "--verbose", action="store_true", help=verbose_text)
subparser.add_argument("--interrupt-after", type=int, default=-1, help="interrupt after this number of resources")
subparser.add_argument("xmlfile", help="path to the XML file containing the data")


def _add_resume_xmlupload(
subparsers: _SubParsersAction[ArgumentParser],
default_dsp_api_url: str,
root_user_email: str,
root_user_pw: str,
) -> None:
subparser = subparsers.add_parser(name="resume-xmlupload", help="Resume an interrupted xmlupload")
subparser.set_defaults(action="resume-xmlupload")
subparser.add_argument(
"-s", "--server", default=default_dsp_api_url, help="URL of the DSP server where DSP-TOOLS sends the data to"
)
subparser.add_argument("-u", "--user", default=root_user_email, help=username_text)
subparser.add_argument("-p", "--password", default=root_user_pw, help=password_text)


def _add_get(
subparsers: _SubParsersAction[ArgumentParser],
default_dsp_api_url: str,
Expand Down
4 changes: 3 additions & 1 deletion src/dsp_tools/commands/ingest_xmlupload/upload_xml.py
Expand Up @@ -21,6 +21,7 @@ def ingest_xmlupload(
password: str,
dsp_url: str,
sipi_url: str,
interrupt_after: int | None = None,
) -> None:
"""
This function reads an XML file
Expand All @@ -36,6 +37,7 @@ def ingest_xmlupload(
password: the user's password for login into DSP
dsp_url: URL to the DSP server
sipi_url: URL to the Sipi server
interrupt_after: if set, the upload will be interrupted after this number of resources
Raises:
InputError: if any media was not uploaded or uploaded media was not referenced.
Expand Down Expand Up @@ -63,5 +65,5 @@ def ingest_xmlupload(
password=password,
imgdir=".",
sipi=sipi_url,
config=UploadConfig(media_previously_uploaded=True),
config=UploadConfig(media_previously_uploaded=True, interrupt_after=interrupt_after),
)
2 changes: 1 addition & 1 deletion src/dsp_tools/commands/project/create/project_create.py
Expand Up @@ -1045,7 +1045,7 @@ def create_project(
if overall_success:
msg = (
f"Successfully created project '{project_definition.shortname}' "
f"({project_definition.shortcode}) with all its ontologies."
f"({project_definition.shortcode}) with all its ontologies. "
f"There were no problems during the creation process."
)
print(f"========================================================\n{msg}")
Expand Down
Empty file.
66 changes: 66 additions & 0 deletions src/dsp_tools/commands/resume_xmlupload/resume_xmlupload.py
@@ -0,0 +1,66 @@
import pickle

from dsp_tools.commands.xmlupload.iri_resolver import IriResolver
from dsp_tools.commands.xmlupload.list_client import ListClient
from dsp_tools.commands.xmlupload.list_client import ListClientLive
from dsp_tools.commands.xmlupload.models.sipi import Sipi
from dsp_tools.commands.xmlupload.models.upload_state import UploadState
from dsp_tools.commands.xmlupload.project_client import ProjectClient
from dsp_tools.commands.xmlupload.project_client import ProjectClientLive
from dsp_tools.commands.xmlupload.upload_config import UploadConfig
from dsp_tools.commands.xmlupload.xmlupload import cleanup_upload
from dsp_tools.commands.xmlupload.xmlupload import upload_resources
from dsp_tools.utils.connection_live import ConnectionLive


def resume_xmlupload(
server: str,
user: str,
password: str,
sipi: str,
) -> bool:
"""
Resume an interrupted xmlupload.
Args:
server: the DSP server where the data should be imported
user: the user (e-mail) with which the data should be imported
password: the password of the user with which the data should be imported
sipi: the sipi instance to be used
Returns:
True if all resources could be uploaded without errors; False if one of the resources could not be
uploaded because there is an error in it
"""
upload_state = _read_upload_state_from_disk(server)

con = ConnectionLive(server)
con.login(user, password)
sipi_con = ConnectionLive(sipi, token=con.get_token())
sipi_server = Sipi(sipi_con)

project_client: ProjectClient = ProjectClientLive(con, upload_state.config.shortcode)
list_client: ListClient = ListClientLive(con, project_client.get_project_iri())

iri_resolver, failed_uploads, nonapplied_stash = upload_resources(
resources=upload_state.pending_resources,
failed_uploads=upload_state.failed_uploads,
imgdir=".",
sipi_server=sipi_server,
permissions_lookup=upload_state.permissions_lookup,
con=con,
stash=upload_state.pending_stash,
config=upload_state.config,
project_client=project_client,
list_client=list_client,
iri_resolver=IriResolver(upload_state.iri_resolver_lookup),
)

return cleanup_upload(iri_resolver, upload_state.config, failed_uploads, nonapplied_stash)


def _read_upload_state_from_disk(server: str) -> UploadState:
save_location = UploadConfig().with_server_info(server, "foo").diagnostics.save_location
with open(save_location, "rb") as f:
saved_state: UploadState = pickle.load(f) # noqa: S301 (deserialization of untrusted data)
return saved_state
20 changes: 20 additions & 0 deletions src/dsp_tools/commands/xmlupload/models/upload_state.py
@@ -0,0 +1,20 @@
from dataclasses import dataclass

from dsp_tools.commands.xmlupload.models.permission import Permissions
from dsp_tools.commands.xmlupload.models.xmlresource import XMLResource
from dsp_tools.commands.xmlupload.stash.stash_models import Stash
from dsp_tools.commands.xmlupload.upload_config import UploadConfig


@dataclass(frozen=True)
class UploadState:
"""
Save the state of an xmlupload, so that after an interruption, it can be resumed.
"""

pending_resources: list[XMLResource]
failed_uploads: list[str]
iri_resolver_lookup: dict[str, str]
pending_stash: Stash | None
config: UploadConfig
permissions_lookup: dict[str, Permissions]
Expand Up @@ -37,7 +37,7 @@ def upload_stashed_resptr_props(
print(f"{datetime.now()}: Upload the stashed resptrs...")
logger.info("Upload the stashed resptrs...")
not_uploaded: list[LinkValueStashItem] = []
for res_id, stash_items in stashed_resptr_props.res_2_stash_items.items():
for res_id, stash_items in stashed_resptr_props.res_2_stash_items.copy().items():
res_iri = iri_resolver.get(res_id)
if not res_iri:
# resource could not be uploaded to DSP, so the stash cannot be uploaded either
Expand All @@ -51,9 +51,12 @@ def upload_stashed_resptr_props(
target_iri = iri_resolver.get(stash_item.target_id)
if not target_iri:
continue
success = _upload_stash_item(stash_item, res_iri, target_iri, con, context)
if not success:
if _upload_stash_item(stash_item, res_iri, target_iri, con, context):
stashed_resptr_props.res_2_stash_items[res_id].remove(stash_item)
else:
not_uploaded.append(stash_item)
if not stashed_resptr_props.res_2_stash_items[res_id]:
del stashed_resptr_props.res_2_stash_items[res_id]
return LinkValueStash.make(not_uploaded)


Expand Down
Expand Up @@ -110,20 +110,19 @@ def upload_stashed_xml_texts(
stashed_xml_texts: all xml texts that have been stashed
Returns:
nonapplied_xml_texts: the xml texts that could not be uploaded
the xml texts that could not be uploaded
"""

print(f"{datetime.now()}: Upload the stashed XML texts...")
logger.info("Upload the stashed XML texts...")
not_uploaded: list[StandoffStashItem] = []
for res_id, stash_items in stashed_xml_texts.res_2_stash_items.items():
for res_id, stash_items in stashed_xml_texts.res_2_stash_items.copy().items():
res_iri = iri_resolver.get(res_id)
if not res_iri:
# resource could not be uploaded to DSP, so the stash cannot be uploaded either
# no action necessary: this resource will remain in nonapplied_xml_texts,
# no action necessary: this resource will remain in the list of not uploaded stash items,
# which will be handled by the caller
continue
# xmlres: XMLResource = stashed_xml_texts.res_2_xmlres[res_id]
try:
resource_in_triplestore = con.get(f"/v2/resources/{quote_plus(res_iri)}")
except BaseError as err:
Expand All @@ -148,8 +147,12 @@ def upload_stashed_xml_texts(
con=con,
context=context,
)
if not success:
if success:
stashed_xml_texts.res_2_stash_items[res_id].remove(stash_item)
else:
not_uploaded.append(stash_item)
if not stashed_xml_texts.res_2_stash_items[res_id]:
stashed_xml_texts.res_2_stash_items.pop(res_id)
return StandoffStash.make(not_uploaded)


Expand Down
8 changes: 3 additions & 5 deletions src/dsp_tools/commands/xmlupload/upload_config.py
Expand Up @@ -3,7 +3,6 @@
import dataclasses
from dataclasses import dataclass
from dataclasses import field
from datetime import datetime
from pathlib import Path

import regex
Expand Down Expand Up @@ -42,7 +41,6 @@ class DiagnosticsConfig:
verbose: bool = False
server_as_foldername: str = "unknown"
save_location: Path = field(default=Path.home() / ".dsp-tools" / "xmluploads")
timestamp_str: str = field(default=datetime.now().strftime("%Y-%m-%d_%H%M%S"))


@dataclass(frozen=True)
Expand All @@ -53,17 +51,17 @@ class UploadConfig:
server: str = "unknown"
shortcode: str = "unknown"
diagnostics: DiagnosticsConfig = field(default_factory=DiagnosticsConfig)
interrupt_after: int | None = None

def with_server_info(
self,
server: str,
shortcode: str,
onto_name: str,
) -> UploadConfig:
"""Create a new UploadConfig with the given server."""
server_as_foldername = _transform_server_url_to_foldername(server)
save_location = Path.home() / Path(".dsp-tools") / "xmluploads" / server_as_foldername / shortcode / onto_name
save_location.mkdir(parents=True, exist_ok=True)
save_location = Path.home() / Path(".dsp-tools") / "xmluploads" / server_as_foldername / "resumable/latest.pkl"
save_location.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"{save_location=:}")
diagnostics: DiagnosticsConfig = dataclasses.replace(
self.diagnostics,
Expand Down
13 changes: 2 additions & 11 deletions src/dsp_tools/commands/xmlupload/write_diagnostic_info.py
Expand Up @@ -2,10 +2,6 @@

import json
from datetime import datetime
from pathlib import Path
from typing import Any

from lxml import etree

from dsp_tools.commands.xmlupload.upload_config import DiagnosticsConfig
from dsp_tools.utils.create_logger import get_logger
Expand All @@ -15,17 +11,12 @@

def write_id2iri_mapping(
id2iri_mapping: dict[str, str],
input_file: str | Path | etree._ElementTree[Any],
diagnostics: DiagnosticsConfig,
) -> None:
"""Writes the mapping of internal IDs to IRIs to a file."""
timestamp = diagnostics.timestamp_str
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
servername = diagnostics.server_as_foldername
match input_file:
case str() | Path():
id2iri_filename = f"{Path(input_file).stem}_id2iri_mapping_{servername}_{timestamp}.json"
case _:
id2iri_filename = f"{timestamp}_id2iri_mapping_{servername}.json"
id2iri_filename = f"{timestamp}_id2iri_mapping_{servername}.json"
with open(id2iri_filename, "x", encoding="utf-8") as f:
json.dump(id2iri_mapping, f, ensure_ascii=False, indent=4)
print(f"{datetime.now()}: The mapping of internal IDs to IRIs was written to {id2iri_filename}")
Expand Down

0 comments on commit 4acf841

Please sign in to comment.