22452 tests run, 11814 passed, 10632 skipped, 6 failed.
Annotations
Check failure on line 467 in deeplake/core/query/test/test_query.py
github-actions / JUnit Test Report
test_query.test_link_materialize[1]
deeplake.util.exceptions.SampleAppendError: Failed to append a sample to the tensor 'abc'. See more details in the traceback.
Raw output
self = Sample(is_lazy=True, path=https://picsum.photos/20/20)
def _read_from_path(self) -> bytes: # type: ignore
if self._buffer is None:
path_type = get_path_type(self.path)
try:
if path_type == "local":
self._buffer = self._read_from_local()
elif path_type == "gcs":
self._buffer = self._read_from_gcs()
elif path_type == "s3":
self._buffer = self._read_from_s3()
elif path_type == "azure":
self._buffer = self._read_from_azure()
elif path_type == "gdrive":
self._buffer = self._read_from_gdrive()
elif path_type == "http":
> self._buffer = self._read_from_http(timeout=self._timeout)
deeplake/core/sample.py:466:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Sample(is_lazy=True, path=https://picsum.photos/20/20), timeout = None
def _read_from_http(self, timeout=None) -> bytes:
assert self.path is not None
if "Authorization" in self._creds:
headers = {"Authorization": self._creds["Authorization"]}
else:
headers = {}
result = requests.get(self.path, headers=headers, timeout=timeout)
if result.status_code != 200:
> raise UnableToReadFromUrlError(self.path, result.status_code)
E deeplake.util.exceptions.UnableToReadFromUrlError: Unable to read from url https://picsum.photos/20/20. Status code: 503
deeplake/core/sample.py:532: UnableToReadFromUrlError
The above exception was the direct cause of the following exception:
sample_path = 'https://picsum.photos/20/20', sample_creds_key = None
link_creds = <deeplake.core.link_creds.LinkCreds object at 0x127801300>
verify = True
def read_linked_sample(
sample_path: str, sample_creds_key: Optional[str], link_creds, verify: bool
):
provider_type = get_path_type(sample_path)
try:
if provider_type == "local":
return deeplake.read(sample_path, verify=verify)
elif provider_type == "http":
> return _read_http_linked_sample(
link_creds, sample_creds_key, sample_path, verify
)
deeplake/core/linked_sample.py:29:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake/core/linked_sample.py:50: in wrapper
raise e
deeplake/core/linked_sample.py:43: in wrapper
return f(linked_creds, sample_creds_key, *args, **kwargs)
deeplake/core/linked_sample.py:72: in _read_http_linked_sample
return deeplake.read(sample_path, verify=verify, creds=creds)
deeplake/api/read.py:63: in read
return Sample(
deeplake/core/sample.py:106: in __init__
compressed_bytes = self._read_from_path()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Sample(is_lazy=True, path=https://picsum.photos/20/20)
def _read_from_path(self) -> bytes: # type: ignore
if self._buffer is None:
path_type = get_path_type(self.path)
try:
if path_type == "local":
self._buffer = self._read_from_local()
elif path_type == "gcs":
self._buffer = self._read_from_gcs()
elif path_type == "s3":
self._buffer = self._read_from_s3()
elif path_type == "azure":
self._buffer = self._read_from_azure()
elif path_type == "gdrive":
self._buffer = self._read_from_gdrive()
elif path_type == "http":
self._buffer = self._read_from_http(timeout=self._timeout)
except Exception as e:
> raise SampleReadError(self.path) from e # type: ignore
E deeplake.util.exceptions.SampleReadError: Unable to read sample from https://picsum.photos/20/20
deeplake/core/sample.py:468: SampleReadError
The above exception was the direct cause of the following exception:
self = <deeplake.core.linked_chunk_engine.LinkedChunkEngine object at 0x127800280>
samples = [<deeplake.core.linked_sample.LinkedSample object at 0x1278012d0>, <deeplake.core.linked_sample.LinkedSample object at...nked_sample.LinkedSample object at 0x153c10be0>, <deeplake.core.linked_sample.LinkedSample object at 0x153c11000>, ...]
verify = True, ignore_errors = False
def check_each_sample(self, samples, verify=True, ignore_errors=False):
link_creds = self.link_creds
verified_samples = []
skipped = []
for i, sample in enumerate(samples):
try:
if isinstance(sample, deeplake.core.tensor.Tensor) and sample.is_link:
sample = sample._linked_sample()
samples[i] = sample
elif (
not isinstance(sample, (LinkedSample, LinkedTiledSample))
and sample is not None
):
raise TypeError(
f"Expected LinkedSample or LinkedTiledSample, got {type(sample)} instead. Use deeplake.link() to link samples or deeplake.link_tiled() to link multiple images as tiles."
)
path, creds_key = get_path_creds_key(sample)
# verifies existence of creds_key
if verify:
link_creds.get_encoding(creds_key, path)
if sample is None or sample.path == "":
verified_samples.append(sample)
elif isinstance(sample, LinkedTiledSample):
verify_samples = self.verify and verify
sample.set_check_tile_shape(self.link_creds, verify_samples)
sample.set_sample_shape()
verified_samples.append(sample)
else:
try:
_verify = verify and self.verify
verified_samples.append(
> read_linked_sample(
sample.path,
sample.creds_key,
self.link_creds,
verify=_verify,
)
)
deeplake/core/linked_chunk_engine.py:280:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
sample_path = 'https://picsum.photos/20/20', sample_creds_key = None
link_creds = <deeplake.core.link_creds.LinkCreds object at 0x127801300>
verify = True
def read_linked_sample(
sample_path: str, sample_creds_key: Optional[str], link_creds, verify: bool
):
provider_type = get_path_type(sample_path)
try:
if provider_type == "local":
return deeplake.read(sample_path, verify=verify)
elif provider_type == "http":
return _read_http_linked_sample(
link_creds, sample_creds_key, sample_path, verify
)
else:
return _read_cloud_linked_sample(
link_creds, sample_creds_key, sample_path, provider_type, verify
)
except Exception as e:
> raise GetDataFromLinkError(sample_path) from e
E deeplake.util.exceptions.GetDataFromLinkError: Unable to get data from link https://picsum.photos/20/20.
deeplake/core/linked_sample.py:37: GetDataFromLinkError
The above exception was the direct cause of the following exception:
self = <deeplake.core.linked_chunk_engine.LinkedChunkEngine object at 0x127800280>
samples = [<deeplake.core.linked_sample.LinkedSample object at 0x1278012d0>, <deeplake.core.linked_sample.LinkedSample object at...nked_sample.LinkedSample object at 0x153c10be0>, <deeplake.core.linked_sample.LinkedSample object at 0x153c11000>, ...]
progressbar = False
link_callback = <bound method Tensor._extend_links of Tensor(key='abc')>
pg_callback = None, ignore_errors = False, verified_samples = None
def extend(
self,
samples,
progressbar: bool = False,
link_callback: Optional[Callable] = None,
pg_callback=None,
ignore_errors: bool = False,
verified_samples=None,
):
try:
assert not (progressbar and pg_callback)
self.check_link_ready()
if not self.write_initialization_done:
self._write_initialization()
self.write_initialization_done = True
initial_autoflush = self.cache.autoflush
self.cache.autoflush = False
num_samples = self.tensor_length
if self.is_sequence:
self._extend_sequence(
samples,
progressbar,
link_callback,
ignore_errors,
verified_samples,
)
else:
> verified_samples = self._extend(
samples,
progressbar,
pg_callback=pg_callback,
ignore_errors=ignore_errors,
verified_samples=verified_samples,
)
deeplake/core/chunk_engine.py:1235:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake/core/chunk_engine.py:1092: in _extend
verified_samples = verified_samples or self.check_each_sample(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <deeplake.core.linked_chunk_engine.LinkedChunkEngine object at 0x127800280>
samples = [<deeplake.core.linked_sample.LinkedSample object at 0x1278012d0>, <deeplake.core.linked_sample.LinkedSample object at...nked_sample.LinkedSample object at 0x153c10be0>, <deeplake.core.linked_sample.LinkedSample object at 0x153c11000>, ...]
verify = True, ignore_errors = False
def check_each_sample(self, samples, verify=True, ignore_errors=False):
link_creds = self.link_creds
verified_samples = []
skipped = []
for i, sample in enumerate(samples):
try:
if isinstance(sample, deeplake.core.tensor.Tensor) and sample.is_link:
sample = sample._linked_sample()
samples[i] = sample
elif (
not isinstance(sample, (LinkedSample, LinkedTiledSample))
and sample is not None
):
raise TypeError(
f"Expected LinkedSample or LinkedTiledSample, got {type(sample)} instead. Use deeplake.link() to link samples or deeplake.link_tiled() to link multiple images as tiles."
)
path, creds_key = get_path_creds_key(sample)
# verifies existence of creds_key
if verify:
link_creds.get_encoding(creds_key, path)
if sample is None or sample.path == "":
verified_samples.append(sample)
elif isinstance(sample, LinkedTiledSample):
verify_samples = self.verify and verify
sample.set_check_tile_shape(self.link_creds, verify_samples)
sample.set_sample_shape()
verified_samples.append(sample)
else:
try:
_verify = verify and self.verify
verified_samples.append(
read_linked_sample(
sample.path,
sample.creds_key,
self.link_creds,
verify=_verify,
)
)
except Exception as e:
> raise BadLinkError(sample.path, sample.creds_key) from e
E deeplake.util.exceptions.BadLinkError: Verification of link failed. Make sure that the link you are trying to append is correct.
E
E Failed link: https://picsum.photos/20/20
E creds_key used: None
E
E No credentials have been provided to access the link. If the link is not publibly accessible, add access credentials to your dataset and use the appropriate creds_key.
deeplake/core/linked_chunk_engine.py:288: BadLinkError
The above exception was the direct cause of the following exception:
local_ds = Dataset(path='./hub_pytest/test_query/test_link_materialize-1-', tensors=['abc'])
num_workers = 1
@pytest.mark.slow
@pytest.mark.parametrize("num_workers", [1, 2])
def test_link_materialize(local_ds, num_workers):
with local_ds as ds:
ds.create_tensor("abc", htype="link[image]", sample_compression="jpg")
> ds.abc.extend(
[
(
deeplake.link("https://picsum.photos/20/20")
if i % 2
else deeplake.link("https://picsum.photos/10/10")
)
for i in range(20)
]
)
deeplake/core/query/test/test_query.py:467:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake/util/invalid_view_op.py:22: in inner
return callable(x, *args, **kwargs)
deeplake/core/tensor.py:362: in extend
self._extend(samples, progressbar=progressbar, ignore_errors=ignore_errors)
deeplake/core/tensor.py:313: in _extend
self.chunk_engine.extend(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <deeplake.core.linked_chunk_engine.LinkedChunkEngine object at 0x127800280>
samples = [<deeplake.core.linked_sample.LinkedSample object at 0x1278012d0>, <deeplake.core.linked_sample.LinkedSample object at...nked_sample.LinkedSample object at 0x153c10be0>, <deeplake.core.linked_sample.LinkedSample object at 0x153c11000>, ...]
progressbar = False
link_callback = <bound method Tensor._extend_links of Tensor(key='abc')>
pg_callback = None, ignore_errors = False, verified_samples = None
def extend(
self,
samples,
progressbar: bool = False,
link_callback: Optional[Callable] = None,
pg_callback=None,
ignore_errors: bool = False,
verified_samples=None,
):
try:
assert not (progressbar and pg_callback)
self.check_link_ready()
if not self.write_initialization_done:
self._write_initialization()
self.write_initialization_done = True
initial_autoflush = self.cache.autoflush
self.cache.autoflush = False
num_samples = self.tensor_length
if self.is_sequence:
self._extend_sequence(
samples,
progressbar,
link_callback,
ignore_errors,
verified_samples,
)
else:
verified_samples = self._extend(
samples,
progressbar,
pg_callback=pg_callback,
ignore_errors=ignore_errors,
verified_samples=verified_samples,
)
if link_callback:
verified_samples = self._prepare_samples_for_link_callback(
verified_samples
)
self._extend_link_callback(
link_callback,
verified_samples,
None,
progressbar,
ignore_errors,
)
self.cache.autoflush = initial_autoflush
self.cache.maybe_flush()
except Exception as e:
self.pop(list(range(num_samples, self.tensor_length)))
> raise SampleAppendError(self.name) from e
E deeplake.util.exceptions.SampleAppendError: Failed to append a sample to the tensor 'abc'. See more details in the traceback.
deeplake/core/chunk_engine.py:1258: SampleAppendError
Check failure on line 200 in deeplake/api/tests/test_json.py
github-actions / JUnit Test Report
test_json.test_json_transform[lz4-gdrive_ds]
googleapiclient.errors.HttpError: <HttpError 500 when requesting https://www.googleapis.com/upload/drive/v3/files/1NZaH1AYmANgKV3bd98QVSpU1oGFZvww2?alt=json&uploadType=media returned "Internal Error". Details: "[{'message': 'Internal Error', 'domain': 'global', 'reason': 'internalError'}]">
Raw output
ds = Dataset(path='gdrive://hubtest/tmp7c7a', tensors=['json'])
compression = 'lz4', scheduler = 'threaded'
@enabled_non_gcs_datasets
@pytest.mark.parametrize("compression", ["lz4", None])
@pytest.mark.slow
def test_json_transform(ds, compression, scheduler="threaded"):
> ds.create_tensor("json", htype="json", sample_compression=compression)
deeplake/api/tests/test_json.py:200:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake/util/invalid_view_op.py:22: in inner
return callable(x, *args, **kwargs)
deeplake/core/dataset/dataset.py:739: in create_tensor
return self._create_tensor(
deeplake/util/invalid_view_op.py:22: in inner
return callable(x, *args, **kwargs)
deeplake/core/dataset/dataset.py:905: in _create_tensor
self._create_sample_id_tensor(name)
deeplake/core/dataset/dataset.py:958: in _create_sample_id_tensor
self._create_tensor(
deeplake/util/invalid_view_op.py:22: in inner
return callable(x, *args, **kwargs)
deeplake/core/dataset/dataset.py:891: in _create_tensor
self.storage.maybe_flush()
deeplake/core/storage/provider.py:181: in maybe_flush
self.flush()
deeplake/core/storage/lru_cache.py:112: in flush
self._forward(key)
deeplake/core/storage/lru_cache.py:394: in _forward
self._forward_value(path, self.cache_storage[path])
deeplake/core/storage/lru_cache.py:407: in _forward_value
self.next_storage[path] = value.tobytes()
deeplake/core/storage/google_drive.py:330: in __setitem__
self._write_to_file(id, content)
deeplake/core/storage/google_drive.py:258: in _write_to_file
file = self.drive.files().update(media_body=content, fileId=id).execute()
/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/googleapiclient/_helpers.py:131: in positional_wrapper
return wrapped(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <googleapiclient.http.HttpRequest object at 0x7f9df2931ab0>
http = <google_auth_httplib2.AuthorizedHttp object at 0x7f9df1f121d0>
num_retries = 0
@util.positional(1)
def execute(self, http=None, num_retries=0):
"""Execute the request.
Args:
http: httplib2.Http, an http object to be used in place of the
one the HttpRequest request object was constructed with.
num_retries: Integer, number of times to retry with randomized
exponential backoff. If all retries fail, the raised HttpError
represents the last request. If zero (default), we attempt the
request only once.
Returns:
A deserialized object model of the response body as determined
by the postproc.
Raises:
googleapiclient.errors.HttpError if the response was not a 2xx.
httplib2.HttpLib2Error if a transport error has occurred.
"""
if http is None:
http = self.http
if self.resumable:
body = None
while body is None:
_, body = self.next_chunk(http=http, num_retries=num_retries)
return body
# Non-resumable case.
if "content-length" not in self.headers:
self.headers["content-length"] = str(self.body_size)
# If the request URI is too long then turn it into a POST request.
# Assume that a GET request never contains a request body.
if len(self.uri) > MAX_URI_LENGTH and self.method == "GET":
self.method = "POST"
self.headers["x-http-method-override"] = "GET"
self.headers["content-type"] = "application/x-www-form-urlencoded"
parsed = urllib.parse.urlparse(self.uri)
self.uri = urllib.parse.urlunparse(
(parsed.scheme, parsed.netloc, parsed.path, parsed.params, None, None)
)
self.body = parsed.query
self.headers["content-length"] = str(len(self.body))
# Handle retries for server-side errors.
resp, content = _retry_request(
http,
num_retries,
"request",
self._sleep,
self._rand,
str(self.uri),
method=str(self.method),
body=self.body,
headers=self.headers,
)
for callback in self.response_callbacks:
callback(resp)
if resp.status >= 300:
> raise HttpError(resp, content, uri=self.uri)
E googleapiclient.errors.HttpError: <HttpError 500 when requesting https://www.googleapis.com/upload/drive/v3/files/1NZaH1AYmANgKV3bd98QVSpU1oGFZvww2?alt=json&uploadType=media returned "Internal Error". Details: "[{'message': 'Internal Error', 'domain': 'global', 'reason': 'internalError'}]">
/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/googleapiclient/http.py:937: HttpError
Check failure on line 467 in deeplake/core/query/test/test_query.py
github-actions / JUnit Test Report
test_query.test_link_materialize[1]
deeplake.util.exceptions.SampleAppendError: Failed to append a sample to the tensor 'abc'. See more details in the traceback.
Raw output
self = <urllib3.connectionpool.HTTPSConnectionPool object at 0x0000025B0A112470>
method = 'GET', url = '/20/20', body = None
headers = {'User-Agent': 'python-requests/2.31.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive'}
retries = Retry(total=0, connect=None, read=False, redirect=None, status=None)
redirect = False, assert_same_host = False
timeout = Timeout(connect=None, read=None, total=None), pool_timeout = None
release_conn = False, chunked = False, body_pos = None, preload_content = False
decode_content = False, response_kw = {}
parsed_url = Url(scheme=None, auth=None, host=None, port=None, path='/20/20', query=None, fragment=None)
destination_scheme = None, conn = None, release_this_conn = True
http_tunnel_required = False, err = None, clean_exit = False
def urlopen( # type: ignore[override]
self,
method: str,
url: str,
body: _TYPE_BODY | None = None,
headers: typing.Mapping[str, str] | None = None,
retries: Retry | bool | int | None = None,
redirect: bool = True,
assert_same_host: bool = True,
timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
pool_timeout: int | None = None,
release_conn: bool | None = None,
chunked: bool = False,
body_pos: _TYPE_BODY_POSITION | None = None,
preload_content: bool = True,
decode_content: bool = True,
**response_kw: typing.Any,
) -> BaseHTTPResponse:
"""
Get a connection from the pool and perform an HTTP request. This is the
lowest level call for making a request, so you'll need to specify all
the raw details.
.. note::
More commonly, it's appropriate to use a convenience method
such as :meth:`request`.
.. note::
`release_conn` will only behave as expected if
`preload_content=False` because we want to make
`preload_content=False` the default behaviour someday soon without
breaking backwards compatibility.
:param method:
HTTP request method (such as GET, POST, PUT, etc.)
:param url:
The URL to perform the request on.
:param body:
Data to send in the request body, either :class:`str`, :class:`bytes`,
an iterable of :class:`str`/:class:`bytes`, or a file-like object.
:param headers:
Dictionary of custom headers to send, such as User-Agent,
If-None-Match, etc. If None, pool headers are used. If provided,
these headers completely replace any pool-specific headers.
:param retries:
Configure the number of retries to allow before raising a
:class:`~urllib3.exceptions.MaxRetryError` exception.
If ``None`` (default) will retry 3 times, see ``Retry.DEFAULT``. Pass a
:class:`~urllib3.util.retry.Retry` object for fine-grained control
over different types of retries.
Pass an integer number to retry connection errors that many times,
but no other types of errors. Pass zero to never retry.
If ``False``, then retries are disabled and any exception is raised
immediately. Also, instead of raising a MaxRetryError on redirects,
the redirect response will be returned.
:type retries: :class:`~urllib3.util.retry.Retry`, False, or an int.
:param redirect:
If True, automatically handle redirects (status codes 301, 302,
303, 307, 308). Each redirect counts as a retry. Disabling retries
will disable redirect, too.
:param assert_same_host:
If ``True``, will make sure that the host of the pool requests is
consistent else will raise HostChangedError. When ``False``, you can
use the pool on an HTTP proxy and request foreign hosts.
:param timeout:
If specified, overrides the default timeout for this one
request. It may be a float (in seconds) or an instance of
:class:`urllib3.util.Timeout`.
:param pool_timeout:
If set and the pool is set to block=True, then this method will
block for ``pool_timeout`` seconds and raise EmptyPoolError if no
connection is available within the time period.
:param bool preload_content:
If True, the response's body will be preloaded into memory.
:param bool decode_content:
If True, will attempt to decode the body based on the
'content-encoding' header.
:param release_conn:
If False, then the urlopen call will not release the connection
back into the pool once a response is received (but will release if
you read the entire contents of the response such as when
`preload_content=True`). This is useful if you're not preloading
the response's content immediately. You will need to call
``r.release_conn()`` on the response ``r`` to return the connection
back into the pool. If None, it takes the value of ``preload_content``
which defaults to ``True``.
:param bool chunked:
If True, urllib3 will send the body using chunked transfer
encoding. Otherwise, urllib3 will send the body using the standard
content-length form. Defaults to False.
:param int body_pos:
Position to seek to in file-like body in the event of a retry or
redirect. Typically this won't need to be set because urllib3 will
auto-populate the value when needed.
"""
parsed_url = parse_url(url)
destination_scheme = parsed_url.scheme
if headers is None:
headers = self.headers
if not isinstance(retries, Retry):
retries = Retry.from_int(retries, redirect=redirect, default=self.retries)
if release_conn is None:
release_conn = preload_content
# Check host
if assert_same_host and not self.is_same_host(url):
raise HostChangedError(self, url, retries)
# Ensure that the URL we're connecting to is properly encoded
if url.startswith("/"):
url = to_str(_encode_target(url))
else:
url = to_str(parsed_url.url)
conn = None
# Track whether `conn` needs to be released before
# returning/raising/recursing. Update this variable if necessary, and
# leave `release_conn` constant throughout the function. That way, if
# the function recurses, the original value of `release_conn` will be
# passed down into the recursive call, and its value will be respected.
#
# See issue #651 [1] for details.
#
# [1] <https://github.com/urllib3/urllib3/issues/651>
release_this_conn = release_conn
http_tunnel_required = connection_requires_http_tunnel(
self.proxy, self.proxy_config, destination_scheme
)
# Merge the proxy headers. Only done when not using HTTP CONNECT. We
# have to copy the headers dict so we can safely change it without those
# changes being reflected in anyone else's copy.
if not http_tunnel_required:
headers = headers.copy() # type: ignore[attr-defined]
headers.update(self.proxy_headers) # type: ignore[union-attr]
# Must keep the exception bound to a separate variable or else Python 3
# complains about UnboundLocalError.
err = None
# Keep track of whether we cleanly exited the except block. This
# ensures we do proper cleanup in finally.
clean_exit = False
# Rewind body position, if needed. Record current position
# for future rewinds in the event of a redirect/retry.
body_pos = set_file_position(body, body_pos)
try:
# Request a connection from the queue.
timeout_obj = self._get_timeout(timeout)
conn = self._get_conn(timeout=pool_timeout)
conn.timeout = timeout_obj.connect_timeout # type: ignore[assignment]
# Is this a closed/new connection that requires CONNECT tunnelling?
if self.proxy is not None and http_tunnel_required and conn.is_closed:
try:
self._prepare_proxy(conn)
except (BaseSSLError, OSError, SocketTimeout) as e:
self._raise_timeout(
err=e, url=self.proxy.url, timeout_value=conn.timeout
)
raise
# If we're going to release the connection in ``finally:``, then
# the response doesn't need to know about the connection. Otherwise
# it will also try to release it and we'll have a double-release
# mess.
response_conn = conn if not release_conn else None
# Make the request on the HTTPConnection object
> response = self._make_request(
conn,
method,
url,
timeout=timeout_obj,
body=body,
headers=headers,
chunked=chunked,
retries=retries,
response_conn=response_conn,
preload_content=preload_content,
decode_content=decode_content,
**response_kw,
)
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\urllib3\connectionpool.py:793:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\urllib3\connectionpool.py:537: in _make_request
response = conn.getresponse()
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\urllib3\connection.py:466: in getresponse
httplib_response = super().getresponse()
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\http\client.py:1375: in getresponse
response.begin()
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\http\client.py:318: in begin
version, status, reason = self._read_status()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <http.client.HTTPResponse object at 0x0000025B0A1121A0>
def _read_status(self):
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
if len(line) > _MAXLINE:
raise LineTooLong("status line")
if self.debuglevel > 0:
print("reply:", repr(line))
if not line:
# Presumably, the server closed the connection before
# sending a valid response.
> raise RemoteDisconnected("Remote end closed connection without"
" response")
E http.client.RemoteDisconnected: Remote end closed connection without response
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\http\client.py:287: RemoteDisconnected
During handling of the above exception, another exception occurred:
self = <requests.adapters.HTTPAdapter object at 0x0000025B0A1135B0>
request = <PreparedRequest [GET]>, stream = False
timeout = Timeout(connect=None, read=None, total=None), verify = True
cert = None, proxies = OrderedDict()
def send(
self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None
):
"""Sends PreparedRequest object. Returns Response object.
:param request: The :class:`PreparedRequest <PreparedRequest>` being sent.
:param stream: (optional) Whether to stream the request content.
:param timeout: (optional) How long to wait for the server to send
data before giving up, as a float, or a :ref:`(connect timeout,
read timeout) <timeouts>` tuple.
:type timeout: float or tuple or urllib3 Timeout object
:param verify: (optional) Either a boolean, in which case it controls whether
we verify the server's TLS certificate, or a string, in which case it
must be a path to a CA bundle to use
:param cert: (optional) Any user-provided SSL certificate to be trusted.
:param proxies: (optional) The proxies dictionary to apply to the request.
:rtype: requests.Response
"""
try:
conn = self.get_connection(request.url, proxies)
except LocationValueError as e:
raise InvalidURL(e, request=request)
self.cert_verify(conn, request.url, verify, cert)
url = self.request_url(request, proxies)
self.add_headers(
request,
stream=stream,
timeout=timeout,
verify=verify,
cert=cert,
proxies=proxies,
)
chunked = not (request.body is None or "Content-Length" in request.headers)
if isinstance(timeout, tuple):
try:
connect, read = timeout
timeout = TimeoutSauce(connect=connect, read=read)
except ValueError:
raise ValueError(
f"Invalid timeout {timeout}. Pass a (connect, read) timeout tuple, "
f"or a single float to set both timeouts to the same value."
)
elif isinstance(timeout, TimeoutSauce):
pass
else:
timeout = TimeoutSauce(connect=timeout, read=timeout)
try:
> resp = conn.urlopen(
method=request.method,
url=url,
body=request.body,
headers=request.headers,
redirect=False,
assert_same_host=False,
preload_content=False,
decode_content=False,
retries=self.max_retries,
timeout=timeout,
chunked=chunked,
)
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\requests\adapters.py:486:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\urllib3\connectionpool.py:847: in urlopen
retries = retries.increment(
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\urllib3\util\retry.py:470: in increment
raise reraise(type(error), error, _stacktrace)
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\urllib3\util\util.py:38: in reraise
raise value.with_traceback(tb)
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\urllib3\connectionpool.py:793: in urlopen
response = self._make_request(
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\urllib3\connectionpool.py:537: in _make_request
response = conn.getresponse()
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\urllib3\connection.py:466: in getresponse
httplib_response = super().getresponse()
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\http\client.py:1375: in getresponse
response.begin()
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\http\client.py:318: in begin
version, status, reason = self._read_status()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <http.client.HTTPResponse object at 0x0000025B0A1121A0>
def _read_status(self):
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
if len(line) > _MAXLINE:
raise LineTooLong("status line")
if self.debuglevel > 0:
print("reply:", repr(line))
if not line:
# Presumably, the server closed the connection before
# sending a valid response.
> raise RemoteDisconnected("Remote end closed connection without"
" response")
E urllib3.exceptions.ProtocolError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\http\client.py:287: ProtocolError
During handling of the above exception, another exception occurred:
self = Sample(is_lazy=True, path=https://picsum.photos/20/20)
def _read_from_path(self) -> bytes: # type: ignore
if self._buffer is None:
path_type = get_path_type(self.path)
try:
if path_type == "local":
self._buffer = self._read_from_local()
elif path_type == "gcs":
self._buffer = self._read_from_gcs()
elif path_type == "s3":
self._buffer = self._read_from_s3()
elif path_type == "azure":
self._buffer = self._read_from_azure()
elif path_type == "gdrive":
self._buffer = self._read_from_gdrive()
elif path_type == "http":
> self._buffer = self._read_from_http(timeout=self._timeout)
deeplake\core\sample.py:466:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake\core\sample.py:530: in _read_from_http
result = requests.get(self.path, headers=headers, timeout=timeout)
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\requests\api.py:73: in get
return request("get", url, params=params, **kwargs)
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\requests\api.py:59: in request
return session.request(method=method, url=url, **kwargs)
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\requests\sessions.py:589: in request
resp = self.send(prep, **send_kwargs)
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\requests\sessions.py:703: in send
r = adapter.send(request, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <requests.adapters.HTTPAdapter object at 0x0000025B0A1135B0>
request = <PreparedRequest [GET]>, stream = False
timeout = Timeout(connect=None, read=None, total=None), verify = True
cert = None, proxies = OrderedDict()
def send(
self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None
):
"""Sends PreparedRequest object. Returns Response object.
:param request: The :class:`PreparedRequest <PreparedRequest>` being sent.
:param stream: (optional) Whether to stream the request content.
:param timeout: (optional) How long to wait for the server to send
data before giving up, as a float, or a :ref:`(connect timeout,
read timeout) <timeouts>` tuple.
:type timeout: float or tuple or urllib3 Timeout object
:param verify: (optional) Either a boolean, in which case it controls whether
we verify the server's TLS certificate, or a string, in which case it
must be a path to a CA bundle to use
:param cert: (optional) Any user-provided SSL certificate to be trusted.
:param proxies: (optional) The proxies dictionary to apply to the request.
:rtype: requests.Response
"""
try:
conn = self.get_connection(request.url, proxies)
except LocationValueError as e:
raise InvalidURL(e, request=request)
self.cert_verify(conn, request.url, verify, cert)
url = self.request_url(request, proxies)
self.add_headers(
request,
stream=stream,
timeout=timeout,
verify=verify,
cert=cert,
proxies=proxies,
)
chunked = not (request.body is None or "Content-Length" in request.headers)
if isinstance(timeout, tuple):
try:
connect, read = timeout
timeout = TimeoutSauce(connect=connect, read=read)
except ValueError:
raise ValueError(
f"Invalid timeout {timeout}. Pass a (connect, read) timeout tuple, "
f"or a single float to set both timeouts to the same value."
)
elif isinstance(timeout, TimeoutSauce):
pass
else:
timeout = TimeoutSauce(connect=timeout, read=timeout)
try:
resp = conn.urlopen(
method=request.method,
url=url,
body=request.body,
headers=request.headers,
redirect=False,
assert_same_host=False,
preload_content=False,
decode_content=False,
retries=self.max_retries,
timeout=timeout,
chunked=chunked,
)
except (ProtocolError, OSError) as err:
> raise ConnectionError(err, request=request)
E requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\requests\adapters.py:501: ConnectionError
The above exception was the direct cause of the following exception:
sample_path = 'https://picsum.photos/20/20', sample_creds_key = None
link_creds = <deeplake.core.link_creds.LinkCreds object at 0x0000025B736948B0>
verify = True
def read_linked_sample(
sample_path: str, sample_creds_key: Optional[str], link_creds, verify: bool
):
provider_type = get_path_type(sample_path)
try:
if provider_type == "local":
return deeplake.read(sample_path, verify=verify)
elif provider_type == "http":
> return _read_http_linked_sample(
link_creds, sample_creds_key, sample_path, verify
)
deeplake\core\linked_sample.py:29:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake\core\linked_sample.py:50: in wrapper
raise e
deeplake\core\linked_sample.py:43: in wrapper
return f(linked_creds, sample_creds_key, *args, **kwargs)
deeplake\core\linked_sample.py:72: in _read_http_linked_sample
return deeplake.read(sample_path, verify=verify, creds=creds)
deeplake\api\read.py:63: in read
return Sample(
deeplake\core\sample.py:106: in __init__
compressed_bytes = self._read_from_path()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Sample(is_lazy=True, path=https://picsum.photos/20/20)
def _read_from_path(self) -> bytes: # type: ignore
if self._buffer is None:
path_type = get_path_type(self.path)
try:
if path_type == "local":
self._buffer = self._read_from_local()
elif path_type == "gcs":
self._buffer = self._read_from_gcs()
elif path_type == "s3":
self._buffer = self._read_from_s3()
elif path_type == "azure":
self._buffer = self._read_from_azure()
elif path_type == "gdrive":
self._buffer = self._read_from_gdrive()
elif path_type == "http":
self._buffer = self._read_from_http(timeout=self._timeout)
except Exception as e:
> raise SampleReadError(self.path) from e # type: ignore
E deeplake.util.exceptions.SampleReadError: Unable to read sample from https://picsum.photos/20/20
deeplake\core\sample.py:468: SampleReadError
The above exception was the direct cause of the following exception:
self = <deeplake.core.linked_chunk_engine.LinkedChunkEngine object at 0x0000025B0A111D50>
samples = [<deeplake.core.linked_sample.LinkedSample object at 0x0000025B73694CD0>, <deeplake.core.linked_sample.LinkedSample ob...nkedSample object at 0x0000025B0A1122F0>, <deeplake.core.linked_sample.LinkedSample object at 0x0000025B0A110340>, ...]
verify = True, ignore_errors = False
def check_each_sample(self, samples, verify=True, ignore_errors=False):
link_creds = self.link_creds
verified_samples = []
skipped = []
for i, sample in enumerate(samples):
try:
if isinstance(sample, deeplake.core.tensor.Tensor) and sample.is_link:
sample = sample._linked_sample()
samples[i] = sample
elif (
not isinstance(sample, (LinkedSample, LinkedTiledSample))
and sample is not None
):
raise TypeError(
f"Expected LinkedSample or LinkedTiledSample, got {type(sample)} instead. Use deeplake.link() to link samples or deeplake.link_tiled() to link multiple images as tiles."
)
path, creds_key = get_path_creds_key(sample)
# verifies existence of creds_key
if verify:
link_creds.get_encoding(creds_key, path)
if sample is None or sample.path == "":
verified_samples.append(sample)
elif isinstance(sample, LinkedTiledSample):
verify_samples = self.verify and verify
sample.set_check_tile_shape(self.link_creds, verify_samples)
sample.set_sample_shape()
verified_samples.append(sample)
else:
try:
_verify = verify and self.verify
verified_samples.append(
> read_linked_sample(
sample.path,
sample.creds_key,
self.link_creds,
verify=_verify,
)
)
deeplake\core\linked_chunk_engine.py:280:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
sample_path = 'https://picsum.photos/20/20', sample_creds_key = None
link_creds = <deeplake.core.link_creds.LinkCreds object at 0x0000025B736948B0>
verify = True
def read_linked_sample(
sample_path: str, sample_creds_key: Optional[str], link_creds, verify: bool
):
provider_type = get_path_type(sample_path)
try:
if provider_type == "local":
return deeplake.read(sample_path, verify=verify)
elif provider_type == "http":
return _read_http_linked_sample(
link_creds, sample_creds_key, sample_path, verify
)
else:
return _read_cloud_linked_sample(
link_creds, sample_creds_key, sample_path, provider_type, verify
)
except Exception as e:
> raise GetDataFromLinkError(sample_path) from e
E deeplake.util.exceptions.GetDataFromLinkError: Unable to get data from link https://picsum.photos/20/20.
deeplake\core\linked_sample.py:37: GetDataFromLinkError
The above exception was the direct cause of the following exception:
self = <deeplake.core.linked_chunk_engine.LinkedChunkEngine object at 0x0000025B0A111D50>
samples = [<deeplake.core.linked_sample.LinkedSample object at 0x0000025B73694CD0>, <deeplake.core.linked_sample.LinkedSample ob...nkedSample object at 0x0000025B0A1122F0>, <deeplake.core.linked_sample.LinkedSample object at 0x0000025B0A110340>, ...]
progressbar = False
link_callback = <bound method Tensor._extend_links of Tensor(key='abc')>
pg_callback = None, ignore_errors = False, verified_samples = None
def extend(
self,
samples,
progressbar: bool = False,
link_callback: Optional[Callable] = None,
pg_callback=None,
ignore_errors: bool = False,
verified_samples=None,
):
try:
assert not (progressbar and pg_callback)
self.check_link_ready()
if not self.write_initialization_done:
self._write_initialization()
self.write_initialization_done = True
initial_autoflush = self.cache.autoflush
self.cache.autoflush = False
num_samples = self.tensor_length
if self.is_sequence:
self._extend_sequence(
samples,
progressbar,
link_callback,
ignore_errors,
verified_samples,
)
else:
> verified_samples = self._extend(
samples,
progressbar,
pg_callback=pg_callback,
ignore_errors=ignore_errors,
verified_samples=verified_samples,
)
deeplake\core\chunk_engine.py:1235:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake\core\chunk_engine.py:1092: in _extend
verified_samples = verified_samples or self.check_each_sample(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <deeplake.core.linked_chunk_engine.LinkedChunkEngine object at 0x0000025B0A111D50>
samples = [<deeplake.core.linked_sample.LinkedSample object at 0x0000025B73694CD0>, <deeplake.core.linked_sample.LinkedSample ob...nkedSample object at 0x0000025B0A1122F0>, <deeplake.core.linked_sample.LinkedSample object at 0x0000025B0A110340>, ...]
verify = True, ignore_errors = False
def check_each_sample(self, samples, verify=True, ignore_errors=False):
link_creds = self.link_creds
verified_samples = []
skipped = []
for i, sample in enumerate(samples):
try:
if isinstance(sample, deeplake.core.tensor.Tensor) and sample.is_link:
sample = sample._linked_sample()
samples[i] = sample
elif (
not isinstance(sample, (LinkedSample, LinkedTiledSample))
and sample is not None
):
raise TypeError(
f"Expected LinkedSample or LinkedTiledSample, got {type(sample)} instead. Use deeplake.link() to link samples or deeplake.link_tiled() to link multiple images as tiles."
)
path, creds_key = get_path_creds_key(sample)
# verifies existence of creds_key
if verify:
link_creds.get_encoding(creds_key, path)
if sample is None or sample.path == "":
verified_samples.append(sample)
elif isinstance(sample, LinkedTiledSample):
verify_samples = self.verify and verify
sample.set_check_tile_shape(self.link_creds, verify_samples)
sample.set_sample_shape()
verified_samples.append(sample)
else:
try:
_verify = verify and self.verify
verified_samples.append(
read_linked_sample(
sample.path,
sample.creds_key,
self.link_creds,
verify=_verify,
)
)
except Exception as e:
> raise BadLinkError(sample.path, sample.creds_key) from e
E deeplake.util.exceptions.BadLinkError: Verification of link failed. Make sure that the link you are trying to append is correct.
E
E Failed link: https://picsum.photos/20/20
E creds_key used: None
E
E No credentials have been provided to access the link. If the link is not publibly accessible, add access credentials to your dataset and use the appropriate creds_key.
deeplake\core\linked_chunk_engine.py:288: BadLinkError
The above exception was the direct cause of the following exception:
local_ds = Dataset(path='./hub_pytest/test_query/test_link_materialize-1-', tensors=['abc'])
num_workers = 1
@pytest.mark.slow
@pytest.mark.parametrize("num_workers", [1, 2])
def test_link_materialize(local_ds, num_workers):
with local_ds as ds:
ds.create_tensor("abc", htype="link[image]", sample_compression="jpg")
> ds.abc.extend(
[
(
deeplake.link("https://picsum.photos/20/20")
if i % 2
else deeplake.link("https://picsum.photos/10/10")
)
for i in range(20)
]
)
deeplake\core\query\test\test_query.py:467:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake\util\invalid_view_op.py:22: in inner
return callable(x, *args, **kwargs)
deeplake\core\tensor.py:362: in extend
self._extend(samples, progressbar=progressbar, ignore_errors=ignore_errors)
deeplake\core\tensor.py:313: in _extend
self.chunk_engine.extend(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <deeplake.core.linked_chunk_engine.LinkedChunkEngine object at 0x0000025B0A111D50>
samples = [<deeplake.core.linked_sample.LinkedSample object at 0x0000025B73694CD0>, <deeplake.core.linked_sample.LinkedSample ob...nkedSample object at 0x0000025B0A1122F0>, <deeplake.core.linked_sample.LinkedSample object at 0x0000025B0A110340>, ...]
progressbar = False
link_callback = <bound method Tensor._extend_links of Tensor(key='abc')>
pg_callback = None, ignore_errors = False, verified_samples = None
def extend(
self,
samples,
progressbar: bool = False,
link_callback: Optional[Callable] = None,
pg_callback=None,
ignore_errors: bool = False,
verified_samples=None,
):
try:
assert not (progressbar and pg_callback)
self.check_link_ready()
if not self.write_initialization_done:
self._write_initialization()
self.write_initialization_done = True
initial_autoflush = self.cache.autoflush
self.cache.autoflush = False
num_samples = self.tensor_length
if self.is_sequence:
self._extend_sequence(
samples,
progressbar,
link_callback,
ignore_errors,
verified_samples,
)
else:
verified_samples = self._extend(
samples,
progressbar,
pg_callback=pg_callback,
ignore_errors=ignore_errors,
verified_samples=verified_samples,
)
if link_callback:
verified_samples = self._prepare_samples_for_link_callback(
verified_samples
)
self._extend_link_callback(
link_callback,
verified_samples,
None,
progressbar,
ignore_errors,
)
self.cache.autoflush = initial_autoflush
self.cache.maybe_flush()
except Exception as e:
self.pop(list(range(num_samples, self.tensor_length)))
> raise SampleAppendError(self.name) from e
E deeplake.util.exceptions.SampleAppendError: Failed to append a sample to the tensor 'abc'. See more details in the traceback.
deeplake\core\chunk_engine.py:1258: SampleAppendError
Check failure on line 104 in deeplake/api/tests/test_access_method.py
github-actions / JUnit Test Report
test_access_method.test_access_method_with_creds
deeplake.util.exceptions.DatasetHandlerError: A Deep Lake dataset does not exist at the given path (hub://testingacc2/tmp534c_test_access_method_test_access_method_with_creds__local-managed-entry__). Check the path provided or in case you want to create a new dataset, use deeplake.empty().
Raw output
hub_cloud_ds_generator = <function hub_cloud_ds_generator.<locals>.generate_hub_cloud_ds at 0x7ff71edb8900>
hub_cloud_dev_managed_creds_key = 'aws_creds'
@pytest.mark.slow
def test_access_method_with_creds(
hub_cloud_ds_generator, hub_cloud_dev_managed_creds_key
):
with hub_cloud_ds_generator() as ds:
ds.create_tensor("abc")
ds.create_tensor("images", htype="link[image]", sample_compression="jpg")
ds.add_creds_key(hub_cloud_dev_managed_creds_key, managed=True)
ds.abc.extend([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
ds.images.extend(
[
deeplake.link(
"https://picsum.photos/20/30",
creds_key=hub_cloud_dev_managed_creds_key,
)
for _ in range(10)
]
)
> ds = hub_cloud_ds_generator(access_method="download:2")
deeplake/api/tests/test_access_method.py:104:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake/tests/dataset_fixtures.py:153: in generate_hub_cloud_ds
return deeplake.dataset(hub_cloud_path, token=hub_cloud_dev_token, **kwargs)
deeplake/util/spinner.py:151: in inner
return func(*args, **kwargs)
deeplake/api/dataset.py:335: in init
raise e
deeplake/api/dataset.py:312: in init
return dataset._load(
deeplake/api/dataset.py:801: in _load
ret = get_local_dataset(**dataset_kwargs)
deeplake/util/access_method.py:196: in get_local_dataset
ds = deeplake.load(
deeplake/util/spinner.py:153: in inner
return func(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
path = 'hub://testingacc2/tmp534c_test_access_method_test_access_method_with_creds__local-managed-entry__'
read_only = None, memory_cache_size = 2000, local_cache_size = 0, creds = {}
token = 'eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJpZCI6InRlc3RpbmdhY2MyIiwiYXBpX2tleSI6IjU4Y0tLb1p6UE1BbThPU2RpbTRiZ2tBekhWekt1VUE3MFJpNTNyZUpKRTJuaiJ9.'
org_id = None, verbose = False, access_method = 'stream', unlink = False
reset = False, indra = False, check_integrity = None, lock_timeout = 0
lock_enabled = True, index_params = None
@staticmethod
@spinner
def load(
path: Union[str, pathlib.Path],
read_only: Optional[bool] = None,
memory_cache_size: int = DEFAULT_MEMORY_CACHE_SIZE,
local_cache_size: int = DEFAULT_LOCAL_CACHE_SIZE,
creds: Optional[Union[dict, str]] = None,
token: Optional[str] = None,
org_id: Optional[str] = None,
verbose: bool = True,
access_method: str = "stream",
unlink: bool = False,
reset: bool = False,
indra: bool = USE_INDRA,
check_integrity: Optional[bool] = None,
lock_timeout: Optional[int] = 0,
lock_enabled: Optional[bool] = True,
index_params: Optional[Dict[str, Union[int, str]]] = None,
) -> Dataset:
"""Loads an existing Deep Lake dataset
Examples:
>>> ds = deeplake.load("hub://username/dataset")
>>> ds = deeplake.load("s3://mybucket/my_dataset")
>>> ds = deeplake.load("./datasets/my_dataset", overwrite=True)
Loading to a specfic version:
>>> ds = deeplake.load("hub://username/dataset@new_branch")
>>> ds = deeplake.load("hub://username/dataset@3e49cded62b6b335c74ff07e97f8451a37aca7b2)
>>> my_commit_id = "3e49cded62b6b335c74ff07e97f8451a37aca7b2"
>>> ds = deeplake.load(f"hub://username/dataset@{my_commit_id}")
Args:
path (str, pathlib.Path): - The full path to the dataset. Can be:
- a Deep Lake cloud path of the form ``hub://username/datasetname``. To write to Deep Lake cloud datasets, ensure that you are authenticated to Deep Lake (pass in a token using the 'token' parameter).
- an s3 path of the form ``s3://bucketname/path/to/dataset``. Credentials are required in either the environment or passed to the creds argument.
- a local file system path of the form ``./path/to/dataset`` or ``~/path/to/dataset`` or ``path/to/dataset``.
- a memory path of the form ``mem://path/to/dataset`` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
- Loading to a specific version:
- You can also specify a ``commit_id`` or ``branch`` to load the dataset to that version directly by using the ``@`` symbol.
- The path will then be of the form ``hub://username/dataset@{branch}`` or ``hub://username/dataset@{commit_id}``.
- See examples above.
read_only (bool, optional): Opens dataset in read only mode if this is passed as ``True``. Defaults to ``False``.
Datasets stored on Deep Lake cloud that your account does not have write access to will automatically open in read mode.
memory_cache_size (int): The size of the memory cache to be used in MB.
local_cache_size (int): The size of the local filesystem cache to be used in MB.
creds (dict, str, optional): The string ``ENV`` or a dictionary containing credentials used to access the dataset at the path.
- If 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token' are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
- It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys.
- If 'ENV' is passed, credentials are fetched from the environment variables. This is also the case when creds is not passed for cloud datasets. For datasets connected to hub cloud, specifying 'ENV' will override the credentials fetched from Activeloop and use local ones.
token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Deep Lake dataset. This is optional, tokens are normally autogenerated.
org_id (str, Optional): Organization id to be used for enabling high-performance features. Only applicable for local datasets.
verbose (bool): If ``True``, logs will be printed. Defaults to ``True``.
access_method (str): The access method to use for the dataset. Can be:
- 'stream'
- Streams the data from the dataset i.e. only fetches data when required. This is the default value.
- 'download'
- Downloads the data to the local filesystem to the path specified in environment variable ``DEEPLAKE_DOWNLOAD_PATH``.
This will overwrite ``DEEPLAKE_DOWNLOAD_PATH``.
- Raises an exception if ``DEEPLAKE_DOWNLOAD_PATH`` environment variable is not set or if the dataset does not exist.
- The 'download' access method can be modified to specify num_workers and/or scheduler.
For example: 'download:2:processed' will use 2 workers and use processed scheduler, while 'download:3' will use 3 workers and
default scheduler (threaded), and 'download:processed' will use a single worker and use processed scheduler.
- 'local'
- Downloads the dataset if it doesn't already exist, otherwise loads from local storage.
- Raises an exception if ``DEEPLAKE_DOWNLOAD_PATH`` environment variable is not set.
- The 'local' access method can be modified to specify num_workers and/or scheduler to be used in case dataset needs to be downloaded.
If dataset needs to be downloaded, 'local:2:processed' will use 2 workers and use processed scheduler, while 'local:3' will use 3 workers
and default scheduler (threaded), and 'local:processed' will use a single worker and use processed scheduler.
unlink (bool): Downloads linked samples if set to ``True``. Only applicable if ``access_method`` is ``download`` or ``local``. Defaults to ``False``.
reset (bool): If the specified dataset cannot be loaded due to a corrupted HEAD state of the branch being loaded,
setting ``reset=True`` will reset HEAD changes and load the previous version.
check_integrity (bool, Optional): Performs an integrity check by default (None) if the dataset has 20 or fewer tensors.
Set to ``True`` to force integrity check, ``False`` to skip integrity check.
indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false
..
# noqa: DAR101
Returns:
Dataset: Dataset loaded using the arguments provided.
Raises:
DatasetHandlerError: If a Dataset does not exist at the given path.
AgreementError: When agreement is rejected
UserNotLoggedInException: When user is not authenticated
InvalidTokenException: If the specified toke is invalid
TokenPermissionError: When there are permission or other errors related to token
CheckoutError: If version address specified in the path cannot be found
DatasetCorruptError: If loading the dataset failed due to corruption and ``reset`` is not ``True``
ReadOnlyModeError: If reset is attempted in read-only mode
LockedException: When attempting to open a dataset for writing when it is locked by another machine
ValueError: If ``org_id`` is specified for a non-local dataset
Exception: Re-raises caught exception if reset cannot fix the issue
ValueError: If the org id is provided but the dataset is not local
Warning:
Setting ``access_method`` to download will overwrite the local copy of the dataset if it was previously downloaded.
Note:
Any changes made to the dataset in download / local mode will only be made to the local copy and will not be reflected in the original dataset.
"""
_check_indra_and_read_only_flags(indra, read_only)
access_method, num_workers, scheduler = parse_access_method(access_method)
check_access_method(access_method, overwrite=False, unlink=unlink)
path, address = process_dataset_path(path)
if creds is None:
creds = {}
if org_id is not None and get_path_type(path) != "local":
raise ValueError("org_id parameter can only be used with local datasets")
try:
storage, cache_chain = get_storage_and_cache_chain(
path=path,
read_only=read_only,
creds=creds,
token=token,
memory_cache_size=memory_cache_size,
local_cache_size=local_cache_size,
indra=indra,
)
feature_report_path(
path,
"load",
{
"lock_enabled": lock_enabled,
"lock_timeout": lock_timeout,
"index_params": index_params,
},
token=token,
)
except Exception as e:
if isinstance(e, UserNotLoggedInException):
raise UserNotLoggedInException from None
raise
if not dataset_exists(cache_chain):
> raise DatasetHandlerError(
f"A Deep Lake dataset does not exist at the given path ({path}). Check the path provided or in case you want to create a new dataset, use deeplake.empty()."
)
E deeplake.util.exceptions.DatasetHandlerError: A Deep Lake dataset does not exist at the given path (hub://testingacc2/tmp534c_test_access_method_test_access_method_with_creds__local-managed-entry__). Check the path provided or in case you want to create a new dataset, use deeplake.empty().
deeplake/api/dataset.py:663: DatasetHandlerError
Check failure on line 110 in deeplake/api/tests/test_access_method.py
github-actions / JUnit Test Report
test_access_method.test_access_method_with_creds
deeplake.util.exceptions.AuthenticationException: Authentication failed. Please try logging in again.
Raw output
hub_cloud_ds_generator = <function hub_cloud_ds_generator.<locals>.generate_hub_cloud_ds at 0x7f8fc201d940>
hub_cloud_dev_managed_creds_key = 'aws_creds'
@pytest.mark.slow
def test_access_method_with_creds(
hub_cloud_ds_generator, hub_cloud_dev_managed_creds_key
):
with hub_cloud_ds_generator() as ds:
ds.create_tensor("abc")
ds.create_tensor("images", htype="link[image]", sample_compression="jpg")
ds.add_creds_key(hub_cloud_dev_managed_creds_key, managed=True)
ds.abc.extend([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
ds.images.extend(
[
deeplake.link(
"https://picsum.photos/20/30",
creds_key=hub_cloud_dev_managed_creds_key,
)
for _ in range(10)
]
)
ds = hub_cloud_ds_generator(access_method="download:2")
assert ds.images.htype == "link[image]"
assert ds.images.shape == (10, 30, 20, 3)
np.testing.assert_array_equal(ds.abc.numpy(), np.arange(1, 11).reshape(-1, 1))
> ds.delete()
deeplake/api/tests/test_access_method.py:110:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake/core/dataset/deeplake_cloud_dataset.py:246: in delete
self.client.delete_dataset_entry(self.org_id, self.ds_name)
deeplake/client/client.py:306: in delete_dataset_entry
self.request(
deeplake/client/client.py:148: in request
check_response_status(response)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
response = <Response [401]>
def check_response_status(response: requests.Response):
"""Check response status and throw corresponding exception on failure."""
code = response.status_code
if code >= 200 and code < 300:
return
try:
message = response.json()["description"]
except Exception:
message = " "
if code == 400:
raise BadRequestException(message)
elif response.status_code == 401:
> raise AuthenticationException
E deeplake.util.exceptions.AuthenticationException: Authentication failed. Please try logging in again.
deeplake/client/utils.py:58: AuthenticationException
Check failure on line 480 in deeplake/core/query/test/test_query.py
github-actions / JUnit Test Report
test_query.test_link_materialize[1]
deeplake.util.exceptions.TransformError: Transform failed at index 0 of the input data. See traceback for more details. If you wish to skip the samples that cause errors, please specify `ignore_errors=True`.
Raw output
self = Sample(is_lazy=True, path=https://picsum.photos/10/10)
def _read_from_path(self) -> bytes: # type: ignore
if self._buffer is None:
path_type = get_path_type(self.path)
try:
if path_type == "local":
self._buffer = self._read_from_local()
elif path_type == "gcs":
self._buffer = self._read_from_gcs()
elif path_type == "s3":
self._buffer = self._read_from_s3()
elif path_type == "azure":
self._buffer = self._read_from_azure()
elif path_type == "gdrive":
self._buffer = self._read_from_gdrive()
elif path_type == "http":
> self._buffer = self._read_from_http(timeout=self._timeout)
deeplake/core/sample.py:466:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Sample(is_lazy=True, path=https://picsum.photos/10/10), timeout = None
def _read_from_http(self, timeout=None) -> bytes:
assert self.path is not None
if "Authorization" in self._creds:
headers = {"Authorization": self._creds["Authorization"]}
else:
headers = {}
result = requests.get(self.path, headers=headers, timeout=timeout)
if result.status_code != 200:
> raise UnableToReadFromUrlError(self.path, result.status_code)
E deeplake.util.exceptions.UnableToReadFromUrlError: Unable to read from url https://picsum.photos/10/10. Status code: 520
deeplake/core/sample.py:532: UnableToReadFromUrlError
The above exception was the direct cause of the following exception:
self = <deeplake.core.transform.transform_tensor.TransformTensor object at 0x7f8f8e8dedc0>
item = Sample(is_lazy=True, path=https://picsum.photos/10/10)
def append(self, item):
"""Adds an item to the tensor."""
if self.is_group:
raise TensorDoesNotExistError(self.name)
try:
# optimization applicable only if extending
self.non_numpy_only()
> self._verify_item(item)
deeplake/core/transform/transform_tensor.py:122:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake/core/transform/transform_tensor.py:112: in _verify_item
shape = getattr(item, "shape", None) # verify sample
deeplake/core/sample.py:169: in shape
self._read_meta()
deeplake/core/sample.py:204: in _read_meta
f = self._read_from_path()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Sample(is_lazy=True, path=https://picsum.photos/10/10)
def _read_from_path(self) -> bytes: # type: ignore
if self._buffer is None:
path_type = get_path_type(self.path)
try:
if path_type == "local":
self._buffer = self._read_from_local()
elif path_type == "gcs":
self._buffer = self._read_from_gcs()
elif path_type == "s3":
self._buffer = self._read_from_s3()
elif path_type == "azure":
self._buffer = self._read_from_azure()
elif path_type == "gdrive":
self._buffer = self._read_from_gdrive()
elif path_type == "http":
self._buffer = self._read_from_http(timeout=self._timeout)
except Exception as e:
> raise SampleReadError(self.path) from e # type: ignore
E deeplake.util.exceptions.SampleReadError: Unable to read sample from https://picsum.photos/10/10
deeplake/core/sample.py:468: SampleReadError
The above exception was the direct cause of the following exception:
data_slice = Dataset(path='./hub_pytest/test_query/test_link_materialize-1-', index=Index([slice(0, 20, 2)]), tensors=['abc'])
offset = 0
transform_dataset = <deeplake.core.transform.transform_dataset.TransformDataset object at 0x7f8f8e8f3310>
pipeline = <deeplake.core.transform.transform.Pipeline object at 0x7f8f8e8dde20>
tensors = ['abc'], skip_ok = True
pg_callback = <function ComputeProvider.map_with_progress_bar.<locals>.sub_func.<locals>.pg_callback at 0x7f8f8e9331f0>
ignore_errors = False
def _transform_and_append_data_slice(
data_slice,
offset,
transform_dataset,
pipeline,
tensors,
skip_ok,
pg_callback,
ignore_errors,
):
"""Appends a data slice. Returns ``True`` if any samples were appended and ``False`` otherwise."""
try:
import pandas as pd # type: ignore
except ImportError:
pd = None
n = len(data_slice)
skipped_samples = 0
skipped_samples_in_current_batch = 0
pipeline_checked = False
last_pg_update_time = time.time()
progress = 0
for i, sample in enumerate(
(data_slice[i : i + 1] for i in range(n))
if pd and isinstance(data_slice, pd.DataFrame)
else data_slice
):
try:
transform_dataset.set_start_input_idx(i)
try:
> out = transform_sample(sample, pipeline, tensors)
deeplake/util/transform.py:227:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake/util/transform.py:76: in transform_sample
fn(out, result, *args, **kwargs)
deeplake/core/dataset/dataset.py:4248: in _copy_tensor_append
sample_out[tensor_name].append(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <deeplake.core.transform.transform_tensor.TransformTensor object at 0x7f8f8e8dedc0>
item = Sample(is_lazy=True, path=https://picsum.photos/10/10)
def append(self, item):
"""Adds an item to the tensor."""
if self.is_group:
raise TensorDoesNotExistError(self.name)
try:
# optimization applicable only if extending
self.non_numpy_only()
self._verify_item(item)
self.items.append(item)
self._item_added(item)
except Exception as e:
self.items.clear()
> raise SampleAppendError(self.name, item) from e
E deeplake.util.exceptions.SampleAppendError: Failed to append the sample at path 'https://picsum.photos/10/10' to the tensor 'abc'. See more details in the traceback.
deeplake/core/transform/transform_tensor.py:127: SampleAppendError
The above exception was the direct cause of the following exception:
local_ds = Dataset(path='./hub_pytest/test_query/test_link_materialize-1-', tensors=['abc'])
num_workers = 1
@pytest.mark.slow
@pytest.mark.parametrize("num_workers", [1, 2])
def test_link_materialize(local_ds, num_workers):
with local_ds as ds:
ds.create_tensor("abc", htype="link[image]", sample_compression="jpg")
ds.abc.extend(
[
(
deeplake.link("https://picsum.photos/20/20")
if i % 2
else deeplake.link("https://picsum.photos/10/10")
)
for i in range(20)
]
)
ds.commit()
view = ds[::2]
> view.save_view(id="view_1", optimize=True, num_workers=num_workers)
deeplake/core/query/test/test_query.py:480:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
deeplake/core/dataset/dataset.py:3737: in save_view
return self._save_view(
deeplake/core/dataset/dataset.py:3835: in _save_view
vds = self._save_view_in_subdir(
deeplake/core/dataset/dataset.py:3635: in _save_view_in_subdir
self._write_vds(vds, info, copy, tensors, num_workers, scheduler, ignore_errors)
deeplake/core/dataset/dataset.py:3566: in _write_vds
self._copy(
deeplake/core/dataset/dataset.py:4264: in _copy
deeplake.compute(
deeplake/core/transform/transform.py:125: in eval
pipeline.eval(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <deeplake.core.transform.transform.Pipeline object at 0x7f8f8e8dde20>
data_in = Dataset(path='./hub_pytest/test_query/test_link_materialize-1-', index=Index([slice(None, None, 2)]), tensors=['abc'])
ds_out = Dataset(path='./hub_pytest/test_query/test_link_materialize-1-/.queries/view_1', tensors=['abc'])
num_workers = 1, scheduler = 'threaded', progressbar = True, skip_ok = True
check_lengths = False, pad_data_in = False, read_only_ok = False
cache_size = 16, checkpoint_interval = 0, ignore_errors = False, verbose = True
kwargs = {'disable_label_sync': True, 'extend_only': False}, overwrite = False
original_data_in = Dataset(path='./hub_pytest/test_query/test_link_materialize-1-', index=Index([slice(None, None, 2)]), tensors=['abc'])
initial_padding_state = None
target_ds = Dataset(path='./hub_pytest/test_query/test_link_materialize-1-/.queries/view_1', tensors=['abc'])
compute_provider = <deeplake.core.compute.thread.ThreadProvider object at 0x7f8f8e8ec160>
compute_id = '31463e2313f54982bdbfce54626b91c2', initial_autoflush = False
def eval(
self,
data_in,
ds_out: Optional[deeplake.Dataset] = None,
num_workers: int = 0,
scheduler: str = "threaded",
progressbar: bool = True,
skip_ok: bool = False,
check_lengths: bool = True,
pad_data_in: bool = False,
read_only_ok: bool = False,
cache_size: int = DEFAULT_TRANSFORM_SAMPLE_CACHE_SIZE,
checkpoint_interval: int = 0,
ignore_errors: bool = False,
verbose: bool = True,
**kwargs,
):
"""
Evaluates the Pipeline of ComputeFunctions on ``data_in`` to produce an output dataset ``ds_out``. The purpose of compute functions is to process the input data in parallel,
which is useful when rapidly ingesting data to a Deep Lake dataset. Pipelines can also be executed in-place, where it modifies the input dataset (see ``ds_out`` parameters below) instead of writing to a new dataset.
Args:
data_in: Input passed to the transform to generate output dataset. Should support ``__getitem__`` and ``__len__`` operations. Can be a Deep Lake dataset.
ds_out (Dataset, optional): The dataset object to which the transform will get written. If this is not provided, the ComputeFunction will operate in-place, which means that data will be written to tensors in ``data_in`` .
All tensors modified in the ComputeFunction should already be defined in ``ds_out``. It's initial state should be either:
- Empty i.e. all tensors have no samples. In this case all samples are added to the dataset.
- All tensors are populated and have same length. In this case new samples are appended to the dataset.
num_workers (int): The number of workers to use for performing the transform. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
scheduler (str): The scheduler to be used to compute the transformation. Supported values include: ``serial``, ``threaded``, and ``processed``.
Defaults to 'threaded'.
progressbar (bool): Displays a progress bar if ``True`` (default).
skip_ok (bool): If ``True``, skips the check for output tensors generated. This allows the user to skip certain tensors in the function definition.
This is especially useful for inplace transformations in which certain tensors are not modified. Defaults to ``False``.
check_lengths (bool): If ``True``, checks whether ``ds_out`` has tensors of same lengths initially.
pad_data_in (bool): If ``True``, pads tensors of ``data_in`` to match the length of the largest tensor in ``data_in``.
Defaults to ``False``.
read_only_ok (bool): If ``True`` and output dataset is same as input dataset, the read-only check is skipped.
Defaults to False.
cache_size (int): Cache size to be used by transform per worker.
checkpoint_interval (int): If > 0, the ComputeFunction will be checkpointed with a commit every ``checkpoint_interval`` input samples to avoid restarting full transform due to intermitten failures. If the transform is interrupted, the intermediate data is deleted and the dataset is reset to the last commit.
If <= 0, no checkpointing is done. Checkpoint interval should be a multiple of num_workers if ``num_workers`` > 0. Defaults to 0.
ignore_errors (bool): If ``True``, input samples that causes transform to fail will be skipped and the errors will be ignored **if possible**.
verbose (bool): If ``True``, prints additional information about the transform.
**kwargs: Additional arguments.
Raises:
InvalidInputDataError: If ``data_in`` passed to transform is invalid. It should support ``__getitem__`` and ``__len__`` operations. Using scheduler other than ``threaded`` with deeplake dataset having base storage as memory as ``data_in`` will also raise this.
InvalidOutputDatasetError: If all the tensors of ``ds_out`` passed to transform don't have the same length. Using scheduler other than "threaded" with deeplake dataset having base storage as memory as ``ds_out`` will also raise this.
TensorMismatchError: If one or more of the outputs generated during transform contain different tensors than the ones present in 'ds_out' provided to transform.
UnsupportedSchedulerError: If the scheduler passed is not recognized. Supported values include: ``serial``, ``threaded``, and ``processed``.
TransformError: All other exceptions raised if there are problems while running the pipeline.
ValueError: If ``num_workers`` > 0 and ``checkpoint_interval`` is not a multiple of ``num_workers`` or if ``checkpoint_interval`` > 0 and ds_out is None.
AllSamplesSkippedError: If all samples are skipped during execution of the Pipeline.
ModuleNotInstalledException: If the module ``ray`` is not installed and the scheduler is set to ``ray``.
# noqa: DAR401
Example:
# Suppose we have a series of operations that we want to perform in parallel on images using reusable pipelines.
# We use the pipeline to ingest the transfomed data from one dataset to another dataset.
# First, we define the ComputeFunctions that will be used in the pipeline
@deeplake.compute
def flip_vertical(sample_in, sample_out):
sample_out.append({'labels': sample_in.labels.numpy(),
'images': np.flip(sample_in.images.numpy(), axis = 0)})
@deeplake.compute
def resize(sample_in, sample_out, new_size):
sample_out.append({"labels": sample_in.labels.numpy(),
"images": np.array(Image.fromarray(sample_in.images.numpy()).resize(new_size))})
# Append the label and image to the output sample
sample_out.labels.append(sample_in.labels.numpy())
sample_out.images.append(np.array(Image.fromarray(sample_in.images.numpy()).resize(new_size)))
# We can define the pipeline using:
pipeline = deeplake.compose([flip_vertical(), resize(new_size = (64,64))])
# Finally, we can evaluate the pipeline using:
pipeline.eval(ds_in, ds_out, num_workers = 4)
Note:
``pad_data_in`` is only applicable if ``data_in`` is a Deep Lake dataset.
"""
num_workers, scheduler = sanitize_workers_scheduler(num_workers, scheduler)
overwrite = ds_out is None
deeplake_reporter.feature_report(
feature_name="eval",
parameters={"Num_Workers": str(num_workers), "Scheduler": scheduler},
)
check_transform_data_in(data_in, scheduler)
data_in, original_data_in, initial_padding_state = prepare_data_in(
data_in, pad_data_in, overwrite
)
target_ds = data_in if overwrite else ds_out
check_transform_ds_out(
target_ds, scheduler, check_lengths, read_only_ok and overwrite
)
# if overwrite then we've already flushed and autocheckecked out data_in which is target_ds now
if not overwrite:
target_ds.flush()
auto_checkout(target_ds)
compute_provider = get_compute_provider(scheduler, num_workers)
compute_id = str(uuid4().hex)
target_ds._send_compute_progress(compute_id=compute_id, start=True, progress=0)
initial_autoflush = target_ds.storage.autoflush
target_ds.storage.autoflush = False
if not check_lengths or read_only_ok:
skip_ok = True
checkpointing_enabled = checkpoint_interval > 0
total_samples = len_data_in(data_in)
if checkpointing_enabled:
check_checkpoint_interval(
data_in,
checkpoint_interval,
num_workers,
overwrite,
verbose,
)
datas_in = [
data_in[i : i + checkpoint_interval]
for i in range(0, len_data_in(data_in), checkpoint_interval)
]
else:
datas_in = [data_in]
samples_processed = 0
desc = get_pbar_description(self.functions)
if progressbar:
pbar = get_progress_bar(len_data_in(data_in), desc)
pqueue = compute_provider.create_queue()
else:
pbar, pqueue = None, None
try:
desc = desc.split()[1]
completed = False
progress = 0.0
for data_in in datas_in:
if checkpointing_enabled:
target_ds._commit(
f"Auto-commit during deeplake.compute of {desc} after {progress}% progress",
None,
False,
is_checkpoint=True,
total_samples_processed=samples_processed,
)
progress = round(
(samples_processed + len_data_in(data_in)) / total_samples * 100, 2
)
end = progress == 100
progress_args = {
"compute_id": compute_id,
"progress": progress,
"end": end,
}
try:
self.run(
data_in,
target_ds,
compute_provider,
num_workers,
scheduler,
progressbar,
overwrite,
skip_ok,
read_only_ok and overwrite,
cache_size,
pbar,
pqueue,
ignore_errors,
**kwargs,
)
target_ds._send_compute_progress(**progress_args, status="success")
samples_processed += len_data_in(data_in)
completed = end
except Exception as e:
if checkpointing_enabled:
print(
"Transform failed. Resetting back to last committed checkpoint."
)
target_ds.reset(force=True)
target_ds._send_compute_progress(**progress_args, status="failed")
index, sample, suggest = None, None, False
if isinstance(e, TransformError):
index, sample, suggest = e.index, e.sample, e.suggest
if checkpointing_enabled and isinstance(index, int):
index = samples_processed + index
e = e.__cause__ # type: ignore
if isinstance(e, AllSamplesSkippedError):
raise e
> raise TransformError(
index=index,
sample=sample,
samples_processed=samples_processed,
suggest=suggest,
) from e
E deeplake.util.exceptions.TransformError: Transform failed at index 0 of the input data. See traceback for more details. If you wish to skip the samples that cause errors, please specify `ignore_errors=True`.
deeplake/core/transform/transform.py:355: TransformError