Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AL-2358] ignore_errors argument for ds.save_view #2498

Merged
merged 11 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion deeplake/api/tests/test_views.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from deeplake.util.exceptions import ReadOnlyModeError, EmptyTensorError
from deeplake.util.exceptions import ReadOnlyModeError, EmptyTensorError, TransformError
from deeplake.client.utils import get_user_name
from deeplake.cli.auth import logout, login
from click.testing import CliRunner
Expand Down Expand Up @@ -139,3 +139,36 @@ def test_view_from_different_commit(local_ds):
view3 = ds.load_view("efg")
assert ds.commit_id == cid2
assert view3.is_optimized


def test_save_view_ignore_errors(local_ds):
with local_ds as ds:
ds.create_tensor(
"images", htype="link[image]", sample_compression="jpg", verify=False
)
ds.create_tensor("labels", htype="class_label")

ds.images.extend(
[deeplake.link("https://picsum.photos/20/30") for _ in range(8)]
)
ds.images.extend([deeplake.link("https://abcd/20") for _ in range(2)])
ds.images.extend(
[deeplake.link("https://picsum.photos/20/30") for _ in range(10)]
)

ds.labels.extend([0 for _ in range(20)])

ds.commit()

with pytest.raises(TransformError):
ds[:10].save_view(id="one", optimize=True, num_workers=2)

ds[:10].save_view(id="two", optimize=True, ignore_errors=True, num_workers=2)
view = ds.load_view("two")

assert len(view) == 8

assert view.images.htype == "image"
assert view.images.shape == (8, 30, 20, 3)

