Skip to content

Commit

Permalink
[AL-2290] DeepLake VectorStore updates (#2375)
Browse files Browse the repository at this point in the history
* First commit

* Changes

* Fixed tests

* Refactored vector search and added support for tql medatada filtering

* fixed case when retult is empty in python search

* deleted vector search becuase it is no longer used

* Fixed bad import

* Fixed issue and tests

* Fixed more tests

* more fixes

* Updated docstrings

* Fixed .tensors and .summary() for virtual tensor datasets.

* Fix tests.

* Fixes

* Temp disable shuffle after random split.

* Fixed .data() method.

* Cleaned up parsing

* fixes

* Refactor

* Fixed tensors.

* Enabled test for shuffle+random_split.

* Better codecov

* Final fixes.

* Fixed black.

* fixes

* fixes

* Fixed reporting position in query API

* Raw data in remote query

* Error message fixes

* Fix types and lint

* Docstring fix

* refactors

* test fixes

* Fixes and updates

* prelim commit

* more fixes

* more fixes

* fixes

* Fixes

* More fixes

* Ran black .

* improved tests

* Small linting fix

* Several fixes and tests improvements

* minor fixes

* test fixes

* Ran black .

* fix issue with ds.connect

* review comments

* update

* Bump version post release (#2381)

* bump version

* bump version

* bump version

* bump version

* bump version

* Support for adding custom tensors

* Add fix

* Added search exception

* fixed tests

* Fixes, better formatting, better docstrings

* Improved search_basic test

* Fixed tests and other issues

* Improving typing

* Fixed typing

* Fixed bad variable name in test

* Fixed low level search tests

* Fixed tests and added return_view

* prelim commit

* Fixed tests and errors

* Improved deletion implementation and tests

* WIP

* Added new ingestion logic

* Fixed tests and removed unecessary params from some functions

* improvements

* Linting, typing, and docstring fixes

* fixed test delete

* New api support

* tests fix

* data ingestion fix

* Testing related changes

* Added support for default embedding tensor

* Cleaned up docstrings and parameters

* Tweaked tests

* ingestion logc twicks

* mypy fixes + examples

* darglint fixes

* test_parse_add_arguments fixes

* Added few more parse_add_arguments tests

* test fix

* Minor refactors and tests fixes

* Added reporting to the vector store module

* tests

* Added support for images

* Many small fixes to reporting, testing, and style

* Minor fix

* fix

* improved docstrings

* Fixed bug with ingestion

* new tests

* Apply suggestions from code review

Co-authored-by: Fayaz Rahman <fayazrahman4u@gmail.com>
Co-authored-by: Fariz Rahman <farizrahman4u@gmail.com>

* Review changes

* upd

* Refactoring based on review

* docs fix

* small fix

* docs

* Improving tests

* Update deeplake/core/vectorstore/deeplake_vectorstore.py

Co-authored-by: Fayaz Rahman <fayazrahman4u@gmail.com>

* Update deeplake/core/vectorstore/deeplake_vectorstore.py

Co-authored-by: Fayaz Rahman <fayazrahman4u@gmail.com>

* increased codecov

* tests + linting

* final fixes

* refactor

* black fix

* transforms fix

* fix

* ssh

* ssh

* ssh

* remove ssh

* increased treshold

---------

Co-authored-by: Ivo Stranic <istranic@gmail.com>
Co-authored-by: Sasun Hambardzumyan <xustup@gmail.com>
Co-authored-by: progerdav <developer.gyulnazaryan@gmail.com>
Co-authored-by: Levon Ghukasyan <levon_ghukasyan@live.com>
Co-authored-by: FayazRahman <fayazrahman4u@gmail.com>
Co-authored-by: Fariz Rahman <farizrahman4u@gmail.com>
  • Loading branch information
7 people committed Jun 6, 2023
1 parent 2ff28ba commit 4671200
Show file tree
Hide file tree
Showing 41 changed files with 2,691 additions and 739 deletions.
53 changes: 36 additions & 17 deletions deeplake/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
process_dataset_path,
get_path_type,
)
from deeplake.util.tensor_db import parse_runtime_parameters
from deeplake.hooks import (
dataset_created,
dataset_loaded,
Expand Down Expand Up @@ -199,7 +200,7 @@ def init(
if creds is None:
creds = {}

db_engine = (runtime or {}).get("db_engine", {})
db_engine = parse_runtime_parameters(path, runtime)["tensor_db"]

try:
storage, cache_chain = get_storage_and_cache_chain(
Expand Down Expand Up @@ -349,12 +350,12 @@ def empty(
"""Creates an empty dataset
Args:
path (str, pathlib.Path): - The full path to the dataset. Can be:
- a Deep Lake cloud path of the form ``hub://username/datasetname``. To write to Deep Lake cloud datasets, ensure that you are logged in to Deep Lake (use 'activeloop login' from command line)
path (str, pathlib.Path): - The full path to the dataset. It can be:
- a Deep Lake cloud path of the form ``hub://org_id/dataset_name``. Requires registration with Deep Lake.
- an s3 path of the form ``s3://bucketname/path/to/dataset``. Credentials are required in either the environment or passed to the creds argument.
- a local file system path of the form ``./path/to/dataset`` or ``~/path/to/dataset`` or ``path/to/dataset``.
- a memory path of the form ``mem://path/to/dataset`` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
runtime (dict): Parameters for Activeloop DB Engine. Only applicable for hub:// paths.
runtime (dict): Parameters for creating a dataset in the Deep Lake Tensor Database. Only applicable for paths of the form ``hub://org_id/dataset_name`` and runtime must be ``{"tensor_db": True}``.
overwrite (bool): If set to ``True`` this overwrites the dataset if it already exists. Defaults to ``False``.
public (bool): Defines if the dataset will have public access. Applicable only if Deep Lake cloud storage is used and a new Dataset is being created. Defaults to ``False``.
memory_cache_size (int): The size of the memory cache to be used in MB.
Expand Down Expand Up @@ -385,7 +386,7 @@ def empty(
if org_id is not None and get_path_type(path) != "local":
raise ValueError("org_id parameter can only be used with local datasets")

db_engine = (runtime or {}).get("db_engine", False)
db_engine = parse_runtime_parameters(path, runtime)["tensor_db"]

if address:
raise ValueError(
Expand Down Expand Up @@ -823,6 +824,7 @@ def like(
token: Optional[str] = None,
org_id: Optional[str] = None,
public: bool = False,
verbose: bool = True,
) -> Dataset:
"""Creates a new dataset by copying the ``source`` dataset's structure to a new location. No samples are copied,
only the meta/info for the dataset and it's tensors.
Expand All @@ -840,6 +842,8 @@ def like(
token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Deep Lake dataset. This is optional, tokens are normally autogenerated.
org_id (str, Optional): Organization id to be used for enabling enterprise features. Only applicable for local datasets.
public (bool): Defines if the dataset will have public access. Applicable only if Deep Lake cloud storage is used and a new Dataset is being created. Defaults to False.
verbose (bool): If True, logs will be printed. Defaults to ``True``.
Returns:
Dataset: New dataset object.
Expand All @@ -862,7 +866,16 @@ def like(
token=token,
)
return dataset._like(
dest, src, runtime, tensors, overwrite, creds, token, org_id, public
dest,
src,
runtime,
tensors,
overwrite,
creds,
token,
org_id,
public,
verbose,
)

@staticmethod
Expand All @@ -876,6 +889,7 @@ def _like( # (No reporting)
token: Optional[str] = None,
org_id: Optional[str] = None,
public: bool = False,
verbose: bool = True,
unlink: Union[List[str], bool] = False,
) -> Dataset:
"""Copies the `source` dataset's structure to a new location. No samples are copied, only the meta/info for the dataset and it's tensors.
Expand All @@ -895,11 +909,24 @@ def _like( # (No reporting)
token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Deep Lake dataset. This is optional, tokens are normally autogenerated.
org_id (str, Optional): Organization id to be used for enabling enterprise features. Only applicable for local datasets.
public (bool): Defines if the dataset will have public access. Applicable only if Deep Lake cloud storage is used and a new Dataset is being created. Defaults to ``False``.
verbose (bool): If True, logs will be printed. Defaults to ``True``.
unlink (Union[List[str], bool]): List of tensors to be unlinked. If ``True`` passed all tensors will be unlinked. Defaults to ``False``, no tensors are unlinked.
Returns:
Dataset: New dataset object.
"""

src = convert_pathlib_to_string_if_needed(src)
if isinstance(src, str):
source_ds = dataset.load(src, verbose=verbose)
else:
source_ds = src

if tensors:
tensors = source_ds._resolve_tensor_list(tensors) # type: ignore
else:
tensors = source_ds.tensors # type: ignore

dest = convert_pathlib_to_string_if_needed(dest)
if isinstance(dest, Dataset):
destination_ds = dest
Expand All @@ -914,20 +941,12 @@ def _like( # (No reporting)
token=token,
org_id=org_id,
public=public,
verbose=verbose,
)

feature_report_path(
dest_path, "like", {"Overwrite": overwrite, "Public": public}, token=token
)
src = convert_pathlib_to_string_if_needed(src)
if isinstance(src, str):
source_ds = dataset.load(src)
else:
source_ds = src

if tensors:
tensors = source_ds._resolve_tensor_list(tensors) # type: ignore
else:
tensors = source_ds.tensors # type: ignore

if unlink is True:
unlink = tensors # type: ignore
Expand Down Expand Up @@ -1110,7 +1129,7 @@ def deepcopy(
)
src_storage = get_base_storage(src_ds.storage)

db_engine = (runtime or {}).get("db_engine", False)
db_engine = parse_runtime_parameters(dest, runtime)["tensor_db"]
dest_storage, cache_chain = get_storage_and_cache_chain(
dest,
db_engine=db_engine,
Expand Down
15 changes: 4 additions & 11 deletions deeplake/client/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import deeplake
import requests
from typing import Any, Optional, List, Tuple
from typing import Any, Optional, Dict
from deeplake.util.exceptions import (
AgreementNotAcceptedError,
AuthorizationException,
Expand Down Expand Up @@ -492,7 +492,7 @@ def connect_dataset_entry(

def remote_query(
self, org_id: str, ds_name: str, query_string: str
) -> Tuple[Any, Any]:
) -> Dict[str, Any]:
"""Queries a remote dataset.
Args:
Expand All @@ -501,7 +501,7 @@ def remote_query(
query_string (str): The query string.
Returns:
Tuple[Any, Any]: The indices and scores matching the query.
Dict[str, Any]: The json response containing matching indicies and data from virtual tensors.
"""
response = self.request(
"POST",
Expand All @@ -510,11 +510,4 @@ def remote_query(
endpoint=self.endpoint(),
).json()

indices = response["indices"]
if len(indices) == 0:
return [], []

scores = response.get("score")

indices = [int(i) for i in indices.split(",")]
return indices, scores
return response
34 changes: 33 additions & 1 deletion deeplake/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,36 @@
DEFAULT_VECTORSTORE_DEEPLAKE_PATH = "./deeplake_vector_store"
MAX_VECTORSTORE_INGESTION_RETRY_ATTEMPTS = 5
MAX_CHECKPOINTING_INTERVAL = 100000
MAX_DATASET_LENGTH_FOR_CACHING = 100000
VECTORSTORE_EXTEND_MAX_SIZE = 20000
DEFAULT_VECTORSTORE_TENSORS = [
{
"name": "text",
"htype": "text",
"create_id_tensor": False,
"create_sample_info_tensor": False,
"create_shape_tensor": False,
},
{
"name": "metadata",
"htype": "json",
"create_id_tensor": False,
"create_sample_info_tensor": False,
"create_shape_tensor": False,
},
{
"name": "embedding",
"htype": "embedding",
"dtype": np.float32,
"create_id_tensor": False,
"create_sample_info_tensor": False,
"create_shape_tensor": True,
"max_chunk_size": 64 * MB,
},
{
"name": "id",
"htype": "text",
"create_id_tensor": False,
"create_sample_info_tensor": False,
"create_shape_tensor": False,
},
]
45 changes: 27 additions & 18 deletions deeplake/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
suppress_iteration_warning,
check_if_iteration,
)
from deeplake.util.tensor_db import parse_runtime_parameters
from deeplake.api.info import load_info
from deeplake.client.log import logger
from deeplake.client.utils import get_user_name
Expand Down Expand Up @@ -2094,7 +2095,7 @@ def query(
self,
query_string: str,
runtime: Optional[Dict] = None,
return_indices_and_scores: bool = False,
return_data: bool = False,
):
"""Returns a sliced :class:`~deeplake.core.dataset.Dataset` with given query results. To use this, install deeplake with ``pip install deeplake[enterprise]``.
Expand All @@ -2104,11 +2105,11 @@ def query(
Args:
query_string (str): An SQL string adjusted with new functionalities to run on the given :class:`~deeplake.core.dataset.Dataset` object
runtime (Optional[Dict]): whether to run query on a remote engine
return_indices_and_scores (bool): by default False. Whether to return indices and scores.
runtime (Optional[Dict]): Runtime parameters for query execution. Supported keys: {"tensor_db": True or False}.
return_data (bool): Defaults to ``False``. Whether to return raw data along with the view.
Raises:
ValueError: if return_indices_and_scores is True and runtime is not {"db_engine": true}
ValueError: if ``return_data`` is True and runtime is not {"tensor_db": true}
Returns:
Dataset: A :class:`~deeplake.core.dataset.Dataset` object.
Expand All @@ -2127,28 +2128,36 @@ def query(
>>> query_ds_train = ds_train.query("(select * where contains(categories, 'car') limit 1000) union (select * where contains(categories, 'motorcycle') limit 1000)")
"""
if runtime is not None and runtime.get("db_engine", False):

deeplake_reporter.feature_report(
feature_name="query",
parameters={
"query_string": query_string[0:100],
"runtime": runtime,
},
)

runtime = parse_runtime_parameters(self.path, runtime)
if runtime["tensor_db"]:
client = DeepLakeBackendClient(token=self._token)
org_id, ds_name = self.path[6:].split("/")
indices, scores = client.remote_query(org_id, ds_name, query_string)
if return_indices_and_scores:
return indices, scores
return self[indices]
response = client.remote_query(org_id, ds_name, query_string)
indices = response["indices"]
view = self[indices]

if return_data:
data = response["data"]
return view, data

if return_indices_and_scores:
return view

if return_data:
raise ValueError(
"return_indices_and_scores is not supported. Please add `runtime = {'db_engine': True}` if you want to return indices and scores"
"`return_data` is only applicable when running queries using the Managed Tensor Database. Please specify `runtime = {'tensor_db': True}`"
)

from deeplake.enterprise import query

deeplake_reporter.feature_report(
feature_name="query",
parameters={
"query_string": query_string,
},
)

return query(self, query_string)

def sample_by(
Expand Down
18 changes: 12 additions & 6 deletions deeplake/core/transform/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ class BadSample:


@all_schedulers
@pytest.mark.parametrize("method", ["ds", "multiple"])
@pytest.mark.parametrize("method", ["ds", "multiple", "checkpointed"])
@pytest.mark.parametrize("error_at", ["transform", "chunk_engine"])
def test_ds_append_errors(
local_path, compressed_image_paths, scheduler, method, error_at
Expand All @@ -1388,7 +1388,7 @@ def upload(item, ds):
if isinstance(item["images"], str)
else item["images"]
)
if method == "ds":
if method == "ds" or method == "checkpointed":
ds.append(
{
"labels": np.zeros(10, dtype=np.uint32),
Expand Down Expand Up @@ -1417,14 +1417,18 @@ def upload(item, ds):
# errors out in transform dataset / tensor
bad_sample = {"images": "bad_path", "boxes": [1, 2, 3]}
err_msg = re.escape(
f"Transform failed at index 17 of the input data on the item: {bad_sample}. See traceback for more details."
f"Transform failed at index 17 of the input data on the item: {bad_sample}."
)
else:
# errors out in chunk engine
bad_sample = {"images": BadSample(), "boxes": [1, 2, 3]}
err_msg = re.escape(
f"Transform failed at index 17 of the input data. See traceback for more details."
err_msg = re.escape(f"Transform failed at index 17 of the input data.")

if method == "checkpointed":
err_msg += re.escape(
" Last checkpoint: 10 samples processed. You can slice the input to resume from this point."
)
err_msg += re.escape(" See traceback for more details.")

samples.insert(17, bad_sample)

Expand All @@ -1434,6 +1438,7 @@ def upload(item, ds):
ds,
num_workers=TRANSFORM_TEST_NUM_WORKERS,
scheduler=scheduler,
checkpoint_interval=10 if method == "checkpointed" else 0,
)

ds = create_test_ds(local_path)
Expand All @@ -1444,9 +1449,10 @@ def upload(item, ds):
num_workers=TRANSFORM_TEST_NUM_WORKERS,
scheduler=scheduler,
ignore_errors=True,
checkpoint_interval=10 if method == "checkpointed" else 0,
)

if method == "ds":
if method == "ds" or method == "checkpointed":
assert ds["images"][::2].numpy().shape == (10, *deeplake.read(images[0]).shape)
assert ds["images"][1::2].numpy().shape == (10, *deeplake.read(images[1]).shape)

Expand Down
10 changes: 9 additions & 1 deletion deeplake/core/transform/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def eval(
cache_size: int = DEFAULT_TRANSFORM_SAMPLE_CACHE_SIZE,
checkpoint_interval: int = 0,
ignore_errors: bool = False,
verbose: bool = True,
**kwargs,
):
"""Evaluates the pipeline on ``data_in`` to produce an output dataset ``ds_out``.
Expand All @@ -165,6 +166,7 @@ def eval(
checkpoint_interval (int): If > 0, the transform will be checkpointed with a commit every ``checkpoint_interval`` input samples to avoid restarting full transform due to intermitten failures. If the transform is interrupted, the intermediate data is deleted and the dataset is reset to the last commit.
If <= 0, no checkpointing is done. Checkpoint interval should be a multiple of num_workers if num_workers > 0. Defaults to 0.
ignore_errors (bool): If ``True``, input samples that causes transform to fail will be skipped and the errors will be ignored **if possible**.
verbose (bool): If ``True``, prints additional information about the transform.
**kwargs: Additional arguments.
Raises:
Expand Down Expand Up @@ -234,7 +236,11 @@ def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0):
total_samples = len(data_in)
if checkpointing_enabled:
check_checkpoint_interval(
data_in, checkpoint_interval, num_workers, overwrite
data_in,
checkpoint_interval,
num_workers,
overwrite,
verbose,
)
datas_in = [
data_in[i : i + checkpoint_interval]
Expand Down Expand Up @@ -300,6 +306,8 @@ def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0):
index, sample = None, None
if isinstance(e, TransformError):
index, sample = e.index, e.sample
if checkpointing_enabled and isinstance(index, int):
index = samples_processed + index
e = e.__cause__ # type: ignore
if isinstance(e, AllSamplesSkippedError):
raise e
Expand Down
Loading

0 comments on commit 4671200

Please sign in to comment.