Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: resume an interrupted xmlupload (DEV-3323) #853

Merged
merged 35 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b2a69ac
edit
jnussbaum Mar 4, 2024
7632753
Merge branch 'main' into wip/DEV-3323-resume-an-interrupted-xmlupload
jnussbaum Mar 4, 2024
4913943
Merge branch 'main' into wip/DEV-3323-resume-an-interrupted-xmlupload
jnussbaum Mar 5, 2024
9964005
Merge branch 'main' into wip/DEV-3323-resume-an-interrupted-xmlupload
jnussbaum Mar 6, 2024
fb476e6
continue
jnussbaum Mar 6, 2024
135322d
tests: write pickle into tmpdir instead of ~/.dsp-tools
jnussbaum Mar 6, 2024
a726a73
refactor
jnussbaum Mar 6, 2024
a62570b
add permissions_lookup to UploadState
jnussbaum Mar 6, 2024
5fbacaa
refactor
jnussbaum Mar 6, 2024
aa2e37e
add input_file to UploadConfig
jnussbaum Mar 6, 2024
65dcc2c
add resume-xmlupload to CLI
jnussbaum Mar 6, 2024
f4fe209
fix argparse
jnussbaum Mar 6, 2024
fa85a0f
add docstring
jnussbaum Mar 6, 2024
271ae45
keep pending_stash and nonapplied_stash separate; nonapplied_stash mu…
jnussbaum Mar 6, 2024
1bb7b04
Merge branch 'main' into wip/DEV-3323-resume-an-interrupted-xmlupload
jnussbaum Mar 6, 2024
4160f14
rename stash to pending_stash
jnussbaum Mar 6, 2024
ce4f085
fix
jnussbaum Mar 7, 2024
473b287
remove timestamp from DiagnosticsConfig
jnussbaum Mar 7, 2024
f37ecb7
fix e2e
jnussbaum Mar 7, 2024
8adca28
typo
jnussbaum Mar 7, 2024
edb1f93
edit
jnussbaum Mar 7, 2024
b7ec4d3
fix parser
jnussbaum Mar 7, 2024
71647dd
fix counter
jnussbaum Mar 7, 2024
61771e9
inject iri_resolver
jnussbaum Mar 7, 2024
de447d6
add documentation
BalduinLandolt Mar 7, 2024
51c77dd
remove input_file from UploadConfig, because etree.ElementTree is not…
jnussbaum Mar 7, 2024
63297f2
fix e2e test
jnussbaum Mar 7, 2024
95ce80c
factor out _tidy_up_resource_creation
jnussbaum Mar 7, 2024
753a5fe
feat: Add `interrupt-after` to `xmlupload` and `resume-xmlupload` (DE…
BalduinLandolt Mar 7, 2024
23cc8f1
edit
jnussbaum Mar 7, 2024
5cb8bcd
fix "All resources have successfully been uploaded" bug
jnussbaum Mar 7, 2024
5f23d0d
remove pickle file at the very end
jnussbaum Mar 7, 2024
20cc16c
add interrupt_after to ingest-xmlupload
jnussbaum Mar 7, 2024
62d77c3
fail fast if the credentials are wrong
jnussbaum Mar 7, 2024
6995d44
improve docs
jnussbaum Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,33 @@ dsp-tools xmlupload -s https://api.dasch.swiss -u 'your@email.com' -p 'password'

The expected XML format is [documented here](./file-formats/xml-data-file.md).

If an XML upload is interrupted before it finished (e.g. by hitting `Ctrl + C`),
it can be resumed with the `resume-xmlupload` command.
When an upload is interrupted,
the current state of the upload is saved in a pickle file,
which is stored in `~/.dsp-tools/xmluploads/[server]/resumable/latest.pkl`.
If the upload should be resumed later,
this file must remain in place.



## `resume-xmlupload`

This command resumes a previously interrupted XML upload.

```bash
dsp-tools resume-xmlupload [options]
```

The following options are available:

- `-s` | `--server` (optional, default: `0.0.0.0:3333`): URL of the DSP server where DSP-TOOLS sends the data to
- `-u` | `--user` (optional, default: `root@example.com`): username (e-mail) used for authentication with the DSP-API
- `-p` | `--password` (optional, default: `test`): password used for authentication with the DSP-API

For this command to work,
the pickle file `~/.dsp-tools/xmluploads/[server]/resumable/latest.pkl` must exist.
Currently, only one interrupted upload can be resumed at a time.
jnussbaum marked this conversation as resolved.
Show resolved Hide resolved


## `excel2json`
Expand Down
15 changes: 13 additions & 2 deletions src/dsp_tools/cli/call_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dsp_tools.commands.project.create.project_create_lists import create_lists
from dsp_tools.commands.project.create.project_validate import validate_project
from dsp_tools.commands.project.get import get_project
from dsp_tools.commands.resume_xmlupload.resume_xmlupload import resume_xmlupload
from dsp_tools.commands.rosetta import upload_rosetta
from dsp_tools.commands.start_stack import StackConfiguration
from dsp_tools.commands.start_stack import StackHandler
Expand Down Expand Up @@ -46,6 +47,8 @@ def call_requested_action(args: argparse.Namespace) -> bool:
result = _call_create(args)
case "xmlupload":
result = _call_xmlupload(args)
case "resume-xmlupload":
result = _call_resume_xmlupload(args)
case "excel2json":
result = _call_excel2json(args)
case "excel2lists":
Expand Down Expand Up @@ -159,16 +162,24 @@ def _call_xmlupload(args: argparse.Namespace) -> bool:
return validate_xml(args.xmlfile)
else:
return xmlupload(
input_file=args.xmlfile,
server=args.server,
user=args.user,
password=args.password,
imgdir=args.imgdir,
sipi=args.sipi_url,
config=UploadConfig(diagnostics=DiagnosticsConfig(verbose=args.verbose)),
config=UploadConfig(input_file=args.xmlfile, diagnostics=DiagnosticsConfig(verbose=args.verbose)),
)


def _call_resume_xmlupload(args: argparse.Namespace) -> bool:
return resume_xmlupload(
server=args.server,
user=args.user,
password=args.password,
sipi=args.sipi_url,
)


def _call_get(args: argparse.Namespace) -> bool:
return get_project(
project_identifier=args.project,
Expand Down
17 changes: 17 additions & 0 deletions src/dsp_tools/cli/create_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def make_parser(

_add_xmlupload(subparsers, default_dsp_api_url, root_user_email, root_user_pw)

_add_resume_xmlupload(subparsers, default_dsp_api_url, root_user_email, root_user_pw)

_add_ingest_xmlupload(subparsers, default_dsp_api_url, root_user_email, root_user_pw)

_add_excel2json(subparsers)
Expand Down Expand Up @@ -215,6 +217,21 @@ def _add_xmlupload(
subparser.add_argument("xmlfile", help="path to the XML file containing the data")


def _add_resume_xmlupload(
subparsers: _SubParsersAction[ArgumentParser],
default_dsp_api_url: str,
root_user_email: str,
root_user_pw: str,
) -> None:
subparser = subparsers.add_parser(name="resume-xmlupload", help="Resume an interrupted xmlupload")
subparser.set_defaults(action="resume-xmlupload")
subparser.add_argument(
"-s", "--server", default=default_dsp_api_url, help="URL of the DSP server where DSP-TOOLS sends the data to"
)
subparser.add_argument("-u", "--user", default=root_user_email, help=username_text)
subparser.add_argument("-p", "--password", default=root_user_pw, help=password_text)


def _add_get(
subparsers: _SubParsersAction[ArgumentParser],
default_dsp_api_url: str,
Expand Down
3 changes: 1 addition & 2 deletions src/dsp_tools/commands/ingest_xmlupload/upload_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ def ingest_xmlupload(
raise InputError(err_msg)

xmlupload(
input_file=xml_tree_replaced,
server=dsp_url,
user=user,
password=password,
imgdir=".",
sipi=sipi_url,
config=UploadConfig(media_previously_uploaded=True),
config=UploadConfig(input_file=xml_tree_replaced, media_previously_uploaded=True),
)
2 changes: 1 addition & 1 deletion src/dsp_tools/commands/project/create/project_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ def create_project(
if overall_success:
msg = (
f"Successfully created project '{project_definition.shortname}' "
f"({project_definition.shortcode}) with all its ontologies."
f"({project_definition.shortcode}) with all its ontologies. "
f"There were no problems during the creation process."
)
print(f"========================================================\n{msg}")
Expand Down
Empty file.
65 changes: 65 additions & 0 deletions src/dsp_tools/commands/resume_xmlupload/resume_xmlupload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import pickle

from dsp_tools.commands.xmlupload.iri_resolver import IriResolver
from dsp_tools.commands.xmlupload.list_client import ListClient
from dsp_tools.commands.xmlupload.list_client import ListClientLive
from dsp_tools.commands.xmlupload.models.sipi import Sipi
from dsp_tools.commands.xmlupload.models.upload_state import UploadState
from dsp_tools.commands.xmlupload.project_client import ProjectClient
from dsp_tools.commands.xmlupload.project_client import ProjectClientLive
from dsp_tools.commands.xmlupload.upload_config import UploadConfig
from dsp_tools.commands.xmlupload.xmlupload import cleanup_upload
from dsp_tools.commands.xmlupload.xmlupload import upload_resources
from dsp_tools.utils.connection_live import ConnectionLive


def resume_xmlupload(
server: str,
user: str,
password: str,
sipi: str,
) -> bool:
"""
Resume an interrupted xmlupload.

Args:
server: the DSP server where the data should be imported
user: the user (e-mail) with which the data should be imported
password: the password of the user with which the data should be imported
sipi: the sipi instance to be used

Returns:
True if all resources could be uploaded without errors; False if one of the resources could not be
uploaded because there is an error in it
"""
upload_state = _read_upload_state_from_disk(server)

con = ConnectionLive(server)
con.login(user, password)
sipi_con = ConnectionLive(sipi, token=con.get_token())
sipi_server = Sipi(sipi_con)

project_client: ProjectClient = ProjectClientLive(con, upload_state.config.shortcode)
list_client: ListClient = ListClientLive(con, project_client.get_project_iri())

iri_resolver, failed_uploads, nonapplied_stash = upload_resources(
resources=upload_state.pending_resources,
imgdir=".",
sipi_server=sipi_server,
permissions_lookup=upload_state.permissions_lookup,
con=con,
stash=upload_state.pending_stash,
config=upload_state.config,
project_client=project_client,
list_client=list_client,
iri_resolver=IriResolver(upload_state.iri_resolver_lookup),
)

return cleanup_upload(iri_resolver, upload_state.config, failed_uploads, nonapplied_stash)


def _read_upload_state_from_disk(server: str) -> UploadState:
save_location = UploadConfig().with_server_info(server, "foo").diagnostics.save_location
with open(save_location, "rb") as f:
saved_state: UploadState = pickle.load(f) # noqa: S301 (deserialization of untrusted data)
return saved_state
3 changes: 1 addition & 2 deletions src/dsp_tools/commands/rosetta.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,12 @@ def _upload_xml(rosetta_folder: Path) -> bool:
"""
print("Execute 'dsp-tools xmlupload rosetta.xml'...")
return xmlupload(
input_file=rosetta_folder / "rosetta.xml",
server="http://0.0.0.0:3333",
user="root@example.com",
password="test",
imgdir=str(rosetta_folder),
sipi="http://0.0.0.0:1024",
config=UploadConfig(),
config=UploadConfig(input_file=rosetta_folder / "rosetta.xml"),
)


Expand Down
19 changes: 19 additions & 0 deletions src/dsp_tools/commands/xmlupload/models/upload_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dataclasses import dataclass

from dsp_tools.commands.xmlupload.models.permission import Permissions
from dsp_tools.commands.xmlupload.models.xmlresource import XMLResource
from dsp_tools.commands.xmlupload.stash.stash_models import Stash
from dsp_tools.commands.xmlupload.upload_config import UploadConfig


@dataclass(frozen=True)
class UploadState:
"""
Save the state of an xmlupload, so that after an interruption, it can be resumed.
"""

pending_resources: list[XMLResource]
iri_resolver_lookup: dict[str, str]
pending_stash: Stash | None
config: UploadConfig
permissions_lookup: dict[str, Permissions]
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def upload_stashed_resptr_props(
print(f"{datetime.now()}: Upload the stashed resptrs...")
logger.info("Upload the stashed resptrs...")
not_uploaded: list[LinkValueStashItem] = []
for res_id, stash_items in stashed_resptr_props.res_2_stash_items.items():
for res_id, stash_items in stashed_resptr_props.res_2_stash_items.copy().items():
jnussbaum marked this conversation as resolved.
Show resolved Hide resolved
res_iri = iri_resolver.get(res_id)
if not res_iri:
# resource could not be uploaded to DSP, so the stash cannot be uploaded either
Expand All @@ -51,9 +51,12 @@ def upload_stashed_resptr_props(
target_iri = iri_resolver.get(stash_item.target_id)
if not target_iri:
continue
success = _upload_stash_item(stash_item, res_iri, target_iri, con, context)
if not success:
if _upload_stash_item(stash_item, res_iri, target_iri, con, context):
stashed_resptr_props.res_2_stash_items[res_id].remove(stash_item)
else:
not_uploaded.append(stash_item)
if not stashed_resptr_props.res_2_stash_items[res_id]:
del stashed_resptr_props.res_2_stash_items[res_id]
return LinkValueStash.make(not_uploaded)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,19 @@ def upload_stashed_xml_texts(
stashed_xml_texts: all xml texts that have been stashed

Returns:
nonapplied_xml_texts: the xml texts that could not be uploaded
the xml texts that could not be uploaded
"""

print(f"{datetime.now()}: Upload the stashed XML texts...")
logger.info("Upload the stashed XML texts...")
not_uploaded: list[StandoffStashItem] = []
for res_id, stash_items in stashed_xml_texts.res_2_stash_items.items():
for res_id, stash_items in stashed_xml_texts.res_2_stash_items.copy().items():
res_iri = iri_resolver.get(res_id)
if not res_iri:
# resource could not be uploaded to DSP, so the stash cannot be uploaded either
# no action necessary: this resource will remain in nonapplied_xml_texts,
# no action necessary: this resource will remain in the list of not uploaded stash items,
# which will be handled by the caller
continue
# xmlres: XMLResource = stashed_xml_texts.res_2_xmlres[res_id]
try:
resource_in_triplestore = con.get(f"/v2/resources/{quote_plus(res_iri)}")
except BaseError as err:
Expand All @@ -148,8 +147,12 @@ def upload_stashed_xml_texts(
con=con,
context=context,
)
if not success:
if success:
stashed_xml_texts.res_2_stash_items[res_id].remove(stash_item)
else:
not_uploaded.append(stash_item)
if not stashed_xml_texts.res_2_stash_items[res_id]:
stashed_xml_texts.res_2_stash_items.pop(res_id)
return StandoffStash.make(not_uploaded)


Expand Down
10 changes: 5 additions & 5 deletions src/dsp_tools/commands/xmlupload/upload_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import dataclasses
from dataclasses import dataclass
from dataclasses import field
from datetime import datetime
from pathlib import Path
from typing import Any

import regex
from lxml import etree

from dsp_tools.utils.create_logger import get_logger

Expand Down Expand Up @@ -42,7 +43,6 @@ class DiagnosticsConfig:
verbose: bool = False
server_as_foldername: str = "unknown"
save_location: Path = field(default=Path.home() / ".dsp-tools" / "xmluploads")
timestamp_str: str = field(default=datetime.now().strftime("%Y-%m-%d_%H%M%S"))


@dataclass(frozen=True)
Expand All @@ -52,18 +52,18 @@ class UploadConfig:
media_previously_uploaded: bool = False
server: str = "unknown"
shortcode: str = "unknown"
input_file: str | Path | etree._ElementTree[Any] = "unknown"
diagnostics: DiagnosticsConfig = field(default_factory=DiagnosticsConfig)

def with_server_info(
self,
server: str,
shortcode: str,
onto_name: str,
) -> UploadConfig:
"""Create a new UploadConfig with the given server."""
server_as_foldername = _transform_server_url_to_foldername(server)
save_location = Path.home() / Path(".dsp-tools") / "xmluploads" / server_as_foldername / shortcode / onto_name
save_location.mkdir(parents=True, exist_ok=True)
save_location = Path.home() / Path(".dsp-tools") / "xmluploads" / server_as_foldername / "resumable/latest.pkl"
save_location.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"{save_location=:}")
diagnostics: DiagnosticsConfig = dataclasses.replace(
self.diagnostics,
Expand Down
2 changes: 1 addition & 1 deletion src/dsp_tools/commands/xmlupload/write_diagnostic_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def write_id2iri_mapping(
diagnostics: DiagnosticsConfig,
) -> None:
"""Writes the mapping of internal IDs to IRIs to a file."""
timestamp = diagnostics.timestamp_str
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
servername = diagnostics.server_as_foldername
match input_file:
case str() | Path():
Expand Down