Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improves S3 retry logic #2834

Merged
merged 3 commits into from
Apr 29, 2024

Merge pull request #2836 from activeloopai/retry_on_get_object_size

e1a7c78
Select commit
Loading
Failed to load commit list.
Sign in for the full log view
Merged

Improves S3 retry logic #2834

Merge pull request #2836 from activeloopai/retry_on_get_object_size
e1a7c78
Select commit
Loading
Failed to load commit list.
GitHub Actions / JUnit Test Report failed Apr 27, 2024 in 0s

2998 tests run, 1678 passed, 1319 skipped, 1 failed.

Annotations

Check failure on line 1036 in deeplake/api/tests/test_api.py

See this annotation in the file changed.

@github-actions github-actions / JUnit Test Report

test_api.test_dataset_deepcopy[True-2-hub_cloud_path-hub_cloud_dev_token]

deeplake.util.exceptions.InvalidTokenException: Token is invalid. Make sure the full token string is included and try again.
Raw output
self = <deeplake.core.storage.s3.S3Provider object at 0x7faf15a7df10>
prefix = ''

    def clear(self, prefix=""):
        """Deletes ALL data with keys having given prefix on the s3 bucket (under self.root).
    
        Warning:
            Exercise caution!
        """
        self.check_readonly()
        self._check_update_creds()
        path = posixpath.join(self.path, prefix) if prefix else self.path
        if self.resource is not None:
            try:
>               self._clear(path)

deeplake/core/storage/s3.py:418: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <deeplake.core.storage.s3.S3Provider object at 0x7faf15a7df10>
prefix = 'tmpde08_test_api_test_dataset_deepcopy-True-2-hub_cloud_path-hub_cloud_dev_token-_src1/'

    def _clear(self, prefix):
        bucket = self.resource.Bucket(self.bucket)
        for response in bucket.objects.filter(Prefix=prefix).delete():
>           if response["Errors"]:
E           KeyError: 'Errors'

deeplake/core/storage/s3.py:404: KeyError

During handling of the above exception, another exception occurred:

self = <deeplake.client.client.DeepLakeBackendClient object at 0x7faf17c41150>
org_id = 'testingacc2'
ds_name = 'tmpde08_test_api_test_dataset_deepcopy-True-2-hub_cloud_path-hub_cloud_dev_token-_src1'
mode = 'a', db_engine = {'enabled': False}, no_cache = True

    def get_dataset_credentials(
        self,
        org_id: str,
        ds_name: str,
        mode: Optional[str] = None,
        db_engine: Optional[dict] = None,
        no_cache: bool = False,
    ):
        """Retrieves temporary 12 hour credentials for the required dataset from the backend.
    
        Args:
            org_id (str): The name of the user/organization to which the dataset belongs.
            ds_name (str): The name of the dataset being accessed.
            mode (str, optional): The mode in which the user has requested to open the dataset.
                If not provided, the backend will set mode to 'a' if user has write permission, else 'r'.
            db_engine (dict, optional): The database engine args to use for the dataset.
            no_cache (bool): If True, cached creds are ignored and new creds are returned. Default False.
    
        Returns:
            tuple: containing full url to dataset, credentials, mode and expiration time respectively.
    
        Raises:
            UserNotLoggedInException: When user is not authenticated
            InvalidTokenException: If the specified token is invalid
            TokenPermissionError: when there are permission or other errors related to token
            AgreementNotAcceptedError: when user has not accepted the agreement
            NotLoggedInAgreementError: when user is not authenticated and dataset has agreement which needs to be signed
        """
        import json
    
        db_engine = db_engine or {}
        relative_url = GET_DATASET_CREDENTIALS_SUFFIX.format(org_id, ds_name)
        try:
>           response = self.request(
                "GET",
                relative_url,
                endpoint=self.endpoint(),
                params={
                    "mode": mode,
                    "no_cache": no_cache,
                    "db_engine": json.dumps(db_engine),
                },
            ).json()

deeplake/client/client.py:196: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
deeplake/client/client.py:148: in request
    check_response_status(response)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

response = <Response [403]>

    def check_response_status(response: requests.Response):
        """Check response status and throw corresponding exception on failure."""
        code = response.status_code
        if code >= 200 and code < 300:
            return
    
        try:
            message = response.json()["description"]
        except Exception:
            message = " "
    
        if code == 400:
            raise BadRequestException(message)
        elif response.status_code == 401:
            raise AuthenticationException
        elif response.status_code == 403:
