Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(upload-files, fast-xmlupload): handle multiple pickle files (DEV-2500) #451

Merged
merged 8 commits into from
Aug 7, 2023
12 changes: 9 additions & 3 deletions docs/internal/fast-xmlupload.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ e.g. `multimedia/dog.jpg` -> `tmp/0b/22/0b22570d-515f-4c3d-a6af-e42b458e7b2b.jp2
**In this case, you need to restart the command several times, until the exit code is 0.**
**Only then, all files are processed.**
**Unexpected errors result in exit code 1.**
**If this batch splitting happens, every run produces a new pickle file.**
Comment on lines 86 to +89
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know what kind of a resource leak this is? Is it on the python side or on our side? Could it be fixed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we have no idea. Christian, Vij, and me, we spent a lot of time figuring out what could be wrong, but no one knows. It's really annoying.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too bad


You can orchestrate this with a shell script, e.g.:

Expand All @@ -102,18 +103,18 @@ if [ $exit_code -ne 0 ]; then
fi
```


## 3. `dsp-tools upload-files`

After all files are processed, the upload step can be started.


```bash
dsp-tools upload-files --pkl-file=processing_result_20230414_152810.pkl --processed-dir=tmp
dsp-tools upload-files --processed-dir=tmp
```

The following options are available:

- `-f` | `--pkl-file` (mandatory): path to the pickle file that was written by the processing step
- `-d` | `--processed-dir` (mandatory): path to the directory where the processed files are located
(same as `--output-dir` in the processing step)
- `-n` | `--nthreads` (optional, default 4): number of threads to use for uploading
Expand All @@ -122,6 +123,9 @@ The following options are available:
- `-u` | `--user` (optional, default: `root@example.com`): username (e-mail) used for authentication with the DSP-API
- `-p` | `--password` (optional, default: `test`): password used for authentication with the DSP-API

This command will collect all pickle files in the current working directory
that were created by the `process-files` command.


## 4. `dsp-tools fast-xmlupload`

Expand All @@ -131,7 +135,9 @@ dsp-tools fast-xmlupload --pkl-file=processing_result_20230414_152810.pkl data.x

The following options are available:

- `-f` | `--pkl-file` (mandatory): path to the pickle file that was written by the processing step
- `-s` | `--server` (optional, default: `0.0.0.0:3333`): URL of the DSP server
- `-u` | `--user` (optional, default: `root@example.com`): username (e-mail) used for authentication with the DSP-API
- `-p` | `--password` (optional, default: `test`): password used for authentication with the DSP-API

