diff --git a/deeplake/client/test_client.py b/deeplake/client/test_client.py index 027e521ebe..67f7f0f09d 100644 --- a/deeplake/client/test_client.py +++ b/deeplake/client/test_client.py @@ -168,7 +168,7 @@ def test_deepmemory_print_status_and_list_jobs(capsys, precomputed_jobs_list): progress=None, ) response_schema = JobResponseStatusSchema(response=pending_response) - response_schema.print_status(job_id) + response_schema.print_status(job_id, recall=None, importvement=None) captured = capsys.readouterr() assert captured.out == Status.pending @@ -204,7 +204,7 @@ def test_deepmemory_print_status_and_list_jobs(capsys, precomputed_jobs_list): }, ) response_schema = JobResponseStatusSchema(response=failed_response) - response_schema.print_status(job_id) + response_schema.print_status(job_id, recall=None, importvement=None) captured = capsys.readouterr() assert captured.out == Status.failed diff --git a/deeplake/client/utils.py b/deeplake/client/utils.py index 74a9ed580b..0e99734806 100644 --- a/deeplake/client/utils.py +++ b/deeplake/client/utils.py @@ -144,8 +144,8 @@ def validate_status_response(self): def print_status( self, job_id: Union[str, List[str]], - recall: Optional[str] = None, - importvement: Optional[str] = None, + recall: str, + importvement: str, ): if not isinstance(job_id, List): job_id = [job_id] @@ -157,8 +157,8 @@ def print_status( if response["status"] == "completed": response["results"] = get_results( - response, - " " * 30, + response=response, + indent=" " * 30, add_vertical_bars=True, recall=recall, improvement=importvement, @@ -217,8 +217,8 @@ def print_jobs( ) if response_status == "completed": response_results = get_results( - response, - "", + response=response, + indent="", add_vertical_bars=False, width=15, recall=recalls[response_id], @@ -271,20 +271,15 @@ def print_jobs( def get_results( response: Dict[str, Any], + improvement: str, + recall: str, indent: str, add_vertical_bars: bool, width: int = 21, - improvement: Optional[str] = None, - recall: Optional[str] = None, ): progress = response["progress"] for progress_key, progress_value in progress.items(): if progress_key == BEST_RECALL: - curr_recall, curr_improvement = progress_value.split("%")[:2] - - recall = recall or curr_recall - improvement = improvement or curr_improvement - if "(" not in improvement: improvement = f"(+{improvement}%)" diff --git a/deeplake/constants.py b/deeplake/constants.py index fbc6ae7995..c7f7abb38b 100644 --- a/deeplake/constants.py +++ b/deeplake/constants.py @@ -328,3 +328,4 @@ "M": 32, }, } +VECTORSTORE_EXTEND_BATCH_SIZE = 500 diff --git a/deeplake/core/vectorstore/deep_memory.py b/deeplake/core/vectorstore/deep_memory.py index 4eba32d036..4b68f81f1a 100644 --- a/deeplake/core/vectorstore/deep_memory.py +++ b/deeplake/core/vectorstore/deep_memory.py @@ -1,4 +1,5 @@ import uuid +from collections import defaultdict from typing import Any, Dict, Optional, List, Union, Callable, Tuple from time import time @@ -6,15 +7,24 @@ import deeplake from deeplake.enterprise.dataloader import indra_available -from deeplake.constants import DEFAULT_QUERIES_VECTORSTORE_TENSORS +from deeplake.util.remove_cache import get_base_storage +from deeplake.constants import ( + DEFAULT_QUERIES_VECTORSTORE_TENSORS, + DEFAULT_MEMORY_CACHE_SIZE, + DEFAULT_LOCAL_CACHE_SIZE, +) +from deeplake.util.storage import get_storage_and_cache_chain from deeplake.core.dataset import Dataset +from deeplake.core.dataset.deeplake_cloud_dataset import DeepLakeCloudDataset from deeplake.core.vectorstore.deeplake_vectorstore import VectorStore from deeplake.client.client import DeepMemoryBackendClient from deeplake.client.utils import JobResponseStatusSchema from deeplake.util.bugout_reporter import ( feature_report_path, ) +from deeplake.util.dataset import try_flushing from deeplake.util.path import get_path_type +from deeplake.util.version_control import load_meta class DeepMemory: @@ -114,7 +124,6 @@ def train( path=queries_path, overwrite=True, runtime=runtime, - embedding_function=embedding_function, token=token or self.token, creds=self.creds, ) @@ -125,6 +134,7 @@ def train( {"relevance": relevance_per_doc} for relevance_per_doc in relevance ], embedding_data=[query for query in queries], + embedding_function=embedding_function, ) # do some rest_api calls to train the model @@ -206,9 +216,22 @@ def status(self, job_id: str): }, token=self.token, ) + + _, storage = get_storage_and_cache_chain( + path=self.dataset.path, + db_engine={"tensor_db": True}, + read_only=False, + creds=self.creds, + token=self.dataset.token, + memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE, + local_cache_size=DEFAULT_LOCAL_CACHE_SIZE, + ) + + loaded_dataset = DeepLakeCloudDataset(storage=storage) + try: recall, improvement = _get_best_model( - self.dataset.embedding, job_id, latest_job=True + loaded_dataset.embedding, job_id, latest_job=True ) recall = "{:.2f}".format(100 * recall) @@ -228,6 +251,17 @@ def list_jobs(self, debug=False): }, token=self.token, ) + _, storage = get_storage_and_cache_chain( + path=self.dataset.path, + db_engine={"tensor_db": True}, + read_only=False, + creds=self.creds, + token=self.dataset.token, + memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE, + local_cache_size=DEFAULT_LOCAL_CACHE_SIZE, + ) + loaded_dataset = DeepLakeCloudDataset(storage=storage) + response = self.client.list_jobs( dataset_path=self.dataset.path, ) @@ -243,7 +277,7 @@ def list_jobs(self, debug=False): for job in jobs: try: recall, delta = _get_best_model( - self.dataset.embedding, + loaded_dataset.embedding, job, latest_job=job == latest_job, ) @@ -352,6 +386,7 @@ def evaluate( }, token=self.token, ) + try_flushing(self.dataset) try: from indra import api # type: ignore @@ -373,9 +408,10 @@ def evaluate( start = time() query_embs: Union[List[np.ndarray], List[List[float]]] + if embedding is not None: query_embs = embedding - elif embedding is None: + else: if self.embedding_function is not None: embedding_function = ( embedding_function or self.embedding_function.embed_documents @@ -404,26 +440,20 @@ def evaluate( ]: eval_type = "with" if use_model else "without" print(f"---- Evaluating {eval_type} model ---- ") - callect_data = False - for k in top_k: - callect_data = k == 10 - - recall, queries_dict = recall_at_k( - self.dataset, - indra_dataset, - relevance, - top_k=k, - query_embs=query_embs, - metric=metric, - collect_data=callect_data, - use_model=use_model, - ) + avg_recalls, queries_dict = recall_at_k( + indra_dataset, + relevance, + top_k=top_k, + query_embs=query_embs, + metric=metric, + use_model=use_model, + ) - if callect_data: - queries_data.update(queries_dict) + queries_data.update(queries_dict) - print(f"Recall@{k}:\t {100*recall: .1f}%") - recalls[f"{eval_type} model"][f"recall@{k}"] = recall + for recall, recall_value in avg_recalls.items(): + print(f"Recall@{recall}:\t {100*recall_value: .1f}%") + recalls[f"{eval_type} model"][f"recall@{recall}"] = recall_value log_queries = parsed_qvs_params.get("log_queries") branch = parsed_qvs_params.get("branch") @@ -454,16 +484,14 @@ def evaluate( def recall_at_k( - dataset: Dataset, indra_dataset: Any, relevance: List[List[Tuple[str, int]]], query_embs: Union[List[np.ndarray], List[List[float]]], metric: str, - top_k: int = 10, - collect_data: bool = False, + top_k: List[int] = [1, 3, 5, 10, 50, 100], use_model: bool = False, ): - recalls = [] + recalls = defaultdict(list) top_k_list = [] for query_idx, _ in enumerate(query_embs): @@ -473,46 +501,48 @@ def recall_at_k( correct_labels = [rel[0] for rel in query_relevance] # Compute the cosine similarity between the query and all data points - view_top_k = get_view_top_k( + view = get_view( metric=metric, query_emb=query_emb, - top_k=top_k, indra_dataset=indra_dataset, ) - top_k_retrieved = [ - sample.id.numpy() for sample in view_top_k - ] # TODO: optimize this - - # Compute the recall: the fraction of relevant items found in the top k - num_relevant_in_top_k = len( - set(correct_labels).intersection(set(top_k_retrieved)) - ) - if len(correct_labels) == 0: - continue - recall = num_relevant_in_top_k / len(correct_labels) - - if collect_data: - top_k_list.append(top_k_retrieved) - recalls.append(recall) + for k in top_k: + collect_data = k == 10 + view_top_k = view[:k] - # Average the recalls for each query - avg_recall = np.mean(np.array(recalls)) - queries_data = {} - if collect_data: - model_type = "deep_memory" if use_model else "vector_search" + top_k_retrieved = [ + sample.id.numpy() for sample in view_top_k + ] # TODO: optimize this - queries_data = { - f"{model_type}_top_10": top_k_list, - f"{model_type}_recall": recalls, - } - return avg_recall, queries_data + # Compute the recall: the fraction of relevant items found in the top k + num_relevant_in_top_k = len( + set(correct_labels).intersection(set(top_k_retrieved)) + ) + if len(correct_labels) == 0: + continue + recall = num_relevant_in_top_k / len(correct_labels) + if collect_data: + top_k_list.append(top_k_retrieved) + recalls[k].append(recall) -def get_view_top_k( + # Average the recalls for each query + avg_recalls = { + f"{recall}": np.mean(np.array(recall_list)) + for recall, recall_list in recalls.items() + } + model_type = "deep_memory" if use_model else "vector_search" + queries_data = { + f"{model_type}_top_10": top_k_list, + f"{model_type}_recall": recalls[10], + } + return avg_recalls, queries_data + + +def get_view( metric: str, query_emb: Union[List[float], np.ndarray], - top_k: int, indra_dataset: Any, return_tensors: List[str] = ["text", "metadata", "id"], tql_filter: str = "", @@ -520,7 +550,7 @@ def get_view_top_k( tql_filter_str = tql_filter if tql_filter == "" else " where " + tql_filter query_emb_str = ",".join([f"{q}" for q in query_emb]) return_tensors_str = ", ".join(return_tensors) - tql = f"SELECT * FROM (SELECT {return_tensors_str}, ROW_NUMBER() as indices, {metric}(embedding, ARRAY[{query_emb_str}]) as score {tql_filter_str} order by {metric}(embedding, ARRAY[{query_emb_str}]) desc limit {top_k})" + tql = f"SELECT * FROM (SELECT {return_tensors_str}, ROW_NUMBER() as indices, {metric}(embedding, ARRAY[{query_emb_str}]) as score {tql_filter_str} order by {metric}(embedding, ARRAY[{query_emb_str}]) desc limit 100)" indra_view = indra_dataset.query(tql) return indra_view diff --git a/deeplake/core/vectorstore/deeplake_vectorstore.py b/deeplake/core/vectorstore/deeplake_vectorstore.py index bc2b743b2d..49fcb2b2d6 100644 --- a/deeplake/core/vectorstore/deeplake_vectorstore.py +++ b/deeplake/core/vectorstore/deeplake_vectorstore.py @@ -41,7 +41,7 @@ def __init__( self, path: Union[str, pathlib.Path], tensor_params: List[Dict[str, object]] = DEFAULT_VECTORSTORE_TENSORS, - embedding_function: Optional[Callable] = None, + embedding_function: Optional[Any] = None, read_only: Optional[bool] = None, ingestion_batch_size: int = 1000, index_params: Optional[Dict[str, Union[int, str]]] = None, @@ -87,7 +87,7 @@ def __init__( - a local file system path of the form ``./path/to/dataset`` or ``~/path/to/dataset`` or ``path/to/dataset``. - a memory path of the form ``mem://path/to/dataset`` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist. tensor_params (List[Dict[str, dict]], optional): List of dictionaries that contains information about tensors that user wants to create. See ``create_tensor`` in Deep Lake API docs for more information. Defaults to ``DEFAULT_VECTORSTORE_TENSORS``. - embedding_function (Optional[callable], optional): Function that converts the embeddable data into embeddings. Input to `embedding_function` is a list of data and output is a list of embeddings. Defaults to None. + embedding_function (Optional[Any], optional): Function or class that converts the embeddable data into embeddings. Input to `embedding_function` is a list of data and output is a list of embeddings. Defaults to None. read_only (bool, optional): Opens dataset in read-only mode if True. Defaults to False. num_workers (int): Number of workers to use for parallel ingestion. ingestion_batch_size (int): Batch size to use for parallel ingestion. @@ -160,6 +160,7 @@ def __init__( self.index_params = utils.parse_index_params(index_params) self.num_workers = num_workers self.creds = creds or {} + self.embedding_function = utils.create_embedding_function(embedding_function) self.dataset = dataset_utils.create_or_load_dataset( tensor_params, @@ -176,7 +177,6 @@ def __init__( branch, **kwargs, ) - self.embedding_function = embedding_function self._exec_option = exec_option self.verbose = verbose self.tensor_params = tensor_params @@ -216,8 +216,8 @@ def add( rate_limiter: Dict = { "enabled": False, "bytes_per_minute": MAX_BYTES_PER_MINUTE, + "batch_byte_size": TARGET_BYTE_SIZE, }, - batch_byte_size: int = TARGET_BYTE_SIZE, **tensors, ) -> Optional[List[str]]: """Adding elements to deeplake vector store. @@ -280,8 +280,7 @@ def add( embedding_data (Optional[List]): Data to be converted into embeddings using the provided ``embedding_function``. Defaults to None. embedding_tensor (Optional[str]): Tensor where results from the embedding function will be stored. If None, the embedding tensor is automatically inferred (when possible). Defaults to None. return_ids (bool): Whether to return added ids as an ouput of the method. Defaults to False. - rate_limiter (Dict): Rate limiter configuration. Defaults to ``{"enabled": False, "bytes_per_minute": MAX_BYTES_PER_MINUTE}``. - batch_byte_size (int): Batch size to use for parallel ingestion. Defaults to ``TARGET_BYTE_SIZE``. + rate_limiter (Dict): Rate limiter configuration. Defaults to ``{"enabled": False, "bytes_per_minute": MAX_BYTES_PER_MINUTE, "batch_byte_size": TARGET_BYTE_SIZE}``. **tensors: Keyword arguments where the key is the tensor name, and the value is a list of samples that should be uploaded to that tensor. Returns: @@ -301,14 +300,16 @@ def add( token=self.token, username=self.username, ) - ( embedding_function, embedding_data, embedding_tensor, tensors, ) = utils.parse_tensors_kwargs( - tensors, embedding_function, embedding_data, embedding_tensor + tensors, + embedding_function, + embedding_data, + embedding_tensor, ) ( @@ -338,7 +339,6 @@ def add( embedding_function=embedding_function, embedding_data=embedding_data, embedding_tensor=embedding_tensor, - batch_byte_size=batch_byte_size, rate_limiter=rate_limiter, ) @@ -483,7 +483,7 @@ def search( return_tensors = utils.parse_return_tensors( self.dataset, return_tensors, embedding_tensor, return_view ) - + embedding_function = utils.create_embedding_function(embedding_function) query_emb: Optional[Union[List[float], np.ndarray[Any, Any]]] = None if query is None: query_emb = dataset_utils.get_embedding( diff --git a/deeplake/core/vectorstore/embedder.py b/deeplake/core/vectorstore/embedder.py new file mode 100644 index 0000000000..bda83837b8 --- /dev/null +++ b/deeplake/core/vectorstore/embedder.py @@ -0,0 +1,126 @@ +import functools +import time +import types +from typing import Any, Optional, List, Dict, Callable + +from deeplake.constants import TARGET_BYTE_SIZE, MAX_BYTES_PER_MINUTE + + +class RateLimitedDataIterator: + def __init__(self, data, func, rate_limiter): + self.data = chunk_by_bytes(data, rate_limiter["batch_byte_size"]) + self.data_iter = iter(self.data) + self.index = 0 + self.bytes_per_minute = rate_limiter["bytes_per_minute"] + self.target_byte_size = rate_limiter["batch_byte_size"] + self.func = func + + def __iter__(self): + return self + + def __next__(self): + if self.index >= len(self.data): + raise StopIteration + batch = next(self.data_iter) + self.index += 1 + # Calculate the number of batches you can send each minute + batches_per_minute = self.bytes_per_minute / self.target_byte_size + + # Calculate sleep time in seconds between batches + sleep_time = 60 / batches_per_minute + + start = time.time() + batch = self.func(batch) + end = time.time() + + # we need to take into account the time spent on openai call + diff = sleep_time - (end - start) + if diff > 0: + time.sleep(diff) + return batch + + def __len__(self): + return len(self.data) + + +class DeepLakeEmbedder: + def __init__( + self, + embedding_function: Any, + ): + self.embedding_function = embedding_function + + def _get_embedding_func(self, default_func): + valid_function_types = ( + types.MethodType, + types.FunctionType, + types.LambdaType, + functools.partial, + ) + if isinstance(self.embedding_function, valid_function_types): + return self.embedding_function + return getattr(self.embedding_function, default_func) + + def embed_documents( + self, + documents: List[str], + rate_limiter: Dict = { + "enabled": False, + "bytes_per_minute": MAX_BYTES_PER_MINUTE, + "batch_byte_size": TARGET_BYTE_SIZE, + }, + ): + embedding_func = self._get_embedding_func("embed_documents") + + if rate_limiter["enabled"]: + return self._apply_rate_limiter(documents, embedding_func, rate_limiter) + return embedding_func(documents) + + def embed_query(self, query: str): + return self._get_embedding_func("embed_query")(query) + + @staticmethod + def _apply_rate_limiter(documents, embedding_function, rate_limiter): + data_iterator = RateLimitedDataIterator( + documents, + embedding_function, + rate_limiter, + ) + output = [] + for data in data_iterator: + output.extend(data) + return output + + +def chunk_by_bytes(data, target_byte_size=TARGET_BYTE_SIZE): + """ + Splits a list of strings into chunks where each chunk has approximately the given target byte size. + + Args: + - strings (list of str): List of strings to be chunked. + - target_byte_size (int): The target byte size for each chunk. + + Returns: + - list of lists containing the chunked strings. + """ + # Calculate byte sizes for all strings + sizes = [len(s.encode("utf-8")) for s in data] + + chunks = [] + current_chunk = [] + current_chunk_size = 0 + index = 0 + + while index < len(data): + if current_chunk_size + sizes[index] > target_byte_size: + chunks.append(current_chunk) + current_chunk = [] + current_chunk_size = 0 + current_chunk.append(data[index]) + current_chunk_size += sizes[index] + index += 1 + + # Add the last chunk if it's not empty + if current_chunk: + chunks.append(current_chunk) + return chunks diff --git a/deeplake/core/vectorstore/test_deeplake_vectorstore.py b/deeplake/core/vectorstore/test_deeplake_vectorstore.py index 164a8796a2..b4ef5c7383 100644 --- a/deeplake/core/vectorstore/test_deeplake_vectorstore.py +++ b/deeplake/core/vectorstore/test_deeplake_vectorstore.py @@ -13,6 +13,7 @@ VectorStore, ) from deeplake.core.vectorstore.deepmemory_vectorstore import DeepMemoryVectorStore +from deeplake.core.vectorstore.embedder import DeepLakeEmbedder from deeplake.core.vectorstore.vectorstore_factory import vectorstore_factory from deeplake.core.vectorstore import utils from deeplake.tests.common import requires_libdeeplake @@ -118,7 +119,7 @@ def test_id_backward_compatibility(local_path): metadata=metadata, ) - assert len(vectorstore) == 20 + assert len(vectorstore) == 2 * num_of_items def test_custom_tensors(local_path): @@ -249,7 +250,7 @@ def test_search_basic(local_path, hub_cloud_dev_token): vector_store.add(embedding=embeddings, text=texts, metadata=metadatas) - with pytest.raises(ValueError): + with pytest.raises(IncorrectEmbeddingShapeError): vector_store.add( embedding_function=embedding_fn2, embedding_data=texts, @@ -1342,6 +1343,8 @@ def test_parse_add_arguments(local_path): overwrite=True, embedding_function=embedding_fn, ) + embedding_fn_dp = DeepLakeEmbedder(embedding_function=embedding_fn) + embedding_fn2_dp = DeepLakeEmbedder(embedding_function=embedding_fn2) with pytest.raises(ValueError): # Throw error because embedding_function requires embed_data_from @@ -1399,7 +1402,7 @@ def test_parse_add_arguments(local_path): tensors, ) = utils.parse_add_arguments( dataset=deeplake_vector_store.dataset, - initial_embedding_function=embedding_fn, + initial_embedding_function=embedding_fn_dp, text=texts, id=ids, embedding=embeddings, @@ -1419,7 +1422,7 @@ def test_parse_add_arguments(local_path): # initial embedding function specified and embeding_tensor is not specified utils.parse_add_arguments( dataset=deeplake_vector_store.dataset, - initial_embedding_function=embedding_fn, + initial_embedding_function=embedding_fn_dp, embedding_data=texts, text=texts, id=ids, @@ -1431,8 +1434,8 @@ def test_parse_add_arguments(local_path): # Throw error because embedding_function and embedding are specified utils.parse_add_arguments( dataset=deeplake_vector_store.dataset, - initial_embedding_function=embedding_fn, - embedding_function=embedding_fn, + initial_embedding_function=embedding_fn_dp, + embedding_function=embedding_fn_dp, embedding_data=texts, embedding_tensor="embedding", text=texts, @@ -1445,7 +1448,7 @@ def test_parse_add_arguments(local_path): # initial_embedding_function is specified and embeding_tensor, embed_data_from and embedding is specified. utils.parse_add_arguments( dataset=deeplake_vector_store.dataset, - initial_embedding_function=embedding_fn, + initial_embedding_function=embedding_fn_dp, embedding_tensor="embedding", embedding_data=texts, text=texts, @@ -1469,8 +1472,8 @@ def test_parse_add_arguments(local_path): with pytest.raises(ValueError): utils.parse_add_arguments( dataset=deeplake_vector_store.dataset, - embedding_function=embedding_fn, - initial_embedding_function=embedding_fn, + embedding_function=embedding_fn_dp, + initial_embedding_function=embedding_fn_dp, embedding_data=texts, embedding_tensor="embedding", text=texts, @@ -1486,14 +1489,13 @@ def test_parse_add_arguments(local_path): tensors, ) = utils.parse_add_arguments( dataset=deeplake_vector_store.dataset, - embedding_function=embedding_fn2, + embedding_function=embedding_fn2_dp, embedding_data=texts, embedding_tensor="embedding", text=texts, id=ids, metadata=metadatas, ) - assert embedding_function[0] is embedding_fn2 assert embedding_tensors == ["embedding"] assert tensors == { "id": ids, @@ -1508,13 +1510,12 @@ def test_parse_add_arguments(local_path): tensors, ) = utils.parse_add_arguments( dataset=deeplake_vector_store.dataset, - embedding_function=embedding_fn2, + embedding_function=embedding_fn2_dp, embedding_data="text", embedding_tensor="embedding", text=texts, metadata=metadatas, ) - assert embedding_function[0] is embedding_fn2 assert embedding_tensors == ["embedding"] assert len(tensors) == 2 @@ -1543,7 +1544,7 @@ def test_parse_add_arguments(local_path): with pytest.raises(ValueError): utils.parse_add_arguments( dataset=deeplake_vector_store.dataset, - embedding_function=embedding_fn2, + embedding_function=embedding_fn2_dp, embedding_data="text", text=texts, metadata=metadatas, @@ -1566,7 +1567,7 @@ def test_parse_add_arguments(local_path): with pytest.raises(ValueError): utils.parse_add_arguments( dataset=deeplake_vector_store.dataset, - embedding_function=embedding_fn2, + embedding_function=embedding_fn2_dp, embedding_data=texts, text=texts, ) @@ -1595,12 +1596,11 @@ def test_parse_add_arguments(local_path): tensors, ) = utils.parse_add_arguments( dataset=deeplake_vector_store.dataset, - embedding_function=embedding_fn2, + embedding_function=embedding_fn2_dp, embedding_data=texts, text=texts, ) - assert embedding_function[0] is embedding_fn2 assert embedding_tensors == ["embedding_1"] assert len(tensors) == 1 @@ -1617,14 +1617,13 @@ def test_parse_add_arguments(local_path): tensors, ) = utils.parse_add_arguments( dataset=deeplake_vector_store.dataset, - initial_embedding_function=embedding_fn, + initial_embedding_function=embedding_fn_dp, text=texts, id=ids, metadata=metadatas, embedding_data=texts, embedding_tensor="embedding", ) - assert embedding_function[0] is embedding_fn assert embedding_tensor == ["embedding"] assert embedding_data == [texts] assert tensors == { @@ -1705,7 +1704,7 @@ def test_parse_add_arguments(local_path): id=ids, metadata=metadatas, embedding_data=texts, - embedding_function=embedding_fn3, + embedding_function=DeepLakeEmbedder(embedding_function=embedding_fn3), embedding_2=embeddings, ) @@ -1720,7 +1719,8 @@ def test_parse_tensors_kwargs(): tensors, None, None, None ) - assert func == [embedding_fn, embedding_fn2] + assert isinstance(func[0], DeepLakeEmbedder) + assert isinstance(func[1], DeepLakeEmbedder) assert data == [texts, texts] assert emb_tensor == ["embedding_1", "embedding_2"] assert new_tensors == {"custom_text": texts} @@ -1799,7 +1799,7 @@ def test_multiple_embeddings(local_path): ) # test with initial embedding function - vector_store.embedding_function = embedding_fn + vector_store.embedding_function = DeepLakeEmbedder(embedding_function=embedding_fn) vector_store.add( text=texts, embedding_data=[texts, texts], diff --git a/deeplake/core/vectorstore/test_embedder.py b/deeplake/core/vectorstore/test_embedder.py new file mode 100644 index 0000000000..bd9dc74acb --- /dev/null +++ b/deeplake/core/vectorstore/test_embedder.py @@ -0,0 +1,90 @@ +import json +import pytest +import sys +from time import time + +import numpy as np + +from deeplake.constants import MAX_BYTES_PER_MINUTE, TARGET_BYTE_SIZE +from deeplake.core.vectorstore.embedder import DeepLakeEmbedder, chunk_by_bytes + + +EMBEDDING_DIM = 15 + + +def test_chunk_by_bytes(): + data = ["a" * 10000] * 10 # 10 chunks of 10000 bytes + + batched_data = chunk_by_bytes(data, target_byte_size=10) + serialized_data = json.dumps(batched_data) + byte_size = len(serialized_data.encode("utf-8")) + list_wieght = 100 + assert ( + byte_size <= 100000 + list_wieght + ), "Chunking by bytes did not work as expected!" + + +@pytest.mark.skipif( + sys.platform != "linux", + reason="Sometimes MacOS fails this test due to speed issues", +) +def test_embedder_with_func(): + def embed_documents(documents): + return [np.random.rand(EMBEDDING_DIM) for doc in documents] + + embedder = DeepLakeEmbedder(embedding_function=embed_documents) + documents = ["a" * 10000] * 10 # 10 chunks of 10000 bytes + embeddings = embedder.embed_documents(documents) + assert len(embeddings) == 10, "Embedding function did not work as expected!" + + embedder = DeepLakeEmbedder(embedding_function=embed_documents) + documents = ["a" * 10000] * 10000 # 10 chunks of 10000 bytes + embeddings = embedder.embed_documents(documents) + assert len(embeddings) == 10000, "Embedding function did not work as expected!" + + documents = ["a" * 10000] * 10 + start_time = time() + embeddings = embedder.embed_documents( + documents, + rate_limiter={ + "enabled": True, + "bytes_per_minute": MAX_BYTES_PER_MINUTE, + "batch_byte_size": TARGET_BYTE_SIZE, + }, + ) + end_time = time() + elapsed_minutes = end_time - start_time + expected_time = 60 * ( + len(documents) * 10000 / MAX_BYTES_PER_MINUTE + ) # each data chunk has 10 bytes + tolerance = 0.1 + + assert len(embeddings) == 10, "Embedding function did not work as expected!" + assert ( + abs(elapsed_minutes - expected_time) <= tolerance + ), "Rate limiting did not work as expected!" + + +def test_embedder_with_class(): + class Embedder: + def embed_documents(self, documents): + return [np.random.rand(EMBEDDING_DIM) for doc in documents] + + def embed_query(self, query): + return np.random.rand(EMBEDDING_DIM) + + embedder_obj = Embedder() + embedder = DeepLakeEmbedder(embedding_function=embedder_obj) + documents = ["a" * 10000] * 10 # 10 chunks of 10000 bytes + embeddings = embedder.embed_documents(documents) + assert len(embeddings) == 10, "Embedding function did not work as expected!" + + embedder = DeepLakeEmbedder(embedding_function=embedder_obj) + documents = ["a" * 10000] * 10000 # 10 chunks of 10000 bytes + embeddings = embedder.embed_documents(documents) + assert len(embeddings) == 10000, "Embedding function did not work as expected!" + + embeddings = embedder.embed_query(documents[0]) + assert ( + len(embeddings) == EMBEDDING_DIM + ), "Embedding function did not work as expected!" diff --git a/deeplake/core/vectorstore/vector_search/dataset/__init__.py b/deeplake/core/vectorstore/vector_search/dataset/__init__.py index b451e6e26e..73cbf86705 100644 --- a/deeplake/core/vectorstore/vector_search/dataset/__init__.py +++ b/deeplake/core/vectorstore/vector_search/dataset/__init__.py @@ -11,5 +11,4 @@ convert_id_to_row_id, search_row_ids, extend, - chunk_by_bytes, ) diff --git a/deeplake/core/vectorstore/vector_search/dataset/dataset.py b/deeplake/core/vectorstore/vector_search/dataset/dataset.py index a0fc044f70..dcbad7dc88 100644 --- a/deeplake/core/vectorstore/vector_search/dataset/dataset.py +++ b/deeplake/core/vectorstore/vector_search/dataset/dataset.py @@ -17,6 +17,7 @@ VECTORSTORE_EXTEND_MAX_SIZE_BY_HTYPE, MAX_BYTES_PER_MINUTE, TARGET_BYTE_SIZE, + VECTORSTORE_EXTEND_BATCH_SIZE, ) from deeplake.util.exceptions import IncorrectEmbeddingShapeError @@ -268,7 +269,7 @@ def get_embedding(embedding, embedding_data, embedding_function=None): if len(embedding_data) > 1: raise NotImplementedError("Searching batched queries is not supported yet.") - embedding = embedding_function(embedding_data) # type: ignore + embedding = embedding_function.embed_query(embedding_data) # type: ignore if embedding is not None and ( isinstance(embedding, list) or embedding.dtype != "float32" @@ -406,129 +407,93 @@ def update_embedding_info(logger, dataset, embedding_function): set_embedding_info(dataset[embeddings_tensors[0]], embedding_function) +def _compute_batched_embeddings( + embedding_function, + embedding_data, + embedding_tensor, + start_idx, + end_idx, + rate_limiter, +): + """ + Computes embeddings for a given slice of data. + """ + batched_processed_tensors = {} + + for func, data, tensor in zip(embedding_function, embedding_data, embedding_tensor): + data_slice = data[start_idx:end_idx] + embedded_data = func(data_slice, rate_limiter=rate_limiter) + + try: + return_embedded_data = np.vstack(embedded_data).astype(dtype=np.float32) + except ValueError: + raise IncorrectEmbeddingShapeError() + + if len(return_embedded_data) == 0: + raise ValueError("embedding function returned empty list") + + batched_processed_tensors[tensor] = return_embedded_data + + return batched_processed_tensors + + +def _slice_non_embedding_tensors( + processed_tensors, embedding_tensor, start_idx, end_idx +): + """ + Slices tensors that are not embeddings for a given range. + """ + batched_processed_tensors = {} + + for tensor_name, tensor_data in processed_tensors.items(): + if tensor_name not in embedding_tensor: + batched_processed_tensors[tensor_name] = tensor_data[start_idx:end_idx] + + return batched_processed_tensors + + def extend( embedding_function: List[Callable], embedding_data: List[Any], embedding_tensor: Union[str, List[str]], processed_tensors: Dict[str, Union[List[Any], np.ndarray]], dataset: deeplake.core.dataset.Dataset, - batch_byte_size: int, rate_limiter: Dict, index_regeneration: bool = False, + _extend_batch_size: int = VECTORSTORE_EXTEND_BATCH_SIZE, ): """ Function to extend the dataset with new data. - - Args: - embedding_function (List[Callable]): List of embedding functions to be used to create embedding data. - embedding_data (List[Any]): List of data to be embedded. - embedding_tensor (Union[str, List[str]]): Name of the tensor(s) to store the embedding data. - processed_tensors (Dict[str, List[Any]]): Dictionary of tensors to be added to the dataset. - dataset (deeplake.core.dataset.Dataset): Dataset to be extended. - batch_byte_size (int): Batch size to use for parallel ingestion. - rate_limiter (Dict): Rate limiter configuration. - index_regeneration (bool): Denotes if index will be regenerated or not. - - Raises: - IncorrectEmbeddingShapeError: If embeding function shapes is incorrect. - ValueError: If embedding function returned empty list - - """ + if embedding_data and not isinstance(embedding_data[0], list): + embedding_data = [embedding_data] + if embedding_function: - for func, data, tensor in zip( - embedding_function, embedding_data, embedding_tensor + for idx in tqdm( + range(0, len(embedding_data[0]), _extend_batch_size), "creating embeddings" ): - data_iterator = data_iteratot_factory( - data, func, batch_byte_size, rate_limiter + batch_start, batch_end = idx, idx + _extend_batch_size + + batched_embeddings = _compute_batched_embeddings( + embedding_function, + embedding_data, + embedding_tensor, + batch_start, + batch_end, + rate_limiter, ) - embedded_data = [] - - for data in tqdm( - data_iterator, total=len(data_iterator), desc="creating embeddings" - ): - embedded_data.append(data) - - try: - return_embedded_data = np.vstack(embedded_data).astype(dtype=np.float32) - except ValueError: - raise IncorrectEmbeddingShapeError() - - if len(return_embedded_data) == 0: - raise ValueError("embedding function returned empty list") - processed_tensors[tensor] = return_embedded_data - - dataset.extend( - processed_tensors, progressbar=True, index_regeneration=index_regeneration - ) + batched_tensors = _slice_non_embedding_tensors( + processed_tensors, embedding_tensor, batch_start, batch_end + ) + batched_processed_tensors = {**batched_embeddings, **batched_tensors} -class DataIterator: - def __init__(self, data, func, batch_byte_size): - self.data = chunk_by_bytes(data, batch_byte_size) - self.data_itr = iter(self.data) - self.index = 0 - self.func = func - - def __iter__(self): - return self - - def __next__(self): - if self.index >= len(self.data): - raise StopIteration - batch = next(self.data_itr) - batch = self.func(batch) - self.index += 1 - return batch - - def __len__(self): - return len(self.data) - - -class RateLimitedDataIterator: - def __init__(self, data, func, batch_byte_size, rate_limiter): - self.data = chunk_by_bytes(data, batch_byte_size) - self.data_iter = iter(self.data) - self.index = 0 - self.rate_limiter = rate_limiter - self.bytes_per_minute = rate_limiter["bytes_per_minute"] - self.target_byte_size = batch_byte_size - self.func = func - - def __iter__(self): - return self - - def __next__(self): - if self.index >= len(self.data): - raise StopIteration - batch = next(self.data_iter) - self.index += 1 - # Calculate the number of batches you can send each minute - batches_per_minute = self.bytes_per_minute / self.target_byte_size - - # Calculate sleep time in seconds between batches - sleep_time = 60 / batches_per_minute - - start = time.time() - batch = self.func(batch) - end = time.time() - - # we need to take into account the time spent on openai call - diff = sleep_time - (end - start) - if diff > 0: - time.sleep(diff) - return batch - - def __len__(self): - return len(self.data) - - -def data_iteratot_factory(data, func, batch_byte_size, rate_limiter): - if rate_limiter["enabled"]: - return RateLimitedDataIterator(data, func, batch_byte_size, rate_limiter) + dataset.extend( + batched_processed_tensors, index_regeneration=index_regeneration + ) else: - return DataIterator(data, func, batch_byte_size) + dataset.extend(processed_tensors, index_regeneration=index_regeneration) def extend_or_ingest_dataset( @@ -537,7 +502,6 @@ def extend_or_ingest_dataset( embedding_function, embedding_tensor, embedding_data, - batch_byte_size, rate_limiter, index_regeneration=False, ): @@ -548,47 +512,11 @@ def extend_or_ingest_dataset( embedding_tensor, processed_tensors, dataset, - batch_byte_size, rate_limiter, index_regeneration=index_regeneration, ) -def chunk_by_bytes(data, target_byte_size=TARGET_BYTE_SIZE): - """ - Splits a list of strings into chunks where each chunk has approximately the given target byte size. - - Args: - - strings (list of str): List of strings to be chunked. - - target_byte_size (int): The target byte size for each chunk. - - Returns: - - list of lists containing the chunked strings. - """ - # Calculate byte sizes for all strings - sizes = [len(s.encode("utf-8")) for s in data] - - chunks = [] - current_chunk = [] - current_chunk_size = 0 - index = 0 - - while index < len(data): - if current_chunk_size + sizes[index] > target_byte_size: - chunks.append(current_chunk) - current_chunk = [] - current_chunk_size = 0 - current_chunk.append(data[index]) - current_chunk_size += sizes[index] - index += 1 - - # Add the last chunk if it's not empty - if current_chunk: - chunks.append(current_chunk) - - return chunks - - def convert_id_to_row_id(ids, dataset, search_fn, query, exec_option, filter): if ids is None: delete_view = search_fn( diff --git a/deeplake/core/vectorstore/vector_search/dataset/test_dataset.py b/deeplake/core/vectorstore/vector_search/dataset/test_dataset.py index f1782c811b..da6d753c83 100644 --- a/deeplake/core/vectorstore/vector_search/dataset/test_dataset.py +++ b/deeplake/core/vectorstore/vector_search/dataset/test_dataset.py @@ -6,8 +6,10 @@ import numpy as np import deeplake +from deeplake.core.vectorstore import utils from deeplake.core.vectorstore.vector_search import dataset as dataset_utils from deeplake.core.vectorstore import DeepLakeVectorStore +from deeplake.core.vectorstore.embedder import DeepLakeEmbedder from deeplake.constants import ( DEFAULT_VECTORSTORE_DEEPLAKE_PATH, DEFAULT_VECTORSTORE_TENSORS, @@ -248,10 +250,10 @@ def test_embeding_data(): dataset_utils.get_embedding( embedding=None, query=query, embedding_function=None ) - + embedding_function = utils.create_embedding_function(Embedding.embed_query) embedding = dataset_utils.get_embedding( embedding=None, - embedding_function=Embedding.embed_query, + embedding_function=embedding_function, embedding_data=[query], ) assert embedding.dtype == np.float32 @@ -278,7 +280,7 @@ def test_embeding_data(): embedding_vector = np.zeros((1, 1538)) embedding = dataset_utils.get_embedding( embedding=embedding_vector, - embedding_function=Embedding.embed_query, + embedding_function=embedding_function, embedding_data=[query], ) assert embedding.dtype == np.float32 @@ -389,8 +391,8 @@ def mock_embedding_function(text): dataset.create_tensor("embedding", htype="embedding") dataset.create_tensor("text", htype="text") - embedding_function = mock_embedding_function - embedding_function.__module__ = "langchain.embeddings.openai" + embedding_function = DeepLakeEmbedder(mock_embedding_function).embed_documents + # embedding_function.__module__ = "langchain.embeddings.openai" data = ["a" * 10000] * 100 # 100 chunks of 10000 bytes @@ -402,13 +404,16 @@ def mock_embedding_function(text): start_time = time.time() dataset_utils.extend( - embedding_function=[mock_embedding_function], + embedding_function=[embedding_function], embedding_data=[data], embedding_tensor=["embedding"], processed_tensors=processed_tensors, dataset=dataset, - batch_byte_size=TARGET_BYTE_SIZE, - rate_limiter={"enabled": True, "bytes_per_minute": MAX_BYTES_PER_MINUTE}, + rate_limiter={ + "enabled": True, + "bytes_per_minute": MAX_BYTES_PER_MINUTE, + "batch_byte_size": TARGET_BYTE_SIZE, + }, ) end_time = time.time() @@ -423,27 +428,3 @@ def mock_embedding_function(text): assert ( abs(elapsed_minutes - expected_time) <= tolerance ), "Rate limiting did not work as expected!" - - -def test_chunk_by_bytest(): - data = ["a" * 10000] * 10 # 10 chunks of 10000 bytes - - batched_data = dataset_utils.chunk_by_bytes(data) - serialized_data = json.dumps(batched_data) - byte_size = len(serialized_data.encode("utf-8")) - list_wieght = 100 - assert ( - byte_size <= 100000 + list_wieght - ), "Chunking by bytes did not work as expected!" - - -def test_chunk_by_bytes(): - data = ["a" * 10000] * 10 # 10 chunks of 10000 bytes - - batched_data = dataset_utils.chunk_by_bytes(data, target_byte_size=10) - serialized_data = json.dumps(batched_data) - byte_size = len(serialized_data.encode("utf-8")) - list_wieght = 100 - assert ( - byte_size <= 100000 + list_wieght - ), "Chunking by bytes did not work as expected!" diff --git a/deeplake/core/vectorstore/vector_search/utils.py b/deeplake/core/vectorstore/vector_search/utils.py index 163f9e27d2..17c5bfc87d 100644 --- a/deeplake/core/vectorstore/vector_search/utils.py +++ b/deeplake/core/vectorstore/vector_search/utils.py @@ -1,11 +1,15 @@ +import functools +import time +import types from abc import ABC, abstractmethod -from deeplake.constants import MB, DEFAULT_VECTORSTORE_INDEX_PARAMS +from deeplake.constants import MB, DEFAULT_VECTORSTORE_INDEX_PARAMS, TARGET_BYTE_SIZE from deeplake.enterprise.util import raise_indra_installation_error from deeplake.util.exceptions import TensorDoesNotExistError from deeplake.util.warnings import always_warn from deeplake.client.utils import read_token from deeplake.core.dataset import DeepLakeCloudDataset, Dataset +from deeplake.core.vectorstore.embedder import DeepLakeEmbedder from deeplake.client.client import DeepLakeBackendClient from deeplake.util.path import get_path_type @@ -14,7 +18,7 @@ import jwt import random import string -from typing import Optional, List, Dict, Callable +from typing import Optional, List, Dict EXEC_OPTION_TO_RUNTIME: Dict[str, Optional[Dict]] = { "compute_engine": None, @@ -273,7 +277,12 @@ def get_embedding_tensor(embedding_tensor, embedding_source_tensor, dataset): return embedding_tensor -def parse_tensors_kwargs(tensors, embedding_function, embedding_data, embedding_tensor): +def parse_tensors_kwargs( + tensors, + embedding_function, + embedding_data, + embedding_tensor, +): tensors = tensors.copy() # embedding_tensor = (embedding_function, embedding_data) syntax @@ -306,11 +315,18 @@ def parse_tensors_kwargs(tensors, embedding_function, embedding_data, embedding_ "Cannot specify embedding tensors in both `tensors` and `embedding_tensor`." ) else: + if isinstance(embedding_function, list): + embedding_function = [ + create_embedding_function(fn_i) for fn_i in embedding_function + ] + else: + embedding_function = create_embedding_function(embedding_function) return embedding_function, embedding_data, embedding_tensor, tensors # separate embedding functions, data and tensors for k, v in filtered.items(): - funcs.append(v[0]) + func = create_embedding_function(v[0]) + funcs.append(func) data.append(v[1]) tensors_.append(k) # remove embedding tensors (tuple format) from tensors @@ -319,44 +335,85 @@ def parse_tensors_kwargs(tensors, embedding_function, embedding_data, embedding_ return funcs, data, tensors_, tensors -def parse_update_arguments( - dataset, - embedding_function=None, - initial_embedding_function=None, - embedding_source_tensor=None, - embedding_tensor=None, -): +def _validate_embedding_functions(embedding_function, initial_embedding_function): if embedding_function is None and initial_embedding_function is None: raise ValueError( "`embedding_function` was not specified during initialization of vector store or the update call" ) - embedding_tensor = get_embedding_tensor( - embedding_tensor, embedding_source_tensor, dataset - ) - if isinstance(embedding_tensor, list) and len(embedding_tensor) == 1: - embedding_tensor = embedding_tensor[0] +def _get_single_value_from_list(data): + if isinstance(data, list) and len(data) == 1: + return data[0] + return data + + +def _validate_source_and_embedding_tensors(embedding_source_tensor, embedding_tensor): if isinstance(embedding_source_tensor, str) and isinstance(embedding_tensor, list): raise ValueError( - "Multiple `embedding_tensor` were specifed. " - "While single `embedding_source_tensor` was given. " + "Multiple `embedding_tensor` were specified while a single `embedding_source_tensor` was given." ) - elif ( + + if ( isinstance(embedding_source_tensor, list) and len(embedding_source_tensor) > 1 and isinstance(embedding_tensor, str) ): raise ValueError( - "Multiple `embedding_source_tensor` were specifed. " - "While single `embedding_tensor` was given. " + "Multiple `embedding_source_tensor` were specified while a single `embedding_tensor` was given." + ) + + +def _convert_to_embedder_list(embedding_function): + if isinstance(embedding_function, list): + return [DeepLakeEmbedder(embedding_function=fn) for fn in embedding_function] + + valid_function_types = ( + types.MethodType, + types.FunctionType, + types.LambdaType, + functools.partial, + ) + if isinstance(embedding_function, valid_function_types): + return DeepLakeEmbedder(embedding_function=embedding_function) + + if embedding_function is not None: + raise ValueError( + "Invalid `embedding_function` type. It should be either a function or a list of functions." ) + +def parse_update_arguments( + dataset, + embedding_function=None, + initial_embedding_function=None, + embedding_source_tensor=None, + embedding_tensor=None, +): + _validate_embedding_functions(embedding_function, initial_embedding_function) + + embedding_tensor = get_embedding_tensor( + embedding_tensor, embedding_source_tensor, dataset + ) + embedding_tensor = _get_single_value_from_list(embedding_tensor) + + _validate_source_and_embedding_tensors(embedding_source_tensor, embedding_tensor) + + embedding_function = _convert_to_embedder_list(embedding_function) final_embedding_function = embedding_function or initial_embedding_function - if isinstance(embedding_tensor, list) and callable(final_embedding_function): + if isinstance(embedding_tensor, list) and not isinstance( + final_embedding_function, list + ): final_embedding_function = [final_embedding_function] * len(embedding_tensor) + if isinstance(final_embedding_function, list): + final_embedding_function = [ + fn.embed_documents for fn in final_embedding_function + ] + else: + final_embedding_function = final_embedding_function.embed_documents + if isinstance(embedding_tensor, list) and isinstance(embedding_source_tensor, list): assert len(embedding_tensor) == len(embedding_source_tensor), ( "The length of the `embedding_tensor` does not match the length of " @@ -418,7 +475,12 @@ def parse_add_arguments( tensors, dataset, ) - return (embedding_function, embedding_data, embedding_tensor, tensors) + return ( + [fn.embed_documents for fn in embedding_function], + embedding_data, + embedding_tensor, + tensors, + ) if initial_embedding_function: if not embedding_data: @@ -435,7 +497,12 @@ def parse_add_arguments( tensors, dataset, ) - return (initial_embedding_function, embedding_data, embedding_tensor, tensors) + return ( + [fn.embed_documents for fn in initial_embedding_function], + embedding_data, + embedding_tensor, + tensors, + ) if embedding_tensor: raise ValueError( @@ -562,3 +629,11 @@ def is_embedding_tensor(tensor): def index_used(exec_option): """Check if the index is used for the exec_option""" return exec_option in ("tensor_db", "compute_engine") + + +def create_embedding_function(embedding_function): + if embedding_function: + return DeepLakeEmbedder( + embedding_function=embedding_function, + ) + return None