>           raise AuthorizationException(message, response=response)
E           deeplake.util.exceptions.AuthorizationException: You don't have permission to write to this dataset (testingacc2/tmpde08_test_api_test_dataset_deepcopy-True-2-hub_cloud_path-hub_cloud_dev_token-_src1). If you have read permissions try accessing it with read_only=True.

deeplake/client/utils.py:60: AuthorizationException

During handling of the above exception, another exception occurred:

self = <deeplake.client.client.DeepLakeBackendClient object at 0x7faf17c41150>
org_id = 'testingacc2'
ds_name = 'tmpde08_test_api_test_dataset_deepcopy-True-2-hub_cloud_path-hub_cloud_dev_token-_src1'
mode = 'a', db_engine = {'enabled': False}, no_cache = True

    def get_dataset_credentials(
        self,
        org_id: str,
        ds_name: str,
        mode: Optional[str] = None,
        db_engine: Optional[dict] = None,
        no_cache: bool = False,
    ):
        """Retrieves temporary 12 hour credentials for the required dataset from the backend.
    
        Args:
            org_id (str): The name of the user/organization to which the dataset belongs.
            ds_name (str): The name of the dataset being accessed.
            mode (str, optional): The mode in which the user has requested to open the dataset.
                If not provided, the backend will set mode to 'a' if user has write permission, else 'r'.
            db_engine (dict, optional): The database engine args to use for the dataset.
            no_cache (bool): If True, cached creds are ignored and new creds are returned. Default False.
    
        Returns:
            tuple: containing full url to dataset, credentials, mode and expiration time respectively.
    
        Raises:
            UserNotLoggedInException: When user is not authenticated
            InvalidTokenException: If the specified token is invalid
            TokenPermissionError: when there are permission or other errors related to token
            AgreementNotAcceptedError: when user has not accepted the agreement
            NotLoggedInAgreementError: when user is not authenticated and dataset has agreement which needs to be signed
        """
        import json
    
        db_engine = db_engine or {}
        relative_url = GET_DATASET_CREDENTIALS_SUFFIX.format(org_id, ds_name)
        try:
            response = self.request(
                "GET",
                relative_url,
                endpoint=self.endpoint(),
                params={
                    "mode": mode,
                    "no_cache": no_cache,
                    "db_engine": json.dumps(db_engine),
                },
            ).json()
        except Exception as e:
            if isinstance(e, AuthorizationException):
                response_data = e.response.json()
                code = response_data.get("code")
                if code == 1:
                    agreements = response_data["agreements"]
                    agreements = [agreement["text"] for agreement in agreements]
                    raise AgreementNotAcceptedError(agreements) from e
                elif code == 2:
                    raise NotLoggedInAgreementError from e
                else:
                    try:
>                       jwt.decode(self.token, options={"verify_signature": False})
E                       AttributeError: 'DeepLakeBackendClient' object has no attribute 'token'

deeplake/client/client.py:218: AttributeError

During handling of the above exception, another exception occurred:

path = 'hub://testingacc2/tmpde08_test_api_test_dataset_deepcopy-True-2-hub_cloud_path-hub_cloud_dev_token-'
hub_token = 'eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJpZCI6InRlc3RpbmdhY2MyIiwiYXBpX2tleSI6IjU4Y0tLb1p6UE1BbThPU2RpbTRiZ2tBekhWekt1VUE3MFJpNTNyZUpKRTJuaiJ9.'
num_workers = 2, progressbar = True

    @pytest.mark.parametrize(
        "path,hub_token",
        [
            ["local_path", "hub_cloud_dev_token"],
            pytest.param("hub_cloud_path", "hub_cloud_dev_token", marks=pytest.mark.slow),
        ],
        indirect=True,
    )
    @pytest.mark.parametrize("num_workers", [2])
    @pytest.mark.parametrize("progressbar", [True])
    def test_dataset_deepcopy(path, hub_token, num_workers, progressbar):
        src_path = "_".join((path, "src1"))
        dest_path = "_".join((path, "dest1"))
    
        src_ds = deeplake.empty(src_path, overwrite=True, token=hub_token)
    
        with src_ds:
            src_ds.info.update(key=0)
    
            src_ds.create_tensor("a", htype="image", sample_compression="png")
            src_ds.create_tensor("b", htype="class_label")
            src_ds.create_tensor("c")
            src_ds.create_tensor("d", dtype=bool)
    
            src_ds.d.info.update(key=1)
    
            src_ds["a"].append(np.ones((28, 28), dtype="uint8"))
            src_ds["b"].append(0)
    
        dest_ds = deeplake.deepcopy(
            src_path,
            dest_path,
            token=hub_token,
            num_workers=num_workers,
            progressbar=progressbar,
        )
    
        assert list(dest_ds.tensors) == ["a", "b", "c", "d"]
        assert dest_ds.a.meta.htype == "image"
        assert dest_ds.a.meta.sample_compression == "png"
        assert dest_ds.b.meta.htype == "class_label"
        assert dest_ds.c.meta.htype == None
        assert dest_ds.d.dtype == bool
    
        assert dest_ds.info.key == 0
        assert dest_ds.d.info.key == 1
    
        for tensor in dest_ds.meta.tensors:
            assert_array_equal(src_ds[tensor].numpy(), dest_ds[tensor].numpy())
    
