Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deepmemory ux improvements #2710

Merged
merged 18 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def search(
return_tensors: List[str],
return_view: bool,
deep_memory: bool,
return_tql: bool,
) -> Union[Dict, Dataset]:
feature_report_path(
path=self.bugout_reporting_path,
Expand Down Expand Up @@ -243,7 +244,7 @@ def search(
embedding_tensor=embedding_tensor,
return_tensors=return_tensors,
return_view=return_view,
deep_memory=deep_memory,
return_tql=return_tql,
token=self.token,
org_id=self.org_id,
)
Expand Down
92 changes: 63 additions & 29 deletions deeplake/core/vectorstore/deep_memory/deep_memory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import pathlib
import uuid
from collections import defaultdict
from pydantic import BaseModel, ValidationError
Expand All @@ -8,13 +9,13 @@
import numpy as np

import deeplake
from deeplake.enterprise.dataloader import indra_available
from deeplake.util.exceptions import (
DeepMemoryWaitingListError,
DeepMemoryWaitingListError,
IncorrectRelevanceTypeError,
IncorrectQueriesTypeError,
)
from deeplake.util.remove_cache import get_base_storage
from deeplake.util.path import convert_pathlib_to_string_if_needed
from deeplake.constants import (
DEFAULT_QUERIES_VECTORSTORE_TENSORS,
DEFAULT_MEMORY_CACHE_SIZE,
Expand All @@ -30,7 +31,15 @@
feature_report_path,
)
from deeplake.util.path import get_path_type
from deeplake.util.version_control import load_meta


def access_control(func):
def wrapper(self, *args, **kwargs):
if self.client is None:
raise DeepMemoryWaitingListError()
return func(self, *args, **kwargs)

return wrapper


def use_deep_memory(func):
Expand All @@ -46,15 +55,6 @@
return wrapper


def access_control(func):
def wrapper(self, *args, **kwargs):
if self.client is None:
raise DeepMemoryWaitingListError()
return func(self, *args, **kwargs)

return wrapper


class Relevance(BaseModel):
data: List[List[Tuple[str, int]]]

Expand All @@ -78,7 +78,8 @@
class DeepMemory:
def __init__(
self,
dataset_or_path: Union[Dataset, str],
dataset: Dataset,
path: Union[str, pathlib.Path],
logger: logging.Logger,
embedding_function: Optional[Any] = None,
token: Optional[str] = None,
Expand All @@ -87,24 +88,18 @@
"""Based Deep Memory class to train and evaluate models on DeepMemory managed service.

Args:
dataset_or_path (Union[Dataset, str]): deeplake dataset object or path.
dataset (Dataset): deeplake dataset object or path.
path (Union[str, pathlib.Path]): Path to the dataset.
logger (logging.Logger): Logger object.
embedding_function (Optional[Any], optional): Embedding funtion class used to convert queries/documents to embeddings. Defaults to None.
token (Optional[str], optional): API token for the DeepMemory managed service. Defaults to None.
creds (Optional[Dict[str, Any]], optional): Credentials to access the dataset. Defaults to None.

Raises:
ImportError: if indra is not installed
ValueError: if incorrect type is specified for `dataset_or_path`
"""
if isinstance(dataset_or_path, Dataset):
self.path = dataset_or_path.path
elif isinstance(dataset_or_path, str):
self.path = dataset_or_path
else:
raise ValueError(
"dataset_or_path should be a Dataset object or a string path"
)
self.dataset = dataset
self.path = convert_pathlib_to_string_if_needed(path)

feature_report_path(
path=self.path,
Expand Down Expand Up @@ -143,7 +138,8 @@
relevance (List[List[Tuple[str, int]]]): List of relevant documents for each query with their respective relevance score.
The outer list corresponds to the queries and the inner list corresponds to the doc_id, relevence_score pair for each query.
doc_id is the document id in the corpus dataset. It is stored in the `id` tensor of the corpus dataset.
relevence_score is the relevance score of the document for the query. The range is between 0 and 1, where 0 stands for not relevant and 1 stands for relevant.
relevence_score is the relevance score of the document for the query. The value is either 0 and 1, where 0 stands for not relevant (unknown relevance)
and 1 stands for relevant. Currently, only values of 1 contribute to the training, and there is no reason to provide examples with relevance of 0.
embedding_function (Optional[Callable[[str], np.ndarray]], optional): Embedding funtion used to convert queries to embeddings. Defaults to None.
token (str, optional): API token for the DeepMemory managed service. Defaults to None.

Expand Down Expand Up @@ -178,7 +174,7 @@
)

if embedding_function is None and self.embedding_function is not None:
embedding_function = self.embedding_function.embed_documents
embedding_function = self.embedding_function

runtime = None
if get_path_type(corpus_path) == "hub":
Expand Down Expand Up @@ -484,10 +480,8 @@
if embedding is not None:
query_embs = embedding
else:
if self.embedding_function is not None:
embedding_function = (
embedding_function or self.embedding_function.embed_documents
)
if self.embedding_function is not None and embedding_function is None:
embedding_function = self.embedding_function

if embedding_function is None:
raise ValueError(
Expand Down Expand Up @@ -554,6 +548,46 @@
self.queries_dataset.commit()
return recalls

@access_control
def get_model(self):
"""Get the name of the model currently being used by DeepMemory managed service."""
return self.dataset.embedding.info["deepmemory"]["model.npy"]["job_id"]

@access_control
def set_model(self, model_name: str):
"""Set model.npy to use `model_name` instead of default model
Args:
model_name (str): name of the model to use
"""

if "npy" not in model_name:
model_name += ".npy"

# verify model_name
self._verify_model_name(model_name)

# set model.npy to use `model_name` instead of default model
self._set_model_npy(model_name)

def _verify_model_name(self, model_name: str):
if model_name not in self.dataset.embedding.info["deepmemory"]:
raise ValueError(

Check warning on line 574 in deeplake/core/vectorstore/deep_memory/deep_memory.py

View check run for this annotation

Codecov / codecov/patch

deeplake/core/vectorstore/deep_memory/deep_memory.py#L574

Added line #L574 was not covered by tests
"Invalid model name. Please choose from the following models: "
+ ", ".join(self.dataset.embedding.info["deepmemory"].keys())
)

def _set_model_npy(self, model_name: str):
# get new model.npy
new_model_npy = self.dataset.embedding.info["deepmemory"][model_name]

# get old deepmemory dictionary and update it:
old_deepmemory = self.dataset.embedding.info["deepmemory"]
new_deepmemory = old_deepmemory.copy()
new_deepmemory.update({"model.npy": new_model_npy})

# assign new deepmemory dictionary to the dataset:
self.dataset.embedding.info["deepmemory"] = new_deepmemory

def _get_dm_client(self):
path = self.path
path_type = get_path_type(path)
Expand Down
130 changes: 126 additions & 4 deletions deeplake/core/vectorstore/deep_memory/test_deepmemory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
import sys
from time import sleep
from unittest.mock import MagicMock

import deeplake
from deeplake import VectorStore
Expand Down Expand Up @@ -40,9 +41,9 @@
assert db.deep_memory is not None


def embedding_fn(texts):
def embedding_fn(texts, embedding_dim=1536):
return [
np.random.uniform(low=-10, high=10, size=(1536)).astype(np.float32)
np.random.uniform(low=-10, high=10, size=(embedding_dim)).astype(np.float32)
for _ in range(len(texts))
]

Expand Down Expand Up @@ -432,7 +433,7 @@
path=corpus,
runtime={"tensor_db": True},
token=hub_cloud_dev_token,
embedding_function=DummyEmbedder,
embedding_function=embedding_fn,
)
recall = db.deep_memory.evaluate(
queries=queries,
Expand Down Expand Up @@ -544,7 +545,7 @@
token=hub_cloud_dev_token,
)

output = db.search(

Check failure on line 548 in deeplake/core/vectorstore/deep_memory/test_deepmemory.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_deepmemory.test_deepmemory_search

deeplake.util.exceptions.BadGatewayException: Invalid response from Activeloop server.
Raw output
corpus_query_relevances_copy = ('hub://testingacc2/tmp33c4_test_deepmemory_test_deepmemory_search', ['0-dimensional biomaterials lack inductive prope...265107', 1]], [['32587939', 1]], ...], 'hub://testingacc2/tmp33c4_test_deepmemory_test_deepmemory_search_eval_queries')
testing_relevance_query_deepmemory = ('31715818', [-0.015188165009021759, 0.02033962868154049, -0.012286307290196419, 0.009264647960662842, -0.00939110480248928, 0.00015578352031297982, ...])
hub_cloud_dev_token = 'eyJhbGciOiJIUzUxMiIsImlhdCI6MTcwMTg4ODY5NSwiZXhwIjoxNzA1NDg4Njk1fQ.eyJpZCI6InRlc3RpbmdhY2MyIn0.z_uTv-691JHOSmcmTD67Lob0EApaRQ7QvmBqSX1PLbWSiUwNk7Va4-wu2Yjyb2s6UVZUwGGlV7cvL2LY9i0_5g'

    @pytest.mark.slow
    @pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
    def test_deepmemory_search(
        corpus_query_relevances_copy,
        testing_relevance_query_deepmemory,
        hub_cloud_dev_token,
    ):
        corpus, _, _, _ = corpus_query_relevances_copy
        relevance, query_embedding = testing_relevance_query_deepmemory
    
        db = VectorStore(
            path=corpus,
            runtime={"tensor_db": True},
            token=hub_cloud_dev_token,
        )
    
>       output = db.search(
            embedding=query_embedding, deep_memory=True, return_tensors=["id"]
        )

deeplake/core/vectorstore/deep_memory/test_deepmemory.py:548: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
deeplake/core/vectorstore/deeplake_vectorstore.py:313: in search
    return self.dataset_handler.search(
deeplake/core/vectorstore/deep_memory/deep_memory.py:53: in wrapper
    return func(self, *args, **kwargs)
deeplake/core/vectorstore/dataset_handlers/client_side_dataset_handler.py:235: in search
    return vector_search.search(
deeplake/core/vectorstore/vector_search/vector_search.py:57: in search
    return EXEC_OPTION_TO_SEARCH_TYPE[exec_option](
deeplake/core/vectorstore/vector_search/indra/vector_search.py:47: in vector_search
    return vectorstore.indra_search_algorithm(
deeplake/core/vectorstore/vector_search/indra/search_algorithm.py:209: in search
    return searcher.run(
deeplake/core/vectorstore/vector_search/indra/search_algorithm.py:57: in run
    view = self._get_view(
deeplake/core/vectorstore/vector_search/indra/search_algorithm.py:151: in _get_view
    view, data = self.deeplake_dataset.query(
deeplake/core/dataset/dataset.py:2338: in query
    response = client.remote_query(org_id, ds_name, query_string)
deeplake/client/client.py:507: in remote_query
    response = self.request(
deeplake/client/client.py:163: in request
    check_response_status(response)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

response = <Response [502]>

    def check_response_status(response: requests.Response):
        """Check response status and throw corresponding exception on failure."""
        code = response.status_code
        if code >= 200 and code < 300:
            return
    
        try:
            message = response.json()["description"]
        except Exception:
            message = " "
    
        if code == 400:
            raise BadRequestException(message)
        elif response.status_code == 401:
            raise AuthenticationException
        elif response.status_code == 403:
            raise AuthorizationException(message, response=response)
        elif response.status_code == 404:
            if message != " ":
                raise ResourceNotFoundException(message)
            raise ResourceNotFoundException
        elif response.status_code == 422:
            raise UnprocessableEntityException(message)
        elif response.status_code == 423:
            raise LockedException
        elif response.status_code == 429:
            raise OverLimitException
        elif response.status_code == 502:
>           raise BadGatewayException
E           deeplake.util.exceptions.BadGatewayException: Invalid response from Activeloop server.

deeplake/client/utils.py:101: BadGatewayException
embedding=query_embedding, deep_memory=True, return_tensors=["id"]
)

Expand Down Expand Up @@ -584,7 +585,10 @@
@requires_libdeeplake
def test_unsupported_deepmemory_users(local_ds):
dm = DeepMemory(
dataset_or_path=local_ds, logger=logger, embedding_function=DummyEmbedder
path=local_ds,
dataset=None,
logger=logger,
embedding_function=DummyEmbedder,
)
with pytest.raises(DeepMemoryWaitingListError):
dm.train(
Expand Down Expand Up @@ -660,3 +664,121 @@
queries=queries,
relevance="relevances",
)


def test_deepmemory_v2_set_model_should_set_model_for_all_subsequent_loads(
local_dmv2_dataset,
hub_cloud_dev_token,
):
# Setiing model should set model for all subsequent loads
db = VectorStore(path=local_dmv2_dataset, token=hub_cloud_dev_token)
assert db.deep_memory.get_model() == "655f86e8ab93e7fc5067a3ac_2"

# ensure after setting model, get model returns specified model
db.deep_memory.set_model("655f86e8ab93e7fc5067a3ac_1")

assert (
db.dataset.embedding.info["deepmemory"]["model.npy"]["job_id"]
== "655f86e8ab93e7fc5067a3ac_1"
)
assert db.deep_memory.get_model() == "655f86e8ab93e7fc5067a3ac_1"

# ensure after setting model, reloading the dataset returns the same model
db = VectorStore(path=local_dmv2_dataset, token=hub_cloud_dev_token)
assert db.deep_memory.get_model() == "655f86e8ab93e7fc5067a3ac_1"


@pytest.mark.slow
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
def test_deepmemory_search_should_contain_correct_answer(
corpus_query_relevances_copy,
testing_relevance_query_deepmemory,
hub_cloud_dev_token,
):
corpus, _, _, _ = corpus_query_relevances_copy
relevance, query_embedding = testing_relevance_query_deepmemory

db = VectorStore(
path=corpus,
token=hub_cloud_dev_token,
)

output = db.search(

Check failure on line 706 in deeplake/core/vectorstore/deep_memory/test_deepmemory.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_deepmemory.test_deepmemory_search_should_contain_correct_answer

deeplake.util.exceptions.ServerException: Server under maintenance, try again later.
Raw output
corpus_query_relevances_copy = ('hub://testingacc2/tmp33c4_test_deepmemory_test_deepmemory_search_should_contain_correct_answer', ['0-dimensional bio...], ...], 'hub://testingacc2/tmp33c4_test_deepmemory_test_deepmemory_search_should_contain_correct_answer_eval_queries')
testing_relevance_query_deepmemory = ('31715818', [-0.015188165009021759, 0.02033962868154049, -0.012286307290196419, 0.009264647960662842, -0.00939110480248928, 0.00015578352031297982, ...])
hub_cloud_dev_token = 'eyJhbGciOiJIUzUxMiIsImlhdCI6MTcwMTg4ODY5NSwiZXhwIjoxNzA1NDg4Njk1fQ.eyJpZCI6InRlc3RpbmdhY2MyIn0.z_uTv-691JHOSmcmTD67Lob0EApaRQ7QvmBqSX1PLbWSiUwNk7Va4-wu2Yjyb2s6UVZUwGGlV7cvL2LY9i0_5g'

    @pytest.mark.slow
    @pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
    def test_deepmemory_search_should_contain_correct_answer(
        corpus_query_relevances_copy,
        testing_relevance_query_deepmemory,
        hub_cloud_dev_token,
    ):
        corpus, _, _, _ = corpus_query_relevances_copy
        relevance, query_embedding = testing_relevance_query_deepmemory
    
        db = VectorStore(
            path=corpus,
            token=hub_cloud_dev_token,
        )
    
>       output = db.search(
            embedding=query_embedding, deep_memory=True, return_tensors=["id"]
        )

deeplake/core/vectorstore/deep_memory/test_deepmemory.py:706: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
deeplake/core/vectorstore/deeplake_vectorstore.py:313: in search
    return self.dataset_handler.search(
deeplake/core/vectorstore/deep_memory/deep_memory.py:53: in wrapper
    return func(self, *args, **kwargs)
deeplake/core/vectorstore/dataset_handlers/client_side_dataset_handler.py:235: in search
    return vector_search.search(
deeplake/core/vectorstore/vector_search/vector_search.py:57: in search
    return EXEC_OPTION_TO_SEARCH_TYPE[exec_option](
deeplake/core/vectorstore/vector_search/indra/vector_search.py:47: in vector_search
    return vectorstore.indra_search_algorithm(
deeplake/core/vectorstore/vector_search/indra/search_algorithm.py:209: in search
    return searcher.run(
deeplake/core/vectorstore/vector_search/indra/search_algorithm.py:57: in run
    view = self._get_view(
deeplake/core/vectorstore/vector_search/indra/search_algorithm.py:151: in _get_view
    view, data = self.deeplake_dataset.query(
deeplake/core/dataset/dataset.py:2338: in query
    response = client.remote_query(org_id, ds_name, query_string)
deeplake/client/client.py:507: in remote_query
    response = self.request(
deeplake/client/client.py:163: in request
    check_response_status(response)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

response = <Response [503]>

    def check_response_status(response: requests.Response):
        """Check response status and throw corresponding exception on failure."""
        code = response.status_code
        if code >= 200 and code < 300:
            return
    
        try:
            message = response.json()["description"]
        except Exception:
            message = " "
    
        if code == 400:
            raise BadRequestException(message)
        elif response.status_code == 401:
            raise AuthenticationException
        elif response.status_code == 403:
            raise AuthorizationException(message, response=response)
        elif response.status_code == 404:
            if message != " ":
                raise ResourceNotFoundException(message)
            raise ResourceNotFoundException
        elif response.status_code == 422:
            raise UnprocessableEntityException(message)
        elif response.status_code == 423:
            raise LockedException
        elif response.status_code == 429:
            raise OverLimitException
        elif response.status_code == 502:
            raise BadGatewayException
        elif response.status_code == 504:
            raise GatewayTimeoutException
        elif 500 <= response.status_code < 600:
>           raise ServerException("Server under maintenance, try again later.")
E           deeplake.util.exceptions.ServerException: Server under maintenance, try again later.

deeplake/client/utils.py:105: ServerException
embedding=query_embedding, deep_memory=True, return_tensors=["id"]
)
assert len(output["id"]) == 4
assert relevance in output["id"]


@pytest.mark.slow
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
def test_deeplake_search_should_not_contain_correct_answer(
corpus_query_relevances_copy,
testing_relevance_query_deepmemory,
hub_cloud_dev_token,
):
corpus, _, _, _ = corpus_query_relevances_copy
relevance, query_embedding = testing_relevance_query_deepmemory

db = VectorStore(
path=corpus,
token=hub_cloud_dev_token,
)
output = db.search(embedding=query_embedding)
assert len(output["id"]) == 4
assert relevance not in output["id"]


@pytest.mark.slow
@pytest.mark.flaky(reruns=3)
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
def test_deepmemory_train_with_embedding_function_specified_in_constructor_should_not_throw_any_exception(
deepmemory_small_dataset_copy,
hub_cloud_dev_token,
):
corpus, queries, relevances, _ = deepmemory_small_dataset_copy

db = VectorStore(
path=corpus,
runtime={"tensor_db": True},
token=hub_cloud_dev_token,
embedding_function=embedding_fn,
)

job_id = db.deep_memory.train(
queries=queries,
relevance=relevances,
)


@pytest.mark.slow
@pytest.mark.flaky(reruns=3)
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
def test_deepmemory_evaluate_with_embedding_function_specified_in_constructor_should_not_throw_any_exception(
corpus_query_pair_path,
hub_cloud_dev_token,
):
corpus, queries = corpus_query_pair_path

db = VectorStore(
path=corpus,
runtime={"tensor_db": True},
token=hub_cloud_dev_token,
embedding_function=embedding_fn,
)

queries_vs = VectorStore(
path=queries,
runtime={"tensor_db": True},
token=hub_cloud_dev_token,
embedding_function=embedding_fn,
)

queries = queries_vs.dataset[:10].text.data()["value"]
relevance = queries_vs.dataset[:10].metadata.data()["value"]
relevance = [rel["relevance"] for rel in relevance]

recall = db.deep_memory.evaluate(
queries=queries,
relevance=relevance,
)
8 changes: 7 additions & 1 deletion deeplake/core/vectorstore/deeplake_vectorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from deeplake.core.dataset import Dataset
from deeplake.core.vectorstore.dataset_handlers import get_dataset_handler
from deeplake.core.vectorstore.deep_memory import DeepMemory
from deeplake.core.vectorstore.dataset_handlers import get_dataset_handler
from deeplake.core.vectorstore.deep_memory import DeepMemory
from deeplake.constants import (
DEFAULT_VECTORSTORE_TENSORS,
MAX_BYTES_PER_MINUTE,
Expand Down Expand Up @@ -131,7 +133,8 @@ def __init__(
)

self.deep_memory = DeepMemory(
dataset_or_path=self.dataset_handler.path,
dataset=self.dataset_handler.dataset,
path=self.dataset_handler.path,
token=self.dataset_handler.token,
logger=logger,
embedding_function=embedding_function,
Expand Down Expand Up @@ -240,6 +243,7 @@ def search(
return_tensors: Optional[List[str]] = None,
return_view: bool = False,
deep_memory: bool = False,
return_tql: bool = False,
) -> Union[Dict, Dataset]:
"""VectorStore search method that combines embedding search, metadata search, and custom TQL search.

Expand Down Expand Up @@ -290,6 +294,7 @@ def search(
return_view (bool): Return a Deep Lake dataset view that satisfied the search parameters, instead of a dictionary with data. Defaults to False. If ``True`` return_tensors is set to "*" beucase data is lazy-loaded and there is no cost to including all tensors in the view.
deep_memory (bool): Whether to use the Deep Memory model for improving search results. Defaults to False if deep_memory is not specified in the Vector Store initialization.
If True, the distance metric is set to "deepmemory_distance", which represents the metric with which the model was trained. The search is performed using the Deep Memory model. If False, the distance metric is set to "COS" or whatever distance metric user specifies.
return_tql (bool): Whether to return the TQL query string used for the search. Defaults to False.

..
# noqa: DAR101
Expand Down Expand Up @@ -317,6 +322,7 @@ def search(
embedding_tensor=embedding_tensor,
return_tensors=return_tensors,
return_view=return_view,
return_tql=return_tql,
deep_memory=deep_memory,
)

Expand Down