Skip to content

Commit

Permalink
fix(upload-files, fast-xmlupload): handle multiple pickle files (DEV-…
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum committed Aug 7, 2023
1 parent 5e668f0 commit 98f0b97
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 288 deletions.
12 changes: 9 additions & 3 deletions docs/internal/fast-xmlupload.md
Expand Up @@ -86,6 +86,7 @@ e.g. `multimedia/dog.jpg` -> `tmp/0b/22/0b22570d-515f-4c3d-a6af-e42b458e7b2b.jp2
**In this case, you need to restart the command several times, until the exit code is 0.**
**Only then, all files are processed.**
**Unexpected errors result in exit code 1.**
**If this batch splitting happens, every run produces a new pickle file.**

You can orchestrate this with a shell script, e.g.:

Expand All @@ -102,18 +103,18 @@ if [ $exit_code -ne 0 ]; then
fi
```


## 3. `dsp-tools upload-files`

After all files are processed, the upload step can be started.


```bash
dsp-tools upload-files --pkl-file=processing_result_20230414_152810.pkl --processed-dir=tmp
dsp-tools upload-files --processed-dir=tmp
```

The following options are available:

- `-f` | `--pkl-file` (mandatory): path to the pickle file that was written by the processing step
- `-d` | `--processed-dir` (mandatory): path to the directory where the processed files are located
(same as `--output-dir` in the processing step)
- `-n` | `--nthreads` (optional, default 4): number of threads to use for uploading
Expand All @@ -122,6 +123,9 @@ The following options are available:
- `-u` | `--user` (optional, default: `root@example.com`): username (e-mail) used for authentication with the DSP-API
- `-p` | `--password` (optional, default: `test`): password used for authentication with the DSP-API

This command will collect all pickle files in the current working directory
that were created by the `process-files` command.


## 4. `dsp-tools fast-xmlupload`

Expand All @@ -131,7 +135,9 @@ dsp-tools fast-xmlupload --pkl-file=processing_result_20230414_152810.pkl data.x

The following options are available:

- `-f` | `--pkl-file` (mandatory): path to the pickle file that was written by the processing step
- `-s` | `--server` (optional, default: `0.0.0.0:3333`): URL of the DSP server
- `-u` | `--user` (optional, default: `root@example.com`): username (e-mail) used for authentication with the DSP-API
- `-p` | `--password` (optional, default: `test`): password used for authentication with the DSP-API

This command will collect all pickle files in the current working directory
that were created by the `process-files` command.
4 changes: 0 additions & 4 deletions src/dsp_tools/dsp_tools.py
Expand Up @@ -144,7 +144,6 @@ def _make_parser(
help="For internal use only: upload already processed files",
)
parser_upload_files.set_defaults(action="upload-files")
parser_upload_files.add_argument("-f", "--pkl-file", help="path to pickle file written by 'process-files'")
parser_upload_files.add_argument("-d", "--processed-dir", help="path to the directory with the processed files")
parser_upload_files.add_argument("-n", "--nthreads", type=int, default=4, help="number of threads to use")
parser_upload_files.add_argument("-s", "--server", default=default_dsp_api_url, help=dsp_server_text)
Expand All @@ -157,7 +156,6 @@ def _make_parser(
help="For internal use only: create resources with already uploaded files",
)
parser_fast_xmlupload_files.set_defaults(action="fast-xmlupload")
parser_fast_xmlupload_files.add_argument("-f", "--pkl-file", help="path to pickle file written by 'process-files'")
parser_fast_xmlupload_files.add_argument("-s", "--server", default=default_dsp_api_url, help=dsp_server_text)
parser_fast_xmlupload_files.add_argument("-u", "--user", default=root_user_email, help=username_text)
parser_fast_xmlupload_files.add_argument("-p", "--password", default=root_user_pw, help=password_text)
Expand Down Expand Up @@ -476,7 +474,6 @@ def _call_requested_action(args: argparse.Namespace) -> bool:
)
elif args.action == "upload-files":
success = upload_files(
pkl_file=args.pkl_file,
dir_with_processed_files=args.processed_dir,
nthreads=args.nthreads,
user=args.user,
Expand All @@ -487,7 +484,6 @@ def _call_requested_action(args: argparse.Namespace) -> bool:
elif args.action == "fast-xmlupload":
success = fast_xmlupload(
xml_file=args.xml_file,
pkl_file=args.pkl_file,
user=args.user,
password=args.password,
dsp_url=args.server,
Expand Down
3 changes: 2 additions & 1 deletion src/dsp_tools/fast_xmlupload/process_files.py
Expand Up @@ -144,7 +144,8 @@ def _write_result_to_pkl_file(processed_files: list[tuple[Path, Optional[Path]]]
Raises:
UserError if the file could not be written
"""
filename = f"processing_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pkl"
filename = Path(f"processing_result_{datetime.now().strftime('%Y-%m-%d_%H.%M.%S.%f')}.pkl")

try:
with open(filename, "wb") as pkl_file:
pickle.dump(processed_files, pkl_file)
Expand Down
128 changes: 64 additions & 64 deletions src/dsp_tools/fast_xmlupload/upload_files.py
Expand Up @@ -9,13 +9,73 @@
from regex import regex

from dsp_tools.models.connection import Connection
from dsp_tools.models.exceptions import BaseError
from dsp_tools.models.exceptions import UserError
from dsp_tools.utils.logging import get_logger
from dsp_tools.utils.shared import login

logger = get_logger(__name__)


def _check_processed_dir(dir_with_processed_files: str) -> Path:
"""
Checks the input parameter provided by the user and transforms it into a Path.
Args:
processed_dir: the directory where the processed files have been written to
Raises:
UserError: if the directory does not exist
Returns:
Path object of the directory
"""
dir_with_processed_files_path = Path(dir_with_processed_files)
if not dir_with_processed_files_path.is_dir():
raise UserError(f"The folder with the processed files is invalid: {dir_with_processed_files}")
return dir_with_processed_files_path


def get_pkl_files() -> list[Path]:
"""
Get all pickle files starting with "processing_result_" in the current working directory.
Raises:
UserError: If no pickle file was found
Returns:
list of pickle files
"""
pkl_file_paths = [Path(x) for x in glob.glob("processing_result_*.pkl")]
if len(pkl_file_paths) == 0:
raise UserError("No pickle file found. Please run the processing step first.")
return pkl_file_paths


def _get_paths_from_pkl_files(pkl_files: list[Path]) -> list[Path]:
"""
Read the pickle file(s) returned by the processing step.
Args:
pkl_files: pickle file(s) returned by the processing step
Returns:
list of file paths of the processed files (uuid filenames)
"""
orig_paths_2_processed_paths: list[tuple[Path, Optional[Path]]] = []
for pkl_file in pkl_files:
orig_paths_2_processed_paths.extend(pickle.loads(pkl_file.read_bytes()))

processed_paths: list[Path] = []
for orig_path, processed_path in orig_paths_2_processed_paths:
if processed_path:
processed_paths.append(processed_path)
else:
print(f"{datetime.now()}: WARNING: There is no processed file for {orig_path}")
logger.warning(f"There is no processed file for {orig_path}")

return processed_paths


def _get_upload_candidates(
dir_with_processed_files: Path,
internal_filename_of_processed_file: Path,
Expand Down Expand Up @@ -189,57 +249,6 @@ def _upload_file(
return internal_filename_of_processed_file, True


def _get_paths_from_pkl_file(pkl_file: Path) -> list[Path]:
"""
Read the pickle file returned by the processing step.
Args:
pkl_file: pickle file returned by the processing step
Returns:
list of uuid file paths
"""
with open(pkl_file, "rb") as file:
orig_paths_2_processed_paths: list[tuple[Path, Optional[Path]]] = pickle.load(file)

processed_paths: list[Path] = []
for orig_processed in orig_paths_2_processed_paths:
if orig_processed[1]:
processed_paths.append(orig_processed[1])
else:
print(f"{datetime.now()}: WARNING: There is no processed file for {orig_processed[0]}")
logger.warning(f"There is no processed file for {orig_processed[0]}")

return processed_paths


def _check_params(
pkl_file: str,
dir_with_processed_files: str,
) -> Optional[tuple[Path, Path]]:
"""
Checks the input parameters provided by the user and transforms them into the expected types.
Args:
pkl_file: the XML file the paths are extracted from
processed_dir: the output directory where the created files should be written to
Returns:
A tuple with the Path objects of the input strings
"""
pkl_file_path = Path(pkl_file)
dir_with_processed_files_path = Path(dir_with_processed_files)

if not pkl_file_path.is_file():
print(f"{pkl_file} is not a file")
return None
if not dir_with_processed_files_path.is_dir():
print(f"{dir_with_processed_files} is not a directory")
return None

return pkl_file_path, dir_with_processed_files_path


def _upload_files_in_parallel(
dir_with_processed_files: Path,
internal_filenames_of_processed_files: list[Path],
Expand Down Expand Up @@ -313,7 +322,6 @@ def _check_if_all_files_were_uploaded(


def upload_files(
pkl_file: str,
dir_with_processed_files: str,
nthreads: int,
user: str,
Expand All @@ -326,7 +334,6 @@ def upload_files(
Before using this method, the files must be processed by the processing step.
Args:
pkl_file: pickle file containing the mapping between the original files and the processed files,
e.g. Path('multimedia/nested/subfolder/test.tif'), Path('tmp/0b/22/0b22570d-515f-4c3d-a6af-e42b458e7b2b.jp2').
dir_with_processed_files: path to the directory where the processed files are located
nthreads: number of threads to use for uploading (optimum depends on the number of CPUs on the server)
Expand All @@ -338,18 +345,11 @@ def upload_files(
Returns:
success status
"""
# check the input parameters
param_check_result = _check_params(
pkl_file=pkl_file,
dir_with_processed_files=dir_with_processed_files,
)
if param_check_result:
pkl_file_path, dir_with_processed_files_path = param_check_result
else:
raise BaseError("Error reading the input parameters. Please check them.")
dir_with_processed_files_path = _check_processed_dir(dir_with_processed_files)
pkl_file_paths = get_pkl_files()

# read paths from pkl file
internal_filenames_of_processed_files = _get_paths_from_pkl_file(pkl_file=pkl_file_path)
internal_filenames_of_processed_files = _get_paths_from_pkl_files(pkl_files=pkl_file_paths)
print(f"{datetime.now()}: Found {len(internal_filenames_of_processed_files)} files to upload...")
logger.info(f"Found {len(internal_filenames_of_processed_files)} files to upload...")

Expand Down
42 changes: 22 additions & 20 deletions src/dsp_tools/fast_xmlupload/upload_xml.py
@@ -1,36 +1,42 @@
import pickle
from datetime import datetime
from pathlib import Path
from typing import Optional
from typing import Optional, cast

from lxml import etree

from dsp_tools.models.exceptions import BaseError
from dsp_tools.models.exceptions import UserError
from dsp_tools.utils.logging import get_logger
from dsp_tools.utils.xml_upload import xml_upload

from dsp_tools.fast_xmlupload.upload_files import get_pkl_files

logger = get_logger(__name__)


def _get_paths_from_pkl_file(pkl_file: Path) -> dict[str, str]:
def _get_paths_from_pkl_files(pkl_files: list[Path]) -> dict[str, str]:
"""
Read the pickle file returned by the processing step.
Read the pickle file(s) returned by the processing step.
Args:
pkl_file: pickle file returned by the processing step
pkl_files: pickle file(s) returned by the processing step
Raises:
UserError: If for a file, no derivative was found
Returns:
dict of original paths to uuid filenames
"""
with open(pkl_file, "rb") as file:
orig_path_2_processed_path: list[tuple[Path, Optional[Path]]] = pickle.load(file)
orig_path_2_processed_path: list[tuple[Path, Optional[Path]]] = []
for pkl_file in pkl_files:
orig_path_2_processed_path.extend(pickle.loads(pkl_file.read_bytes()))

orig_path_2_uuid_filename: dict[str, str] = {}
for orig_path, processed_path in orig_path_2_processed_path:
if processed_path:
orig_path_2_uuid_filename[str(orig_path)] = str(processed_path.name)
else:
raise BaseError(
raise UserError(
f"There is no processed file for {orig_path}. The fast xmlupload cannot be started, "
"because the resource that uses this file would fail."
)
Expand All @@ -43,14 +49,14 @@ def replace_bitstream_paths(
orig_path_2_uuid_filename: dict[str, str],
) -> "etree._ElementTree[etree._Element]":
"""
Replace the original filepaths in the <bitstream> Tags by the uuid filenames of the processed files.
Replace the original filepaths in the <bitstream> gags by the uuid filenames of the processed files.
Args:
xml_tree: The parsed original XML tree
orig_path_2_uuid_filename: Mapping from original filenames to uuid filenames (from the pickle file)
Raises:
BaseError: If for a file, no derivative was found
UserError: If for a file, no derivative was found
Returns:
The XML tree with the replaced filepaths (modified in place)
Expand All @@ -60,20 +66,16 @@ def replace_bitstream_paths(
if elem.text in orig_path_2_uuid_filename:
elem.text = orig_path_2_uuid_filename[elem.text]
else:
res_id = ""
res = elem.getparent()
if res:
res_id = f"Resource {res.attrib['id']}: "
raise BaseError(
f"{res_id}Cannot find processed derivatives for {elem.text}. The fast xmlupload cannot be started, "
"because the resource that uses this file would fail."
res_id = cast("etree._Element", elem.getparent()).attrib.get("id")
raise UserError(
f"Resource {res_id}: Cannot find processed derivatives for {elem.text}. "
"The fast xmlupload cannot be started, because the resource that uses this file would fail."
)
return xml_tree


def fast_xmlupload(
xml_file: str,
pkl_file: str,
user: str,
password: str,
dsp_url: str,
Expand All @@ -89,7 +91,6 @@ def fast_xmlupload(
Args:
xml_file: path to XML file containing the resources
pkl_file: pickle file containing the mapping between the original files and the processed files,
e.g. Path('multimedia/nested/subfolder/test.tif'), Path('tmp/0b/22/0b22570d-515f-4c3d-a6af-e42b458e7b2b.jp2')
user: the user's e-mail for login into DSP
password: the user's password for login into DSP
Expand All @@ -100,7 +101,8 @@ def fast_xmlupload(
success status
"""
xml_tree_orig = etree.parse(xml_file)
orig_path_2_uuid_filename = _get_paths_from_pkl_file(pkl_file=Path(pkl_file))
pkl_files = get_pkl_files()
orig_path_2_uuid_filename = _get_paths_from_pkl_files(pkl_files)
xml_tree_replaced = replace_bitstream_paths(
xml_tree=xml_tree_orig,
orig_path_2_uuid_filename=orig_path_2_uuid_filename,
Expand Down

0 comments on commit 98f0b97

Please sign in to comment.