Skip to content

Commit

Permalink
feat(resume-xmlupload): add option to skip the first resource (DEV-3412
Browse files Browse the repository at this point in the history
…) (#878)
  • Loading branch information
Nora-Olivia-Ammann committed Mar 13, 2024
1 parent c543274 commit a53785f
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 16 deletions.
2 changes: 2 additions & 0 deletions docs/cli-commands.md
Expand Up @@ -191,6 +191,8 @@ The following options are available:
- `-s` | `--server` (optional, default: `0.0.0.0:3333`): URL of the DSP server where DSP-TOOLS sends the data to
- `-u` | `--user` (optional, default: `root@example.com`): username (e-mail) used for authentication with the DSP-API
- `-p` | `--password` (optional, default: `test`): password used for authentication with the DSP-API
- `--skip-first-resource` (optional): the `xmlupload` should skip the first saved resource.
This is not implemented for stashed links.

For this command to work,
the pickle file `~/.dsp-tools/xmluploads/[server]/resumable/latest.pkl` must exist.
Expand Down
1 change: 1 addition & 0 deletions src/dsp_tools/cli/call_action.py
Expand Up @@ -180,6 +180,7 @@ def _call_resume_xmlupload(args: argparse.Namespace) -> bool:
user=args.user,
password=args.password,
sipi=args.sipi_url,
skip_first_resource=args.skip_first_resource,
)


Expand Down
7 changes: 7 additions & 0 deletions src/dsp_tools/cli/create_parsers.py
Expand Up @@ -231,6 +231,13 @@ def _add_resume_xmlupload(
)
subparser.add_argument("-u", "--user", default=root_user_email, help=username_text)
subparser.add_argument("-p", "--password", default=root_user_pw, help=password_text)
skip_text = (
"In case the last resource was successfully uploaded to the DSP-API, "
"but DSP-TOOLS did not get a confirmation from the DSP-API, "
"the resource will remain in the pending upload list.\n"
"If this flag is set, the first resource in the list is skipped."
)
subparser.add_argument("--skip-first-resource", help=skip_text)


