Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(process-files): diverse improvements #656

Merged
merged 7 commits into from
Dec 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 27 additions & 15 deletions src/dsp_tools/commands/fast_xmlupload/process_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,14 @@ def _process_files_in_parallel(
- a list of all paths that could not be processed
(this list will only have content if a Docker API error led to a restart of the SIPI container)
"""
batchsize = 1000
msg = f"Processing {len(files_to_process)} files, in batches of {batchsize} files each..."
print(msg)
orig_filepath_2_uuid: list[tuple[Path, Optional[Path]]] = []
for batch in make_chunks(lst=files_to_process, length=1000):
for batch in make_chunks(lst=files_to_process, length=batchsize):
if unprocessed_paths := _launch_thread_pool(nthreads, input_dir, output_dir, batch, orig_filepath_2_uuid):
return orig_filepath_2_uuid, unprocessed_paths
print(f"Processed {len(orig_filepath_2_uuid)}/{len(files_to_process)} files")
return orig_filepath_2_uuid, []


Expand All @@ -124,15 +128,16 @@ def _launch_thread_pool(
files_to_process: list[Path],
orig_filepath_2_uuid: list[tuple[Path, Optional[Path]]],
) -> list[Path]:
counter = 0
total = len(files_to_process)
with ThreadPoolExecutor(max_workers=nthreads) as pool:
processing_jobs = [pool.submit(_process_file, f, input_dir, output_dir) for f in files_to_process]
for processed in as_completed(processing_jobs):
try:
orig_file, internal_file = processed.result()
orig_filepath_2_uuid.append((orig_file, internal_file))
msg = f"Successfully processed file {len(orig_filepath_2_uuid)}/{total} of this batch: {orig_file}"
print(f"{datetime.now()}: {msg}")
counter += 1
msg = f"Successfully processed file {counter}/{total} of this batch: {orig_file}"
logger.info(msg)
except docker.errors.APIError:
print(f"{datetime.now()}: ERROR: A Docker exception occurred. Cancel jobs and restart SIPI...")
Expand Down Expand Up @@ -222,16 +227,18 @@ def _get_file_paths_from_xml(xml_file: Path) -> list[Path]:
"""
tree: etree._ElementTree[etree._Element] = etree.parse(xml_file)
bitstream_paths: set[Path] = set()
errors = []
for x in tree.iter():
if x.text and etree.QName(x).localname.endswith("bitstream"):
path = Path(x.text)
if path.is_file():
bitstream_paths.add(path)
else:
msg = f"{datetime.now()}: ERROR: '{path}' is referenced in the XML file, but it doesn't exist."
logger.error(msg)
raise UserError(msg)

errors.append(f"'{path}' is referenced in the XML file, but it doesn't exist.")
if errors:
msg = "\n".join(errors)
logger.error(msg)
raise UserError(msg)
return list(bitstream_paths)


Expand All @@ -251,7 +258,7 @@ def _restart_sipi_container(
global sipi_container
docker_client = docker.from_env()
sipi_container = docker_client.containers.run(
image="daschswiss/sipi:3.8.1",
image="daschswiss/sipi:3.8.6",
name="sipi",
volumes=[
f"{input_dir.absolute()}:/sipi/processing-input",
Expand Down Expand Up @@ -341,7 +348,6 @@ def _convert_file_with_sipi(
return False
result = sipi_container.exec_run(f"/sipi/sipi '{in_file_sipi_path}' {out_file_sipi_path}")
if result.exit_code != 0:
print(f"{datetime.now()}: ERROR: Sipi conversion of {in_file_local_path} failed: {result}")
logger.error(f"Sipi conversion of {in_file_local_path} failed: {result}")
return False
return True
Expand Down Expand Up @@ -668,12 +674,18 @@ def _process_image_file(
If there was an error, the internal filename is None.
"""
converted_file_full_path = out_dir / Path(internal_filename).with_suffix(".jp2")
sipi_result = _convert_file_with_sipi(
in_file_local_path=in_file,
input_dir=input_dir,
out_file_local_path=converted_file_full_path,
output_dir=out_dir,
)
sipi_result = False
counter = 0
while not sipi_result:
sipi_result = _convert_file_with_sipi(
in_file_local_path=in_file,
input_dir=input_dir,
out_file_local_path=converted_file_full_path,
output_dir=out_dir,
)
counter += 1
if counter > 4:
break
if not sipi_result:
print(f"{datetime.now()}: ERROR: Couldn't process file of category IMAGE: {in_file}")
logger.error(f"Couldn't process file of category IMAGE: {in_file}")
Expand Down