Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(fast xmlupload): make process-files more robust (DEV-2235) #395

Merged
merged 11 commits into from
Jun 6, 2023
103 changes: 86 additions & 17 deletions src/dsp_tools/fast_xmlupload/process_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pickle
import shutil
import subprocess
import sys
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
Expand Down Expand Up @@ -102,7 +103,28 @@ def _process_files_in_parallel(

orig_filepath_2_uuid: list[tuple[Path, Optional[Path]]] = []
for processed in as_completed(processing_jobs):
orig_filepath_2_uuid.append(processed.result())
try:
orig_filepath_2_uuid.append(processed.result())
jnussbaum marked this conversation as resolved.
Show resolved Hide resolved
except docker.errors.APIError:
print(f"{datetime.now()}: ERROR: A Docker exception occurred. Restart SIPI and resume processing...")
logger.error("A Docker exception occurred. Restart SIPI and resume processing...", exc_info=True)
# cancel all running jobs
for job in processing_jobs:
job.cancel()
# restart sipi container
_stop_and_remove_sipi_container()
_start_sipi_container_and_mount_volumes(input_dir, output_dir)
global sipi_container
sipi_container = _get_sipi_container()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perfect candidate to extract into a method restart_sipi_container() - then you don't even need the comment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored these methods, because they were a bit spurious anyway.

# restart processing
processed_paths = [x[0] for x in orig_filepath_2_uuid]
unprocessed_paths = [x for x in paths if x not in processed_paths]
orig_filepath_2_uuid += _process_files_in_parallel(
paths=unprocessed_paths,
input_dir=input_dir,
output_dir=output_dir,
nthreads=nthreads
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing a recursive call like this can potentially be a cause for errors, if the max recursion depth gets reached. In general, it would probably be more robust to move calling this function again one "level out", so it's flat, not recursive.
But, to be honest, I'm not sure how big the danger is of this actually happening

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be safe, I moved calling this function again one level out. Can you check that there's no logical flaw in it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't look thoroughly, but at first glance it seems good


return orig_filepath_2_uuid

Expand Down Expand Up @@ -168,24 +190,30 @@ def _check_params(
def _get_file_paths_from_xml(xml_file: Path) -> list[Path]:
"""
Parse XML file to get all file paths.
If the same file is referenced several times in the XML,
it is only returned once.

Args:
xml_file: path to the XML file

Raises:
BaseError: if a referenced file doesn't exist in the file system

Returns:
list of all paths in the <bitstream> tags
"""
tree: etree._ElementTree = etree.parse(xml_file) # type: ignore
bitstream_paths: list[Path] = []
bitstream_paths: set[Path] = set()
for x in tree.iter():
if x.text and etree.QName(x).localname.endswith("bitstream"):
if Path(x.text).is_file():
bitstream_paths.append(Path(x.text))
path = Path(x.text)
if path.is_file():
bitstream_paths.add(path)
else:
print(f"{datetime.now()}: ERROR: '{x.text}' is referenced in the XML file, but it doesn't exist. Skipping...")
logger.error(f"'{x.text}' is referenced in the XML file, but it doesn't exist. Skipping...")
print(f"{datetime.now()}: ERROR: '{path}' is referenced in the XML file, but it doesn't exist. Skipping...")
logger.error(f"'{path}' is referenced in the XML file, but it doesn't exist. Skipping...")

return bitstream_paths
return list(bitstream_paths)


def _start_sipi_container_and_mount_volumes(
Expand Down Expand Up @@ -559,7 +587,7 @@ def _process_file(
file_category = _get_file_category_from_extension(in_file)
if not file_category:
return in_file, None

if file_category == "OTHER":
result = _process_other_file(
in_file=in_file,
Expand All @@ -580,8 +608,8 @@ def _process_file(
out_dir=out_dir_full
)
else:
print(f"{datetime.now()}: ERROR: Unexpected file category: {file_category}")
logger.error(f"Unexpected file category: {file_category}")
print(f"{datetime.now()}: ERROR: Unexpected file category for {in_file}: {file_category}")
logger.error(f"Unexpected file category for {in_file}: {file_category}")
return in_file, None

return result
Expand Down Expand Up @@ -712,6 +740,38 @@ def _process_video_file(
return in_file, converted_file_full_path


def handle_interruption(
all_paths: list[Path],
processed_paths: list[Path],
exception: BaseException,
) -> None:
"""
Handles an interruption of the processing.
Writes the processed and unprocessed files into two files,
and exits the program with exit code 1.

Args:
all_paths: list of all paths that should be processed
processed_paths: list of all paths that were processed successfully
exception: the exception that was raised
"""
unprocessed_paths = [x for x in all_paths if x not in processed_paths]
with open("unprocessed_files.txt", "w", encoding="utf-8") as f:
f.write("\n".join([str(x) for x in unprocessed_paths]))
with open("processed_files.txt", "w", encoding="utf-8") as f:
f.write("\n".join([str(x) for x in processed_paths]))

msg = (
"An error occurred while processing the files. "
"2 files were written, listing the processed and the unprocessed files:\n"
" - 'processed_files.txt'\n - 'unprocessed_files.txt'."
)
print(f"{datetime.now()}: ERROR: {msg}. The exception was: {exception}")
logger.error(msg, exc_info=exception)

sys.exit(1)


def process_files(
input_dir: str,
output_dir: str,
Expand Down Expand Up @@ -767,12 +827,20 @@ def process_files(
start_time = datetime.now()
print(f"{start_time}: Start local file processing...")
logger.info("Start local file processing...")
result = _process_files_in_parallel(
paths=all_paths,
input_dir=input_dir_path,
output_dir=output_dir_path,
nthreads=nthreads
)
result = []
try:
result = _process_files_in_parallel(
paths=all_paths,
input_dir=input_dir_path,
output_dir=output_dir_path,
nthreads=nthreads
)
except BaseException as exc: # pylint: disable=broad-exception-caught
handle_interruption(
all_paths=all_paths,
processed_paths=[x[1] for x in result if x and x[1]],
exception=exc
)

# check if all files were processed
end_time = datetime.now()
Expand All @@ -790,6 +858,7 @@ def process_files(
logger.error(f"An error occurred while writing the result to the pickle file. The result was: {result}")

# remove the SIPI container
_stop_and_remove_sipi_container()
if success:
_stop_and_remove_sipi_container()

return success
4 changes: 2 additions & 2 deletions src/dsp_tools/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def get_logger(name: str) -> logging.Logger:
handler = logging.handlers.RotatingFileHandler(
filename=logfile_directory / "logging.log",
mode="a",
maxBytes=3*1024*1024,
backupCount=1
maxBytes=5*1024*1024,
backupCount=4
)
handler.setFormatter(formatter)
_logger.addHandler(handler)
Expand Down