Skip to content

Commit

Permalink
refactor: refactor process-files (DEV-2623) (#506)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum committed Sep 5, 2023
1 parent b5ea3e9 commit 7595846
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 43 deletions.
69 changes: 28 additions & 41 deletions src/dsp_tools/fast_xmlupload/process_files.py
Expand Up @@ -10,7 +10,7 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Optional, Union
from typing import Any, Literal, Optional, Union

import docker
import requests
Expand Down Expand Up @@ -43,18 +43,17 @@ def _get_export_moving_image_frames_script() -> None:
f.write(script_text)


def _determine_success_status_and_exit_code(
def _determine_exit_code(
files_to_process: list[Path],
processed_files: list[tuple[Path, Optional[Path]]],
is_last_batch: bool,
) -> tuple[bool, int]:
) -> Literal[0, 1, 2]:
"""
Based on the result of the file processing,
this function determines the success status and the exit code.
If some files of the current batch could not be processed,
the success status is false, and the exit code is 1.
If all files of the current batch were processed, the success status is true,
and the exit code is 0 if this is the last batch,
this function determines the exit code.
If some files of the current batch could not be processed, the exit code is 1.
If all files of the current batch were processed,
the exit code is 0 if this is the last batch,
and 2 if there are more batches to process.
Args:
Expand All @@ -63,21 +62,19 @@ def _determine_success_status_and_exit_code(
is_last_batch: true if this is the last batch of files to process
Returns:
tuple (success status, exit_code)
exit code
"""
processed_paths = [x[1] for x in processed_files if x and x[1]]
if len(processed_paths) == len(files_to_process):
success = True
print(f"{datetime.now()}: All files ({len(files_to_process)}) of this batch were processed: Okay")
logger.info(f"All files ({len(files_to_process)}) of this batch were processed: Okay")
if is_last_batch:
exit_code = 0
print(f"{datetime.now()}: All multimedia files referenced in the XML are processed. No more batches.")
logger.info("All multimedia files referenced in the XML are processed. No more batches.")
return 0
else:
exit_code = 2
return 2
else:
success = False
ratio = f"{len(processed_paths)}/{len(files_to_process)}"
msg = f"Some files of this batch could not be processed: Only {ratio} were processed. The failed ones are:"
print(f"{datetime.now()}: ERROR: {msg}")
Expand All @@ -86,9 +83,7 @@ def _determine_success_status_and_exit_code(
if not output_file:
print(f" - {input_file}")
logger.error(f" - {input_file}")
exit_code = 1

return success, exit_code
return 1


def _process_files_in_parallel(
Expand Down Expand Up @@ -824,15 +819,18 @@ def double_check_unprocessed_files(
"""
unprocessed_files_txt_exists = sorted(unprocessed_files) != sorted(all_files)
if unprocessed_files_txt_exists and not processed_files:
logger.error("There is a file 'unprocessed_files.txt', but no file 'processed_files.txt'")
raise UserError("There is a file 'unprocessed_files.txt', but no file 'processed_files.txt'")

if processed_files and sorted(unprocessed_files) == sorted(all_files):
logger.error("There is a file 'processed_files.txt', but no file 'unprocessed_files.txt'")
raise UserError("There is a file 'processed_files.txt', but no file 'unprocessed_files.txt'")

if unprocessed_files_txt_exists:
# there is a 'unprocessed_files.txt' file. check it for consistency
unprocessed_files_from_processed_files = [x for x in all_files if x not in processed_files]
if not sorted(unprocessed_files_from_processed_files) == sorted(unprocessed_files):
logger.error("The files 'unprocessed_files.txt' and 'processed_files.txt' are inconsistent")
raise UserError("The files 'unprocessed_files.txt' and 'processed_files.txt' are inconsistent")


Expand Down Expand Up @@ -929,40 +927,31 @@ def process_files(
Returns:
True --> exit code 0: all multimedia files in the XML file were processed
False --> exit code 1: an error occurred while processing the current batch
Error raised --> exit code 1: an error occurred while processing the current batch
with exit code 1: an error occurred while processing the current batch
exit with code 2: Python interpreter exits after each batch
"""
# check the input parameters
input_dir_path, output_dir_path, xml_file_path = _check_input_params(
input_dir=input_dir,
out_dir=output_dir,
xml_file=xml_file,
)

# startup the SIPI container
all_files = _get_file_paths_from_xml(xml_file_path)
_start_sipi_container_and_mount_volumes(
input_dir=input_dir_path,
output_dir=output_dir_path,
)

# get the files referenced in the XML file
all_files = _get_file_paths_from_xml(xml_file_path)

# find out if there was a previous processing attempt that should be continued
files_to_process, is_last_batch = _determine_next_batch(
all_files=all_files,
batch_size=batch_size,
)

# get shell script for processing video files
if any(path.suffix == ".mp4" for path in files_to_process):
_get_export_moving_image_frames_script()

# process the files in parallel
start_time = datetime.now()
print(f"{start_time}: Start local file processing...")
logger.info("Start local file processing...")

processed_files: list[tuple[Path, Optional[Path]]] = []
unprocessed_files = files_to_process
while unprocessed_files:
Expand All @@ -980,30 +969,28 @@ def process_files(
processed_files=processed_files,
exception=exc,
)

end_time = datetime.now()
print(f"{end_time}: Processing files took: {end_time - start_time}")
logger.info(f"Processing files took: {end_time - start_time}")

# write results to files
_write_processed_and_unprocessed_files_to_txt_files(
all_files=all_files,
processed_files=processed_files,
)
_write_result_to_pkl_file(processed_files)

# check if all files were processed
success, exit_code = _determine_success_status_and_exit_code(
exit_code = _determine_exit_code(
files_to_process=files_to_process,
processed_files=processed_files,
is_last_batch=is_last_batch,
)

# remove the SIPI container
if exit_code == 0:
_stop_and_remove_sipi_container()

# exit with correct exit code: 0 and 1 will automatically happen, but 2 must be done manually
if exit_code == 2:
sys.exit(2)

return success
match exit_code:
case 0:
# if there were problems, don't remove the sipi container. it might contain valuable log data.
_stop_and_remove_sipi_container()
return True
case 1:
sys.exit(1)
case 2:
sys.exit(2)
5 changes: 3 additions & 2 deletions test/e2e/test_fast_xmlupload.py
Expand Up @@ -126,12 +126,13 @@ def action() -> bool:
batch_size=15,
)

# first 2 batches: exit code 2
for i in range(2):
with self.assertRaises(SystemExit) as cm:
success = action()
self.assertTrue(success)
action()
self.assertEqual(cm.exception.code, 2, msg=f"Failed in iteration {i}")

# third batch: exit code 0 and success == True
success = action()
self.assertTrue(success)

Expand Down

0 comments on commit 7595846

Please sign in to comment.