From 75958461006b435ab5bdadb85ab44524138e5fb0 Mon Sep 17 00:00:00 2001 From: Johannes Nussbaum <39048939+jnussbaum@users.noreply.github.com> Date: Tue, 5 Sep 2023 10:02:29 +0200 Subject: [PATCH] refactor: refactor process-files (DEV-2623) (#506) --- src/dsp_tools/fast_xmlupload/process_files.py | 69 ++++++++----------- test/e2e/test_fast_xmlupload.py | 5 +- 2 files changed, 31 insertions(+), 43 deletions(-) diff --git a/src/dsp_tools/fast_xmlupload/process_files.py b/src/dsp_tools/fast_xmlupload/process_files.py index 01505acfe..09a3e59e7 100644 --- a/src/dsp_tools/fast_xmlupload/process_files.py +++ b/src/dsp_tools/fast_xmlupload/process_files.py @@ -10,7 +10,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Optional, Union +from typing import Any, Literal, Optional, Union import docker import requests @@ -43,18 +43,17 @@ def _get_export_moving_image_frames_script() -> None: f.write(script_text) -def _determine_success_status_and_exit_code( +def _determine_exit_code( files_to_process: list[Path], processed_files: list[tuple[Path, Optional[Path]]], is_last_batch: bool, -) -> tuple[bool, int]: +) -> Literal[0, 1, 2]: """ Based on the result of the file processing, - this function determines the success status and the exit code. - If some files of the current batch could not be processed, - the success status is false, and the exit code is 1. - If all files of the current batch were processed, the success status is true, - and the exit code is 0 if this is the last batch, + this function determines the exit code. + If some files of the current batch could not be processed, the exit code is 1. + If all files of the current batch were processed, + the exit code is 0 if this is the last batch, and 2 if there are more batches to process. Args: @@ -63,21 +62,19 @@ def _determine_success_status_and_exit_code( is_last_batch: true if this is the last batch of files to process Returns: - tuple (success status, exit_code) + exit code """ processed_paths = [x[1] for x in processed_files if x and x[1]] if len(processed_paths) == len(files_to_process): - success = True print(f"{datetime.now()}: All files ({len(files_to_process)}) of this batch were processed: Okay") logger.info(f"All files ({len(files_to_process)}) of this batch were processed: Okay") if is_last_batch: - exit_code = 0 print(f"{datetime.now()}: All multimedia files referenced in the XML are processed. No more batches.") logger.info("All multimedia files referenced in the XML are processed. No more batches.") + return 0 else: - exit_code = 2 + return 2 else: - success = False ratio = f"{len(processed_paths)}/{len(files_to_process)}" msg = f"Some files of this batch could not be processed: Only {ratio} were processed. The failed ones are:" print(f"{datetime.now()}: ERROR: {msg}") @@ -86,9 +83,7 @@ def _determine_success_status_and_exit_code( if not output_file: print(f" - {input_file}") logger.error(f" - {input_file}") - exit_code = 1 - - return success, exit_code + return 1 def _process_files_in_parallel( @@ -824,15 +819,18 @@ def double_check_unprocessed_files( """ unprocessed_files_txt_exists = sorted(unprocessed_files) != sorted(all_files) if unprocessed_files_txt_exists and not processed_files: + logger.error("There is a file 'unprocessed_files.txt', but no file 'processed_files.txt'") raise UserError("There is a file 'unprocessed_files.txt', but no file 'processed_files.txt'") if processed_files and sorted(unprocessed_files) == sorted(all_files): + logger.error("There is a file 'processed_files.txt', but no file 'unprocessed_files.txt'") raise UserError("There is a file 'processed_files.txt', but no file 'unprocessed_files.txt'") if unprocessed_files_txt_exists: # there is a 'unprocessed_files.txt' file. check it for consistency unprocessed_files_from_processed_files = [x for x in all_files if x not in processed_files] if not sorted(unprocessed_files_from_processed_files) == sorted(unprocessed_files): + logger.error("The files 'unprocessed_files.txt' and 'processed_files.txt' are inconsistent") raise UserError("The files 'unprocessed_files.txt' and 'processed_files.txt' are inconsistent") @@ -929,40 +927,31 @@ def process_files( Returns: True --> exit code 0: all multimedia files in the XML file were processed - False --> exit code 1: an error occurred while processing the current batch Error raised --> exit code 1: an error occurred while processing the current batch + with exit code 1: an error occurred while processing the current batch exit with code 2: Python interpreter exits after each batch """ - # check the input parameters input_dir_path, output_dir_path, xml_file_path = _check_input_params( input_dir=input_dir, out_dir=output_dir, xml_file=xml_file, ) - - # startup the SIPI container + all_files = _get_file_paths_from_xml(xml_file_path) _start_sipi_container_and_mount_volumes( input_dir=input_dir_path, output_dir=output_dir_path, ) - - # get the files referenced in the XML file - all_files = _get_file_paths_from_xml(xml_file_path) - - # find out if there was a previous processing attempt that should be continued files_to_process, is_last_batch = _determine_next_batch( all_files=all_files, batch_size=batch_size, ) - - # get shell script for processing video files if any(path.suffix == ".mp4" for path in files_to_process): _get_export_moving_image_frames_script() - # process the files in parallel start_time = datetime.now() print(f"{start_time}: Start local file processing...") logger.info("Start local file processing...") + processed_files: list[tuple[Path, Optional[Path]]] = [] unprocessed_files = files_to_process while unprocessed_files: @@ -980,30 +969,28 @@ def process_files( processed_files=processed_files, exception=exc, ) + end_time = datetime.now() print(f"{end_time}: Processing files took: {end_time - start_time}") logger.info(f"Processing files took: {end_time - start_time}") - # write results to files _write_processed_and_unprocessed_files_to_txt_files( all_files=all_files, processed_files=processed_files, ) _write_result_to_pkl_file(processed_files) - # check if all files were processed - success, exit_code = _determine_success_status_and_exit_code( + exit_code = _determine_exit_code( files_to_process=files_to_process, processed_files=processed_files, is_last_batch=is_last_batch, ) - - # remove the SIPI container - if exit_code == 0: - _stop_and_remove_sipi_container() - - # exit with correct exit code: 0 and 1 will automatically happen, but 2 must be done manually - if exit_code == 2: - sys.exit(2) - - return success + match exit_code: + case 0: + # if there were problems, don't remove the sipi container. it might contain valuable log data. + _stop_and_remove_sipi_container() + return True + case 1: + sys.exit(1) + case 2: + sys.exit(2) diff --git a/test/e2e/test_fast_xmlupload.py b/test/e2e/test_fast_xmlupload.py index 533c4218a..5916761ab 100644 --- a/test/e2e/test_fast_xmlupload.py +++ b/test/e2e/test_fast_xmlupload.py @@ -126,12 +126,13 @@ def action() -> bool: batch_size=15, ) + # first 2 batches: exit code 2 for i in range(2): with self.assertRaises(SystemExit) as cm: - success = action() - self.assertTrue(success) + action() self.assertEqual(cm.exception.code, 2, msg=f"Failed in iteration {i}") + # third batch: exit code 0 and success == True success = action() self.assertTrue(success)