diff --git a/src/dsp_tools/fast_xmlupload/process_files.py b/src/dsp_tools/fast_xmlupload/process_files.py index 08462f908..e28ac80e3 100644 --- a/src/dsp_tools/fast_xmlupload/process_files.py +++ b/src/dsp_tools/fast_xmlupload/process_files.py @@ -5,6 +5,7 @@ import pickle import shutil import subprocess +import sys import uuid from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime @@ -78,9 +79,15 @@ def _process_files_in_parallel( input_dir: Path, output_dir: Path, nthreads: Optional[int] -) -> list[tuple[Path, Optional[Path]]]: +) -> tuple[ + list[tuple[Path, Optional[Path]]], + list[Path] + ]: """ Creates a thread pool and executes the file processing in parallel. + If a Docker API error occurs, the SIPI container is restarted, + and the unprocessed files are returned, + so that this function can be called again with the unprocessed files. Args: paths: a list of all paths to the files that should be processed @@ -89,8 +96,10 @@ def _process_files_in_parallel( nthreads: number of threads to use for processing Returns: - a list of tuples with the original file path and the path to the processed file. - If a file could not be processed, the second path is None. + - a list of tuples with the original file path and the path to the processed file. + (if a file could not be processed, the second path is None) + - a list of all paths that could not be processed + (this list will only have content if a Docker API error led to a restart of the SIPI container) """ with ThreadPoolExecutor(max_workers=nthreads) as pool: processing_jobs = [pool.submit( @@ -102,9 +111,20 @@ def _process_files_in_parallel( orig_filepath_2_uuid: list[tuple[Path, Optional[Path]]] = [] for processed in as_completed(processing_jobs): - orig_filepath_2_uuid.append(processed.result()) - - return orig_filepath_2_uuid + try: + orig_filepath_2_uuid.append(processed.result()) + except docker.errors.APIError: + print(f"{datetime.now()}: ERROR: A Docker exception occurred. Cancel jobs and restart SIPI...") + logger.error("A Docker exception occurred. Cancel jobs and restart SIPI...", exc_info=True) + for job in processing_jobs: + job.cancel() + _stop_and_remove_sipi_container() + _start_sipi_container_and_mount_volumes(input_dir, output_dir) + processed_paths = [x[0] for x in orig_filepath_2_uuid] + unprocessed_paths = [x for x in paths if x not in processed_paths] + return orig_filepath_2_uuid, unprocessed_paths + + return orig_filepath_2_uuid, [] def _write_result_to_pkl_file(result: list[tuple[Path, Optional[Path]]]) -> bool: @@ -168,24 +188,30 @@ def _check_params( def _get_file_paths_from_xml(xml_file: Path) -> list[Path]: """ Parse XML file to get all file paths. + If the same file is referenced several times in the XML, + it is only returned once. Args: xml_file: path to the XML file + Raises: + BaseError: if a referenced file doesn't exist in the file system + Returns: list of all paths in the tags """ tree: etree._ElementTree = etree.parse(xml_file) # type: ignore - bitstream_paths: list[Path] = [] + bitstream_paths: set[Path] = set() for x in tree.iter(): if x.text and etree.QName(x).localname.endswith("bitstream"): - if Path(x.text).is_file(): - bitstream_paths.append(Path(x.text)) + path = Path(x.text) + if path.is_file(): + bitstream_paths.add(path) else: - print(f"{datetime.now()}: ERROR: '{x.text}' is referenced in the XML file, but it doesn't exist. Skipping...") - logger.error(f"'{x.text}' is referenced in the XML file, but it doesn't exist. Skipping...") + print(f"{datetime.now()}: ERROR: '{path}' is referenced in the XML file, but it doesn't exist. Skipping...") + logger.error(f"'{path}' is referenced in the XML file, but it doesn't exist. Skipping...") - return bitstream_paths + return list(bitstream_paths) def _start_sipi_container_and_mount_volumes( @@ -201,6 +227,7 @@ def _start_sipi_container_and_mount_volumes( input_dir: the root directory of the images that should be processed, is mounted into the container output_dir: the output directory where the processed files should be written to, is mounted into the container """ + # prepare parameters for container creation container_name = "sipi" volumes = [ f"{input_dir.absolute()}:/sipi/processing-input", @@ -209,47 +236,25 @@ def _start_sipi_container_and_mount_volumes( entrypoint = ["tail", "-f", "/dev/null"] docker_client = docker.from_env() - # Docker container doesn't exist yet + # get container. if it doesn't exist: create and run it try: - container: Container = docker_client.containers.get(container_name) # pyright: ignore - except docker.errors.NotFound: # pyright: ignore + container: Container = docker_client.containers.get(container_name) + except docker.errors.NotFound: docker_client.containers.run(image="daschswiss/sipi:3.8.1", name=container_name, volumes=volumes, entrypoint=entrypoint, detach=True) + container = docker_client.containers.get(container_name) print(f"{datetime.now()}: Created and started Sipi container '{container_name}'.") logger.info(f"Created and started Sipi container '{container_name}'.") - return - # Docker container exists - if not container: - container_running = False - elif not container.attrs: - container_running = False - elif not container.attrs.get("State", {}).get("Running"): - container_running = False - else: - container_running = True - if container_running: - print(f"{datetime.now()}: Found running Sipi container '{container_name}'.") - logger.info(f"Found running Sipi container '{container_name}'.") - else: + # the container exists. if it is not running, restart it + container_running = bool(container.attrs and container.attrs.get("State", {}).get("Running")) + if not container_running: container.restart() - print(f"{datetime.now()}: Restarted existing Sipi container '{container_name}'.") - logger.info(f"Restarted existing Sipi container '{container_name}'.") - - -def _get_sipi_container() -> Optional[Container]: - """ - Finds the locally running Sipi container (searches for container name "sipi") - - Returns: - the reference to the Sipi container - """ - docker_client = docker.from_env() - try: - return docker_client.containers.get("sipi") # pyright: ignore - except docker.errors.NotFound: # pyright: ignore - print(f"{datetime.now()}: ERROR: Couldn't find a running Sipi container.") - logger.error("Couldn't find a running Sipi container.", exc_info=True) - return None + + # make container globally available + global sipi_container + sipi_container = docker_client.containers.get(container_name) + print(f"{datetime.now()}: Sipi container is running.") + logger.info("Sipi container is running.") def _stop_and_remove_sipi_container() -> None: @@ -559,7 +564,7 @@ def _process_file( file_category = _get_file_category_from_extension(in_file) if not file_category: return in_file, None - + if file_category == "OTHER": result = _process_other_file( in_file=in_file, @@ -580,8 +585,8 @@ def _process_file( out_dir=out_dir_full ) else: - print(f"{datetime.now()}: ERROR: Unexpected file category: {file_category}") - logger.error(f"Unexpected file category: {file_category}") + print(f"{datetime.now()}: ERROR: Unexpected file category for {in_file}: {file_category}") + logger.error(f"Unexpected file category for {in_file}: {file_category}") return in_file, None return result @@ -712,6 +717,38 @@ def _process_video_file( return in_file, converted_file_full_path +def handle_interruption( + all_paths: list[Path], + processed_paths: list[Path], + exception: BaseException, +) -> None: + """ + Handles an interruption of the processing. + Writes the processed and unprocessed files into two files, + and exits the program with exit code 1. + + Args: + all_paths: list of all paths that should be processed + processed_paths: list of all paths that were processed successfully + exception: the exception that was raised + """ + unprocessed_paths = [x for x in all_paths if x not in processed_paths] + with open("unprocessed_files.txt", "w", encoding="utf-8") as f: + f.write("\n".join([str(x) for x in unprocessed_paths])) + with open("processed_files.txt", "w", encoding="utf-8") as f: + f.write("\n".join([str(x) for x in processed_paths])) + + msg = ( + "An error occurred while processing the files. " + "2 files were written, listing the processed and the unprocessed files:\n" + " - 'processed_files.txt'\n - 'unprocessed_files.txt'." + ) + print(f"{datetime.now()}: ERROR: {msg}. The exception was: {exception}") + logger.error(msg, exc_info=exception) + + sys.exit(1) + + def process_files( input_dir: str, output_dir: str, @@ -751,8 +788,6 @@ def process_files( input_dir=input_dir_path, output_dir=output_dir_path ) - global sipi_container - sipi_container = _get_sipi_container() # get the paths of the files referenced in the XML file all_paths = _get_file_paths_from_xml(xml_file_path) @@ -767,29 +802,41 @@ def process_files( start_time = datetime.now() print(f"{start_time}: Start local file processing...") logger.info("Start local file processing...") - result = _process_files_in_parallel( - paths=all_paths, - input_dir=input_dir_path, - output_dir=output_dir_path, - nthreads=nthreads - ) + processed_files = [] + unprocessed_files = all_paths + while unprocessed_files: + try: + result, unprocessed_files = _process_files_in_parallel( + paths=all_paths, + input_dir=input_dir_path, + output_dir=output_dir_path, + nthreads=nthreads + ) + processed_files.extend(result) + except BaseException as exc: # pylint: disable=broad-exception-caught + handle_interruption( + all_paths=all_paths, + processed_paths=[x[1] for x in processed_files if x and x[1]], + exception=exc + ) # check if all files were processed end_time = datetime.now() print(f"{end_time}: Processing files took: {end_time - start_time}") logger.info(f"{end_time}: Processing files took: {end_time - start_time}") success = _check_if_all_files_were_processed( - result=result, + result=processed_files, all_paths=all_paths ) # write pickle file - if not _write_result_to_pkl_file(result): + if not _write_result_to_pkl_file(processed_files): success = False - print(f"{datetime.now()}: An error occurred while writing the result to the pickle file. The result was: {result}") - logger.error(f"An error occurred while writing the result to the pickle file. The result was: {result}") + print(f"{datetime.now()}: An error occurred while writing the result to the pickle file. The result was: {processed_files}") + logger.error(f"An error occurred while writing the result to the pickle file. The result was: {processed_files}") # remove the SIPI container - _stop_and_remove_sipi_container() + if success: + _stop_and_remove_sipi_container() return success diff --git a/src/dsp_tools/utils/logging.py b/src/dsp_tools/utils/logging.py index f873d5347..cabc9025d 100644 --- a/src/dsp_tools/utils/logging.py +++ b/src/dsp_tools/utils/logging.py @@ -30,8 +30,8 @@ def get_logger(name: str) -> logging.Logger: handler = logging.handlers.RotatingFileHandler( filename=logfile_directory / "logging.log", mode="a", - maxBytes=3*1024*1024, - backupCount=1 + maxBytes=5*1024*1024, + backupCount=4 ) handler.setFormatter(formatter) _logger.addHandler(handler)