Skip to content

Commit

Permalink
Merge pull request #2339 from activeloopai/fy_tfm_err
Browse files Browse the repository at this point in the history
[AL-2277] Avoid producing corrupt datasets in transforms
  • Loading branch information
FayazRahman committed May 12, 2023
2 parents 5aebba2 + aa3aef1 commit 5b8bbeb
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 22 deletions.
27 changes: 27 additions & 0 deletions deeplake/core/transform/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -1552,6 +1552,33 @@ def upload(stuff, ds):
ds2.delete()


def test_no_corruption(local_ds):
@deeplake.compute
def upload(stuff, ds):
ds.append(stuff)

with local_ds as ds:
ds.create_tensor("images", htype="image", sample_compression="png")
ds.create_tensor("labels", htype="class_label")

samples = (
[
{
"images": np.random.randint(0, 256, (10, 10, 3), dtype=np.uint8),
"labels": 1,
}
for _ in range(20)
]
+ ["bad_sample"]
) * 2

with pytest.raises(TransformError):
upload().eval(samples, ds, num_workers=TRANSFORM_TEST_NUM_WORKERS)

assert ds.images.numpy().shape == (40, 10, 10, 3)
assert ds.labels.numpy().shape == (40, 1)


def test_ds_append_empty(local_ds):
@deeplake.compute
def upload(stuff, ds):
Expand Down
4 changes: 4 additions & 0 deletions deeplake/core/transform/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,10 @@ def run(
verbose=progressbar,
)

for res in result["error"]:
if res is not None:
raise res


def compose(functions: List[ComputeFunction]): # noqa: DAR101, DAR102, DAR201, DAR401
"""Takes a list of functions decorated using :func:`deeplake.compute` and creates a pipeline that can be evaluated using .eval
Expand Down
5 changes: 4 additions & 1 deletion deeplake/core/transform/transform_dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from deeplake.util.exceptions import SampleAppendError, TensorDoesNotExistError
from deeplake.util.exceptions import SampleAppendError, SampleAppendingError
from deeplake.core.transform.transform_tensor import TransformTensor
from deeplake.core.linked_tiled_sample import LinkedTiledSample
from deeplake.core.partial_sample import PartialSample
Expand Down Expand Up @@ -60,6 +60,9 @@ def __iter__(self):
yield self[i]

def append(self, sample, skip_ok=False, append_empty=False):
if not isinstance(sample, dict):
raise SampleAppendingError()

if skip_ok:
raise ValueError(
"`skip_ok` is not supported for `ds.append` in transforms. Use `skip_ok` parameter of the `eval` method instead."
Expand Down
53 changes: 32 additions & 21 deletions deeplake/util/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,26 +333,37 @@ def store_data_slice_with_pbar(pg_callback, transform_input: Tuple) -> Dict:
)

ret = True
if extend_only:
_extend_data_slice(
data_slice, offset, transform_dataset, pipeline.functions[0], pg_callback
)
else:
ret = _transform_and_append_data_slice(
data_slice,
offset,
transform_dataset,
pipeline,
rel_tensors,
skip_ok,
pg_callback,
ignore_errors,
)

# retrieve relevant objects from memory
meta = _retrieve_memory_objects(all_chunk_engines)
meta["all_samples_skipped"] = not ret
return meta
err = None
try:
if extend_only:
_extend_data_slice(
data_slice,
offset,
transform_dataset,
pipeline.functions[0],
pg_callback,
)
else:
ret = _transform_and_append_data_slice(
data_slice,
offset,
transform_dataset,
pipeline,
rel_tensors,
skip_ok,
pg_callback,
ignore_errors,
)
except Exception as e:
print(e)
transform_dataset.flush()
err = e
finally:
# retrieve relevant objects from memory
meta = _retrieve_memory_objects(all_chunk_engines)
meta["all_samples_skipped"] = not ret
meta["error"] = err
return meta


def create_worker_chunk_engines(
Expand Down Expand Up @@ -568,7 +579,7 @@ def get_lengths_generated(all_tensor_metas, tensors):


def check_lengths(all_tensors_generated_length, skip_ok):
if not skip_ok:
if skip_ok:
return

first_length = None
Expand Down

0 comments on commit 5b8bbeb

Please sign in to comment.