From bce66f727c0c20b86e30e82347fc78c8e2ced9dd Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Sun, 13 Oct 2024 16:04:16 +0545 Subject: [PATCH 1/8] pass storage options to s5cmd --- src/litdata/streaming/downloader.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/litdata/streaming/downloader.py b/src/litdata/streaming/downloader.py index 41e4a6a9e..5b9a0b333 100644 --- a/src/litdata/streaming/downloader.py +++ b/src/litdata/streaming/downloader.py @@ -68,10 +68,14 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None: local_filepath + ".lock", timeout=3 if obj.path.endswith(_INDEX_FILENAME) else 0 ): if self._s5cmd_available: + env = os.environ.copy() + if self._storage_options: + env.update(self._storage_options) proc = subprocess.Popen( f"s5cmd cp {remote_filepath} {local_filepath}", shell=True, stdout=subprocess.PIPE, + env=env, ) proc.wait() else: @@ -79,8 +83,6 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None: extra_args: Dict[str, Any] = {} - # try: - # with FileLock(local_filepath + ".lock", timeout=1): if not os.path.exists(local_filepath): # Issue: https://github.com/boto/boto3/issues/3113 self._client.client.download_file( From 1783215282cf18a0d63f9f24dc88d99ca9abef69 Mon Sep 17 00:00:00 2001 From: Bhimraj Yadav Date: Mon, 14 Oct 2024 11:21:46 +0545 Subject: [PATCH 2/8] Update src/litdata/streaming/downloader.py Co-authored-by: Deependu Jha --- src/litdata/streaming/downloader.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/litdata/streaming/downloader.py b/src/litdata/streaming/downloader.py index 5b9a0b333..5f071734a 100644 --- a/src/litdata/streaming/downloader.py +++ b/src/litdata/streaming/downloader.py @@ -68,8 +68,9 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None: local_filepath + ".lock", timeout=3 if obj.path.endswith(_INDEX_FILENAME) else 0 ): if self._s5cmd_available: - env = os.environ.copy() + env = None if self._storage_options: + env = os.environ.copy() env.update(self._storage_options) proc = subprocess.Popen( f"s5cmd cp {remote_filepath} {local_filepath}", From 8deb5d0fa746eaa899227954956108d7e5a12cbf Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 14 Oct 2024 11:44:31 +0545 Subject: [PATCH 3/8] add mock tests to test_s3_downloader_with_s5cmd --- tests/streaming/test_downloader.py | 58 +++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/tests/streaming/test_downloader.py b/tests/streaming/test_downloader.py index 7c79afe58..7a835a5fd 100644 --- a/tests/streaming/test_downloader.py +++ b/tests/streaming/test_downloader.py @@ -1,6 +1,6 @@ import os from unittest import mock -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch from litdata.streaming.downloader import ( AzureDownloader, @@ -21,6 +21,62 @@ def test_s3_downloader_fast(tmpdir, monkeypatch): popen_mock.wait.assert_called() +@patch("os.system") +@patch("subprocess.Popen") +def test_s3_downloader_with_s5cmd_no_storage_options(popen_mock, system_mock, tmpdir): + system_mock.return_value = 0 # Simulates s5cmd being available + process_mock = MagicMock() + popen_mock.return_value = process_mock + + # Initialize the S3Downloader without storage options + downloader = S3Downloader("s3://test_bucket", str(tmpdir), []) + + # Action: Call the download_file method + remote_filepath = "s3://test_bucket/sample_file.txt" + local_filepath = os.path.join(tmpdir, "sample_file.txt") + downloader.download_file(remote_filepath, local_filepath) + + # Assertion: Verify subprocess.Popen was called with correct arguments and no env variables + popen_mock.assert_called_once_with( # noqa: S604 + f"s5cmd cp {remote_filepath} {local_filepath}", + shell=True, + stdout=subprocess.PIPE, + env=None, + ) + process_mock.wait.assert_called_once() + + +@patch("os.system") +@patch("subprocess.Popen") +def test_s3_downloader_with_s5cmd_with_storage_options(popen_mock, system_mock, tmpdir): + system_mock.return_value = 0 # Simulates s5cmd being available + process_mock = MagicMock() + popen_mock.return_value = process_mock + + storage_options = {"AWS_ACCESS_KEY_ID": "dummy_key", "AWS_SECRET_ACCESS_KEY": "dummy_secret"} + + # Initialize the S3Downloader with storage options + downloader = S3Downloader("s3://test_bucket", str(tmpdir), [], storage_options) + + # Action: Call the download_file method + remote_filepath = "s3://test_bucket/sample_file.txt" + local_filepath = os.path.join(tmpdir, "sample_file.txt") + downloader.download_file(remote_filepath, local_filepath) + + # Create expected environment variables by merging the current env with storage_options + expected_env = os.environ.copy() + expected_env.update(storage_options) + + # Assertion: Verify subprocess.Popen was called with the correct arguments and environment variables + popen_mock.assert_called_once_with( # noqa: S604 + f"s5cmd cp {remote_filepath} {local_filepath}", + shell=True, + stdout=subprocess.PIPE, + env=expected_env, + ) + process_mock.wait.assert_called_once() + + @mock.patch("litdata.streaming.downloader._GOOGLE_STORAGE_AVAILABLE", True) def test_gcp_downloader(tmpdir, monkeypatch, google_mock): # Create mock objects From a1b4effc2eaedc1884efa9e35d7875798cb1809a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 14 Oct 2024 05:59:47 +0000 Subject: [PATCH 4/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/streaming/test_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/streaming/test_downloader.py b/tests/streaming/test_downloader.py index 7a835a5fd..841708400 100644 --- a/tests/streaming/test_downloader.py +++ b/tests/streaming/test_downloader.py @@ -37,7 +37,7 @@ def test_s3_downloader_with_s5cmd_no_storage_options(popen_mock, system_mock, tm downloader.download_file(remote_filepath, local_filepath) # Assertion: Verify subprocess.Popen was called with correct arguments and no env variables - popen_mock.assert_called_once_with( # noqa: S604 + popen_mock.assert_called_once_with( f"s5cmd cp {remote_filepath} {local_filepath}", shell=True, stdout=subprocess.PIPE, @@ -68,7 +68,7 @@ def test_s3_downloader_with_s5cmd_with_storage_options(popen_mock, system_mock, expected_env.update(storage_options) # Assertion: Verify subprocess.Popen was called with the correct arguments and environment variables - popen_mock.assert_called_once_with( # noqa: S604 + popen_mock.assert_called_once_with( f"s5cmd cp {remote_filepath} {local_filepath}", shell=True, stdout=subprocess.PIPE, From 9bf83af63e467fa95b6a322353d75e82df937410 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 14 Oct 2024 11:48:41 +0545 Subject: [PATCH 5/8] fix:consistent bucket name over the tests --- tests/streaming/test_downloader.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/streaming/test_downloader.py b/tests/streaming/test_downloader.py index 841708400..f4a8b9f51 100644 --- a/tests/streaming/test_downloader.py +++ b/tests/streaming/test_downloader.py @@ -29,10 +29,10 @@ def test_s3_downloader_with_s5cmd_no_storage_options(popen_mock, system_mock, tm popen_mock.return_value = process_mock # Initialize the S3Downloader without storage options - downloader = S3Downloader("s3://test_bucket", str(tmpdir), []) + downloader = S3Downloader("s3://random_bucket", str(tmpdir), []) # Action: Call the download_file method - remote_filepath = "s3://test_bucket/sample_file.txt" + remote_filepath = "s3://random_bucket/sample_file.txt" local_filepath = os.path.join(tmpdir, "sample_file.txt") downloader.download_file(remote_filepath, local_filepath) @@ -56,10 +56,10 @@ def test_s3_downloader_with_s5cmd_with_storage_options(popen_mock, system_mock, storage_options = {"AWS_ACCESS_KEY_ID": "dummy_key", "AWS_SECRET_ACCESS_KEY": "dummy_secret"} # Initialize the S3Downloader with storage options - downloader = S3Downloader("s3://test_bucket", str(tmpdir), [], storage_options) + downloader = S3Downloader("s3://random_bucket", str(tmpdir), [], storage_options) # Action: Call the download_file method - remote_filepath = "s3://test_bucket/sample_file.txt" + remote_filepath = "s3://random_bucket/sample_file.txt" local_filepath = os.path.join(tmpdir, "sample_file.txt") downloader.download_file(remote_filepath, local_filepath) From 98110d0671097d0613b69d24184bdd208b9674ae Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 14 Oct 2024 12:01:07 +0545 Subject: [PATCH 6/8] adds ignore --- tests/streaming/test_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/streaming/test_downloader.py b/tests/streaming/test_downloader.py index f4a8b9f51..c286fd1f9 100644 --- a/tests/streaming/test_downloader.py +++ b/tests/streaming/test_downloader.py @@ -37,7 +37,7 @@ def test_s3_downloader_with_s5cmd_no_storage_options(popen_mock, system_mock, tm downloader.download_file(remote_filepath, local_filepath) # Assertion: Verify subprocess.Popen was called with correct arguments and no env variables - popen_mock.assert_called_once_with( + popen_mock.assert_called_once_with( # type: ignore # noqa: S604 f"s5cmd cp {remote_filepath} {local_filepath}", shell=True, stdout=subprocess.PIPE, @@ -68,7 +68,7 @@ def test_s3_downloader_with_s5cmd_with_storage_options(popen_mock, system_mock, expected_env.update(storage_options) # Assertion: Verify subprocess.Popen was called with the correct arguments and environment variables - popen_mock.assert_called_once_with( + popen_mock.assert_called_once_with( # type: ignore # noqa: S604 f"s5cmd cp {remote_filepath} {local_filepath}", shell=True, stdout=subprocess.PIPE, From 5f576493b333aa662be47231928a5bd8118acced Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 14 Oct 2024 06:16:31 +0000 Subject: [PATCH 7/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/streaming/test_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/streaming/test_downloader.py b/tests/streaming/test_downloader.py index c286fd1f9..c92d8b8e9 100644 --- a/tests/streaming/test_downloader.py +++ b/tests/streaming/test_downloader.py @@ -37,7 +37,7 @@ def test_s3_downloader_with_s5cmd_no_storage_options(popen_mock, system_mock, tm downloader.download_file(remote_filepath, local_filepath) # Assertion: Verify subprocess.Popen was called with correct arguments and no env variables - popen_mock.assert_called_once_with( # type: ignore # noqa: S604 + popen_mock.assert_called_once_with( # type: ignore f"s5cmd cp {remote_filepath} {local_filepath}", shell=True, stdout=subprocess.PIPE, @@ -68,7 +68,7 @@ def test_s3_downloader_with_s5cmd_with_storage_options(popen_mock, system_mock, expected_env.update(storage_options) # Assertion: Verify subprocess.Popen was called with the correct arguments and environment variables - popen_mock.assert_called_once_with( # type: ignore # noqa: S604 + popen_mock.assert_called_once_with( # type: ignore f"s5cmd cp {remote_filepath} {local_filepath}", shell=True, stdout=subprocess.PIPE, From 4f4a1cc569ddf8c078fcbadcea7be8fec46fa48f Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 14 Oct 2024 12:05:33 +0545 Subject: [PATCH 8/8] reverted type:ignore --- tests/streaming/test_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/streaming/test_downloader.py b/tests/streaming/test_downloader.py index c92d8b8e9..f4a8b9f51 100644 --- a/tests/streaming/test_downloader.py +++ b/tests/streaming/test_downloader.py @@ -37,7 +37,7 @@ def test_s3_downloader_with_s5cmd_no_storage_options(popen_mock, system_mock, tm downloader.download_file(remote_filepath, local_filepath) # Assertion: Verify subprocess.Popen was called with correct arguments and no env variables - popen_mock.assert_called_once_with( # type: ignore + popen_mock.assert_called_once_with( f"s5cmd cp {remote_filepath} {local_filepath}", shell=True, stdout=subprocess.PIPE, @@ -68,7 +68,7 @@ def test_s3_downloader_with_s5cmd_with_storage_options(popen_mock, system_mock, expected_env.update(storage_options) # Assertion: Verify subprocess.Popen was called with the correct arguments and environment variables - popen_mock.assert_called_once_with( # type: ignore + popen_mock.assert_called_once_with( f"s5cmd cp {remote_filepath} {local_filepath}", shell=True, stdout=subprocess.PIPE,