Skip to content

Commit

Permalink
fix(fast xmlupload): make process-files more robust (DEV-2235) (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum committed Jun 6, 2023
1 parent 3d646ad commit 12d527b
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 65 deletions.
173 changes: 110 additions & 63 deletions src/dsp_tools/fast_xmlupload/process_files.py
Expand Up @@ -5,6 +5,7 @@
import pickle
import shutil
import subprocess
import sys
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
Expand Down Expand Up @@ -78,9 +79,15 @@ def _process_files_in_parallel(
input_dir: Path,
output_dir: Path,
nthreads: Optional[int]
) -> list[tuple[Path, Optional[Path]]]:
) -> tuple[
list[tuple[Path, Optional[Path]]],
list[Path]
]:
"""
Creates a thread pool and executes the file processing in parallel.
If a Docker API error occurs, the SIPI container is restarted,
and the unprocessed files are returned,
so that this function can be called again with the unprocessed files.
Args:
paths: a list of all paths to the files that should be processed
Expand All @@ -89,8 +96,10 @@ def _process_files_in_parallel(
nthreads: number of threads to use for processing
Returns:
a list of tuples with the original file path and the path to the processed file.
If a file could not be processed, the second path is None.
- a list of tuples with the original file path and the path to the processed file.
(if a file could not be processed, the second path is None)
- a list of all paths that could not be processed
(this list will only have content if a Docker API error led to a restart of the SIPI container)
"""
with ThreadPoolExecutor(max_workers=nthreads) as pool:
processing_jobs = [pool.submit(
Expand All @@ -102,9 +111,20 @@ def _process_files_in_parallel(

orig_filepath_2_uuid: list[tuple[Path, Optional[Path]]] = []
for processed in as_completed(processing_jobs):
orig_filepath_2_uuid.append(processed.result())

return orig_filepath_2_uuid
try:
orig_filepath_2_uuid.append(processed.result())
except docker.errors.APIError:
print(f"{datetime.now()}: ERROR: A Docker exception occurred. Cancel jobs and restart SIPI...")
logger.error("A Docker exception occurred. Cancel jobs and restart SIPI...", exc_info=True)
for job in processing_jobs:
job.cancel()
_stop_and_remove_sipi_container()
_start_sipi_container_and_mount_volumes(input_dir, output_dir)
processed_paths = [x[0] for x in orig_filepath_2_uuid]
unprocessed_paths = [x for x in paths if x not in processed_paths]
return orig_filepath_2_uuid, unprocessed_paths

return orig_filepath_2_uuid, []


def _write_result_to_pkl_file(result: list[tuple[Path, Optional[Path]]]) -> bool:
Expand Down Expand Up @@ -168,24 +188,30 @@ def _check_params(
def _get_file_paths_from_xml(xml_file: Path) -> list[Path]:
"""
Parse XML file to get all file paths.
If the same file is referenced several times in the XML,
it is only returned once.
Args:
xml_file: path to the XML file
Raises:
BaseError: if a referenced file doesn't exist in the file system
Returns:
list of all paths in the <bitstream> tags
"""
tree: etree._ElementTree = etree.parse(xml_file) # type: ignore
bitstream_paths: list[Path] = []
bitstream_paths: set[Path] = set()
for x in tree.iter():
if x.text and etree.QName(x).localname.endswith("bitstream"):
if Path(x.text).is_file():
bitstream_paths.append(Path(x.text))
path = Path(x.text)
if path.is_file():
bitstream_paths.add(path)
else:
print(f"{datetime.now()}: ERROR: '{x.text}' is referenced in the XML file, but it doesn't exist. Skipping...")
logger.error(f"'{x.text}' is referenced in the XML file, but it doesn't exist. Skipping...")
print(f"{datetime.now()}: ERROR: '{path}' is referenced in the XML file, but it doesn't exist. Skipping...")
logger.error(f"'{path}' is referenced in the XML file, but it doesn't exist. Skipping...")

return bitstream_paths
return list(bitstream_paths)


def _start_sipi_container_and_mount_volumes(
Expand All @@ -201,6 +227,7 @@ def _start_sipi_container_and_mount_volumes(
input_dir: the root directory of the images that should be processed, is mounted into the container
output_dir: the output directory where the processed files should be written to, is mounted into the container
"""
# prepare parameters for container creation
container_name = "sipi"
volumes = [
f"{input_dir.absolute()}:/sipi/processing-input",
Expand All @@ -209,47 +236,25 @@ def _start_sipi_container_and_mount_volumes(
entrypoint = ["tail", "-f", "/dev/null"]
docker_client = docker.from_env()

# Docker container doesn't exist yet
# get container. if it doesn't exist: create and run it
try:
container: Container = docker_client.containers.get(container_name) # pyright: ignore
except docker.errors.NotFound: # pyright: ignore
container: Container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
docker_client.containers.run(image="daschswiss/sipi:3.8.1", name=container_name, volumes=volumes, entrypoint=entrypoint, detach=True)
container = docker_client.containers.get(container_name)
print(f"{datetime.now()}: Created and started Sipi container '{container_name}'.")
logger.info(f"Created and started Sipi container '{container_name}'.")
return

# Docker container exists
if not container:
container_running = False
elif not container.attrs:
container_running = False
elif not container.attrs.get("State", {}).get("Running"):
container_running = False
else:
container_running = True
if container_running:
print(f"{datetime.now()}: Found running Sipi container '{container_name}'.")
logger.info(f"Found running Sipi container '{container_name}'.")
else:
# the container exists. if it is not running, restart it
container_running = bool(container.attrs and container.attrs.get("State", {}).get("Running"))
if not container_running:
container.restart()
print(f"{datetime.now()}: Restarted existing Sipi container '{container_name}'.")
logger.info(f"Restarted existing Sipi container '{container_name}'.")


def _get_sipi_container() -> Optional[Container]:
"""
Finds the locally running Sipi container (searches for container name "sipi")
Returns:
the reference to the Sipi container
"""
docker_client = docker.from_env()
try:
return docker_client.containers.get("sipi") # pyright: ignore
except docker.errors.NotFound: # pyright: ignore
print(f"{datetime.now()}: ERROR: Couldn't find a running Sipi container.")
logger.error("Couldn't find a running Sipi container.", exc_info=True)
return None

# make container globally available
global sipi_container
sipi_container = docker_client.containers.get(container_name)
print(f"{datetime.now()}: Sipi container is running.")
logger.info("Sipi container is running.")


def _stop_and_remove_sipi_container() -> None:
Expand Down Expand Up @@ -559,7 +564,7 @@ def _process_file(
file_category = _get_file_category_from_extension(in_file)
if not file_category:
return in_file, None

if file_category == "OTHER":
result = _process_other_file(
in_file=in_file,
Expand All @@ -580,8 +585,8 @@ def _process_file(
out_dir=out_dir_full
)
else:
print(f"{datetime.now()}: ERROR: Unexpected file category: {file_category}")
logger.error(f"Unexpected file category: {file_category}")
print(f"{datetime.now()}: ERROR: Unexpected file category for {in_file}: {file_category}")
logger.error(f"Unexpected file category for {in_file}: {file_category}")
return in_file, None

return result
Expand Down Expand Up @@ -712,6 +717,38 @@ def _process_video_file(
return in_file, converted_file_full_path


def handle_interruption(
all_paths: list[Path],
processed_paths: list[Path],
exception: BaseException,
) -> None:
"""
Handles an interruption of the processing.
Writes the processed and unprocessed files into two files,
and exits the program with exit code 1.
Args:
all_paths: list of all paths that should be processed
processed_paths: list of all paths that were processed successfully
exception: the exception that was raised
"""
unprocessed_paths = [x for x in all_paths if x not in processed_paths]
with open("unprocessed_files.txt", "w", encoding="utf-8") as f:
f.write("\n".join([str(x) for x in unprocessed_paths]))
with open("processed_files.txt", "w", encoding="utf-8") as f:
f.write("\n".join([str(x) for x in processed_paths]))

msg = (
"An error occurred while processing the files. "
"2 files were written, listing the processed and the unprocessed files:\n"
" - 'processed_files.txt'\n - 'unprocessed_files.txt'."
)
print(f"{datetime.now()}: ERROR: {msg}. The exception was: {exception}")
logger.error(msg, exc_info=exception)

sys.exit(1)


def process_files(
input_dir: str,
output_dir: str,
Expand Down Expand Up @@ -751,8 +788,6 @@ def process_files(
input_dir=input_dir_path,
output_dir=output_dir_path
)
global sipi_container
sipi_container = _get_sipi_container()

# get the paths of the files referenced in the XML file
all_paths = _get_file_paths_from_xml(xml_file_path)
Expand All @@ -767,29 +802,41 @@ def process_files(
start_time = datetime.now()
print(f"{start_time}: Start local file processing...")
logger.info("Start local file processing...")
result = _process_files_in_parallel(
paths=all_paths,
input_dir=input_dir_path,
output_dir=output_dir_path,
nthreads=nthreads
)
processed_files = []
unprocessed_files = all_paths
while unprocessed_files:
try:
result, unprocessed_files = _process_files_in_parallel(
paths=all_paths,
input_dir=input_dir_path,
output_dir=output_dir_path,
nthreads=nthreads
)
processed_files.extend(result)
except BaseException as exc: # pylint: disable=broad-exception-caught
handle_interruption(
all_paths=all_paths,
processed_paths=[x[1] for x in processed_files if x and x[1]],
exception=exc
)

# check if all files were processed
end_time = datetime.now()
print(f"{end_time}: Processing files took: {end_time - start_time}")
logger.info(f"{end_time}: Processing files took: {end_time - start_time}")
success = _check_if_all_files_were_processed(
result=result,
result=processed_files,
all_paths=all_paths
)

# write pickle file
if not _write_result_to_pkl_file(result):
if not _write_result_to_pkl_file(processed_files):
success = False
print(f"{datetime.now()}: An error occurred while writing the result to the pickle file. The result was: {result}")
logger.error(f"An error occurred while writing the result to the pickle file. The result was: {result}")
print(f"{datetime.now()}: An error occurred while writing the result to the pickle file. The result was: {processed_files}")
logger.error(f"An error occurred while writing the result to the pickle file. The result was: {processed_files}")

# remove the SIPI container
_stop_and_remove_sipi_container()
if success:
_stop_and_remove_sipi_container()

return success
4 changes: 2 additions & 2 deletions src/dsp_tools/utils/logging.py
Expand Up @@ -30,8 +30,8 @@ def get_logger(name: str) -> logging.Logger:
handler = logging.handlers.RotatingFileHandler(
filename=logfile_directory / "logging.log",
mode="a",
maxBytes=3*1024*1024,
backupCount=1
maxBytes=5*1024*1024,
backupCount=4
)
handler.setFormatter(formatter)
_logger.addHandler(handler)
Expand Down

0 comments on commit 12d527b

Please sign in to comment.