Skip to content

Commit

Permalink
Video Query API - Load non linear views in deeplake (#2136)
Browse files Browse the repository at this point in the history
* Add support for saving query in query.json

* Refactor

* Fixed linting

* WIP

* First Changes

* Avoid saving VDS in case of query.

* WIP

* WIP

* Added query.py

* Fixed dataloader

* Added deeplake_query_view.py

* Added dataloader test

* Removed default_collate

* Fix query.py

* Some fixes

* Fixed dataloader issue

* Added Exception for OSS dataloader

* Authorization fix

* dataloader tensors fix

* First commit

* WIP

* WIP

* WIP

* Abhinav's dataloader batch changes

* Fix

* WIP

* WIP

* WIP

* WIP

* WIP

* tql fix

* Fixed linter. Cleanup.

* Fixed linter and updated tests.

* Fixed tests.

* Added nested query test.

* Fixed black.

* Minor fixes.

* Fixed black.

* Step back.

* Avoid user storage queries.

* Minor fixes.

* Fixed compile error.

* Added metadata apis.

* More tests.

* More clear apis.

* Fix.

* Fixed sequence strings dataloader.

* Cleanup.

* Cleanup.

* Fixed linker.

* Error handling for pil decode method without transform/collate.

* Fixed typo

* Update indra ds.

* Added random split test.

* Minor.

* Custom random split for indra.

* Fix black.

* Fix indexing+random_split.

* Switch to deeplake random_split.

* Bring back indra random split.

* Addressed review comments.

* Review fixes.

* Fixed linter.

* Fixed lint.

* Fixed recursion error.

* Fixed indexing+query

* Cleanup.

* Cleanup.

* Fixed shape and numpy functions.

* Revert unneeded change.

* Minor.

---------

Co-authored-by: Sasun Hambardzumyan <xustup@gmail.com>
Co-authored-by: Ivo Stranic <istranic@gmail.com>
  • Loading branch information
3 people committed Apr 19, 2023
1 parent 15bb31b commit d2938f8
Show file tree
Hide file tree
Showing 11 changed files with 835 additions and 28 deletions.
2 changes: 1 addition & 1 deletion deeplake/core/chunk_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1717,7 +1717,7 @@ def _get_full_chunk(self, index) -> bool:
start = self.num_samples + start

if stop < 0:
stop = self.num_samples + start
stop = self.num_samples + stop

numpy_array_length = (stop - start) // step
return numpy_array_length > threshold
Expand Down
299 changes: 299 additions & 0 deletions deeplake/core/dataset/deeplake_query_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
import posixpath
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from functools import partial
from deeplake.constants import SHOW_ITERATION_WARNING

from time import time
import math

from deeplake.util.iteration_warning import (
check_if_iteration,
)
from deeplake.api.info import load_info
from deeplake.client.log import logger
from deeplake.client.utils import get_user_name
from deeplake.constants import (
SAMPLE_INFO_TENSOR_MAX_CHUNK_SIZE,
)
from deeplake.core.index import Index
from deeplake.core.tensor import Tensor
from deeplake.util.bugout_reporter import deeplake_reporter
from deeplake.util.exceptions import (
InvalidKeyTypeError,
InvalidOperationError,
MemoryDatasetCanNotBePickledError,
TensorDoesNotExistError,
)
from deeplake.util.scheduling import calculate_absolute_lengths
from deeplake.core.dataset import Dataset

import warnings

from deeplake.core.dataset.deeplake_query_tensor import DeepLakeQueryTensor


class DeepLakeQueryDataset(Dataset):
def __init__(self, deeplake_ds, indra_ds, group_index=None, enabled_tensors=None):
self.deeplake_ds = deeplake_ds
self.indra_ds = indra_ds
self.group_index = group_index or deeplake_ds.group_index
self.enabled_tensors = enabled_tensors or deeplake_ds.enabled_tensors

@property
def meta(self):
return self.deeplake_ds.meta

def merge(self, *args, **kwargs):
raise InvalidOperationError(
"merge", "merge method cannot be called on a Dataset view."
)

def checkout(self, address: str, create: bool = False):
raise InvalidOperationError(
"checkout", "checkout method cannot be called on a Dataset view."
)

def _get_tensor_from_root(self, fullpath):
tensors = self.indra_ds.tensors
for tensor in tensors:
if tensor.name == fullpath:
deeplake_tensor = self.deeplake_ds.__getattr__(fullpath)
indra_tensor = tensor
return DeepLakeQueryTensor(deeplake_tensor, indra_tensor)

def pytorch(
self,
batch_size: Optional[int] = 1,
shuffle: bool = False,
drop_last: bool = False,
return_index: bool = True,
transform: Optional[Callable] = None,
num_workers: int = 0,
num_threads: Optional[int] = None,
collate_fn: Optional[Callable] = None,
distributed=False,
tensors: Optional[List[str]] = None,
raw_tensors: Optional[List[str]] = None,
compressed_tensors: Optional[List[str]] = None,
prefetch_factor: int = 10,
upcast: bool = True,
primary_tensor: Optional[str] = None,
buffer_size: int = 2048,
persistent_workers: bool = False,
):
"""
# noqa: DAR101
Raises:
Exception: OSS dataloader is not supported on query dataset.
"""
raise Exception(
"OSS dataloader is not supported for non-linear views. Use `view.dataloader().pytorch()` instead."
)

def __getitem__(
self,
item: Union[
str, int, slice, List[int], Tuple[Union[int, slice, Tuple[int]]], Index
],
is_iteration: bool = False,
):
if isinstance(item, str):
fullpath = posixpath.join(self.group_index, item)
enabled_tensors = self.enabled_tensors
if enabled_tensors is None or fullpath in enabled_tensors:
tensor = self._get_tensor_from_root(fullpath)
if tensor is not None:
return tensor
if self.deeplake_ds._has_group_in_root(fullpath):
ret = DeepLakeQueryDataset(
deeplake_ds=self.deeplake_ds,
indra_ds=self.indra_ds,
group_index=posixpath.join(self.group_index, item),
)
elif "/" in item:
splt = posixpath.split(item)
ret = self[splt[0]][splt[1]]
else:
raise TensorDoesNotExistError(item)
elif isinstance(item, (int, slice, list, tuple, Index, type(Ellipsis))):
if (
isinstance(item, list)
and len(item)
and (
isinstance(item[0], str)
or (
isinstance(item[0], (list, tuple))
and len(item[0])
and isinstance(item[0][0], str)
)
)
):
group_index = self.group_index
enabled_tensors = [
posixpath.join(
group_index, (x if isinstance(x, str) else "/".join(x))
)
for x in item
]
ret = DeepLakeQueryDataset(
deeplake_ds=self.deeplake_ds,
indra_ds=self.indra_ds,
enabled_tensors=enabled_tensors,
)
elif isinstance(item, tuple) and len(item) and isinstance(item[0], str):
ret = self
for x in item:
ret = self[x]
return ret
else:
if not is_iteration and isinstance(item, int):
is_iteration = check_if_iteration(self._indexing_history, item)
if is_iteration and SHOW_ITERATION_WARNING:
warnings.warn(
"Indexing by integer in a for loop, like `for i in range(len(ds)): ... ds[i]` can be quite slow. Use `for i, sample in enumerate(ds)` instead."
)
ret = DeepLakeQueryDataset(
deeplake_ds=self.deeplake_ds,
indra_ds=self.indra_ds[item],
)
else:
raise InvalidKeyTypeError(item)

if hasattr(self, "_view_entry"):
ret._view_entry = self._view_entry
return ret

def __getattr__(self, key):
try:
return self.__getitem__(key)
except TensorDoesNotExistError as ke:
try:
return getattr(self.deeplake_ds, key)
except AttributeError:
raise AttributeError(
f"'{self.__class__}' object has no attribute '{key}'"
) from ke

def __len__(self):
return len(self.indra_ds)

@deeplake_reporter.record_call
def dataloader(self):
"""Returns a :class:`~deeplake.enterprise.DeepLakeDataLoader` object. To use this, install deeplake with ``pip install deeplake[enterprise]``.
Returns:
~deeplake.enterprise.DeepLakeDataLoader: A :class:`deeplake.enterprise.DeepLakeDataLoader` object.
Examples:
Creating a simple dataloader object which returns a batch of numpy arrays
>>> import deeplake
>>> ds_train = deeplake.load('hub://activeloop/fashion-mnist-train')
>>> train_loader = ds_train.dataloader().numpy()
>>> for i, data in enumerate(train_loader):
... # custom logic on data
... pass
Creating dataloader with custom transformation and batch size
>>> import deeplake
>>> import torch
>>> from torchvision import datasets, transforms, models
>>>
>>> ds_train = deeplake.load('hub://activeloop/fashion-mnist-train')
>>> tform = transforms.Compose([
... transforms.ToPILImage(), # Must convert to PIL image for subsequent operations to run
... transforms.RandomRotation(20), # Image augmentation
... transforms.ToTensor(), # Must convert to pytorch tensor for subsequent operations to run
... transforms.Normalize([0.5], [0.5]),
... ])
...
>>> batch_size = 32
>>> # create dataloader by chaining with transform function and batch size and returns batch of pytorch tensors
>>> train_loader = ds_train.dataloader()\\
... .transform({'images': tform, 'labels': None})\\
... .batch(batch_size)\\
... .shuffle()\\
... .pytorch()
...
>>> # loop over the elements
>>> for i, data in enumerate(train_loader):
... # custom logic on data
... pass
Creating dataloader and chaining with query
>>> ds = deeplake.load('hub://activeloop/coco-train')
>>> train_loader = ds_train.dataloader()\\
... .query("(select * where contains(categories, 'car') limit 1000) union (select * where contains(categories, 'motorcycle') limit 1000)")\\
... .pytorch()
...
>>> # loop over the elements
>>> for i, data in enumerate(train_loader):
... # custom logic on data
... pass
**Restrictions**
The new high performance C++ dataloader is part of our Growth and Enterprise Plan .
- Users of our Community plan can create dataloaders on Activeloop datasets ("hub://activeloop/..." datasets).
- To run queries on your own datasets, `upgrade your organization's plan <https://www.activeloop.ai/pricing/>`_.
"""
from deeplake.enterprise import DeepLakeDataLoader

dataloader = DeepLakeDataLoader(self, _indra_dataset=self.indra_ds)
return dataloader

@property
def no_view_dataset(self):
return self

@property
def index(self):
return self.deeplake_ds.index

def _tensors(
self, include_hidden: bool = True, include_disabled=True
) -> Dict[str, Tensor]:
"""All tensors belonging to this group, including those within sub groups. Always returns the sliced tensors."""
version_state = self.version_state
group_index = self.group_index
all_tensors = self._all_tensors_filtered(include_hidden, include_disabled)
return {t: self[posixpath.join(group_index, t)] for t in all_tensors}

def __str__(self):
path_str = ""
if self.path:
path_str = f"path='{self.path}', "

mode_str = ""
if self.read_only:
mode_str = f"read_only=True, "

index_str = f"index={self.deeplake_ds.index}, "
if self.deeplake_ds.index.is_trivial():
index_str = ""

group_index_str = (
f"group_index='{self.group_index}', " if self.group_index else ""
)

return f"Dataset({path_str}{mode_str}{index_str}{group_index_str}tensors={self._all_tensors_filtered(include_hidden=False, include_disabled=False)})"

def copy(self, *args, **kwargs):
raise NotImplementedError(
"Copying or Deepcopying for views generated by nonlinear queries is not supported."
)

def __del__(self):
self.indra_ds = None

def random_split(self, lengths: Sequence[Union[int, float]]):
if math.isclose(sum(lengths), 1) and sum(lengths) <= 1:
lengths = calculate_absolute_lengths(lengths, len(self))
vs = self.indra_ds.random_split(lengths)
return [DeepLakeQueryDataset(self.deeplake_ds, v) for v in vs]

0 comments on commit d2938f8

Please sign in to comment.