From ca319aa0f8689b4f139e680da9c9d52f9e4fb895 Mon Sep 17 00:00:00 2001 From: yhl48 Date: Fri, 8 Nov 2024 18:12:23 +0000 Subject: [PATCH 1/2] added multithreading function to merge_datasets --- src/litdata/processing/functions.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/litdata/processing/functions.py b/src/litdata/processing/functions.py index dd62909a0..e1762e28f 100644 --- a/src/litdata/processing/functions.py +++ b/src/litdata/processing/functions.py @@ -521,12 +521,13 @@ class CopyInfo: new_filename: str -def merge_datasets(input_dirs: List[str], output_dir: str) -> None: +def merge_datasets(input_dirs: List[str], output_dir: str, max_workers: Optional[int] = os.cpu_count()) -> None: """Enables to merge multiple existing optimized datasets into a single optimized dataset. Args: input_dirs: A list of directories pointing to the existing optimized datasets. output_dir: The directory where the merged dataset would be stored. + max_workers: Number of workers for multithreading """ if len(input_dirs) == 0: @@ -537,6 +538,7 @@ def merge_datasets(input_dirs: List[str], output_dir: str) -> None: resolved_input_dirs = [_resolve_dir(input_dir) for input_dir in input_dirs] resolved_output_dir = _resolve_dir(output_dir) + max_workers = max_workers or 1 if any(input_dir == resolved_output_dir for input_dir in resolved_input_dirs): raise ValueError("The provided output_dir was found within the input_dirs. This isn't supported.") @@ -580,8 +582,11 @@ def merge_datasets(input_dirs: List[str], output_dir: str) -> None: _tqdm = _get_tqdm_iterator_if_available() - for copy_info in _tqdm(copy_infos): - _apply_copy(copy_info, resolved_output_dir) + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + futures: List[concurrent.futures.Future] = [] = [] + for copy_info in _tqdm(copy_infos): + future = executor.submit(_apply_copy, copy_info, resolved_output_dir) + futures.append(future) _save_index(index_json, resolved_output_dir) From 7555aaf609d49abb6004f8b3bfa6f77583bf55ea Mon Sep 17 00:00:00 2001 From: thomas chaton Date: Fri, 8 Nov 2024 18:33:13 +0000 Subject: [PATCH 2/2] Update functions.py --- src/litdata/processing/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/processing/functions.py b/src/litdata/processing/functions.py index e1762e28f..f1cd2fe29 100644 --- a/src/litdata/processing/functions.py +++ b/src/litdata/processing/functions.py @@ -583,7 +583,7 @@ def merge_datasets(input_dirs: List[str], output_dir: str, max_workers: Optional _tqdm = _get_tqdm_iterator_if_available() with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - futures: List[concurrent.futures.Future] = [] = [] + futures: List[concurrent.futures.Future] = [] for copy_info in _tqdm(copy_infos): future = executor.submit(_apply_copy, copy_info, resolved_output_dir) futures.append(future)