Skip to content

Commit

Permalink
fix(process-files): fix resource leak, try omitting restart of python (
Browse files Browse the repository at this point in the history
  • Loading branch information
jnussbaum committed Nov 21, 2023
1 parent 431a755 commit 7347be7
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 211 deletions.
25 changes: 0 additions & 25 deletions docs/internal/fast-xmlupload.md
Expand Up @@ -78,31 +78,6 @@ It contains a mapping from the original files to the processed files,
e.g. `multimedia/dog.jpg``tmp/0b/22/0b22570d-515f-4c3d-a6af-e42b458e7b2b.jp2`.


### Important Note: Resource Leak

**Due to a resource leak, Python must be quitted after a certain time.**
**For big datasets, only a batch of files is processed, then Python exits with exit code 2.**
**In this case, you need to restart the command several times, until the exit code is 0.**
**Only then, all files are processed.**
**Unexpected errors result in exit code 1.**
**If this batch splitting happens, every run produces a new pickle file.**

You can orchestrate this with a shell script, e.g.:

```bash
exit_code=2
while [ $exit_code -eq 2 ]; do
dsp-tools process-files --input-dir=multimedia --output-dir=tmp data.xml
exit_code=$?
done

if [ $exit_code -ne 0 ]; then
echo "Error: exit code $exit_code"
exit $exit_code
fi
```


## 3. `dsp-tools upload-files`

After all files are processed, the upload step can be started.
Expand Down
4 changes: 0 additions & 4 deletions src/dsp_tools/cli.py
Expand Up @@ -137,9 +137,6 @@ def _make_parser(
"--output-dir", help="path to the output directory where the processed/transformed files should be written to"
)
parser_process_files.add_argument("--nthreads", type=int, default=None, help="number of threads to use")
parser_process_files.add_argument(
"--batchsize", type=int, default=3000, help="number of files to process before Python exits"
)
parser_process_files.add_argument("xml_file", help="path to XML file containing the data")

# upload-files
Expand Down Expand Up @@ -494,7 +491,6 @@ def _call_requested_action(args: argparse.Namespace) -> bool:
output_dir=args.output_dir,
xml_file=args.xml_file,
nthreads=args.nthreads,
batch_size=args.batchsize,
)
elif args.action == "upload-files":
success = upload_files(
Expand Down
167 changes: 42 additions & 125 deletions src/dsp_tools/commands/fast_xmlupload/process_files.py
Expand Up @@ -111,18 +111,27 @@ def _process_files_in_parallel(
(this list will only have content if a Docker API error led to a restart of the SIPI container)
"""
orig_filepath_2_uuid: list[tuple[Path, Optional[Path]]] = []
total = len(files_to_process)
num_of_processed_files = 0
for subbatch in make_chunks(lst=files_to_process, length=100):
with ThreadPoolExecutor(max_workers=nthreads) as pool:
processing_jobs = [pool.submit(_process_file, f, input_dir, output_dir) for f in subbatch]
for batch in make_chunks(lst=files_to_process, length=1000):
if unprocessed_paths := _launch_thread_pool(nthreads, input_dir, output_dir, batch, orig_filepath_2_uuid):
return orig_filepath_2_uuid, unprocessed_paths
return orig_filepath_2_uuid, []


def _launch_thread_pool(
nthreads: int | None,
input_dir: Path,
output_dir: Path,
files_to_process: list[Path],
orig_filepath_2_uuid: list[tuple[Path, Optional[Path]]],
) -> list[Path]:
total = len(files_to_process)
with ThreadPoolExecutor(max_workers=nthreads) as pool:
processing_jobs = [pool.submit(_process_file, f, input_dir, output_dir) for f in files_to_process]
for processed in as_completed(processing_jobs):
try:
orig_file, internal_file = processed.result()
orig_filepath_2_uuid.append((orig_file, internal_file))
num_of_processed_files += 1
msg = f"Successfully processed file {num_of_processed_files}/{total} of this batch: {orig_file}"
msg = f"Successfully processed file {len(orig_filepath_2_uuid)}/{total} of this batch: {orig_file}"
print(f"{datetime.now()}: {msg}")
logger.info(msg)
except docker.errors.APIError:
Expand All @@ -133,9 +142,8 @@ def _process_files_in_parallel(
_restart_sipi_container(input_dir, output_dir)
processed_paths = [x[0] for x in orig_filepath_2_uuid]
unprocessed_paths = [x for x in files_to_process if x not in processed_paths]
return orig_filepath_2_uuid, unprocessed_paths

return orig_filepath_2_uuid, []
return unprocessed_paths
return []


def _write_result_to_pkl_file(processed_files: list[tuple[Path, Optional[Path]]]) -> None:
Expand Down Expand Up @@ -190,9 +198,9 @@ def _check_input_params(
raise UserError(f"Couldn't create directory {out_dir_path}") from None

if not input_dir_path.is_dir():
raise UserError("input_dir is not a directory")
raise UserError(f"{input_dir} is not a directory")
if not xml_file_path.is_file():
raise UserError("xml_file is not a file")
raise UserError(f"{xml_file} is not a file")

return input_dir_path, out_dir_path, xml_file_path

Expand Down Expand Up @@ -731,39 +739,33 @@ def _process_video_file(
def _write_processed_and_unprocessed_files_to_txt_files(
all_files: list[Path],
processed_files: list[tuple[Path, Optional[Path]]],
) -> None:
) -> bool:
"""
Determine the files that were processed until now
(taking into account a possibly existing file 'processed_files.txt')
and write them to 'processed_files.txt'.
Determine the files that were not processed until now,
and write them to 'unprocessed_files.txt'
(possibly overwriting an existing file).
Determine the files that were processed and write them to 'processed_files.txt'.
Determine the files that were not processed and write them to 'unprocessed_files.txt'.
Args:
all_files: list of all paths that should be processed
processed_files: list of tuples (orig path, processed path). 2nd path is None if a file could not be processed.
Returns:
True if all multimedia files in the XML file were processed, False otherwise
"""
success = True
processed_original_paths = [x[0] for x in processed_files]
if Path("processed_files.txt").is_file():
with open("processed_files.txt", "r", encoding="utf-8") as f:
previously_processed_files = [Path(x) for x in f.read().splitlines()]
processed_original_paths += previously_processed_files

with open("processed_files.txt", "w", encoding="utf-8") as f:
with open("processed_files.txt", "x", encoding="utf-8") as f:
f.write("\n".join([str(x) for x in processed_original_paths]))
msg = "Wrote 'processed_files.txt'"

if unprocessed_original_paths := [x for x in all_files if x not in processed_original_paths]:
with open("unprocessed_files.txt", "w", encoding="utf-8") as f:
with open("unprocessed_files.txt", "x", encoding="utf-8") as f:
f.write("\n".join([str(x) for x in unprocessed_original_paths]))
msg += " and 'unprocessed_files.txt'"
elif Path("unprocessed_files.txt").is_file():
Path("unprocessed_files.txt").unlink()
msg += " and removed 'unprocessed_files.txt'"
success = False

print(f"{datetime.now()}: {msg}")
logger.info(msg)
return success


def handle_interruption(
Expand Down Expand Up @@ -829,76 +831,11 @@ def double_check_unprocessed_files(
raise UserError("The files 'unprocessed_files.txt' and 'processed_files.txt' are inconsistent")


def _determine_next_batch(
all_files: list[Path],
batch_size: int,
) -> tuple[list[Path], bool]:
"""
Looks in the input directory for txt files that contain the already processed files and the still unprocessed files.
If no such files are found, this run of `dsp-tools process-files` is the first one.
In this case, the first batch_size files (or less if there are less) are returned.
If such files are found, the already processed files are read from them,
and the next batch_size files are returned.
If all files have been processed, an empty list is returned.
Args:
all_files: list of all paths in the <bitstream> tags of the XML file
batch_size: number of files to process before Python exits
Raises:
UserError: if the files 'unprocessed_files.txt' and 'processed_files.txt' are inconsistent
Returns:
- the next batch of up to batch_size files that should be processed
(or empty list if all files have been processed)
- a boolean indicating if this is the last batch
"""
# read the already processed files
if Path("processed_files.txt").is_file():
with open("processed_files.txt", "r", encoding="utf-8") as f:
processed_files = [Path(x.strip()) for x in f.readlines()]
else:
processed_files = []

# read the still unprocessed files
if Path("unprocessed_files.txt").is_file():
with open("unprocessed_files.txt", "r", encoding="utf-8") as f:
unprocessed_files = [Path(x.strip()) for x in f.readlines()]
else:
unprocessed_files = all_files

# consistency check
double_check_unprocessed_files(
all_files=all_files,
processed_files=processed_files,
unprocessed_files=unprocessed_files,
)

# determine next batch
if len(unprocessed_files) <= batch_size:
next_batch = unprocessed_files
is_last_batch = True
else:
next_batch = unprocessed_files[:batch_size]
is_last_batch = False

# print and log
msg = (
f"Found {len(all_files)} bitstreams in the XML file, {len(unprocessed_files)} of them unprocessed. "
f"Process batch of {len(next_batch)} files..."
)
print(f"{datetime.now()}: {msg}")
logger.info(msg)

return next_batch, is_last_batch


def process_files(
input_dir: str,
output_dir: str,
xml_file: str,
nthreads: Optional[int],
batch_size: int,
) -> bool:
"""
Process the files referenced in the given XML file.
Expand All @@ -908,23 +845,14 @@ def process_files(
Additionally, writes a pickle file containing the mapping between the original files and the processed files,
e.g. Path('multimedia/nested/subfolder/test.tif'), Path('tmp/0b/22/0b22570d-515f-4c3d-a6af-e42b458e7b2b.jp2').
Due to a resource leak, the Python interpreter must be quitted after a while.
For this reason, the processing is done in batches, each batch containing batch_size files.
After each batch, the Python interpreter exits, and the CLI command must be executed again.
It automatically resumes where it left off.
Args:
input_dir: path to the directory where the files should be read from
output_dir: path to the directory where the transformed / created files should be written to
xml_file: path to xml file containing the resources
nthreads: number of threads to use for processing
batch_size: number of files to process before Python exits
Returns:
True --> exit code 0: all multimedia files in the XML file were processed
Error raised --> exit code 1: an error occurred while processing the current batch
with exit code 1: an error occurred while processing the current batch
exit with code 2: Python interpreter exits after each batch
True if all multimedia files in the XML file were processed, False otherwise
"""
input_dir_path, output_dir_path, xml_file_path = _check_input_params(
input_dir=input_dir,
Expand All @@ -936,19 +864,15 @@ def process_files(
input_dir=input_dir_path,
output_dir=output_dir_path,
)
files_to_process, is_last_batch = _determine_next_batch(
all_files=all_files,
batch_size=batch_size,
)
if any(path.suffix == ".mp4" for path in files_to_process):
if any(path.suffix == ".mp4" for path in all_files):
_get_export_moving_image_frames_script()

start_time = datetime.now()
print(f"{start_time}: Start local file processing...")
logger.info("Start local file processing...")

processed_files: list[tuple[Path, Optional[Path]]] = []
unprocessed_files = files_to_process
unprocessed_files = all_files
while unprocessed_files:
try:
result, unprocessed_files = _process_files_in_parallel(
Expand All @@ -969,23 +893,16 @@ def process_files(
print(f"{end_time}: Processing files took: {end_time - start_time}")
logger.info(f"Processing files took: {end_time - start_time}")

_write_processed_and_unprocessed_files_to_txt_files(
success = _write_processed_and_unprocessed_files_to_txt_files(
all_files=all_files,
processed_files=processed_files,
)
_write_result_to_pkl_file(processed_files)

exit_code = _determine_exit_code(
files_to_process=files_to_process,
processed_files=processed_files,
is_last_batch=is_last_batch,
)
match exit_code:
case 0:
# if there were problems, don't remove the sipi container. it might contain valuable log data.
_stop_and_remove_sipi_container()
return True
case 1:
sys.exit(1)
case 2:
sys.exit(2)
if success:
# if there were problems, don't remove the sipi container. it might contain valuable log data.
_stop_and_remove_sipi_container()
return True
else:
print("Something went wrong. The SIPI container is still available to be analyzed. Don't forget to remove it.")
return False
57 changes: 1 addition & 56 deletions test/e2e/commands/test_fast_xmlupload.py
Expand Up @@ -67,7 +67,7 @@ def tearDown(self) -> None:
for txt_file in self.txt_files:
Path(txt_file).unlink(missing_ok=True)

def test_fast_xmlupload_without_batching(self) -> None:
def test_fast_xmlupload(self) -> None:
"""
Test if it is possible to call the 3 steps of the fast XML upload, one after the other.
No detailed tests are done here, it is only tested if the 3 steps can be called.
Expand All @@ -78,7 +78,6 @@ def test_fast_xmlupload_without_batching(self) -> None:
output_dir=self.output_dir,
xml_file=self.xml_file,
nthreads=None,
batch_size=5000,
)
self.assertTrue(success_process)

Expand Down Expand Up @@ -106,60 +105,6 @@ def test_fast_xmlupload_without_batching(self) -> None:
)
self.assertTrue(success_fast_xmlupload)

def test_fast_xmlupload_batching(self) -> None:
"""
Test if the "batch_size" parameter of process_files() function works.
The test file contains 34 bitstreams, so a batch size of 15 should result in 3 batches.
The first 2 batches should exit with exit code 2 and success=True,
the 3rd batch should exit with exit code 0 and success=True.
Afterwards, the uploading step and the fast xmlupload step should work with the 3 pickle files.
"""

def action() -> bool:
print("test_fast_xmlupload_batching: call process_files() with batch size 15")
return process_files(
input_dir=str(self.input_dir),
output_dir=self.output_dir,
xml_file=self.xml_file,
nthreads=None,
batch_size=15,
)

# first 2 batches: exit code 2
for i in range(2):
with self.assertRaises(SystemExit) as cm:
action()
self.assertEqual(cm.exception.code, 2, msg=f"Failed in iteration {i}")

# third batch: exit code 0 and success == True
success = action()
self.assertTrue(success)

pickle_files = list(Path().glob("*.pkl"))
self.assertEqual(len(pickle_files), 3)

print("test_fast_xmlupload_batching: call upload_files()")
success_upload = upload_files(
dir_with_processed_files=self.output_dir,
nthreads=4,
user=self.user,
password=self.password,
dsp_url=self.dsp_url,
sipi_url=self.sipi_url,
)
self.assertTrue(success_upload)

print("test_fast_xmlupload_batching: call fast_xmlupload()")
success_fast_xmlupload = fast_xmlupload(
xml_file=self.xml_file,
user=self.user,
password=self.password,
dsp_url=self.dsp_url,
sipi_url=self.sipi_url,
)
self.assertTrue(success_fast_xmlupload)


if __name__ == "__main__":
pytest.main([__file__])

0 comments on commit 7347be7

Please sign in to comment.