>       deeplake.delete(src_path, token=hub_token)

deeplake/api/tests/test_api.py:1036: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
deeplake/util/spinner.py:151: in inner
    return func(*args, **kwargs)
deeplake/api/dataset.py:905: in delete
    ds.delete(large_ok=large_ok)
deeplake/core/dataset/deeplake_cloud_dataset.py:243: in delete
    super().delete(large_ok=large_ok)
deeplake/util/invalid_view_op.py:22: in inner
    return callable(x, *args, **kwargs)
deeplake/core/dataset/dataset.py:2664: in delete
    self.storage.clear()
deeplake/core/storage/lru_cache.py:373: in clear
    self.next_storage.clear(prefix=prefix)
deeplake/core/storage/s3.py:420: in clear
    with S3ResetReloadCredentialsManager(self, S3DeletionError):
deeplake/core/storage/s3.py:75: in __enter__
    self.s3p._check_update_creds(force=True)
deeplake/core/storage/s3.py:543: in _check_update_creds
    url, creds, mode, expiration, repo = client.get_dataset_credentials(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <deeplake.client.client.DeepLakeBackendClient object at 0x7faf17c41150>
org_id = 'testingacc2'
ds_name = 'tmpde08_test_api_test_dataset_deepcopy-True-2-hub_cloud_path-hub_cloud_dev_token-_src1'
mode = 'a', db_engine = {'enabled': False}, no_cache = True

    def get_dataset_credentials(
        self,
        org_id: str,
        ds_name: str,
        mode: Optional[str] = None,
        db_engine: Optional[dict] = None,
        no_cache: bool = False,
    ):
        """Retrieves temporary 12 hour credentials for the required dataset from the backend.
    
        Args:
            org_id (str): The name of the user/organization to which the dataset belongs.
            ds_name (str): The name of the dataset being accessed.
            mode (str, optional): The mode in which the user has requested to open the dataset.
                If not provided, the backend will set mode to 'a' if user has write permission, else 'r'.
            db_engine (dict, optional): The database engine args to use for the dataset.
            no_cache (bool): If True, cached creds are ignored and new creds are returned. Default False.
    
        Returns:
            tuple: containing full url to dataset, credentials, mode and expiration time respectively.
    
        Raises:
            UserNotLoggedInException: When user is not authenticated
            InvalidTokenException: If the specified token is invalid
            TokenPermissionError: when there are permission or other errors related to token
            AgreementNotAcceptedError: when user has not accepted the agreement
            NotLoggedInAgreementError: when user is not authenticated and dataset has agreement which needs to be signed
        """
        import json
    
        db_engine = db_engine or {}
        relative_url = GET_DATASET_CREDENTIALS_SUFFIX.format(org_id, ds_name)
        try:
            response = self.request(
                "GET",
                relative_url,
                endpoint=self.endpoint(),
                params={
                    "mode": mode,
                    "no_cache": no_cache,
                    "db_engine": json.dumps(db_engine),
                },
            ).json()
        except Exception as e:
            if isinstance(e, AuthorizationException):
                response_data = e.response.json()
                code = response_data.get("code")
                if code == 1:
                    agreements = response_data["agreements"]
                    agreements = [agreement["text"] for agreement in agreements]
                    raise AgreementNotAcceptedError(agreements) from e
                elif code == 2:
                    raise NotLoggedInAgreementError from e
                else:
                    try:
                        jwt.decode(self.token, options={"verify_signature": False})
                    except Exception:
>                       raise InvalidTokenException
E                       deeplake.util.exceptions.InvalidTokenException: Token is invalid. Make sure the full token string is included and try again.

deeplake/client/client.py:220: InvalidTokenException