def _add_get(
Expand Down
30 changes: 24 additions & 6 deletions src/dsp_tools/commands/resume_xmlupload/resume_xmlupload.py
@@ -1,4 +1,9 @@
import pickle
import sys
from copy import deepcopy
from dataclasses import replace

from termcolor import colored

from dsp_tools.commands.xmlupload.iri_resolver import IriResolver
from dsp_tools.commands.xmlupload.list_client import ListClient
Expand All @@ -16,12 +21,7 @@
logger = get_logger(__name__)


def resume_xmlupload(
server: str,
user: str,
password: str,
sipi: str,
) -> bool:
def resume_xmlupload(server: str, user: str, password: str, sipi: str, skip_first_resource: bool = False) -> bool:
"""
Resume an interrupted xmlupload.
Expand All @@ -30,12 +30,30 @@ def resume_xmlupload(
user: the user (e-mail) with which the data should be imported
password: the password of the user with which the data should be imported
sipi: the sipi instance to be used
skip_first_resource: if this flag is set, the first resource of the pending resources is removed
Returns:
True if all resources could be uploaded without errors; False if one of the resources could not be
uploaded because there is an error in it
"""
upload_state = _read_upload_state_from_disk(server)
if skip_first_resource:
if len(upload_state.pending_resources) > 0:
new_pending = deepcopy(upload_state.pending_resources)
new_pending.pop(0)
upload_state = replace(upload_state, pending_resources=new_pending)
else:
msg = (
"The list of pending resources is empty.\n"
"It is not yet possible to skip the first item of the stashed properties.\n"
"Do you want to continue with the upload of the stashed properties anyway? [y/n]"
)
resp = None
while resp not in ["y", "n"]:
resp = input(colored(msg, color="red"))
if resp == "n":
sys.exit(1)

previous_successful = len(upload_state.iri_resolver_lookup)
previous_failed = len(upload_state.failed_uploads)
previous_total = previous_successful + previous_failed
Expand Down
17 changes: 17 additions & 0 deletions src/dsp_tools/commands/xmlupload/xmlupload.py
Expand Up @@ -32,6 +32,7 @@
from dsp_tools.commands.xmlupload.upload_config import UploadConfig
from dsp_tools.commands.xmlupload.write_diagnostic_info import write_id2iri_mapping
from dsp_tools.models.exceptions import BaseError
from dsp_tools.models.exceptions import PermanentConnectionError
from dsp_tools.models.exceptions import UserError
from dsp_tools.models.exceptions import XmlUploadInterruptedError
from dsp_tools.models.projectContext import ProjectContext
Expand Down Expand Up @@ -426,6 +427,17 @@ def _upload_resources(
# resource creation succeeded: update the iri_resolver and remove the resource from the list
iri, label = res
_tidy_up_resource_creation(iri, label, iri_resolver, resource, current_res, total_res) # type: ignore[arg-type]
except PermanentConnectionError:
msg = (
f"There was a timeout while trying to create resource '{resource.res_id}'.\n"
f"It is unclear if the resource '{resource.res_id}' was created successfully or not.\n"
f"Please check manually in the DSP-APP or DB.\n"
f"In case of successful creation, call 'resume-xmlupload' with the flag "
f"'--skip-first-resource' to prevent duplication.\n"
f"If not, a normal 'resume-xmlupload' can be started."
)
logger.error(msg)
raise XmlUploadInterruptedError(msg)
except BaseException as err:
if res and res[0]:
# creation succeeded, but during tidy up, a Keyboard Interrupt occurred. tidy up again before escalating
Expand Down Expand Up @@ -462,6 +474,11 @@ def _create_resource(
) -> tuple[str, str] | tuple[None, None]:
try:
return resource_create_client.create_resource(resource, bitstream_information)
except PermanentConnectionError as err:
# The following block catches all exceptions and handles them in a generic way.
# Because the calling function needs to know that this was a PermanentConnectionError, we need to catch and
# raise it here.
raise err
except Exception as err:
msg = f"{datetime.now()}: WARNING: Unable to create resource '{resource.label}' ({resource.res_id})"
if isinstance(err, BaseError):
Expand Down
11 changes: 8 additions & 3 deletions src/dsp_tools/utils/connection_live.py
Expand Up @@ -268,9 +268,8 @@ def _try_network_action(self, params: RequestParameters) -> Response:
try:
self._log_request(params)
response = action()
except (TimeoutError, ReadTimeout):
self._log_and_sleep(reason="Timeout Error", retry_counter=i, exc_info=True)
continue
except (TimeoutError, ReadTimeout) as err:
self._log_and_raise_timeouts(err)
except (ConnectionError, RequestException):
self._renew_session()
self._log_and_sleep(reason="Connection Error raised", retry_counter=i, exc_info=True)
Expand Down Expand Up @@ -311,6 +310,12 @@ def _log_and_sleep(self, reason: str, retry_counter: int, exc_info: bool) -> Non
logger.error(f"{msg} ({retry_counter=:})", exc_info=exc_info)
time.sleep(2**retry_counter)

def _log_and_raise_timeouts(self, error: TimeoutError | ReadTimeout) -> None:
msg = f"A '{error.__class__.__name__}' occurred during the connection to the DSP server."
print(f"{datetime.now()}: {msg}")
logger.exception(msg)
raise PermanentConnectionError(msg) from None

def _log_response(self, response: Response) -> None:
dumpobj: dict[str, Any] = {
"status_code": response.status_code,
Expand Down
12 changes: 5 additions & 7 deletions test/unittests/utils/test_connection_live.py
Expand Up @@ -10,6 +10,7 @@
from unittest.mock import patch

import pytest
import regex
from requests import ReadTimeout
from requests import RequestException

Expand Down Expand Up @@ -260,18 +261,15 @@ def test_try_network_action() -> None:
def test_try_network_action_timeout_error(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("DSP_TOOLS_TESTING", raising=False) # in CI, this variable suppresses the retrying mechanism
con = ConnectionLive("http://example.com/")
responses = (TimeoutError(), TimeoutError(), ReadTimeout(), ReadTimeout(), Mock(status_code=200))
responses = (TimeoutError(), ReadTimeout(), Mock(status_code=200))
session_mock = SessionMock(responses)
con.session = session_mock # type: ignore[assignment]
con._log_request = Mock()
con._log_response = Mock()
params = RequestParameters(method="GET", url="http://example.com/", timeout=1)
with patch("dsp_tools.utils.connection_live.time.sleep") as sleep_mock:
response = con._try_network_action(params)
assert [x.args[0] for x in sleep_mock.call_args_list] == [1, 2, 4, 8]
assert [x.args[0] for x in con._log_request.call_args_list] == [params] * len(session_mock.responses)
con._log_response.assert_called_once_with(session_mock.responses[-1])
assert response == session_mock.responses[-1]
expected_msg = regex.escape("A 'TimeoutError' occurred during the connection to the DSP server.")
with pytest.raises(PermanentConnectionError, match=expected_msg):
con._try_network_action(params)


def test_try_network_action_connection_error(monkeypatch: pytest.MonkeyPatch) -> None:
Expand Down

0 comments on commit a53785f

Please sign in to comment.