This command will collect all pickle files in the current working directory
that were created by the `process-files` command.
4 changes: 0 additions & 4 deletions src/dsp_tools/dsp_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def _make_parser(
help="For internal use only: upload already processed files",
)
parser_upload_files.set_defaults(action="upload-files")
parser_upload_files.add_argument("-f", "--pkl-file", help="path to pickle file written by 'process-files'")
parser_upload_files.add_argument("-d", "--processed-dir", help="path to the directory with the processed files")
parser_upload_files.add_argument("-n", "--nthreads", type=int, default=4, help="number of threads to use")
parser_upload_files.add_argument("-s", "--server", default=default_dsp_api_url, help=dsp_server_text)
Expand All @@ -157,7 +156,6 @@ def _make_parser(
help="For internal use only: create resources with already uploaded files",
)
parser_fast_xmlupload_files.set_defaults(action="fast-xmlupload")
parser_fast_xmlupload_files.add_argument("-f", "--pkl-file", help="path to pickle file written by 'process-files'")
parser_fast_xmlupload_files.add_argument("-s", "--server", default=default_dsp_api_url, help=dsp_server_text)
parser_fast_xmlupload_files.add_argument("-u", "--user", default=root_user_email, help=username_text)
parser_fast_xmlupload_files.add_argument("-p", "--password", default=root_user_pw, help=password_text)
Expand Down Expand Up @@ -476,7 +474,6 @@ def _call_requested_action(args: argparse.Namespace) -> bool:
)
elif args.action == "upload-files":
success = upload_files(
pkl_file=args.pkl_file,
dir_with_processed_files=args.processed_dir,
nthreads=args.nthreads,
user=args.user,
Expand All @@ -487,7 +484,6 @@ def _call_requested_action(args: argparse.Namespace) -> bool:
elif args.action == "fast-xmlupload":
success = fast_xmlupload(
xml_file=args.xml_file,
pkl_file=args.pkl_file,
user=args.user,
password=args.password,
dsp_url=args.server,
Expand Down
8 changes: 7 additions & 1 deletion src/dsp_tools/fast_xmlupload/process_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import shutil
import subprocess
import sys
from time import sleep
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
Expand Down Expand Up @@ -144,7 +145,12 @@ def _write_result_to_pkl_file(processed_files: list[tuple[Path, Optional[Path]]]
Raises:
UserError if the file could not be written
"""
filename = f"processing_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pkl"
filename = Path(f"processing_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pkl")
while filename.is_file():
logger.warning(f"The file {filename} already exists. Trying again in 1 second...")
sleep(1)
filename = Path(f"processing_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pkl")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a particular reason to not adding more date precision, or using a unique identifier (like a UUID) in the first place?


try:
with open(filename, "wb") as pkl_file:
pickle.dump(processed_files, pkl_file)
Expand Down
129 changes: 65 additions & 64 deletions src/dsp_tools/fast_xmlupload/upload_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,74 @@
from regex import regex

from dsp_tools.models.connection import Connection
from dsp_tools.models.exceptions import BaseError
from dsp_tools.models.exceptions import UserError
from dsp_tools.utils.logging import get_logger
from dsp_tools.utils.shared import login

logger = get_logger(__name__)


def _check_processed_dir(dir_with_processed_files: str) -> Path:
"""
Checks the input parameter provided by the user and transforms it into a Path.

Args:
processed_dir: the directory where the processed files have been written to

Raises:
UserError: if the directory does not exist

Returns:
Path object of the directory
"""
dir_with_processed_files_path = Path(dir_with_processed_files)
if not dir_with_processed_files_path.is_dir():
raise UserError(f"The folder with the processed files is invalid: {dir_with_processed_files}")
return dir_with_processed_files_path


def get_pkl_files() -> list[Path]:
"""
Get all pickle files starting with "processing_result_" in the current working directory.

Raises:
UserError: If no pickle file was found

Returns:
list of pickle files
"""
pkl_file_paths = [Path(x) for x in glob.glob("processing_result_*.pkl")]
if len(pkl_file_paths) == 0:
raise UserError("No pickle file found. Please run the processing step first.")
return pkl_file_paths


def _get_paths_from_pkl_files(pkl_files: list[Path]) -> list[Path]:
"""
Read the pickle file(s) returned by the processing step.

Args:
pkl_files: pickle file(s) returned by the processing step

Returns:
list of uuid file paths
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is a "uuid file path"?

"""
orig_paths_2_processed_paths: list[tuple[Path, Optional[Path]]] = []
for pkl_file in pkl_files:
with open(pkl_file, "rb") as file:
orig_paths_2_processed_paths.extend(pickle.load(file))

processed_paths: list[Path] = []
for orig_path, processed_path in orig_paths_2_processed_paths:
if processed_path:
processed_paths.append(processed_path)
else:
print(f"{datetime.now()}: WARNING: There is no processed file for {orig_path}")
logger.warning(f"There is no processed file for {orig_path}")

return processed_paths
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is overly complicated if you ask me: The first loop only creates an iterable to be iterated in the second loop, so they could be combined into one loop. Also, reading pickles from Path is a one liner: pickle.loads(path.read_bytes())



def _get_upload_candidates(
dir_with_processed_files: Path,
internal_filename_of_processed_file: Path,
Expand Down Expand Up @@ -189,57 +250,6 @@ def _upload_file(
return internal_filename_of_processed_file, True


def _get_paths_from_pkl_file(pkl_file: Path) -> list[Path]:
"""
Read the pickle file returned by the processing step.

Args:
pkl_file: pickle file returned by the processing step

Returns:
list of uuid file paths
"""
with open(pkl_file, "rb") as file:
orig_paths_2_processed_paths: list[tuple[Path, Optional[Path]]] = pickle.load(file)

processed_paths: list[Path] = []
for orig_processed in orig_paths_2_processed_paths:
if orig_processed[1]:
processed_paths.append(orig_processed[1])
else:
print(f"{datetime.now()}: WARNING: There is no processed file for {orig_processed[0]}")
logger.warning(f"There is no processed file for {orig_processed[0]}")

return processed_paths


def _check_params(
pkl_file: str,
dir_with_processed_files: str,
) -> Optional[tuple[Path, Path]]:
"""
Checks the input parameters provided by the user and transforms them into the expected types.

Args:
pkl_file: the XML file the paths are extracted from
processed_dir: the output directory where the created files should be written to

Returns:
A tuple with the Path objects of the input strings
"""
pkl_file_path = Path(pkl_file)
dir_with_processed_files_path = Path(dir_with_processed_files)

if not pkl_file_path.is_file():
print(f"{pkl_file} is not a file")
return None
if not dir_with_processed_files_path.is_dir():
print(f"{dir_with_processed_files} is not a directory")
return None

return pkl_file_path, dir_with_processed_files_path


def _upload_files_in_parallel(
dir_with_processed_files: Path,
internal_filenames_of_processed_files: list[Path],
Expand Down Expand Up @@ -313,7 +323,6 @@ def _check_if_all_files_were_uploaded(


def upload_files(
pkl_file: str,
dir_with_processed_files: str,
nthreads: int,
user: str,
Expand All @@ -326,7 +335,6 @@ def upload_files(
Before using this method, the files must be processed by the processing step.

Args:
pkl_file: pickle file containing the mapping between the original files and the processed files,
e.g. Path('multimedia/nested/subfolder/test.tif'), Path('tmp/0b/22/0b22570d-515f-4c3d-a6af-e42b458e7b2b.jp2').
dir_with_processed_files: path to the directory where the processed files are located
nthreads: number of threads to use for uploading (optimum depends on the number of CPUs on the server)
Expand All @@ -338,18 +346,11 @@ def upload_files(
Returns:
success status
"""
# check the input parameters
param_check_result = _check_params(
pkl_file=pkl_file,
dir_with_processed_files=dir_with_processed_files,
)
if param_check_result:
pkl_file_path, dir_with_processed_files_path = param_check_result
else:
raise BaseError("Error reading the input parameters. Please check them.")
dir_with_processed_files_path = _check_processed_dir(dir_with_processed_files)
pkl_file_paths = get_pkl_files()

# read paths from pkl file
internal_filenames_of_processed_files = _get_paths_from_pkl_file(pkl_file=pkl_file_path)
internal_filenames_of_processed_files = _get_paths_from_pkl_files(pkl_files=pkl_file_paths)
print(f"{datetime.now()}: Found {len(internal_filenames_of_processed_files)} files to upload...")
logger.info(f"Found {len(internal_filenames_of_processed_files)} files to upload...")

Expand Down
43 changes: 23 additions & 20 deletions src/dsp_tools/fast_xmlupload/upload_xml.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,43 @@
import pickle
from datetime import datetime
from pathlib import Path
from typing import Optional
from typing import Optional, cast

from lxml import etree

from dsp_tools.models.exceptions import BaseError
from dsp_tools.models.exceptions import UserError
from dsp_tools.utils.logging import get_logger
from dsp_tools.utils.xml_upload import xml_upload

from dsp_tools.fast_xmlupload.upload_files import get_pkl_files

logger = get_logger(__name__)


def _get_paths_from_pkl_file(pkl_file: Path) -> dict[str, str]:
def _get_paths_from_pkl_files(pkl_files: list[Path]) -> dict[str, str]:
"""
Read the pickle file returned by the processing step.
Read the pickle file(s) returned by the processing step.

Args:
pkl_file: pickle file returned by the processing step
pkl_files: pickle file(s) returned by the processing step

Raises:
UserError: If for a file, no derivative was found

Returns:
dict of original paths to uuid filenames
"""
with open(pkl_file, "rb") as file:
orig_path_2_processed_path: list[tuple[Path, Optional[Path]]] = pickle.load(file)
orig_path_2_processed_path: list[tuple[Path, Optional[Path]]] = []
for pkl_file in pkl_files:
with open(pkl_file, "rb") as file:
orig_path_2_processed_path.extend(pickle.load(file))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above. Also, there seems to be a lot of logic duplicated here


orig_path_2_uuid_filename: dict[str, str] = {}
for orig_path, processed_path in orig_path_2_processed_path:
if processed_path:
orig_path_2_uuid_filename[str(orig_path)] = str(processed_path.name)
else:
raise BaseError(
raise UserError(
Comment on lines -33 to +39
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the idea of the UserError that it's thrown when something is the user's fault? Is that really the case here?

f"There is no processed file for {orig_path}. The fast xmlupload cannot be started, "
"because the resource that uses this file would fail."
)
Expand All @@ -43,14 +50,14 @@ def replace_bitstream_paths(
orig_path_2_uuid_filename: dict[str, str],
) -> "etree._ElementTree[etree._Element]":
"""
Replace the original filepaths in the <bitstream> Tags by the uuid filenames of the processed files.
Replace the original filepaths in the <bitstream> gags by the uuid filenames of the processed files.

Args:
xml_tree: The parsed original XML tree
orig_path_2_uuid_filename: Mapping from original filenames to uuid filenames (from the pickle file)

Raises:
BaseError: If for a file, no derivative was found
UserError: If for a file, no derivative was found

Returns:
The XML tree with the replaced filepaths (modified in place)
Expand All @@ -60,20 +67,16 @@ def replace_bitstream_paths(
if elem.text in orig_path_2_uuid_filename:
elem.text = orig_path_2_uuid_filename[elem.text]
else:
res_id = ""
res = elem.getparent()
if res:
res_id = f"Resource {res.attrib['id']}: "
raise BaseError(
f"{res_id}Cannot find processed derivatives for {elem.text}. The fast xmlupload cannot be started, "
"because the resource that uses this file would fail."
res_id = cast("etree._Element", elem.getparent()).attrib.get("id")
raise UserError(
f"Resource {res_id}: Cannot find processed derivatives for {elem.text}. "
"The fast xmlupload cannot be started, because the resource that uses this file would fail."
)
return xml_tree


def fast_xmlupload(
xml_file: str,
pkl_file: str,
user: str,
password: str,
dsp_url: str,
Expand All @@ -89,7 +92,6 @@ def fast_xmlupload(

Args:
xml_file: path to XML file containing the resources
pkl_file: pickle file containing the mapping between the original files and the processed files,
e.g. Path('multimedia/nested/subfolder/test.tif'), Path('tmp/0b/22/0b22570d-515f-4c3d-a6af-e42b458e7b2b.jp2')
user: the user's e-mail for login into DSP
password: the user's password for login into DSP
Expand All @@ -100,7 +102,8 @@ def fast_xmlupload(
success status
"""
xml_tree_orig = etree.parse(xml_file)
orig_path_2_uuid_filename = _get_paths_from_pkl_file(pkl_file=Path(pkl_file))
pkl_files = get_pkl_files()
orig_path_2_uuid_filename = _get_paths_from_pkl_files(pkl_files)
xml_tree_replaced = replace_bitstream_paths(
xml_tree=xml_tree_orig,
orig_path_2_uuid_filename=orig_path_2_uuid_filename,
Expand Down