From f944fafd93daeff1633903ad5a13e4866deb027d Mon Sep 17 00:00:00 2001 From: "david.buniatyan@gmail.com" Date: Mon, 19 Jul 2021 18:11:35 -0700 Subject: [PATCH 1/8] added retry to get to fix multiprocessing pytorch bug --- hub/core/storage/s3.py | 3 +++ requirements/common.txt | 3 +-- requirements/requirements.txt | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/hub/core/storage/s3.py b/hub/core/storage/s3.py index fbeae6c6f4..44d3226610 100644 --- a/hub/core/storage/s3.py +++ b/hub/core/storage/s3.py @@ -9,6 +9,8 @@ from hub.util.exceptions import S3DeletionError, S3GetError, S3ListError, S3SetError import hub +from retrying import retry + class S3Provider(StorageProvider): """Provider class for using S3 storage.""" @@ -83,6 +85,7 @@ def __setitem__(self, path, content): except Exception as err: raise S3SetError(err) + @retry(stop_max_attempt_number=3) def __getitem__(self, path): """Gets the object present at the path. diff --git a/requirements/common.txt b/requirements/common.txt index 95905f9289..8a4b400122 100644 --- a/requirements/common.txt +++ b/requirements/common.txt @@ -1,6 +1,4 @@ numpy -numcodecs>=0.7.3 -msgpack>=1.0.2 pillow==8.2.0 boto3 boto3-stubs[essential] @@ -9,3 +7,4 @@ humbug>=0.2.6 types-requests types-click tqdm +retrying \ No newline at end of file diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 0c39d0a344..5d72c6cd0a 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -8,3 +8,4 @@ Pillow~=8.2.0 lz4~=3.1.3 zstd~=1.4.5 requests~=2.25.1 +retrying~=1.3.3 \ No newline at end of file From 7a44305b8591e3f13e258c487b99f3052d4252c1 Mon Sep 17 00:00:00 2001 From: "david.buniatyan@gmail.com" Date: Tue, 10 Aug 2021 08:27:32 -0700 Subject: [PATCH 2/8] num_workers fix --- hub/core/transform/test_transform.py | 20 ++++++++++++++++++++ hub/core/transform/transform.py | 1 + 2 files changed, 21 insertions(+) diff --git a/hub/core/transform/test_transform.py b/hub/core/transform/test_transform.py index cc5a5dd225..605d312976 100644 --- a/hub/core/transform/test_transform.py +++ b/hub/core/transform/test_transform.py @@ -116,6 +116,26 @@ def test_chain_transform_list_small(ds): ) +@enabled_datasets +@pytest.mark.xfail(raises=hub.util.exceptions.InvalidOutputDatasetError, strict=False) +def test_chain_transform_list_small_zero(ds): + ls = [i for i in range(100)] + ds_out = ds + ds_out.create_tensor("image") + ds_out.create_tensor("label") + pipeline = hub.compose([fn1(mul=5, copy=2), fn2(mul=3, copy=3)]) + pipeline.eval(ls, ds_out, num_workers=0) + assert len(ds_out) == 600 + for i in range(100): + for index in range(6 * i, 6 * i + 6): + np.testing.assert_array_equal( + ds_out[index].image.numpy(), 15 * i * np.ones((337, 200)) + ) + np.testing.assert_array_equal( + ds_out[index].label.numpy(), 15 * i * np.ones((1,)) + ) + + @enabled_datasets @pytest.mark.xfail(raises=NotImplementedError, strict=True) def test_chain_transform_list_big(ds): diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index 74ed7c960a..26d7d6f665 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -115,6 +115,7 @@ def run( """Runs the pipeline on the input data to produce output samples and stores in the dataset. This receives arguments processed and sanitized by the Pipeline.eval method. """ + num_workers = max(num_workers, 1) size = math.ceil(len(data_in) / num_workers) slices = [data_in[i * size : (i + 1) * size] for i in range(num_workers)] From d05c3702693780bd47182b8327c07d0cec62b31d Mon Sep 17 00:00:00 2001 From: AbhinavTuli Date: Wed, 11 Aug 2021 00:02:50 +0530 Subject: [PATCH 3/8] fix to allow memory ds in serial/0 worker transforms --- hub/core/transform/test_transform.py | 1 - hub/util/transform.py | 12 +++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/hub/core/transform/test_transform.py b/hub/core/transform/test_transform.py index 605d312976..119a569fe3 100644 --- a/hub/core/transform/test_transform.py +++ b/hub/core/transform/test_transform.py @@ -117,7 +117,6 @@ def test_chain_transform_list_small(ds): @enabled_datasets -@pytest.mark.xfail(raises=hub.util.exceptions.InvalidOutputDatasetError, strict=False) def test_chain_transform_list_small_zero(ds): ls = [i for i in range(100)] ds_out = ds diff --git a/hub/util/transform.py b/hub/util/transform.py index ab9f03ec1a..4fa26d903f 100644 --- a/hub/util/transform.py +++ b/hub/util/transform.py @@ -172,8 +172,11 @@ def check_transform_data_in(data_in, scheduler: str) -> None: f"The data_in to transform is invalid. It should support __len__ operation." ) if isinstance(data_in, hub.core.dataset.Dataset): - base_storage = get_base_storage(data_in.storage) - if isinstance(base_storage, MemoryProvider) and scheduler != "threaded": + input_base_storage = get_base_storage(data_in.storage) + if isinstance(input_base_storage, MemoryProvider) and scheduler not in [ + "serial", + "threaded", + ]: raise InvalidOutputDatasetError( f"Transforms with data_in as a Dataset having base storage as MemoryProvider are only supported in threaded and serial mode. Current mode is {scheduler}." ) @@ -191,7 +194,10 @@ def check_transform_ds_out(ds_out: hub.core.dataset.Dataset, scheduler: str) -> ) output_base_storage = get_base_storage(ds_out.storage) - if isinstance(output_base_storage, MemoryProvider) and scheduler != "threaded": + if isinstance(output_base_storage, MemoryProvider) and scheduler not in [ + "serial", + "threaded", + ]: raise InvalidOutputDatasetError( f"Transforms with ds_out having base storage as MemoryProvider are only supported in threaded and serial mode. Current mode is {scheduler}." ) From cfc832ece30d2258af700c4aabcae1b1d8056587 Mon Sep 17 00:00:00 2001 From: "david.buniatyan@gmail.com" Date: Wed, 11 Aug 2021 00:54:45 -0700 Subject: [PATCH 4/8] parametrized tests --- hub/__init__.py | 2 +- hub/core/storage/s3.py | 3 --- hub/core/transform/test_transform.py | 25 ++++--------------------- hub/requirements/common.txt | 3 +-- hub/requirements/requirements.txt | 3 +-- hub/tests/common.py | 4 +++- 6 files changed, 10 insertions(+), 30 deletions(-) diff --git a/hub/__init__.py b/hub/__init__.py index 112f27218c..d834f4f18e 100644 --- a/hub/__init__.py +++ b/hub/__init__.py @@ -35,7 +35,7 @@ "ingest_kaggle", ] -__version__ = "2.0.5" +__version__ = "2.0.6" __encoded_version__ = np.array(__version__) hub_reporter.tags.append(f"version:{__version__}") diff --git a/hub/core/storage/s3.py b/hub/core/storage/s3.py index 5d25bce507..1ade64602c 100644 --- a/hub/core/storage/s3.py +++ b/hub/core/storage/s3.py @@ -9,8 +9,6 @@ from hub.util.exceptions import S3DeletionError, S3GetError, S3ListError, S3SetError import hub -from retrying import retry - class S3Provider(StorageProvider): """Provider class for using S3 storage.""" @@ -85,7 +83,6 @@ def __setitem__(self, path, content): except Exception as err: raise S3SetError(err) - @retry(stop_max_attempt_number=3) def __getitem__(self, path): """Gets the object present at the path. diff --git a/hub/core/transform/test_transform.py b/hub/core/transform/test_transform.py index 119a569fe3..d0a3d0b582 100644 --- a/hub/core/transform/test_transform.py +++ b/hub/core/transform/test_transform.py @@ -4,6 +4,7 @@ from click.testing import CliRunner from hub.core.storage.memory import MemoryProvider from hub.util.remove_cache import remove_memory_cache +from hub.tests.common import parametrize_num_workers from hub.tests.dataset_fixtures import enabled_datasets from hub.util.exceptions import InvalidOutputDatasetError @@ -98,32 +99,14 @@ def test_single_transform_hub_dataset_htypes(ds): @enabled_datasets -def test_chain_transform_list_small(ds): +@parametrize_num_workers +def test_chain_transform_list_small(ds, num_workers): ls = [i for i in range(100)] ds_out = ds ds_out.create_tensor("image") ds_out.create_tensor("label") pipeline = hub.compose([fn1(mul=5, copy=2), fn2(mul=3, copy=3)]) - pipeline.eval(ls, ds_out, num_workers=5) - assert len(ds_out) == 600 - for i in range(100): - for index in range(6 * i, 6 * i + 6): - np.testing.assert_array_equal( - ds_out[index].image.numpy(), 15 * i * np.ones((337, 200)) - ) - np.testing.assert_array_equal( - ds_out[index].label.numpy(), 15 * i * np.ones((1,)) - ) - - -@enabled_datasets -def test_chain_transform_list_small_zero(ds): - ls = [i for i in range(100)] - ds_out = ds - ds_out.create_tensor("image") - ds_out.create_tensor("label") - pipeline = hub.compose([fn1(mul=5, copy=2), fn2(mul=3, copy=3)]) - pipeline.eval(ls, ds_out, num_workers=0) + pipeline.eval(ls, ds_out, num_workers=num_workers) assert len(ds_out) == 600 for i in range(100): for index in range(6 * i, 6 * i + 6): diff --git a/hub/requirements/common.txt b/hub/requirements/common.txt index 8a4b400122..e8f7dd16fb 100644 --- a/hub/requirements/common.txt +++ b/hub/requirements/common.txt @@ -6,5 +6,4 @@ pathos humbug>=0.2.6 types-requests types-click -tqdm -retrying \ No newline at end of file +tqdm \ No newline at end of file diff --git a/hub/requirements/requirements.txt b/hub/requirements/requirements.txt index 5d72c6cd0a..3acb88a2c9 100644 --- a/hub/requirements/requirements.txt +++ b/hub/requirements/requirements.txt @@ -7,5 +7,4 @@ numcodecs~=0.7.3 Pillow~=8.2.0 lz4~=3.1.3 zstd~=1.4.5 -requests~=2.25.1 -retrying~=1.3.3 \ No newline at end of file +requests~=2.25.1 \ No newline at end of file diff --git a/hub/tests/common.py b/hub/tests/common.py index 67fe130261..8712d6bda2 100644 --- a/hub/tests/common.py +++ b/hub/tests/common.py @@ -20,8 +20,10 @@ NUM_BATCHES_PARAM = "num_batches" DTYPE_PARAM = "dtype" CHUNK_SIZE_PARAM = "chunk_size" +NUM_WORKERS_PARAM = "num_workers" NUM_BATCHES = (1, 5) +NUM_WORKERS = (0, 1, 2, 4, 8) CHUNK_SIZES = ( 1 * KB, @@ -39,7 +41,7 @@ parametrize_chunk_sizes = pytest.mark.parametrize(CHUNK_SIZE_PARAM, CHUNK_SIZES) parametrize_dtypes = pytest.mark.parametrize(DTYPE_PARAM, DTYPES) parametrize_num_batches = pytest.mark.parametrize(NUM_BATCHES_PARAM, NUM_BATCHES) - +parametrize_num_workers = pytest.mark.parametrize(NUM_WORKERS_PARAM, NUM_WORKERS) def current_test_name() -> str: full_name = os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0] # type: ignore From 31ea7d1a791a79125fef8463a3eb07b499c0337b Mon Sep 17 00:00:00 2001 From: "david.buniatyan@gmail.com" Date: Wed, 11 Aug 2021 00:56:54 -0700 Subject: [PATCH 5/8] formatting fixed --- hub/tests/common.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hub/tests/common.py b/hub/tests/common.py index 8712d6bda2..393d3a0dd9 100644 --- a/hub/tests/common.py +++ b/hub/tests/common.py @@ -43,6 +43,7 @@ parametrize_num_batches = pytest.mark.parametrize(NUM_BATCHES_PARAM, NUM_BATCHES) parametrize_num_workers = pytest.mark.parametrize(NUM_WORKERS_PARAM, NUM_WORKERS) + def current_test_name() -> str: full_name = os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0] # type: ignore test_file = full_name.split("::")[0].split("/")[-1].split(".py")[0] From 583806cdbfac0a8d480ce2d672c7602c050328b4 Mon Sep 17 00:00:00 2001 From: "david.buniatyan@gmail.com" Date: Wed, 11 Aug 2021 02:52:04 -0700 Subject: [PATCH 6/8] minimize threads --- hub/tests/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/tests/common.py b/hub/tests/common.py index 393d3a0dd9..d24032e857 100644 --- a/hub/tests/common.py +++ b/hub/tests/common.py @@ -23,7 +23,7 @@ NUM_WORKERS_PARAM = "num_workers" NUM_BATCHES = (1, 5) -NUM_WORKERS = (0, 1, 2, 4, 8) +NUM_WORKERS = (0, 1, 2, 4) CHUNK_SIZES = ( 1 * KB, From fde17f8945737a46b99c8e5d21a855b08ddd1db5 Mon Sep 17 00:00:00 2001 From: "david.buniatyan@gmail.com" Date: Wed, 11 Aug 2021 03:57:16 -0700 Subject: [PATCH 7/8] moved num work testing to simpler test --- hub/core/transform/test_transform.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hub/core/transform/test_transform.py b/hub/core/transform/test_transform.py index d0a3d0b582..757f7e043d 100644 --- a/hub/core/transform/test_transform.py +++ b/hub/core/transform/test_transform.py @@ -72,7 +72,8 @@ def test_single_transform_hub_dataset(ds): @enabled_datasets -def test_single_transform_hub_dataset_htypes(ds): +@parametrize_num_workers +def test_single_transform_hub_dataset_htypes(ds, num_workers): with CliRunner().isolated_filesystem(): with hub.dataset("./test/transform_hub_in_htypes") as data_in: data_in.create_tensor("image", htype="image", sample_compression="png") @@ -84,7 +85,7 @@ def test_single_transform_hub_dataset_htypes(ds): ds_out = ds ds_out.create_tensor("image") ds_out.create_tensor("label") - fn2(copy=1, mul=2).eval(data_in, ds_out, num_workers=5) + fn2(copy=1, mul=2).eval(data_in, ds_out, num_workers=num_workers) assert len(ds_out) == 99 for index in range(1, 100): np.testing.assert_array_equal( @@ -99,14 +100,13 @@ def test_single_transform_hub_dataset_htypes(ds): @enabled_datasets -@parametrize_num_workers def test_chain_transform_list_small(ds, num_workers): ls = [i for i in range(100)] ds_out = ds ds_out.create_tensor("image") ds_out.create_tensor("label") pipeline = hub.compose([fn1(mul=5, copy=2), fn2(mul=3, copy=3)]) - pipeline.eval(ls, ds_out, num_workers=num_workers) + pipeline.eval(ls, ds_out, num_workers=3) assert len(ds_out) == 600 for i in range(100): for index in range(6 * i, 6 * i + 6): From 6a8d1eb1a208e5ab661aea0fc25383a42ee15ea4 Mon Sep 17 00:00:00 2001 From: "david.buniatyan@gmail.com" Date: Wed, 11 Aug 2021 04:00:00 -0700 Subject: [PATCH 8/8] remove argument --- hub/core/transform/test_transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/core/transform/test_transform.py b/hub/core/transform/test_transform.py index 757f7e043d..a1833e4ced 100644 --- a/hub/core/transform/test_transform.py +++ b/hub/core/transform/test_transform.py @@ -100,7 +100,7 @@ def test_single_transform_hub_dataset_htypes(ds, num_workers): @enabled_datasets -def test_chain_transform_list_small(ds, num_workers): +def test_chain_transform_list_small(ds): ls = [i for i in range(100)] ds_out = ds ds_out.create_tensor("image")