diff --git a/docs/cli-commands.md b/docs/cli-commands.md index ebd8eb156..df088e46a 100644 --- a/docs/cli-commands.md +++ b/docs/cli-commands.md @@ -312,3 +312,26 @@ dsp-tools rosetta ``` A DSP stack must be running before executing this command. + + + +## `process-files` + +DaSCH-internal command to process multimedia files locally, +before uploading them to a DSP server. +See [here](./internal/fast-xmlupload.md) for more information. + + + +## `upload-files` + +DaSCH-internal command to upload processed multimedia files to a DSP server. +See [here](./internal/fast-xmlupload.md) for more information. + + + +## `fast-xmlupload` + +DaSCH-internal command to create the resources of an XML file +after the processed multimedia files have been uploaded already. +See [here](./internal/fast-xmlupload.md) for more information. diff --git a/docs/internal/fast-xmlupload.md b/docs/internal/fast-xmlupload.md index 4175e547c..5e848eb9b 100644 --- a/docs/internal/fast-xmlupload.md +++ b/docs/internal/fast-xmlupload.md @@ -62,19 +62,45 @@ The following options are available: - `--output-dir` (mandatory): path to the output directory where the processed/transformed files should be written to - `--nthreads` (optional, default computed by the concurrent library, dependent on the machine): number of threads to use for processing +- `--batchsize` (optional, default 5000): number of files to process in one batch All files referenced in the `` tags of the XML are expected to be in the input directory which is provided with the `--input-dir` option. + The processed files (derivative, .orig file, sidecar file, as well as the preview file for movies) will be stored in the given `--output-dir` directory. If the output directory doesn't exist, it will be created automatically. + Additionally to the output directory, a pickle file is written with the name `processing_result_[timestamp].pkl`. It contains a mapping from the original files to the processed files, -e.g. "multimedia/dog.jpg" -> "tmp/0b/22/0b22570d-515f-4c3d-a6af-e42b458e7b2b.jp2". +e.g. `multimedia/dog.jpg` -> `tmp/0b/22/0b22570d-515f-4c3d-a6af-e42b458e7b2b.jp2`. + + +### Important note +**Due to a resource leak, Python must be quitted after a certain time.** +**For big datasets, only a batch of files is processed, then Python exits with exit code 2.** +**In this case, you need to restart the command several times, until the exit code is 0.** +**Only then, all files are processed.** +**Unexpected errors result in exit code 1.** + +You can orchestrate this with a shell script, e.g.: + +```bash +exit_code=2 +while [ $exit_code -eq 2 ]; do + dsp-tools process-files --input-dir=multimedia --output-dir=tmp data.xml + exit_code=$? +done + +if [ $exit_code -ne 0 ]; then + echo "Error: exit code $exit_code" + exit $exit_code +fi +``` ## 3. `dsp-tools upload-files` diff --git a/src/dsp_tools/dsp_tools.py b/src/dsp_tools/dsp_tools.py index 3cf27cc8e..ea9f64e3d 100644 --- a/src/dsp_tools/dsp_tools.py +++ b/src/dsp_tools/dsp_tools.py @@ -140,6 +140,9 @@ def make_parser() -> argparse.ArgumentParser: "--output-dir", help="path to the output directory where the processed/transformed files should be written to" ) parser_process_files.add_argument("--nthreads", type=int, default=None, help="number of threads to use") + parser_process_files.add_argument( + "--batchsize", type=int, default=5000, help="number of files to process before Python exits" + ) parser_process_files.add_argument("xml_file", help="path to XML file containing the data") # upload-files @@ -376,13 +379,13 @@ def call_requested_action( save_metrics=args.metrics, preprocessing_done=False, ) - elif args.action == "process-files": success = process_files( input_dir=args.input_dir, output_dir=args.output_dir, xml_file=args.xml_file, nthreads=args.nthreads, + batch_size=args.batchsize, ) elif args.action == "upload-files": success = upload_files( diff --git a/src/dsp_tools/fast_xmlupload/process_files.py b/src/dsp_tools/fast_xmlupload/process_files.py index 758d5be9e..e7867c247 100644 --- a/src/dsp_tools/fast_xmlupload/process_files.py +++ b/src/dsp_tools/fast_xmlupload/process_files.py @@ -17,10 +17,10 @@ from docker.models.containers import Container from lxml import etree -from dsp_tools.models.exceptions import BaseError +from dsp_tools.models.exceptions import UserError from dsp_tools.utils.logging import get_logger -logger = get_logger(__name__) +logger = get_logger(__name__, filesize_mb=100, backupcount=36) sipi_container: Optional[Container] = None export_moving_image_frames_script: Optional[Path] = None @@ -41,42 +41,56 @@ def _get_export_moving_image_frames_script() -> None: f.write(script_text) -def _check_if_all_files_were_processed( - result: list[tuple[Path, Optional[Path]]], - all_paths: list[Path], -) -> bool: +def _determine_success_status_and_exit_code( + files_to_process: list[Path], + processed_files: list[tuple[Path, Optional[Path]]], + is_last_batch: bool, +) -> tuple[bool, int]: """ - Go through the result list and print all files that could not be processed. + Based on the result of the file processing, + this function determines the success status and the exit code. + If some files of the current batch could not be processed, + the success status is false, and the exit code is 1. + If all files of the current batch were processed, the success status is true, + and the exit code is 0 if this is the last batch, + and 2 if there are more batches to process. Args: - result: list of tuples of Paths. If the second Path is None, the file could not be processed. - all_paths: list of all paths that should have been processed + files_to_process: list of all paths that should have been processed (current batch) + processed_files: list of tuples of Paths. If the second Path is None, the file could not be processed. + is_last_batch: true if this is the last batch of files to process Returns: - success status + tuple (success status, exit_code) """ - processed_paths = [x[1] for x in result if x[1]] - if len(processed_paths) == len(all_paths): + processed_paths = [x[1] for x in processed_files if x and x[1]] + if len(processed_paths) == len(files_to_process): success = True - print(f"{datetime.now()}: Number of processed files: {len(result)}: Okay") - logger.info(f"Number of processed files: {len(result)}: Okay") + print(f"{datetime.now()}: All files ({len(files_to_process)}) of this batch were processed: Okay") + logger.info(f"All files ({len(files_to_process)}) of this batch were processed: Okay") + if is_last_batch: + exit_code = 0 + print(f"{datetime.now()}: All multimedia files referenced in the XML are processed. No more batches.") + logger.info("All multimedia files referenced in the XML are processed. No more batches.") + else: + exit_code = 2 else: success = False - ratio = f"{len(processed_paths)}/{len(all_paths)}" - msg = f"Some files could not be processed: Only {ratio} were processed. The failed ones are:" + ratio = f"{len(processed_paths)}/{len(files_to_process)}" + msg = f"Some files of this batch could not be processed: Only {ratio} were processed. The failed ones are:" print(f"{datetime.now()}: ERROR: {msg}") logger.error(msg) + for input_file, output_file in processed_files: + if not output_file: + print(f" - {input_file}") + logger.error(f" - {input_file}") + exit_code = 1 - for input_file, output_file in result: - if not output_file: - print(f" - {input_file}") - logger.error(f" - {input_file}") - - return success + return success, exit_code def _process_files_in_parallel( - paths: list[Path], + files_to_process: list[Path], input_dir: Path, output_dir: Path, nthreads: Optional[int], @@ -88,7 +102,7 @@ def _process_files_in_parallel( so that this function can be called again with the unprocessed files. Args: - paths: a list of all paths to the files that should be processed + files_to_process: a list of all paths to the files that should be processed input_dir: the root directory of the input files output_dir: the directory where the processed files should be written to nthreads: number of threads to use for processing @@ -100,7 +114,7 @@ def _process_files_in_parallel( (this list will only have content if a Docker API error led to a restart of the SIPI container) """ with ThreadPoolExecutor(max_workers=nthreads) as pool: - processing_jobs = [pool.submit(_process_file, input_file, input_dir, output_dir) for input_file in paths] + processing_jobs = [pool.submit(_process_file, f, input_dir, output_dir) for f in files_to_process] orig_filepath_2_uuid: list[tuple[Path, Optional[Path]]] = [] for processed in as_completed(processing_jobs): @@ -114,48 +128,50 @@ def _process_files_in_parallel( _stop_and_remove_sipi_container() _start_sipi_container_and_mount_volumes(input_dir, output_dir) processed_paths = [x[0] for x in orig_filepath_2_uuid] - unprocessed_paths = [x for x in paths if x not in processed_paths] + unprocessed_paths = [x for x in files_to_process if x not in processed_paths] return orig_filepath_2_uuid, unprocessed_paths return orig_filepath_2_uuid, [] -def _write_result_to_pkl_file(result: list[tuple[Path, Optional[Path]]]) -> bool: +def _write_result_to_pkl_file(processed_files: list[tuple[Path, Optional[Path]]]) -> None: """ - Writes the processing result to a pickle file. + Writes the processing result to a pickle file in the working directory. Args: - result: the result of the file processing + processed_files: the result of the file processing - Returns: - true if successful, false otherwise + Raises: + UserError if the file could not be written """ - filename = "processing_result_" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".pkl" + filename = f"processing_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pkl" try: with open(filename, "wb") as pkl_file: - pickle.dump(result, pkl_file) + pickle.dump(processed_files, pkl_file) print(f"{datetime.now()}: The result was written to: {filename}") - return True except OSError: - err_msg = f"An error occurred while writing the result to the pickle file. Content of file: {result}" - print(f"{datetime.now()}: {err_msg}") + err_msg = f"An error occurred while writing the result to the pickle file. Content of file: {processed_files}" logger.error(err_msg, exc_info=True) - return False + raise UserError(err_msg) from None -def _check_params( +def _check_input_params( input_dir: str, out_dir: str, xml_file: str, -) -> Optional[tuple[Path, Path, Path]]: +) -> tuple[Path, Path, Path]: """ - Checks the input parameters provided by the user and transforms them into the expected types. + Checks the input parameters provided by the user and transforms them into Path objects. + If the output directory doesn't exist, it is created. Args: input_dir: the root directory of the input files out_dir: the output directory where the created files should be written to xml_file: the XML file the paths are extracted from + Raises: + UserError: if one of the parameters is not valid + Returns: A tuple with the Path objects of the input strings """ @@ -163,18 +179,15 @@ def _check_params( out_dir_path = Path(out_dir) xml_file_path = Path(xml_file) - if not _ensure_directory_exists(out_dir_path): - return None + try: + out_dir_path.mkdir(parents=True, exist_ok=True) + except Exception: # pylint: disable=broad-exception-caught + raise UserError(f"Couldn't create directory {out_dir_path}") from None if not input_dir_path.is_dir(): - print("input_dir is not a directory") - return None - if not out_dir_path.is_dir(): - print("out_dir is not a directory") - return None + raise UserError("input_dir is not a directory") if not xml_file_path.is_file(): - print("xml_file is not a file") - return None + raise UserError("xml_file is not a file") return input_dir_path, out_dir_path, xml_file_path @@ -514,26 +527,6 @@ def _extract_preview_from_video(file: Path) -> bool: return True -def _ensure_directory_exists(path: Path) -> bool: - """ - Try to create the directory at the given path. - If the directory already exists, nothing happens. - - Args: - path: path to the directory that should be created - - Returns: - True if the directory exists or was created successfully, False if an error occurred during the creation. - """ - try: - path.mkdir(parents=True, exist_ok=True) - return True - except Exception: # pylint: disable=broad-exception-caught - print(f"{datetime.now()}: ERROR: Couldn't create directory {path}") - logger.error(f"Couldn't create directory {path}", exc_info=True) - return False - - def _process_file( in_file: Path, input_dir: Path, @@ -735,43 +728,175 @@ def _process_video_file( return in_file, converted_file_full_path +def _write_processed_and_unprocessed_files_to_txt_files( + all_files: list[Path], + processed_files: list[tuple[Path, Optional[Path]]], +) -> None: + """ + Determine the files that were processed until now + (taking into account a possibly existing file 'processed_files.txt') + and write them to 'processed_files.txt'. + Determine the files that were not processed until now, + and write them to 'unprocessed_files.txt' + (possibly overwriting an existing file). + + Args: + all_files: list of all paths that should be processed + processed_files: list of tuples (orig path, processed path). 2nd path is None if a file could not be processed. + """ + processed_original_paths = [x[0] for x in processed_files] + if Path("processed_files.txt").is_file(): + with open("processed_files.txt", "r", encoding="utf-8") as f: + previously_processed_files = [Path(x) for x in f.read().splitlines()] + processed_original_paths = processed_original_paths + previously_processed_files + + with open("processed_files.txt", "w", encoding="utf-8") as f: + f.write("\n".join([str(x) for x in processed_original_paths])) + msg = "Wrote 'processed_files.txt'" + + unprocessed_original_paths = [x for x in all_files if x not in processed_original_paths] + if unprocessed_original_paths: + with open("unprocessed_files.txt", "w", encoding="utf-8") as f: + f.write("\n".join([str(x) for x in unprocessed_original_paths])) + msg += " and 'unprocessed_files.txt'" + elif Path("unprocessed_files.txt").is_file(): + Path("unprocessed_files.txt").unlink() + msg += " and removed 'unprocessed_files.txt'" + + print(f"{datetime.now()}: {msg}") + logger.info(msg) + + def handle_interruption( - all_paths: list[Path], - processed_paths: list[Path], + all_files: list[Path], + processed_files: list[tuple[Path, Optional[Path]]], exception: BaseException, ) -> None: """ Handles an interruption of the processing. - Writes the processed and unprocessed files into two files, + Writes the pickle file, + and the txt files with the processed and unprocessed files, and exits the program with exit code 1. Args: - all_paths: list of all paths that should be processed - processed_paths: list of all paths that were processed successfully + all_files: list of all paths that should be processed + processed_files: list of tuples (orig path, processed path). 2nd path is None if a file could not be processed. exception: the exception that was raised """ - unprocessed_paths = [x for x in all_paths if x not in processed_paths] - with open("unprocessed_files.txt", "w", encoding="utf-8") as f: - f.write("\n".join([str(x) for x in unprocessed_paths])) - with open("processed_files.txt", "w", encoding="utf-8") as f: - f.write("\n".join([str(x) for x in processed_paths])) + msg = "ERROR while processing the files. Writing pickle file and human-readable txt files..." + print(f"{datetime.now()}: {msg}") + logger.error(msg, exc_info=exception) - msg = ( - "An error occurred while processing the files. " - "2 files were written, listing the processed and the unprocessed files:\n" - " - 'processed_files.txt'\n - 'unprocessed_files.txt'." + _write_processed_and_unprocessed_files_to_txt_files( + all_files=all_files, + processed_files=processed_files, ) - print(f"{datetime.now()}: ERROR: {msg}. The exception was: {exception}") - logger.error(msg, exc_info=exception) + _write_result_to_pkl_file(processed_files) sys.exit(1) +def double_check_unprocessed_files( + all_files: list[Path], + processed_files: list[Path], + unprocessed_files: list[Path], +) -> None: + """ + Checks if the files in 'unprocessed_files.txt' are consistent with the files in 'processed_files.txt'. + + Args: + all_files: list of all paths in the tags of the XML file + processed_files: the paths from 'processed_files.txt' + unprocessed_files: the paths from 'unprocessed_files.txt' (or all_files if there is no such file) + + Raises: + UserError: if there is a file 'unprocessed_files.txt', but no file 'processed_files.txt' + UserError: if the files 'unprocessed_files.txt' and 'processed_files.txt' are inconsistent + """ + unprocessed_files_txt_exists = sorted(unprocessed_files) != sorted(all_files) + if unprocessed_files_txt_exists and not processed_files: + raise UserError("There is a file 'unprocessed_files.txt', but no file 'processed_files.txt'") + + if processed_files and sorted(unprocessed_files) == sorted(all_files): + raise UserError("There is a file 'processed_files.txt', but no file 'unprocessed_files.txt'") + + if unprocessed_files_txt_exists: + # there is a 'unprocessed_files.txt' file. check it for consistency + unprocessed_files_from_processed_files = [x for x in all_files if x not in processed_files] + if not sorted(unprocessed_files_from_processed_files) == sorted(unprocessed_files): + raise UserError("The files 'unprocessed_files.txt' and 'processed_files.txt' are inconsistent") + + +def _determine_next_batch( + all_files: list[Path], + batch_size: int, +) -> tuple[list[Path], bool]: + """ + Looks in the input directory for txt files that contain the already processed files and the still unprocessed files. + If no such files are found, this run of `dsp-tools process-files` is the first one. + In this case, the first batch_size files (or less if there are less) are returned. + If such files are found, the already processed files are read from them, + and the next batch_size files are returned. + If all files have been processed, an empty list is returned. + + Args: + all_files: list of all paths in the tags of the XML file + batch_size: number of files to process before Python exits + + Raises: + UserError: if the files 'unprocessed_files.txt' and 'processed_files.txt' are inconsistent + + Returns: + - the next batch of up to batch_size files that should be processed + (or empty list if all files have been processed) + - a boolean indicating if this is the last batch + """ + # read the already processed files + if Path("processed_files.txt").is_file(): + with open("processed_files.txt", "r", encoding="utf-8") as f: + processed_files = [Path(x.strip()) for x in f.readlines()] + else: + processed_files = [] + + # read the still unprocessed files + if Path("unprocessed_files.txt").is_file(): + with open("unprocessed_files.txt", "r", encoding="utf-8") as f: + unprocessed_files = [Path(x.strip()) for x in f.readlines()] + else: + unprocessed_files = all_files + + # consistency check + double_check_unprocessed_files( + all_files=all_files, + processed_files=processed_files, + unprocessed_files=unprocessed_files, + ) + + # determine next batch + if len(unprocessed_files) <= batch_size: + next_batch = unprocessed_files + is_last_batch = True + else: + next_batch = unprocessed_files[:batch_size] + is_last_batch = False + + # print and log + msg = ( + f"Found {len(all_files)} bitstreams in the XML file, {len(unprocessed_files)} of them unprocessed. " + f"Process batch of {len(next_batch)} files..." + ) + print(f"{datetime.now()}: {msg}") + logger.info(msg) + + return next_batch, is_last_batch + + def process_files( input_dir: str, output_dir: str, xml_file: str, nthreads: Optional[int], + batch_size: int, ) -> bool: """ Process the files referenced in the given XML file. @@ -781,25 +906,30 @@ def process_files( Additionally, writes a pickle file containing the mapping between the original files and the processed files, e.g. Path('multimedia/nested/subfolder/test.tif'), Path('tmp/0b/22/0b22570d-515f-4c3d-a6af-e42b458e7b2b.jp2'). + Due to a resource leak, the Python interpreter must be quitted after a while. + For this reason, the processing is done in batches, each batch containing batch_size files. + After each batch, the Python interpreter exits, and the CLI command must be executed again. + It automatically resumes where it left off. + Args: input_dir: path to the directory where the files should be read from output_dir: path to the directory where the transformed / created files should be written to xml_file: path to xml file containing the resources nthreads: number of threads to use for processing + batch_size: number of files to process before Python exits Returns: - success status + True --> exit code 0: all multimedia files in the XML file were processed + False --> exit code 1: an error occurred while processing the current batch + Error raised --> exit code 1: an error occurred while processing the current batch + exit with code 2: Python interpreter exits after each batch """ # check the input parameters - param_check_result = _check_params( + input_dir_path, output_dir_path, xml_file_path = _check_input_params( input_dir=input_dir, out_dir=output_dir, xml_file=xml_file, ) - if param_check_result: - input_dir_path, output_dir_path, xml_file_path = param_check_result - else: - raise BaseError("Error reading the input parameters. Please check them.") # startup the SIPI container _start_sipi_container_and_mount_volumes( @@ -807,25 +937,29 @@ def process_files( output_dir=output_dir_path, ) - # get the paths of the files referenced in the XML file - all_paths = _get_file_paths_from_xml(xml_file_path) - print(f"{datetime.now()}: Found {len(all_paths)} bitstreams in the XML file.") - logger.info(f"Found {len(all_paths)} bitstreams in the XML file.") + # get the files referenced in the XML file + all_files = _get_file_paths_from_xml(xml_file_path) + + # find out if there was a previous processing attempt that should be continued + files_to_process, is_last_batch = _determine_next_batch( + all_files=all_files, + batch_size=batch_size, + ) # get shell script for processing video files - if any(path.suffix == ".mp4" for path in all_paths): + if any(path.suffix == ".mp4" for path in files_to_process): _get_export_moving_image_frames_script() # process the files in parallel start_time = datetime.now() print(f"{start_time}: Start local file processing...") logger.info("Start local file processing...") - processed_files = [] - unprocessed_files = all_paths + processed_files: list[tuple[Path, Optional[Path]]] = [] + unprocessed_files = files_to_process while unprocessed_files: try: result, unprocessed_files = _process_files_in_parallel( - paths=all_paths, + files_to_process=files_to_process, input_dir=input_dir_path, output_dir=output_dir_path, nthreads=nthreads, @@ -833,29 +967,34 @@ def process_files( processed_files.extend(result) except BaseException as exc: # pylint: disable=broad-exception-caught handle_interruption( - all_paths=all_paths, - processed_paths=[x[1] for x in processed_files if x and x[1]], + all_files=all_files, + processed_files=processed_files, exception=exc, ) - - # check if all files were processed end_time = datetime.now() print(f"{end_time}: Processing files took: {end_time - start_time}") - logger.info(f"{end_time}: Processing files took: {end_time - start_time}") - success = _check_if_all_files_were_processed( - result=processed_files, - all_paths=all_paths, + logger.info(f"Processing files took: {end_time - start_time}") + + # write results to files + _write_processed_and_unprocessed_files_to_txt_files( + all_files=all_files, + processed_files=processed_files, ) + _write_result_to_pkl_file(processed_files) - # write pickle file - if not _write_result_to_pkl_file(processed_files): - success = False - err_msg = f"An error occurred while writing the result to the pickle file. The result was: {processed_files}" - print(f"{datetime.now()}: {err_msg}") - logger.error(err_msg) + # check if all files were processed + success, exit_code = _determine_success_status_and_exit_code( + files_to_process=files_to_process, + processed_files=processed_files, + is_last_batch=is_last_batch, + ) # remove the SIPI container - if success: + if exit_code == 0: _stop_and_remove_sipi_container() + # exit with correct exit code: 0 and 1 will automatically happen, but 2 must be done manually + if exit_code == 2: + sys.exit(2) + return success diff --git a/src/dsp_tools/utils/logging.py b/src/dsp_tools/utils/logging.py index 087fde519..19c1d06cd 100644 --- a/src/dsp_tools/utils/logging.py +++ b/src/dsp_tools/utils/logging.py @@ -3,7 +3,12 @@ from pathlib import Path -def get_logger(name: str) -> logging.Logger: +def get_logger( + name: str, + level: int = logging.INFO, + filesize_mb: int = 5, + backupcount: int = 4, +) -> logging.Logger: """ Create a logger instance, set its level to INFO, @@ -11,12 +16,15 @@ def get_logger(name: str) -> logging.Logger: Args: name: name of the logger + level: logging level, defaults to logging.INFO + filesize_mb: maximum size per log file in MB, defaults to 5 + backupcount: number of log files to keep, defaults to 4 Returns: the logger instance """ _logger = logging.getLogger(name) - _logger.setLevel(logging.INFO) + _logger.setLevel(level) formatter = logging.Formatter(fmt="{asctime} {filename: <20} {levelname: <8} {message}", style="{") # a RotatingFileHandler fills "filename" until it is "maxBytes" big, # then appends ".1" to it and starts with a new file "filename", @@ -27,8 +35,8 @@ def get_logger(name: str) -> logging.Logger: handler = logging.handlers.RotatingFileHandler( filename=logfile_directory / "logging.log", mode="a", - maxBytes=5 * 1024 * 1024, - backupCount=4, + maxBytes=filesize_mb * 1024 * 1024, + backupCount=backupcount, ) handler.setFormatter(formatter) _logger.addHandler(handler) diff --git a/test/e2e/test_fast_xmlupload.py b/test/e2e/test_fast_xmlupload.py index 119e81cb8..e26f983de 100644 --- a/test/e2e/test_fast_xmlupload.py +++ b/test/e2e/test_fast_xmlupload.py @@ -1,4 +1,3 @@ -import os import shutil import unittest from pathlib import Path @@ -21,39 +20,49 @@ class TestFastXmlUpload(unittest.TestCase): user = "root@example.com" password = "test" - xml_file = "xml-data/test-data-fast-xmlupload.xml" - dir_with_processed_files = "preprocessed_files" - original_cwd = "" - pickle_file = Path() + input_dir = Path("testdata/bitstreams") + output_dir = "testdata/preprocessed_files" + xml_file = "testdata/xml-data/test-data-fast-xmlupload.xml" + json_file = "testdata/json-project/test-project-fast-xmlupload.json" + txt_files = ["processed_files.txt", "unprocessed_files.txt"] - def setUp(self) -> None: + @classmethod + def setUpClass(cls) -> None: """ - Is executed before any test is run. + Is executed before the methods of this class are run """ - self.original_cwd = os.getcwd() - os.chdir("testdata") create_project( - project_file_as_path_or_parsed="json-project/test-project-fast-xmlupload.json", - server=self.dsp_url, - user_mail=self.user, - password=self.password, + project_file_as_path_or_parsed=cls.json_file, + server=cls.dsp_url, + user_mail=cls.user, + password=cls.password, verbose=False, dump=False, ) - shutil.copytree("bitstreams", "bitstreams/nested") - shutil.copytree("bitstreams/nested", "bitstreams/nested/subfolder") + shutil.copytree(cls.input_dir, cls.input_dir / "nested") + shutil.copytree(cls.input_dir / "nested", cls.input_dir / "nested/subfolder") + + @classmethod + def tearDownClass(cls) -> None: + """ + Is executed after the methods of this class have all run through + """ + shutil.rmtree(cls.input_dir / "nested") + shutil.rmtree(cls.output_dir) def tearDown(self) -> None: """ - Is executed after all tests have run through. + Is executed after each test method """ - shutil.rmtree("bitstreams/nested") - shutil.rmtree(self.dir_with_processed_files) - self.pickle_file.unlink() + for pickle_file in list(Path().glob("*.pkl")): + pickle_file.unlink() + id2iri_search_results = list(Path().glob("*id2iri_mapping.json")) if len(id2iri_search_results) == 1: id2iri_search_results[0].unlink() - os.chdir(self.original_cwd) + + for txt_file in self.txt_files: + Path(txt_file).unlink(missing_ok=True) def test_fast_xmlupload(self) -> None: """ @@ -62,19 +71,20 @@ def test_fast_xmlupload(self) -> None: """ print("test_fast_xmlupload: call process_files()") success_process = process_files( - input_dir="bitstreams", - output_dir=self.dir_with_processed_files, + input_dir=str(self.input_dir), + output_dir=self.output_dir, xml_file=self.xml_file, nthreads=None, + batch_size=5000, ) self.assertTrue(success_process) - self.pickle_file = list(Path().glob("*.pkl"))[0] + pickle_file = str(list(Path().glob("*.pkl"))[0]) - print(f"test_fast_xmlupload: call upload_files() with pickle file {self.pickle_file}") + print(f"test_fast_xmlupload: call upload_files() with pickle file {pickle_file}") success_upload = upload_files( - pkl_file=str(self.pickle_file), - dir_with_processed_files=self.dir_with_processed_files, + pkl_file=pickle_file, + dir_with_processed_files=self.output_dir, nthreads=4, user=self.user, password=self.password, @@ -86,7 +96,7 @@ def test_fast_xmlupload(self) -> None: print("test_fast_xmlupload: call fast_xmlupload()") success_fast_xmlupload = fast_xmlupload( xml_file=self.xml_file, - pkl_file=str(self.pickle_file), + pkl_file=pickle_file, user=self.user, password=self.password, dsp_url=self.dsp_url, @@ -94,6 +104,32 @@ def test_fast_xmlupload(self) -> None: ) self.assertTrue(success_fast_xmlupload) + def test_batch_size_of_process_files(self) -> None: + """ + Test if the "batch_size" parameter of process_files() function works. + The test file contains 92 bitstreams, so a batch size of 40 should result in 3 batches. + The first 2 batches should exit with exit code 2 and success=True, + the 3rd batch should exit with exit code 0 and success=True. + """ + + def action() -> bool: + return process_files( + input_dir=str(self.input_dir), + output_dir=self.output_dir, + xml_file=self.xml_file, + nthreads=None, + batch_size=40, + ) + + for i in range(2): + with self.assertRaises(SystemExit) as cm: + success = action() + self.assertTrue(success) + self.assertEqual(cm.exception.code, 2, msg=f"Failed in iteration {i}") + + success = action() + self.assertTrue(success) + if __name__ == "__main__": pytest.main([__file__]) diff --git a/testdata/xml-data/test-data-fast-xmlupload.xml b/testdata/xml-data/test-data-fast-xmlupload.xml index 0743ff412..694177cae 100644 --- a/testdata/xml-data/test-data-fast-xmlupload.xml +++ b/testdata/xml-data/test-data-fast-xmlupload.xml @@ -21,281 +21,281 @@ CR - - bitstreams/test with uppercase extension.JPG + + testdata/bitstreams/test with uppercase extension.JPG - - bitstreams/test with whitespaces.png + + testdata/bitstreams/test with whitespaces.png - - bitstreams/README.txt + + testdata/bitstreams/README.txt - - bitstreams/nested/README.txt + + testdata/bitstreams/nested/README.txt - - bitstreams/nested/subfolder/README.txt + + testdata/bitstreams/nested/subfolder/README.txt - - bitstreams/test.jpg + + testdata/bitstreams/test.jpg - - bitstreams/nested/test.jpg + + testdata/bitstreams/nested/test.jpg - - bitstreams/nested/subfolder/test.jpg + + testdata/bitstreams/nested/subfolder/test.jpg - - bitstreams/test.jpeg + + testdata/bitstreams/test.jpeg - - bitstreams/nested/test.jpeg + + testdata/bitstreams/nested/test.jpeg - - bitstreams/nested/subfolder/test.jpeg + + testdata/bitstreams/nested/subfolder/test.jpeg - - bitstreams/test.tif + + testdata/bitstreams/test.tif - - bitstreams/nested/test.tif + + testdata/bitstreams/nested/test.tif - - bitstreams/nested/subfolder/test.tif + + testdata/bitstreams/nested/subfolder/test.tif - - bitstreams/test.tiff + + testdata/bitstreams/test.tiff - - bitstreams/nested/test.tiff + + testdata/bitstreams/nested/test.tiff - - bitstreams/nested/subfolder/test.tiff + + testdata/bitstreams/nested/subfolder/test.tiff - - bitstreams/test.jp2 + + testdata/bitstreams/test.jp2 - - bitstreams/nested/test.jp2 + + testdata/bitstreams/nested/test.jp2 - - bitstreams/nested/subfolder/test.jp2 + + testdata/bitstreams/nested/subfolder/test.jp2 - - bitstreams/test.png + + testdata/bitstreams/test.png - - bitstreams/nested/test.png + + testdata/bitstreams/nested/test.png - - bitstreams/nested/subfolder/test.png + + testdata/bitstreams/nested/subfolder/test.png - - bitstreams/test.mp4 + + testdata/bitstreams/test.mp4 - - bitstreams/nested/test.mp4 + + testdata/bitstreams/nested/test.mp4 - - bitstreams/nested/subfolder/test.mp4 + + testdata/bitstreams/nested/subfolder/test.mp4 - - bitstreams/test.7z + + testdata/bitstreams/test.7z - - bitstreams/nested/test.7z + + testdata/bitstreams/nested/test.7z - - bitstreams/nested/subfolder/test.7z + + testdata/bitstreams/nested/subfolder/test.7z - - bitstreams/test.gz + + testdata/bitstreams/test.gz - - bitstreams/nested/test.gz + + testdata/bitstreams/nested/test.gz - - bitstreams/nested/subfolder/test.gz + + testdata/bitstreams/nested/subfolder/test.gz - - bitstreams/test.gzip + + testdata/bitstreams/test.gzip - - bitstreams/nested/test.gzip + + testdata/bitstreams/nested/test.gzip - - bitstreams/nested/subfolder/test.gzip + + testdata/bitstreams/nested/subfolder/test.gzip - - bitstreams/test.tar + + testdata/bitstreams/test.tar - - bitstreams/nested/test.tar + + testdata/bitstreams/nested/test.tar - - bitstreams/nested/subfolder/test.tar + + testdata/bitstreams/nested/subfolder/test.tar - - bitstreams/test.tar.gz + + testdata/bitstreams/test.tar.gz - - bitstreams/nested/test.tar.gz + + testdata/bitstreams/nested/test.tar.gz - - bitstreams/nested/subfolder/test.tar.gz + + testdata/bitstreams/nested/subfolder/test.tar.gz - - bitstreams/test.tgz + + testdata/bitstreams/test.tgz - - bitstreams/nested/test.tgz + + testdata/bitstreams/nested/test.tgz - - bitstreams/nested/subfolder/test.tgz + + testdata/bitstreams/nested/subfolder/test.tgz - - bitstreams/test.z + + testdata/bitstreams/test.z - - bitstreams/nested/test.z + + testdata/bitstreams/nested/test.z - - bitstreams/nested/subfolder/test.z + + testdata/bitstreams/nested/subfolder/test.z - - bitstreams/test.zip + + testdata/bitstreams/test.zip - - bitstreams/nested/test.zip + + testdata/bitstreams/nested/test.zip - - bitstreams/nested/subfolder/test.zip + + testdata/bitstreams/nested/subfolder/test.zip - - bitstreams/test.csv + + testdata/bitstreams/test.csv - - bitstreams/nested/test.csv + + testdata/bitstreams/nested/test.csv - - bitstreams/nested/subfolder/test.csv + + testdata/bitstreams/nested/subfolder/test.csv - - bitstreams/test.txt + + testdata/bitstreams/test.txt - - bitstreams/nested/test.txt + + testdata/bitstreams/nested/test.txt - - bitstreams/nested/subfolder/test.txt + + testdata/bitstreams/nested/subfolder/test.txt - - bitstreams/test.xml + + testdata/bitstreams/test.xml - - bitstreams/nested/test.xml + + testdata/bitstreams/nested/test.xml - - bitstreams/nested/subfolder/test.xml + + testdata/bitstreams/nested/subfolder/test.xml - - bitstreams/test.xsd + + testdata/bitstreams/test.xsd - - bitstreams/nested/test.xsd + + testdata/bitstreams/nested/test.xsd - - bitstreams/nested/subfolder/test.xsd + + testdata/bitstreams/nested/subfolder/test.xsd - - bitstreams/test.xsl + + testdata/bitstreams/test.xsl - - bitstreams/nested/test.xsl + + testdata/bitstreams/nested/test.xsl - - bitstreams/nested/subfolder/test.xsl + + testdata/bitstreams/nested/subfolder/test.xsl - - bitstreams/test.doc + + testdata/bitstreams/test.doc - - bitstreams/nested/test.doc + + testdata/bitstreams/nested/test.doc - - bitstreams/nested/subfolder/test.doc + + testdata/bitstreams/nested/subfolder/test.doc - - bitstreams/test.docx + + testdata/bitstreams/test.docx - - bitstreams/nested/test.docx + + testdata/bitstreams/nested/test.docx - - bitstreams/nested/subfolder/test.docx + + testdata/bitstreams/nested/subfolder/test.docx - - bitstreams/test.pdf + + testdata/bitstreams/test.pdf - - bitstreams/nested/test.pdf + + testdata/bitstreams/nested/test.pdf - - bitstreams/nested/subfolder/test.pdf + + testdata/bitstreams/nested/subfolder/test.pdf - - bitstreams/test.ppt + + testdata/bitstreams/test.ppt - - bitstreams/nested/test.ppt + + testdata/bitstreams/nested/test.ppt - - bitstreams/nested/subfolder/test.ppt + + testdata/bitstreams/nested/subfolder/test.ppt - - bitstreams/test.pptx + + testdata/bitstreams/test.pptx - - bitstreams/nested/test.pptx + + testdata/bitstreams/nested/test.pptx - - bitstreams/nested/subfolder/test.pptx + + testdata/bitstreams/nested/subfolder/test.pptx - - bitstreams/test.xls + + testdata/bitstreams/test.xls - - bitstreams/nested/test.xls + + testdata/bitstreams/nested/test.xls - - bitstreams/nested/subfolder/test.xls + + testdata/bitstreams/nested/subfolder/test.xls - - bitstreams/test.xlsx + + testdata/bitstreams/test.xlsx - - bitstreams/nested/test.xlsx + + testdata/bitstreams/nested/test.xlsx - - bitstreams/nested/subfolder/test.xlsx + + testdata/bitstreams/nested/subfolder/test.xlsx - - bitstreams/test.mp3 + + testdata/bitstreams/test.mp3 - - bitstreams/nested/test.mp3 + + testdata/bitstreams/nested/test.mp3 - - bitstreams/nested/subfolder/test.mp3 + + testdata/bitstreams/nested/subfolder/test.mp3 - - bitstreams/test.wav + + testdata/bitstreams/test.wav - - bitstreams/nested/test.wav + + testdata/bitstreams/nested/test.wav - - bitstreams/nested/subfolder/test.wav + + testdata/bitstreams/nested/subfolder/test.wav