np.testing.assert_array_equal(view.labels.numpy(), np.array([[0]] * 8))
77 changes: 53 additions & 24 deletions deeplake/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3227,6 +3227,7 @@ def _write_vds(
tensors: Optional[List[str]] = None,
num_workers: Optional[int] = 0,
scheduler: str = "threaded",
ignore_errors: bool = False,
unlink=True,
):
"""Writes the indices of this view to a vds."""
Expand All @@ -3241,6 +3242,7 @@ def _write_vds(
scheduler=scheduler,
unlink=unlink,
create_vds_index_tensor=True,
ignore_errors=ignore_errors,
)
else:
vds.create_tensor(
Expand Down Expand Up @@ -3270,13 +3272,14 @@ def _save_view_in_subdir(
tensors: Optional[List[str]],
num_workers: int,
scheduler: str,
ignore_errors: bool,
):
"""Saves this view under ".queries" sub directory of same storage."""
info = self._get_view_info(id, message, copy)
hash = info["id"]
path = f".queries/{hash}"
vds = self._sub_ds(path, empty=True, verbose=False)
self._write_vds(vds, info, copy, tensors, num_workers, scheduler)
self._write_vds(vds, info, copy, tensors, num_workers, scheduler, ignore_errors)
self._append_to_queries_json(info)
return vds

Expand All @@ -3289,6 +3292,7 @@ def _save_view_in_path(
tensors: Optional[List[str]],
num_workers: int,
scheduler: str,
ignore_errors: bool,
**ds_args,
):
"""Saves this view at a given dataset path"""
Expand All @@ -3299,7 +3303,7 @@ def _save_view_in_path(
except Exception as e:
raise DatasetViewSavingError from e
info = self._get_view_info(id, message, copy)
self._write_vds(vds, info, copy, tensors, num_workers, scheduler)
self._write_vds(vds, info, copy, tensors, num_workers, scheduler, ignore_errors)
return vds

def save_view(
Expand All @@ -3312,6 +3316,7 @@ def save_view(
num_workers: int = 0,
scheduler: str = "threaded",
verbose: bool = True,
ignore_errors: bool = False,
**ds_args,
) -> str:
"""Saves a dataset view as a virtual dataset (VDS)
Expand Down Expand Up @@ -3346,6 +3351,7 @@ def save_view(
num_workers (int): Number of workers to be used for optimization process. Applicable only if ``optimize=True``. Defaults to 0.
scheduler (str): The scheduler to be used for optimization. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Only applicable if ``optimize=True``. Defaults to 'threaded'.
verbose (bool): If ``True``, logs will be printed. Defaults to ``True``.
ignore_errors (bool): Skip samples that cause errors while saving views. Only applicable if ``optimize=True``. Defaults to ``False``.
ds_args (dict): Additional args for creating VDS when path is specified. (See documentation for :func:`deeplake.dataset()`)

Returns:
Expand Down Expand Up @@ -3385,6 +3391,7 @@ def save_view(
scheduler,
verbose,
False,
ignore_errors,
**ds_args,
)

Expand All @@ -3399,6 +3406,7 @@ def _save_view(
scheduler: str = "threaded",
verbose: bool = True,
_ret_ds: bool = False,
ignore_errors: bool = False,
**ds_args,
) -> Union[str, Any]:
"""Saves a dataset view as a virtual dataset (VDS)
Expand All @@ -3416,6 +3424,7 @@ def _save_view(
verbose (bool): If ``True``, logs will be printed. Defaults to ``True``.
_ret_ds (bool): If ``True``, the VDS is retured as such without converting it to a view. If ``False``, the VDS path is returned.
Default False.
ignore_errors (bool): Skip samples that cause errors while saving views. Only applicable if ``optimize=True``. Defaults to ``False``.
ds_args (dict): Additional args for creating VDS when path is specified. (See documentation for `deeplake.dataset()`)

Returns:
Expand Down Expand Up @@ -3457,6 +3466,7 @@ def _save_view(
tensors,
num_workers,
scheduler,
ignore_errors,
)
except ReadOnlyModeError as e:
raise ReadOnlyModeError(
Expand All @@ -3469,7 +3479,13 @@ def _save_view(
)
else:
vds = self._save_view_in_subdir(
id, message, optimize, tensors, num_workers, scheduler
id,
message,
optimize,
tensors,
num_workers,
scheduler,
ignore_errors,
)
else:
vds = self._save_view_in_path(
Expand All @@ -3480,6 +3496,7 @@ def _save_view(
tensors,
num_workers,
scheduler,
ignore_errors,
**ds_args,
)
if verbose and self.verbose:
Expand Down Expand Up @@ -3811,6 +3828,7 @@ def _copy(
public: bool = False,
unlink: bool = False,
create_vds_index_tensor: bool = False,
ignore_errors: bool = False,
verbose: bool = True,
):
if isinstance(dest, str):
Expand Down Expand Up @@ -3845,30 +3863,37 @@ def _copy(
dest_ds.link_creds.__setstate__(self.link_creds.__getstate__())
save_link_creds(dest_ds.link_creds, dest_ds.storage)

def _copy_tensor(sample_in, sample_out):
def _is_unlink_tensor(tensor):
if (
unlink
and tensor.is_link
and (tensor.base_htype != "video" or deeplake.constants._UNLINK_VIDEOS)
):
return True

# If we have to unlink any tensor, we will use sample-by-sample append implementation (_copy_tensor_append)
# Otherwise, we will use extend-by-whole-slice implementation (_copy_tensor_extend)
extend_only = not any(
_is_unlink_tensor(self[tensor_name]) for tensor_name in dest_ds.tensors
)

def _copy_tensor_extend(sample_in, sample_out):
for tensor_name in dest_ds.tensors:
sample_out[tensor_name].extend(sample_in[tensor_name])

def _copy_tensor_append(sample_in, sample_out):
for tensor_name in dest_ds.tensors:
src = sample_in[tensor_name]
if (
unlink
and src.is_link
and (src.base_htype != "video" or deeplake.constants._UNLINK_VIDEOS)
):
if _is_unlink_tensor(src):
if len(sample_in.index) > 1:
sample_out[tensor_name].extend(src)
sample_out[tensor_name].append(src)
else:
if sample_in.index.subscriptable_at(0):
sample_idxs = sample_in.index.values[0].indices(
src.num_samples
)
else:
sample_idxs = [sample_in.index.values[0].value]
for i in sample_idxs:
sample_out[tensor_name].append(
src.chunk_engine.get_deeplake_read_sample(i)
)
sample_out.check_flush()
idx = sample_in.index.values[0].value
sample_out[tensor_name].append(
src.chunk_engine.get_deeplake_read_sample(idx)
)
else:
sample_out[tensor_name].extend(src)
sample_out[tensor_name].append(src)

if not self.index.subscriptable_at(0):
old_first_index = self.index.values[0]
Expand All @@ -3880,16 +3905,20 @@ def _copy_tensor(sample_in, sample_out):
else:
reset_index = False
try:
deeplake.compute(_copy_tensor, name="copy transform")().eval(
deeplake.compute(
_copy_tensor_extend if extend_only else _copy_tensor_append,
name="copy transform",
)().eval(
self,
dest_ds,
num_workers=num_workers,
scheduler=scheduler,
progressbar=progressbar,
skip_ok=True,
check_lengths=False,
ignore_errors=ignore_errors,
disable_label_sync=True,
extend_only=True,
extend_only=extend_only,
)

dest_ds.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def search(
"""
try:
from indra import api # type: ignore

INDRA_INSTALLED = True
except ImportError:
INDRA_INSTALLED = False
Expand Down
Loading