Skip to content

Commit

Permalink
chore(process-files): diverse improvements (#656)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum committed Dec 6, 2023
1 parent b3f75f4 commit 692ad40
Showing 1 changed file with 27 additions and 15 deletions.
42 changes: 27 additions & 15 deletions src/dsp_tools/commands/fast_xmlupload/process_files.py
Expand Up @@ -110,10 +110,14 @@ def _process_files_in_parallel(
- a list of all paths that could not be processed
(this list will only have content if a Docker API error led to a restart of the SIPI container)
"""
batchsize = 1000
msg = f"Processing {len(files_to_process)} files, in batches of {batchsize} files each..."
print(msg)
orig_filepath_2_uuid: list[tuple[Path, Optional[Path]]] = []
for batch in make_chunks(lst=files_to_process, length=1000):
for batch in make_chunks(lst=files_to_process, length=batchsize):
if unprocessed_paths := _launch_thread_pool(nthreads, input_dir, output_dir, batch, orig_filepath_2_uuid):
return orig_filepath_2_uuid, unprocessed_paths
print(f"Processed {len(orig_filepath_2_uuid)}/{len(files_to_process)} files")
return orig_filepath_2_uuid, []


Expand All @@ -124,15 +128,16 @@ def _launch_thread_pool(
files_to_process: list[Path],
orig_filepath_2_uuid: list[tuple[Path, Optional[Path]]],
) -> list[Path]:
counter = 0
total = len(files_to_process)
with ThreadPoolExecutor(max_workers=nthreads) as pool:
processing_jobs = [pool.submit(_process_file, f, input_dir, output_dir) for f in files_to_process]
for processed in as_completed(processing_jobs):
try:
orig_file, internal_file = processed.result()
orig_filepath_2_uuid.append((orig_file, internal_file))
msg = f"Successfully processed file {len(orig_filepath_2_uuid)}/{total} of this batch: {orig_file}"
print(f"{datetime.now()}: {msg}")
counter += 1
msg = f"Successfully processed file {counter}/{total} of this batch: {orig_file}"
logger.info(msg)
except docker.errors.APIError:
print(f"{datetime.now()}: ERROR: A Docker exception occurred. Cancel jobs and restart SIPI...")
Expand Down Expand Up @@ -222,16 +227,18 @@ def _get_file_paths_from_xml(xml_file: Path) -> list[Path]:
"""
tree: etree._ElementTree[etree._Element] = etree.parse(xml_file)
bitstream_paths: set[Path] = set()
errors = []
for x in tree.iter():
if x.text and etree.QName(x).localname.endswith("bitstream"):
path = Path(x.text)
if path.is_file():
bitstream_paths.add(path)
else:
msg = f"{datetime.now()}: ERROR: '{path}' is referenced in the XML file, but it doesn't exist."
logger.error(msg)
raise UserError(msg)

errors.append(f"'{path}' is referenced in the XML file, but it doesn't exist.")
if errors:
msg = "\n".join(errors)
logger.error(msg)
raise UserError(msg)
return list(bitstream_paths)


Expand All @@ -251,7 +258,7 @@ def _restart_sipi_container(
global sipi_container
docker_client = docker.from_env()
sipi_container = docker_client.containers.run(
image="daschswiss/sipi:3.8.1",
image="daschswiss/sipi:3.8.6",
name="sipi",
volumes=[
f"{input_dir.absolute()}:/sipi/processing-input",
Expand Down Expand Up @@ -341,7 +348,6 @@ def _convert_file_with_sipi(
return False
result = sipi_container.exec_run(f"/sipi/sipi '{in_file_sipi_path}' {out_file_sipi_path}")
if result.exit_code != 0:
print(f"{datetime.now()}: ERROR: Sipi conversion of {in_file_local_path} failed: {result}")
logger.error(f"Sipi conversion of {in_file_local_path} failed: {result}")
return False
return True
Expand Down Expand Up @@ -668,12 +674,18 @@ def _process_image_file(
If there was an error, the internal filename is None.
"""
converted_file_full_path = out_dir / Path(internal_filename).with_suffix(".jp2")
sipi_result = _convert_file_with_sipi(
in_file_local_path=in_file,
input_dir=input_dir,
out_file_local_path=converted_file_full_path,
output_dir=out_dir,
)
sipi_result = False
counter = 0
while not sipi_result:
sipi_result = _convert_file_with_sipi(
in_file_local_path=in_file,
input_dir=input_dir,
out_file_local_path=converted_file_full_path,
output_dir=out_dir,
)
counter += 1
if counter > 4:
break
if not sipi_result:
print(f"{datetime.now()}: ERROR: Couldn't process file of category IMAGE: {in_file}")
logger.error(f"Couldn't process file of category IMAGE: {in_file}")
Expand Down

0 comments on commit 692ad40

Please sign in to comment.