diff --git a/deeplake/core/vectorstore/dataset_handlers/client_side_dataset_handler.py b/deeplake/core/vectorstore/dataset_handlers/client_side_dataset_handler.py index f587d8b5fc..e248c5b35d 100644 --- a/deeplake/core/vectorstore/dataset_handlers/client_side_dataset_handler.py +++ b/deeplake/core/vectorstore/dataset_handlers/client_side_dataset_handler.py @@ -168,6 +168,7 @@ def search( return_tensors: List[str], return_view: bool, deep_memory: bool, + return_tql: bool, ) -> Union[Dict, Dataset]: feature_report_path( path=self.bugout_reporting_path, @@ -210,6 +211,7 @@ def search( exec_option=exec_option, embedding_tensor=embedding_tensor, return_tensors=return_tensors, + return_tql=return_tql, ) return_tensors = utils.parse_return_tensors( diff --git a/deeplake/core/vectorstore/deep_memory/deep_memory.py b/deeplake/core/vectorstore/deep_memory/deep_memory.py index 2656b8f3d3..82bb2f345e 100644 --- a/deeplake/core/vectorstore/deep_memory/deep_memory.py +++ b/deeplake/core/vectorstore/deep_memory/deep_memory.py @@ -1,4 +1,5 @@ import logging +import pathlib import uuid from collections import defaultdict from pydantic import BaseModel, ValidationError @@ -8,13 +9,13 @@ import numpy as np import deeplake -from deeplake.enterprise.dataloader import indra_available from deeplake.util.exceptions import ( DeepMemoryWaitingListError, IncorrectRelevanceTypeError, IncorrectQueriesTypeError, + DeepMemoryEvaluationError, ) -from deeplake.util.remove_cache import get_base_storage +from deeplake.util.path import convert_pathlib_to_string_if_needed from deeplake.constants import ( DEFAULT_QUERIES_VECTORSTORE_TENSORS, DEFAULT_MEMORY_CACHE_SIZE, @@ -30,7 +31,15 @@ feature_report_path, ) from deeplake.util.path import get_path_type -from deeplake.util.version_control import load_meta + + +def access_control(func): + def wrapper(self, *args, **kwargs): + if self.client is None: + raise DeepMemoryWaitingListError() + return func(self, *args, **kwargs) + + return wrapper def use_deep_memory(func): @@ -46,15 +55,6 @@ def wrapper(self, *args, **kwargs): return wrapper -def access_control(func): - def wrapper(self, *args, **kwargs): - if self.client is None: - raise DeepMemoryWaitingListError() - return func(self, *args, **kwargs) - - return wrapper - - class Relevance(BaseModel): data: List[List[Tuple[str, int]]] @@ -78,7 +78,8 @@ def validate_relevance_and_queries(relevance, queries): class DeepMemory: def __init__( self, - dataset_or_path: Union[Dataset, str], + dataset: Dataset, + path: Union[str, pathlib.Path], logger: logging.Logger, embedding_function: Optional[Any] = None, token: Optional[str] = None, @@ -87,7 +88,8 @@ def __init__( """Based Deep Memory class to train and evaluate models on DeepMemory managed service. Args: - dataset_or_path (Union[Dataset, str]): deeplake dataset object or path. + dataset (Dataset): deeplake dataset object or path. + path (Union[str, pathlib.Path]): Path to the dataset. logger (logging.Logger): Logger object. embedding_function (Optional[Any], optional): Embedding funtion class used to convert queries/documents to embeddings. Defaults to None. token (Optional[str], optional): API token for the DeepMemory managed service. Defaults to None. @@ -97,14 +99,8 @@ def __init__( ImportError: if indra is not installed ValueError: if incorrect type is specified for `dataset_or_path` """ - if isinstance(dataset_or_path, Dataset): - self.path = dataset_or_path.path - elif isinstance(dataset_or_path, str): - self.path = dataset_or_path - else: - raise ValueError( - "dataset_or_path should be a Dataset object or a string path" - ) + self.dataset = dataset + self.path = convert_pathlib_to_string_if_needed(path) feature_report_path( path=self.path, @@ -143,7 +139,8 @@ def train( relevance (List[List[Tuple[str, int]]]): List of relevant documents for each query with their respective relevance score. The outer list corresponds to the queries and the inner list corresponds to the doc_id, relevence_score pair for each query. doc_id is the document id in the corpus dataset. It is stored in the `id` tensor of the corpus dataset. - relevence_score is the relevance score of the document for the query. The range is between 0 and 1, where 0 stands for not relevant and 1 stands for relevant. + relevence_score is the relevance score of the document for the query. The value is either 0 and 1, where 0 stands for not relevant (unknown relevance) + and 1 stands for relevant. Currently, only values of 1 contribute to the training, and there is no reason to provide examples with relevance of 0. embedding_function (Optional[Callable[[str], np.ndarray]], optional): Embedding funtion used to convert queries to embeddings. Defaults to None. token (str, optional): API token for the DeepMemory managed service. Defaults to None. @@ -155,7 +152,6 @@ def train( """ from deeplake.core.vectorstore.deeplake_vectorstore import VectorStore - self.logger.info("Starting DeepMemory training job") feature_report_path( path=self.path, feature_name="dm.train", @@ -166,52 +162,19 @@ def train( }, token=token or self.token, ) - validate_relevance_and_queries(relevance=relevance, queries=queries) - - # TODO: Support for passing query_embeddings directly without embedding function - corpus_path = self.path - queries_path = corpus_path + "_queries" - - if embedding_function is None and self.embedding_function is None: - raise ValueError( - "Embedding function should be specifed either during initialization or during training." - ) - if embedding_function is None and self.embedding_function is not None: - embedding_function = self.embedding_function.embed_documents - - runtime = None - if get_path_type(corpus_path) == "hub": - runtime = {"tensor_db": True} - - queries_vs = VectorStore( - path=queries_path, - overwrite=True, - runtime=runtime, - token=token or self.token, + trainer = DeepMemoryTrainer( + client=self.client, + logger=self.logger, + path=self.path, + token=self.token, creds=self.creds, - verbose=False, + embedding_function=embedding_function or self.embedding_function, + queries=queries, + relevance=relevance, ) - self.logger.info("Preparing training data for deepmemory:") - queries_vs.add( - text=[query for query in queries], - metadata=[ - {"relevance": relevance_per_doc} for relevance_per_doc in relevance - ], - embedding_data=[query for query in queries], - embedding_function=embedding_function, - ) - - # do some rest_api calls to train the model - response = self.client.start_taining( - corpus_path=corpus_path, - queries_path=queries_path, - ) - - self.logger.info( - f"DeepMemory training job started. Job ID: {response['job_id']}" - ) + response = trainer.start_training() return response["job_id"] @access_control @@ -261,7 +224,7 @@ def delete(self, job_id: str): return self.client.delete_job(job_id=job_id) @access_control - def status(self, job_id: str): + def status(self, job_id: str, return_status=False): """Get the status of a training job on DeepMemory managed service. Examples: @@ -288,29 +251,10 @@ def status(self, job_id: str): token=self.token, ) - _, storage = get_storage_and_cache_chain( - path=self.path, - db_engine={"tensor_db": True}, - read_only=False, - creds=self.creds, - token=self.token, - memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE, - local_cache_size=DEFAULT_LOCAL_CACHE_SIZE, + reporter = DeepMemoryStatusReporter( + self.client, self.path, self.token, self.creds ) - - loaded_dataset = DeepLakeCloudDataset(storage=storage) - - try: - recall, improvement = _get_best_model( - loaded_dataset.embedding, job_id, latest_job=True - ) - - recall = "{:.2f}".format(100 * recall) - improvement = "{:.2f}".format(100 * improvement) - except: - recall = None - improvement = None - self.client.check_status(job_id=job_id, recall=recall, improvement=improvement) + return reporter.status(job_id, return_status) @access_control def list_jobs(self, debug=False): @@ -323,55 +267,10 @@ def list_jobs(self, debug=False): }, token=self.token, ) - _, storage = get_storage_and_cache_chain( - path=self.path, - db_engine={"tensor_db": True}, - read_only=False, - creds=self.creds, - token=self.token, - memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE, - local_cache_size=DEFAULT_LOCAL_CACHE_SIZE, + job_manager = DeepMemoryListJobsManager( + self.client, self.path, self.token, self.creds ) - loaded_dataset = DeepLakeCloudDataset(storage=storage) - - response = self.client.list_jobs( - dataset_path=self.path, - ) - - response_status_schema = JobResponseStatusSchema(response=response) - - jobs = self._get_jobs(response) - if jobs is None: - reposnse_str = "No Deep Memory training jobs were found for this dataset" - print(reposnse_str) - if debug: - return reposnse_str - return None - - recalls = {} - deltas = {} - - latest_job = jobs[-1] - for job in jobs: - try: - recall, delta = _get_best_model( - loaded_dataset.embedding, - job, - latest_job=job == latest_job, - ) - recall = "{:.2f}".format(100 * recall) - delta = "{:.2f}".format(100 * delta) - except: - recall = None - delta = None - - recalls[f"{job}"] = recall - deltas[f"{job}"] = delta - - reposnse_str = response_status_schema.print_jobs( - debug=debug, recalls=recalls, improvements=deltas - ) - return reposnse_str + return job_manager.list_jobs(debug=debug) @access_control def evaluate( @@ -457,184 +356,248 @@ def evaluate( token=self.token, ) + evaluator = DeepMemoryEvaluater( + client=self.client, + path=self.path, + token=self.token, + creds=self.creds, + embedding_function=embedding_function or self.embedding_function, + ) + return evaluator.evaluate_model( + relevance=relevance, + queries=queries, + top_k=top_k, + query_embs=embedding, + qvs_params=qvs_params, + ) + + @access_control + def get_model(self): + """Get the name of the model currently being used by DeepMemory managed service.""" try: - from indra import api # type: ignore + return self.dataset.embedding.info["deepmemory"]["model.npy"]["job_id"] + except: + # if no model exist in the dataset + return None - INDRA_INSTALLED = True - except Exception: - INDRA_INSTALLED = False + @access_control + def set_model(self, model_name: str): + """Set model.npy to use `model_name` instead of default model + Args: + model_name (str): name of the model to use + """ - if not INDRA_INSTALLED: - raise ImportError( - "indra is not installed. Please install indra to use this functionality with: pip install `deeplake[enterprise]`" + if "npy" not in model_name: + model_name += ".npy" + + # verify model_name + self._verify_model_name(model_name) + + # set model.npy to use `model_name` instead of default model + self._set_model_npy(model_name) + + def _verify_model_name(self, model_name: str): + if model_name not in self.dataset.embedding.info["deepmemory"]: + raise ValueError( + "Invalid model name. Please choose from the following models: " + + ", ".join(self.dataset.embedding.info["deepmemory"].keys()) ) - validate_relevance_and_queries(relevance=relevance, queries=queries) + def _set_model_npy(self, model_name: str): + # get new model.npy + new_model_npy = self.dataset.embedding.info["deepmemory"][model_name] - from indra import api # type: ignore + # get old deepmemory dictionary and update it: + old_deepmemory = self.dataset.embedding.info["deepmemory"] + new_deepmemory = old_deepmemory.copy() + new_deepmemory.update({"model.npy": new_model_npy}) - indra_dataset = api.dataset(self.path, token=self.token) - api.tql.prepare_deepmemory_metrics(indra_dataset) + # assign new deepmemory dictionary to the dataset: + self.dataset.embedding.info["deepmemory"] = new_deepmemory - parsed_qvs_params = parse_queries_params(qvs_params) + def _get_dm_client(self): + path = self.path + path_type = get_path_type(path) - start = time() - query_embs: Union[List[np.ndarray], List[List[float]]] + dm_client = DeepMemoryBackendClient(token=self.token) + user_profile = dm_client.get_user_profile() - if embedding is not None: - query_embs = embedding + if path_type == "hub": + # TODO: add support for windows + dataset_id = path[6:].split("/")[0] else: - if self.embedding_function is not None: - embedding_function = ( - embedding_function or self.embedding_function.embed_documents - ) + # TODO: change user_profile to user_id + dataset_id = user_profile["name"] + + deepmemory_is_available = dm_client.deepmemory_is_available(dataset_id) + if deepmemory_is_available: + return dm_client + return None + + def _check_if_model_exists(self): + model = self.get_model() - if embedding_function is None: - raise ValueError( - "Embedding function should be specifed either during initialization or during evaluation." + if model is None: + # check whether training exists at all: + jobs = self.list_jobs(debug=True) + if jobs == "No Deep Memory training jobs were found for this dataset": + raise DeepMemoryEvaluationError( + "No Deep Memory training jobs were found for this dataset. Please train a model before evaluating." + ) + else: + raise DeepMemoryEvaluationError( + "DeepMemory training hasn't finished yet, please wait until training is finished before evaluating." ) - query_embs = embedding_function(queries) - print(f"Embedding queries took {time() - start:.2f} seconds") - recalls: Dict[str, Dict] = {"with model": {}, "without model": {}} - queries_data = { - "text": queries, - "metadata": [ - {"relevance": relevance_per_doc} for relevance_per_doc in relevance - ], - "embedding": query_embs, - "id": [uuid.uuid4().hex for _ in range(len(queries))], - } - for use_model, metric in [ - (False, "COSINE_SIMILARITY"), - (True, "deepmemory_distance"), - ]: - eval_type = "with" if use_model else "without" - print(f"---- Evaluating {eval_type} Deep Memory ---- ") - avg_recalls, queries_dict = recall_at_k( - indra_dataset, - relevance, - top_k=top_k, - query_embs=query_embs, - metric=metric, - use_model=use_model, +class DeepMemoryEvaluater: + def __init__(self, client, path, token, creds, embedding_function=None): + self.client = client + self.path = path + self.token = token + self.creds = creds + self.embedding_function = embedding_function + + def check_indra_installation(self): + try: + from indra import api # type: ignore + + return True + except ImportError: + raise ImportError( + "indra is not installed. Please install indra to use this functionality with: pip install `deeplake[enterprise]`" ) - queries_data.update(queries_dict) + def prepare_embeddings(self, queries, embedding): + if embedding is not None: + return embedding + if self.embedding_function is None: + raise ValueError("Embedding function must be specified.") + return self.embedding_function(queries) - for recall, recall_value in avg_recalls.items(): - print(f"Recall@{recall}:\t {100*recall_value: .1f}%") - recalls[f"{eval_type} model"][f"recall@{recall}"] = recall_value + def evaluate_model(self, relevance, queries, query_embs, top_k, qvs_params): + from indra import api # type: ignore - log_queries = parsed_qvs_params.get("log_queries") - branch = parsed_qvs_params.get("branch") + validate_relevance_and_queries(relevance=relevance, queries=queries) + query_embs = self.prepare_embeddings(queries, query_embs) - if log_queries == False: - return recalls + indra_dataset = api.dataset(self.path, token=self.token) + api.tql.prepare_deepmemory_metrics(indra_dataset) - self.queries_dataset = deeplake.empty( + recalls = {"with model": {}, "without model": {}} + for use_model in [False, True]: + recalls.update( + self._evaluate_with_metric( + indra_dataset, relevance, queries, query_embs, top_k, use_model + ) + ) + + self._log_queries_if_needed(queries, query_embs, relevance, qvs_params) + return recalls + + def _evaluate_with_metric( + self, indra_dataset, relevance, queries, query_embs, top_k, use_model + ): + metric = "deepmemory_distance" if use_model else "COSINE_SIMILARITY" + eval_type = "with" if use_model else "without" + avg_recalls, _ = self._recall_at_k( + indra_dataset, relevance, query_embs, metric, top_k, use_model + ) + + return { + f"{eval_type} model": {f"recall@{k}": v for k, v in avg_recalls.items()} + } + + def _log_queries_if_needed(self, queries, query_embs, relevance, qvs_params): + log_queries = qvs_params.get("log_queries", False) + branch = qvs_params.get("branch", "main") + + if not log_queries: + return + + eval_queries_dataset = deeplake.empty( self.path + "_eval_queries", token=self.token, creds=self.creds, overwrite=True, ) - if len(self.queries_dataset) == 0: - self.queries_dataset.commit(allow_empty=True) + if len(eval_queries_dataset) == 0: + eval_queries_dataset.commit(allow_empty=True) - create = branch not in self.queries_dataset.branches - self.queries_dataset.checkout(parsed_qvs_params["branch"], create=create) + create_branch = branch not in eval_queries_dataset.branches + eval_queries_dataset.checkout(branch, create=create_branch) for tensor_params in DEFAULT_QUERIES_VECTORSTORE_TENSORS: - if tensor_params["name"] not in self.queries_dataset.tensors: - self.queries_dataset.create_tensor(**tensor_params) + if tensor_params["name"] not in eval_queries_dataset.tensors: + eval_queries_dataset.create_tensor(**tensor_params) - self.queries_dataset.extend(queries_data, progressbar=True) - self.queries_dataset.commit() - return recalls - - def _get_dm_client(self): - path = self.path - path_type = get_path_type(path) - - dm_client = DeepMemoryBackendClient(token=self.token) - user_profile = dm_client.get_user_profile() - - if path_type == "hub": - # TODO: add support for windows - dataset_id = path[6:].split("/")[0] - else: - # TODO: change user_profile to user_id - dataset_id = user_profile["name"] + queries_data = { + "text": queries, + "metadata": [{"relevance": r} for r in relevance], + "embedding": query_embs, + "id": [uuid.uuid4().hex for _ in range(len(queries))], + } - deepmemory_is_available = dm_client.deepmemory_is_available(dataset_id) - if deepmemory_is_available: - return dm_client - return None + eval_queries_dataset.extend(queries_data, progressbar=True) + eval_queries_dataset.commit() - def _get_jobs(self, response): - jobs = None - if response is not None and len(response) > 0: - jobs = [job["id"] for job in response] - return jobs + @staticmethod + def _recall_at_k( + indra_dataset: Any, + relevance: List[List[Tuple[str, int]]], + query_embs: Union[List[np.ndarray], List[List[float]]], + metric: str, + top_k: List[int] = [1, 3, 5, 10, 50, 100], + use_model: bool = False, + ): + recalls = defaultdict(list) + top_k_list = [] + for query_idx, _ in enumerate(query_embs): + query_emb = query_embs[query_idx] + # Get the indices of the relevant data for this query + query_relevance = relevance[query_idx] + correct_labels = [rel[0] for rel in query_relevance] -def recall_at_k( - indra_dataset: Any, - relevance: List[List[Tuple[str, int]]], - query_embs: Union[List[np.ndarray], List[List[float]]], - metric: str, - top_k: List[int] = [1, 3, 5, 10, 50, 100], - use_model: bool = False, -): - recalls = defaultdict(list) - top_k_list = [] - - for query_idx, _ in enumerate(query_embs): - query_emb = query_embs[query_idx] - # Get the indices of the relevant data for this query - query_relevance = relevance[query_idx] - correct_labels = [rel[0] for rel in query_relevance] - - # Compute the cosine similarity between the query and all data points - view = get_view( - metric=metric, - query_emb=query_emb, - indra_dataset=indra_dataset, - ) + # Compute the cosine similarity between the query and all data points + view = get_view( + metric=metric, + query_emb=query_emb, + indra_dataset=indra_dataset, + ) - for k in top_k: - collect_data = k == 10 - view_top_k = view[:k] + for k in top_k: + collect_data = k == 10 + view_top_k = view[:k] - top_k_retrieved = [ - sample.id.numpy() for sample in view_top_k - ] # TODO: optimize this + top_k_retrieved = [ + sample.id.numpy() for sample in view_top_k + ] # TODO: optimize this - # Compute the recall: the fraction of relevant items found in the top k - num_relevant_in_top_k = len( - set(correct_labels).intersection(set(top_k_retrieved)) - ) - if len(correct_labels) == 0: - continue - recall = num_relevant_in_top_k / len(correct_labels) - - if collect_data: - top_k_list.append(top_k_retrieved) - recalls[k].append(recall) - - # Average the recalls for each query - avg_recalls = { - f"{recall}": np.mean(np.array(recall_list)) - for recall, recall_list in recalls.items() - } - model_type = "deep_memory" if use_model else "vector_search" - queries_data = { - f"{model_type}_top_10": top_k_list, - f"{model_type}_recall": recalls[10], - } - return avg_recalls, queries_data + # Compute the recall: the fraction of relevant items found in the top k + num_relevant_in_top_k = len( + set(correct_labels).intersection(set(top_k_retrieved)) + ) + if len(correct_labels) == 0: + continue + recall = num_relevant_in_top_k / len(correct_labels) + + if collect_data: + top_k_list.append(top_k_retrieved) + recalls[k].append(recall) + + # Average the recalls for each query + avg_recalls = { + f"{recall}": np.mean(np.array(recall_list)) + for recall, recall_list in recalls.items() + } + model_type = "deep_memory" if use_model else "vector_search" + queries_data = { + f"{model_type}_top_10": top_k_list, + f"{model_type}_recall": recalls[10], + } + return avg_recalls, queries_data def get_view( @@ -694,3 +657,199 @@ def _get_best_model(embedding: Any, job_id: str, latest_job: bool = False): best_recall = recall best_delta = value["delta"] return best_recall, best_delta + + +class DeepMemoryStatusReporter: + def __init__(self, client, path, token, creds): + self.client = client + self.path = path + self.token = token + self.creds = creds + + def _get_storage(self): + _, storage = get_storage_and_cache_chain( + path=self.path, + db_engine={"tensor_db": True}, + read_only=False, + creds=self.creds, + token=self.token, + memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE, + local_cache_size=DEFAULT_LOCAL_CACHE_SIZE, + ) + return storage + + def _get_model_performance(self, storage, job_id): + loaded_dataset = DeepLakeCloudDataset(storage=storage) + try: + recall, improvement = _get_best_model( + loaded_dataset.embedding, job_id, latest_job=True + ) + return "{:.2f}".format(100 * recall), "{:.2f}".format(100 * improvement) + except Exception as e: + # Log or handle the specific exception here + return None, None + + def _check_status(self, job_id, recall, improvement): + return self.client.check_status( + job_id=job_id, recall=recall, improvement=improvement + ) + + def status(self, job_id, return_status): + storage = self._get_storage() + + recall, improvement = self._get_model_performance(storage, job_id) + response = self._check_status(job_id, recall, improvement) + + return response["status"] if return_status else response + + +class DeepMemoryTrainer: + def __init__( + self, + client, + logger, + path, + token, + creds, + queries, + relevance, + embedding_function=None, + ): + self.client = client + self.logger = logger + self.path = path + self.token = token + self.creds = creds + self.embedding_function = embedding_function + self.queries = queries + self.relevance = relevance + + @staticmethod + def _validate_inputs(queries, relevance): + validate_relevance_and_queries(relevance=relevance, queries=queries) + + def _prepare_paths(self): + corpus_path = self.path + queries_path = corpus_path + "_queries" + return corpus_path, queries_path + + def _resolve_embedding_function(self, embedding_function): + if embedding_function is None and self.embedding_function is None: + raise ValueError("Embedding function must be specified.") + return embedding_function or self.embedding_function.embed_documents + + @property + def runtime(self): + return {"tensor_db": True} if get_path_type(self.path) == "hub" else None + + def _configure_vector_store(self, queries_path, token): + from deeplake.core.vectorstore.deeplake_vectorstore import VectorStore + + return VectorStore( + path=queries_path, + overwrite=True, + runtime=self.runtime, + token=token or self.token, + creds=self.creds, + verbose=False, + ) + + def start_training(self): + self._validate_inputs(self.queries, self.relevance) + + self.logger.info("Starting DeepMemory training job") + corpus_path, queries_path = self._prepare_paths() + embedding_function = self._resolve_embedding_function(embedding_function) + + queries_vs = self._configure_vector_store(queries_path, self.token) + + self.logger.info("Preparing training data for DeepMemory:") + queries_vs.add( + text=self.queries, + metadata=[{"relevance": r} for r in self.relevance], + embedding_data=self.queries, + embedding_function=embedding_function, + ) + + response = self.client.start_training( + corpus_path=corpus_path, + queries_path=queries_path, + ) + + self.logger.info( + f"DeepMemory training job started. Job ID: {response['job_id']}" + ) + return response + + +class DeepMemoryListJobsManager: + def __init__(self, client, path, token, creds): + self.client = client + self.path = path + self.token = token + self.creds = creds + + def _load_dataset(self): + _, storage = get_storage_and_cache_chain( + path=self.path, + db_engine={"tensor_db": True}, + read_only=False, + creds=self.creds, + token=self.token, + memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE, + local_cache_size=DEFAULT_LOCAL_CACHE_SIZE, + ) + + loaded_dataset = DeepLakeCloudDataset(storage=storage) + return loaded_dataset + + def _get_status_schema(self): + response = self.client.list_jobs(dataset_path=self.path) + return response, JobResponseStatusSchema(response=response) + + def _get_job_performance(self, loaded_dataset, job, latest_job): + try: + recall, delta = _get_best_model( + loaded_dataset.embedding, + job, + latest_job=latest_job, + ) + return "{:.2f}".format(100 * recall), "{:.2f}".format(100 * delta) + except Exception as e: + return None, None + + def _process_jobs(self, loaded_dataset, jobs): + recalls = {} + deltas = {} + latest_job = jobs[-1] + for job in jobs: + recall, delta = self._get_job_performance( + loaded_dataset, job, job == latest_job + ) + recalls[f"{job}"] = recall + deltas[f"{job}"] = delta + return recalls, deltas + + @staticmethod + def _get_jobs(response): + jobs = None + if response is not None and len(response) > 0: + jobs = [job["id"] for job in response] + return jobs + + def list_jobs(self, debug): + loaded_dataset = self._load_dataset() + + response, response_status_schema = self._get_status_schema() + + jobs = self._get_jobs(response) + + if not jobs: + response_str = "No Deep Memory training jobs were found for this dataset" + print(response_str) + return response_str if debug else None + + recalls, deltas = self._process_jobs(loaded_dataset, jobs) + return response_status_schema.print_jobs( + debug=debug, recalls=recalls, improvements=deltas + ) diff --git a/deeplake/core/vectorstore/deep_memory/test_deepmemory.py b/deeplake/core/vectorstore/deep_memory/test_deepmemory.py index 0a458974fa..9778b0f413 100644 --- a/deeplake/core/vectorstore/deep_memory/test_deepmemory.py +++ b/deeplake/core/vectorstore/deep_memory/test_deepmemory.py @@ -3,6 +3,7 @@ import pytest import sys from time import sleep +from unittest.mock import MagicMock import deeplake from deeplake import VectorStore @@ -660,3 +661,66 @@ def test_not_supported_training_args(corpus_query_relevances_copy, hub_cloud_dev queries=queries, relevance="relevances", ) + + +def test_deepmemory_v2_set_model_should_set_model_for_all_subsequent_loads( + local_dmv2_dataset, + hub_cloud_dev_token, +): + # Setiing model should set model for all subsequent loads + db = VectorStore(path=local_dmv2_dataset, token=hub_cloud_dev_token) + assert db.deep_memory.get_model() == "655f86e8ab93e7fc5067a3ac_2" + + # ensure after setting model, get model returns specified model + db.deep_memory.set_model("655f86e8ab93e7fc5067a3ac_1") + + assert ( + db.dataset.embedding.info["deepmemory"]["model.npy"]["job_id"] + == "655f86e8ab93e7fc5067a3ac_1" + ) + assert db.deep_memory.get_model() == "655f86e8ab93e7fc5067a3ac_1" + + # ensure after setting model, reloading the dataset returns the same model + db = VectorStore(path=local_dmv2_dataset, token=hub_cloud_dev_token) + assert db.deep_memory.get_model() == "655f86e8ab93e7fc5067a3ac_1" + + +@pytest.mark.slow +@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows") +def test_deepmemory_search_should_contain_correct_answer( + corpus_query_relevances_copy, + testing_relevance_query_deepmemory, + hub_cloud_dev_token, +): + corpus, _, _, _ = corpus_query_relevances_copy + relevance, query_embedding = testing_relevance_query_deepmemory + + db = VectorStore( + path=corpus, + token=hub_cloud_dev_token, + ) + + output = db.search( + embedding=query_embedding, deep_memory=True, return_tensors=["id"] + ) + assert len(output["id"]) == 4 + assert relevance in output["id"] + + +@pytest.mark.slow +@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows") +def test_deeplake_search_should_not_contain_correct_answer( + corpus_query_relevances_copy, + testing_relevance_query_deepmemory, + hub_cloud_dev_token, +): + corpus, _, _, _ = corpus_query_relevances_copy + relevance, query_embedding = testing_relevance_query_deepmemory + + db = VectorStore( + path=corpus, + token=hub_cloud_dev_token, + ) + output = db.search(embedding=query_embedding) + assert len(output["id"]) == 4 + assert relevance not in output["id"] diff --git a/deeplake/core/vectorstore/deeplake_vectorstore.py b/deeplake/core/vectorstore/deeplake_vectorstore.py index 69e643e7d3..1c511a1f0d 100644 --- a/deeplake/core/vectorstore/deeplake_vectorstore.py +++ b/deeplake/core/vectorstore/deeplake_vectorstore.py @@ -8,6 +8,8 @@ from deeplake.core.dataset import Dataset from deeplake.core.vectorstore.dataset_handlers import get_dataset_handler from deeplake.core.vectorstore.deep_memory import DeepMemory +from deeplake.core.vectorstore.dataset_handlers import get_dataset_handler +from deeplake.core.vectorstore.deep_memory import DeepMemory from deeplake.constants import ( DEFAULT_VECTORSTORE_TENSORS, MAX_BYTES_PER_MINUTE, @@ -131,7 +133,8 @@ def __init__( ) self.deep_memory = DeepMemory( - dataset_or_path=self.dataset_handler.path, + dataset=self.dataset_handler.dataset, + path=self.dataset_handler.path, token=self.dataset_handler.token, logger=logger, embedding_function=embedding_function, @@ -240,6 +243,7 @@ def search( return_tensors: Optional[List[str]] = None, return_view: bool = False, deep_memory: bool = False, + return_tql: bool = False, ) -> Union[Dict, Dataset]: """VectorStore search method that combines embedding search, metadata search, and custom TQL search. @@ -290,6 +294,7 @@ def search( return_view (bool): Return a Deep Lake dataset view that satisfied the search parameters, instead of a dictionary with data. Defaults to False. If ``True`` return_tensors is set to "*" beucase data is lazy-loaded and there is no cost to including all tensors in the view. deep_memory (bool): Whether to use the Deep Memory model for improving search results. Defaults to False if deep_memory is not specified in the Vector Store initialization. If True, the distance metric is set to "deepmemory_distance", which represents the metric with which the model was trained. The search is performed using the Deep Memory model. If False, the distance metric is set to "COS" or whatever distance metric user specifies. + return_tql (bool): Whether to return the TQL query string used for the search. Defaults to False. .. # noqa: DAR101 @@ -318,6 +323,7 @@ def search( return_tensors=return_tensors, return_view=return_view, deep_memory=deep_memory, + return_tql=return_tql, ) def delete( diff --git a/deeplake/core/vectorstore/dev_helpers/__init__.py b/deeplake/core/vectorstore/dev_helpers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/deeplake/core/vectorstore/dev_helpers/vectorstore_tools.py b/deeplake/core/vectorstore/dev_helpers/vectorstore_tools.py new file mode 100644 index 0000000000..11a5cd8896 --- /dev/null +++ b/deeplake/core/vectorstore/dev_helpers/vectorstore_tools.py @@ -0,0 +1,106 @@ +import deeplake +from typing import Dict, List, Optional, Tuple +from deeplake.core.vectorstore.vector_search.utils import create_data + + +def create_and_load_vectorstore(): + from deeplake import VectorStore + + db = VectorStore( + path="local_path", + overwrite=True, + ) + + texts, embeddings, ids, metadata, _ = create_data( + number_of_data=100, embedding_dim=1536, metadata_key="abc" + ) + db.add( + text=texts, + embedding=embeddings, + id=ids, + metadata=metadata, + ) + return db + + +def train_deepmemory_model( + dataset_name: str = f"hub://activeloop-test/scifact", + corpus: Optional[Dict] = None, + relevenace: Optional[List[List[Tuple[str, int]]]] = None, + queries: Optional[List[str]] = None, + token: Optional[str] = None, + overwrite: bool = False, + enviroment: str = "staging", +): + from deeplake import VectorStore + from langchain.embeddings.openai import OpenAIEmbeddings # type: ignore + + if enviroment == "staging": + deeplake.client.config.USE_STAGING_ENVIRONMENT = True + elif enviroment == "dev": + deeplake.client.config.USE_DEV_ENVIRONMENT = True + + embedding_function = OpenAIEmbeddings() + if corpus is None: + if ( + not deeplake.exists(dataset_name, token=token, creds={}) + or overwrite == True + ): + deeplake.deepcopy( + f"hub://activeloop-test/deepmemory_test_corpus", + dataset_name, + token=token, + overwrite=True, + runtime={"tensor_db": True}, + ) + + db = VectorStore( + dataset_name, + token=token, + embedding_function=embedding_function, + ) + else: + db = VectorStore( + dataset_name, + token=token, + overwrite=True, + embedding_function=embedding_function, + ) + db.add(**corpus) + + query_vs = None + + if relevenace is None: + query_vs = VectorStore( + path=f"hub://activeloop-test/deepmemory_test_queries", + runtime={"tensor_db": True}, + token=token, + ) + relevance = query_vs.dataset.metadata.data()["value"] + + if queries is None: + if not query_vs: + query_vs = VectorStore( + path=f"hub://activeloop-test/deepmemory_test_queries", + runtime={"tensor_db": True}, + token=token, + ) + queries = query_vs.dataset.text.data()["value"] + + db.deep_memory.train( + relevance=relevance, + queries=queries, + ) + return db + + +def set_backend(backend="prod"): + if backend == "staging": + deeplake.client.config.USE_STAGING_ENVIRONMENT = True + deeplake.client.config.USE_DEV_ENVIRONMENT = False + elif backend == "dev": + deeplake.client.config.USE_DEV_ENVIRONMENT = True + deeplake.client.config.USE_STAGING_ENVIRONMENT = False + else: + deeplake.client.config.USE_DEV_ENVIRONMENT = False + deeplake.client.config.USE_STAGING_ENVIRONMENT = False diff --git a/deeplake/core/vectorstore/test_deeplake_vectorstore.py b/deeplake/core/vectorstore/test_deeplake_vectorstore.py index 7194a62906..66b56cb8a6 100644 --- a/deeplake/core/vectorstore/test_deeplake_vectorstore.py +++ b/deeplake/core/vectorstore/test_deeplake_vectorstore.py @@ -1299,10 +1299,14 @@ def create_and_populate_vs( overwrite=True, verbose=False, exec_option="compute_engine", - index_params={"threshold": 10}, + index_params={"threshold": -1}, number_of_data=NUMBER_OF_DATA, + runtime=None, ): - # TODO: cache the vectostore object and reuse it in other tests (maybe with deepcopy) + # if runtime specified and tensor_db is enabled, then set exec_option to None + if runtime and runtime.get("tensor_db", False): + exec_option = None + vector_store = DeepLakeVectorStore( path=path, overwrite=overwrite, @@ -1310,6 +1314,7 @@ def create_and_populate_vs( exec_option=exec_option, index_params=index_params, token=token, + runtime=runtime, ) utils.create_data(number_of_data=number_of_data, embedding_dim=EMBEDDING_DIM) @@ -1357,7 +1362,6 @@ def test_update_embedding_row_ids_and_filter_specified_should_throw_exception( ) embedding_fn = get_embedding_function() - # calling update_embedding with both ids and filter being specified with pytest.raises(ValueError): vector_store.update_embedding( row_ids=vector_store_row_ids, @@ -1381,6 +1385,7 @@ def test_update_embedding_query_and_filter_specified_should_throw_exception( embedding_fn = get_embedding_function() # calling update_embedding with both query and filter being specified + with pytest.raises(ValueError): vector_store.update_embedding( filter=vector_store_filters, @@ -2851,6 +2856,46 @@ def test_vs_commit(local_path): assert len(db) == NUMBER_OF_DATA +def test_vs_init_when_both_dataset_and_path_is_specified(local_path): + with pytest.raises(ValueError): + VectorStore( + path=local_path, + dataset=deeplake.empty(local_path, overwrite=True), + ) + + +def test_vs_init_when_both_dataset_and_path_are_not_specified(): + with pytest.raises(ValueError): + VectorStore() + + +def test_vs_init_with_emptyt_token(local_path): + with patch("deeplake.client.config.DEEPLAKE_AUTH_TOKEN", ""): + db = VectorStore( + path=local_path, + ) + + assert db.dataset_handler.username == "public" + + +@pytest.fixture +def mock_search_managed(mocker): + # Replace SearchManaged with a mock + mock_class = mocker.patch( + "deeplake.core.vectorstore.vector_search.indra.search_algorithm.SearchManaged" + ) + return mock_class + + +@pytest.fixture +def mock_search_indra(mocker): + # Replace SearchIndra with a mock + mock_class = mocker.patch( + "deeplake.core.vectorstore.vector_search.indra.search_algorithm.SearchIndra" + ) + return mock_class + + def test_vs_init_when_both_dataset_and_path_is_specified_should_throw_exception( local_path, ): @@ -2880,3 +2925,39 @@ def test_vs_init_with_emptyt_token_should_not_throw_exception(local_path): ) assert db.dataset_handler.username == "public" + + +@pytest.mark.slow +def test_db_search_with_managed_db_should_instantiate_SearchManaged_class( + mock_search_managed, hub_cloud_path, hub_cloud_dev_token +): + # using interaction test to ensure that the search managed class is executed + db = create_and_populate_vs( + hub_cloud_path, + runtime={"tensor_db": True}, + token=hub_cloud_dev_token, + ) + + # Perform the search + db.search(embedding=query_embedding) + + # Assert that SearchManaged was instantiated + mock_search_managed.assert_called() + + +@pytest.mark.slow +@requires_libdeeplake +def test_db_search_should_instantiate_SearchIndra_class( + mock_search_indra, hub_cloud_path, hub_cloud_dev_token +): + # using interaction test to ensure that the search indra class is executed + db = create_and_populate_vs( + hub_cloud_path, + token=hub_cloud_dev_token, + ) + + # Perform the search + db.search(embedding=query_embedding) + + # Assert that SearchIndra was instantiated + mock_search_indra.assert_called() diff --git a/deeplake/core/vectorstore/vector_search/dataset/dataset.py b/deeplake/core/vectorstore/vector_search/dataset/dataset.py index 2450718767..ffa3f88b0d 100644 --- a/deeplake/core/vectorstore/vector_search/dataset/dataset.py +++ b/deeplake/core/vectorstore/vector_search/dataset/dataset.py @@ -567,6 +567,7 @@ def convert_id_to_row_id(ids, dataset, search_fn, query, exec_option, filter): return_view=True, k=int(1e9), deep_memory=False, + return_tql=False, ) else: diff --git a/deeplake/core/vectorstore/vector_search/indra/search_algorithm.py b/deeplake/core/vectorstore/vector_search/indra/search_algorithm.py index 71a8a1f353..d93e09242e 100644 --- a/deeplake/core/vectorstore/vector_search/indra/search_algorithm.py +++ b/deeplake/core/vectorstore/vector_search/indra/search_algorithm.py @@ -1,4 +1,5 @@ import numpy as np +from abc import ABC, abstractmethod from typing import Union, Dict, List, Optional from deeplake.core.vectorstore.vector_search.indra import query @@ -9,6 +10,157 @@ from deeplake.enterprise.util import raise_indra_installation_error +class SearchBasic(ABC): + def __init__( + self, + deeplake_dataset: DeepLakeDataset, + org_id: Optional[str] = None, + token: Optional[str] = None, + runtime: Optional[Dict] = None, + deep_memory: bool = False, + ): + """Base class for all search algorithms. + Args: + deeplake_dataset (DeepLakeDataset): DeepLake dataset object. + org_id (Optional[str], optional): Organization ID, is needed only for local datasets. Defaults to None. + token (Optional[str], optional): Token used for authentication. Defaults to None. + runtime (Optional[Dict], optional): Whether to run query on managed_db or indra. Defaults to None. + deep_memory (bool): Use DeepMemory for the search. Defaults to False. + """ + self.deeplake_dataset = deeplake_dataset + self.org_id = org_id + self.token = token + self.runtime = runtime + self.deep_memory = deep_memory + + def run( + self, + tql_string: str, + return_view: bool, + return_tql: bool, + distance_metric: str, + k: int, + query_embedding: Union[np.ndarray, List[float]], + embedding_tensor: str, + tql_filter: str, + return_tensors: List[str], + ): + tql_query = self._create_tql_string( + tql_string, + distance_metric, + k, + query_embedding, + embedding_tensor, + tql_filter, + return_tensors, + ) + view = self._get_view( + tql_query, + runtime=self.runtime, + ) + + if return_view: + return view + + return_data = self._collect_return_data(view) + + if return_tql: + return {"data": return_data, "tql": tql_query} + return return_data + + @abstractmethod + def _collect_return_data( + self, + view: DeepLakeDataset, + ): + pass + + @staticmethod + def _create_tql_string( + tql_string: str, + distance_metric: str, + k: int, + query_embedding: np.ndarray, + embedding_tensor: str, + tql_filter: str, + return_tensors: List[str], + ): + """Creates TQL query string for the vector search.""" + if tql_string: + return tql_string + else: + return query.parse_query( + distance_metric, + k, + query_embedding, + embedding_tensor, + tql_filter, + return_tensors, + ) + + @abstractmethod + def _get_view(self, tql_query: str, runtime: Optional[Dict] = None): + pass + + +class SearchIndra(SearchBasic): + def _get_view(self, tql_query, runtime: Optional[Dict] = None): + indra_dataset = self._get_indra_dataset() + indra_view = indra_dataset.query(tql_query) + view = DeepLakeQueryDataset( + deeplake_ds=self.deeplake_dataset, indra_ds=indra_view + ) + view._tql_query = tql_query + return view + + def _get_indra_dataset(self): + try: + from indra import api # type: ignore + + INDRA_INSTALLED = True + except ImportError: + INDRA_INSTALLED = False + pass + + if not INDRA_INSTALLED: + raise raise_indra_installation_error(indra_import_error=None) + + if self.deeplake_dataset.libdeeplake_dataset is not None: + indra_dataset = self.deeplake_dataset.libdeeplake_dataset + else: + if self.org_id is not None: + self.deeplake_dataset.org_id = self.org_id + if self.token is not None: + self.deeplake_dataset.set_token(self.token) + + indra_dataset = dataset_to_libdeeplake(self.deeplake_dataset) + return indra_dataset + + def _collect_return_data( + self, + view: DeepLakeDataset, + ): + return_data = {} + for tensor in view.tensors: + return_data[tensor] = utils.parse_tensor_return(view[tensor]) + return return_data + + +class SearchManaged(SearchBasic): + def _get_view(self, tql_query, runtime: Optional[Dict] = None): + view, data = self.deeplake_dataset.query( + tql_query, runtime=runtime, return_data=True + ) + self.data = data + return view + + def _collect_return_data( + self, + view: DeepLakeDataset, + ): + return self.data + + def search( query_embedding: np.ndarray, distance_metric: str, @@ -23,6 +175,7 @@ def search( deep_memory: bool = False, token: Optional[str] = None, org_id: Optional[str] = None, + return_tql: bool = False, ) -> Union[Dict, DeepLakeDataset]: """Generalized search algorithm that uses indra. It combines vector search and other TQL queries. @@ -40,6 +193,7 @@ def search( deep_memory (bool): Use DeepMemory for the search. Defaults to False. token (Optional[str], optional): Token used for authentication. Defaults to None. org_id (Optional[str], optional): Organization ID, is needed only for local datasets. Defaults to None. + return_tql (bool): Return TQL query used for the search. Defaults to False. Raises: ValueError: If both tql_string and tql_filter are specified. @@ -48,76 +202,20 @@ def search( Returns: Union[Dict, DeepLakeDataset]: Dictionary where keys are tensor names and values are the results of the search, or a Deep Lake dataset view. """ - try: - from indra import api # type: ignore - - INDRA_INSTALLED = True - except ImportError: - INDRA_INSTALLED = False - pass - - if tql_string: - tql_query = tql_string + searcher: SearchBasic + if runtime and runtime.get("db_engine", False): + searcher = SearchManaged(deeplake_dataset, org_id, token, runtime=runtime) else: - tql_query = query.parse_query( - distance_metric, - k, - query_embedding, - embedding_tensor, - tql_filter, - return_tensors, - ) - - if deep_memory: - if not INDRA_INSTALLED: - raise raise_indra_installation_error(indra_import_error=None) - - if deeplake_dataset.libdeeplake_dataset is not None: - indra_dataset = deeplake_dataset.libdeeplake_dataset - else: - if org_id is not None: - deeplake_dataset.org_id = org_id - if token is not None: - deeplake_dataset.set_token(token) - - indra_dataset = dataset_to_libdeeplake(deeplake_dataset) - api.tql.prepare_deepmemory_metrics(indra_dataset) - - indra_view = indra_dataset.query(tql_query) - - view = DeepLakeQueryDataset(deeplake_ds=deeplake_dataset, indra_ds=indra_view) - view._tql_query = tql_query - - if return_view: - return view - - return_data = {} - for tensor in view.tensors: - return_data[tensor] = utils.parse_tensor_return(view[tensor]) - elif runtime and runtime.get("db_engine", False): - view, data = deeplake_dataset.query( - tql_query, runtime=runtime, return_data=True - ) - if return_view: - return view - - return_data = data - else: - if not INDRA_INSTALLED: - raise raise_indra_installation_error( - indra_import_error=None - ) # pragma: no cover - - view = deeplake_dataset.query( - tql_query, - runtime=runtime, - ) - - if return_view: - return view - - return_data = {} - for tensor in view.tensors: - return_data[tensor] = utils.parse_tensor_return(view[tensor]) - - return return_data + searcher = SearchIndra(deeplake_dataset, org_id, token) + + return searcher.run( + tql_string=tql_string, + return_view=return_view, + return_tql=return_tql, + distance_metric=distance_metric, + k=k, + query_embedding=query_embedding, + embedding_tensor=embedding_tensor, + tql_filter=tql_filter, + return_tensors=return_tensors, + ) diff --git a/deeplake/core/vectorstore/vector_search/indra/vector_search.py b/deeplake/core/vectorstore/vector_search/indra/vector_search.py index b0e80cb019..6b94fcb599 100644 --- a/deeplake/core/vectorstore/vector_search/indra/vector_search.py +++ b/deeplake/core/vectorstore/vector_search/indra/vector_search.py @@ -21,6 +21,7 @@ def vector_search( deep_memory, token, org_id, + return_tql, ) -> Union[Dict, DeepLakeDataset]: try: from indra import api # type: ignore @@ -58,4 +59,5 @@ def vector_search( deep_memory=deep_memory, token=token, org_id=org_id, + return_tql=return_tql, ) diff --git a/deeplake/core/vectorstore/vector_search/python/test_vector_search.py b/deeplake/core/vectorstore/vector_search/python/test_vector_search.py index da18ec955f..2c8f1ac4ba 100644 --- a/deeplake/core/vectorstore/vector_search/python/test_vector_search.py +++ b/deeplake/core/vectorstore/vector_search/python/test_vector_search.py @@ -29,6 +29,7 @@ def test_vector_search(): deep_memory=False, token=None, org_id=None, + return_tql=False, ) assert len(data["score"]) == 10 @@ -49,6 +50,7 @@ def test_vector_search(): deep_memory=False, token=None, org_id=None, + return_tql=False, ) data = vector_search.vector_search( @@ -66,6 +68,7 @@ def test_vector_search(): deep_memory=False, token=None, org_id=None, + return_tql=False, ) assert len(data) == 10 @@ -87,4 +90,5 @@ def test_vector_search(): deep_memory=False, token=None, org_id=None, + return_tql=False, ) diff --git a/deeplake/core/vectorstore/vector_search/python/vector_search.py b/deeplake/core/vectorstore/vector_search/python/vector_search.py index e632b5e5a1..cfbf186b48 100644 --- a/deeplake/core/vectorstore/vector_search/python/vector_search.py +++ b/deeplake/core/vectorstore/vector_search/python/vector_search.py @@ -21,12 +21,18 @@ def vector_search( deep_memory, token, org_id, + return_tql, ) -> Union[Dict, DeepLakeDataset]: if query is not None: raise NotImplementedError( f"User-specified TQL queries are not supported for exec_option={exec_option} " ) + if return_tql: + raise NotImplementedError( + f"return_tql is not supported for exec_option={exec_option}" + ) + view = filter_utils.attribute_based_filtering_python(dataset, filter) return_data = {} diff --git a/deeplake/core/vectorstore/vector_search/vector_search.py b/deeplake/core/vectorstore/vector_search/vector_search.py index 4606fd1257..bf7a3fdc38 100644 --- a/deeplake/core/vectorstore/vector_search/vector_search.py +++ b/deeplake/core/vectorstore/vector_search/vector_search.py @@ -31,6 +31,7 @@ def search( deep_memory: bool = False, token: Optional[str] = None, org_id: Optional[str] = None, + return_tql: bool = False, ) -> Union[Dict, DeepLakeDataset]: """Searching function Args: @@ -53,6 +54,7 @@ def search( deep_memory (bool): Use DeepMemory for the search. Defaults to False. token (Optional[str], optional): Token used for authentication. Defaults to None. org_id (Optional[str], optional): Organization ID, is needed only for local datasets. Defaults to None. + return_tql (bool): Return TQL query used for the search. Defaults to False. """ return EXEC_OPTION_TO_SEARCH_TYPE[exec_option]( query=query, @@ -69,4 +71,5 @@ def search( deep_memory=deep_memory, token=token, org_id=org_id, + return_tql=return_tql, ) diff --git a/deeplake/requirements/tests.txt b/deeplake/requirements/tests.txt index d2f66c77a8..373fa63000 100644 --- a/deeplake/requirements/tests.txt +++ b/deeplake/requirements/tests.txt @@ -2,6 +2,7 @@ pytest pytest-cases pytest-benchmark pytest-cov +pytest-mock pytest-timeout pytest-rerunfailures pytest-profiling diff --git a/deeplake/tests/path_fixtures.py b/deeplake/tests/path_fixtures.py index 659ea30cd5..179e4724b3 100644 --- a/deeplake/tests/path_fixtures.py +++ b/deeplake/tests/path_fixtures.py @@ -792,3 +792,33 @@ def precomputed_jobs_list(): with open(os.path.join(parent, "precomputed_jobs_list.txt"), "r") as f: jobs = f.read() return jobs + + +@pytest.fixture +def local_dmv2_dataset(request, hub_cloud_dev_token): + dmv2_path = f"hub://{HUB_CLOUD_DEV_USERNAME}/dmv2" + + local_cache_path = ".deeplake_tests_cache/" + if not os.path.exists(local_cache_path): + os.mkdir(local_cache_path) + + dataset_cache_path = local_cache_path + "dmv2" + if not os.path.exists(dataset_cache_path): + deeplake.deepcopy( + dmv2_path, + dataset_cache_path, + token=hub_cloud_dev_token, + overwrite=True, + ) + + corpus = _get_storage_path(request, LOCAL) + + deeplake.deepcopy( + dataset_cache_path, + corpus, + token=hub_cloud_dev_token, + overwrite=True, + ) + yield corpus + + delete_if_exists(corpus, hub_cloud_dev_token) diff --git a/deeplake/util/exceptions.py b/deeplake/util/exceptions.py index 73a79917ad..cd330b8e5c 100644 --- a/deeplake/util/exceptions.py +++ b/deeplake/util/exceptions.py @@ -1138,3 +1138,8 @@ def __init__(self): "Please make sure, that queries is of type List[str]" ) super().__init__(msg) + + +class DeepMemoryEvaluationError(Exception): + def __init__(self, msg): + super().__init__(msg)