From 4475b300ed22d0f5eac564caa56b00d51f42355b Mon Sep 17 00:00:00 2001 From: Vits Date: Tue, 9 Jul 2024 22:43:52 +0200 Subject: [PATCH 1/6] Introducing sync setting and management to sync data between remote and local layers --- semantic_router/index/base.py | 16 ++++++ semantic_router/index/pinecone.py | 82 ++++++++++++++++++++++++++++++- semantic_router/layer.py | 1 + 3 files changed, 98 insertions(+), 1 deletion(-) diff --git a/semantic_router/index/base.py b/semantic_router/index/base.py index e6638fb1..6d0969fc 100644 --- a/semantic_router/index/base.py +++ b/semantic_router/index/base.py @@ -18,6 +18,7 @@ class BaseIndex(BaseModel): utterances: Optional[np.ndarray] = None dimensions: Union[int, None] = None type: str = "base" + sync: str = "merge-force-local" def add( self, embeddings: List[List[float]], routes: List[str], utterances: List[Any] @@ -73,6 +74,21 @@ def delete_index(self): This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") + + def _sync_index(self, local_routes: dict): + """ + Synchronize the local index with the remote index based on the specified mode. + Modes: + - "error": Raise an error if local and remote are not synchronized. + - "remote": Take remote as the source of truth and update local to align. + - "local": Take local as the source of truth and update remote to align. + - "merge-force-remote": Merge both local and remote taking only remote routes utterances when a route with same route name is present both locally and remotely. + - "merge-force-local": Merge both local and remote taking only local routes utterances when a route with same route name is present both locally and remotely. + - "merge": Merge both local and remote, merging also local and remote utterances when a route with same route name is present both locally and remotely. + + This method should be implemented by subclasses. + """ + raise NotImplementedError("This method should be implemented by subclasses.") class Config: arbitrary_types_allowed = True diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index 7d3828f4..48c186ab 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -65,6 +65,7 @@ def __init__( host: str = "", namespace: Optional[str] = "", base_url: Optional[str] = "https://api.pinecone.io", + sync: str = "merge-force-local", ): super().__init__() self.index_name = index_name @@ -77,6 +78,7 @@ def __init__( self.type = "pinecone" self.api_key = api_key or os.getenv("PINECONE_API_KEY") self.base_url = base_url + self.sync = sync if self.api_key is None: raise ValueError("Pinecone API key is required.") @@ -195,6 +197,57 @@ async def _init_async_index(self, force_create: bool = False): logger.warning("Index could not be initialized.") self.host = index_stats["host"] if index_stats else None + def _sync_index(self, local_routes: dict): + remote_routes = self.get_routes() + remote_dict = {route: set() for route, _ in remote_routes} + for route, utterance in remote_routes: + remote_dict[route].add(utterance) + + local_dict = {route: set() for route in local_routes['routes']} + for route, utterance in zip(local_routes['routes'], local_routes['utterances']): + local_dict[route].add(utterance) + + all_routes = set(remote_dict.keys()).union(local_dict.keys()) + + routes_to_add = [] + routes_to_delete = [] + + for route in all_routes: + local_utterances = local_dict.get(route, set()) + remote_utterances = remote_dict.get(route, set()) + + if self.sync == "error": + if local_utterances != remote_utterances: + raise ValueError(f"Synchronization error: Differences found in route '{route}'") + utterances_to_include = set() + elif self.sync == "remote": + utterances_to_include = set() + elif self.sync == "local": + utterances_to_include = local_utterances - remote_utterances + routes_to_delete.extend([(route, utterance) for utterance in remote_utterances if utterance not in local_utterances]) + elif self.sync == "merge-force-remote": + if route in local_dict and route not in remote_dict: + utterances_to_include = local_utterances + else: + utterances_to_include = set() + elif self.sync == "merge-force-local": + if route in local_dict: + utterances_to_include = local_utterances - remote_utterances + routes_to_delete.extend([(route, utterance) for utterance in remote_utterances if utterance not in local_utterances]) + else: + utterances_to_include = set() + elif self.sync == "merge": + utterances_to_include = local_utterances - remote_utterances + else: + raise ValueError("Invalid sync mode specified") + + for utterance in utterances_to_include: + indices = [i for i, x in enumerate(local_routes['utterances']) if x == utterance and local_routes['routes'][i] == route] + routes_to_add.extend([(local_routes['embeddings'][idx], route, utterance) for idx in indices]) + + return routes_to_add, routes_to_delete + + def _batch_upsert(self, batch: List[Dict]): """Helper method for upserting a single batch of records.""" if self.index is not None: @@ -208,15 +261,33 @@ def add( routes: List[str], utterances: List[str], batch_size: int = 100, + sync: bool = False, ): """Add vectors to Pinecone in batches.""" if self.index is None: self.dimensions = self.dimensions or len(embeddings[0]) self.index = self._init_index(force_create=True) + if sync: + local_routes = {"routes": routes, "utterances": utterances, "embeddings": embeddings} + data_to_upsert, data_to_delete = self._sync_index(local_routes=local_routes) + + routes_to_delete = {} + for route, utterance in data_to_delete: + routes_to_delete.setdefault(route, []).append(utterance) + + for route, utterances in routes_to_delete.items(): + remote_routes = self._get_routes_with_ids(route_name=route) + ids_to_delete = [r["id"] for r in remote_routes if (r["route"], r["utterance"]) in zip([route]*len(utterances), utterances)] + if ids_to_delete: + self.index.delete(ids=ids_to_delete) + + else: + data_to_upsert = zip(embeddings, routes, utterances) + vectors_to_upsert = [ PineconeRecord(values=vector, route=route, utterance=utterance).to_dict() - for vector, route, utterance in zip(embeddings, routes, utterances) + for vector, route, utterance in data_to_upsert ] for i in range(0, len(vectors_to_upsert), batch_size): @@ -227,6 +298,15 @@ def _get_route_ids(self, route_name: str): clean_route = clean_route_name(route_name) ids, _ = self._get_all(prefix=f"{clean_route}#") return ids + + def _get_routes_with_ids(self, route_name: str): + clean_route = clean_route_name(route_name) + ids, _ = self._get_all(prefix=f"{clean_route}#") + route_tuples = [] + for id in ids: + res_meta = self.index.fetch(ids=[id], namespace=self.namespace) + route_tuples.extend([{"id": id, "route": x["metadata"]["sr_route"], "utterance": x["metadata"]["sr_utterance"]} for x in res_meta["vectors"].values()]) + return route_tuples def _get_all(self, prefix: Optional[str] = None, include_metadata: bool = False): """ diff --git a/semantic_router/layer.py b/semantic_router/layer.py index c87c7935..3ac1596c 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -470,6 +470,7 @@ def _add_routes(self, routes: List[Route]): embeddings=embedded_utterances, routes=route_names, utterances=all_utterances, + sync=True, ) def _encode(self, text: str) -> Any: From 5dbff6c15c3a68b7336dd876b63ebc32ff0e9c4c Mon Sep 17 00:00:00 2001 From: Vits Date: Tue, 9 Jul 2024 23:04:52 +0200 Subject: [PATCH 2/6] Linting and formatting --- semantic_router/index/base.py | 10 ++-- semantic_router/index/local.py | 6 ++- semantic_router/index/pinecone.py | 82 ++++++++++++++++++++++++------- 3 files changed, 75 insertions(+), 23 deletions(-) diff --git a/semantic_router/index/base.py b/semantic_router/index/base.py index 6d0969fc..e53ca44f 100644 --- a/semantic_router/index/base.py +++ b/semantic_router/index/base.py @@ -21,7 +21,11 @@ class BaseIndex(BaseModel): sync: str = "merge-force-local" def add( - self, embeddings: List[List[float]], routes: List[str], utterances: List[Any] + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[Any], + sync: bool = False, ): """ Add embeddings to the index. @@ -74,7 +78,7 @@ def delete_index(self): This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") - + def _sync_index(self, local_routes: dict): """ Synchronize the local index with the remote index based on the specified mode. @@ -85,7 +89,7 @@ def _sync_index(self, local_routes: dict): - "merge-force-remote": Merge both local and remote taking only remote routes utterances when a route with same route name is present both locally and remotely. - "merge-force-local": Merge both local and remote taking only local routes utterances when a route with same route name is present both locally and remotely. - "merge": Merge both local and remote, merging also local and remote utterances when a route with same route name is present both locally and remotely. - + This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") diff --git a/semantic_router/index/local.py b/semantic_router/index/local.py index df9e02c1..b1108873 100644 --- a/semantic_router/index/local.py +++ b/semantic_router/index/local.py @@ -21,7 +21,11 @@ class Config: arbitrary_types_allowed = True def add( - self, embeddings: List[List[float]], routes: List[str], utterances: List[str] + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + sync: bool = False, ): embeds = np.array(embeddings) # type: ignore routes_arr = np.array(routes) diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index 48c186ab..dc86004a 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -199,12 +199,12 @@ async def _init_async_index(self, force_create: bool = False): def _sync_index(self, local_routes: dict): remote_routes = self.get_routes() - remote_dict = {route: set() for route, _ in remote_routes} + remote_dict: dict = {route: set() for route, _ in remote_routes} for route, utterance in remote_routes: remote_dict[route].add(utterance) - local_dict = {route: set() for route in local_routes['routes']} - for route, utterance in zip(local_routes['routes'], local_routes['utterances']): + local_dict: dict = {route: set() for route in local_routes["routes"]} + for route, utterance in zip(local_routes["routes"], local_routes["utterances"]): local_dict[route].add(utterance) all_routes = set(remote_dict.keys()).union(local_dict.keys()) @@ -218,13 +218,21 @@ def _sync_index(self, local_routes: dict): if self.sync == "error": if local_utterances != remote_utterances: - raise ValueError(f"Synchronization error: Differences found in route '{route}'") - utterances_to_include = set() + raise ValueError( + f"Synchronization error: Differences found in route '{route}'" + ) + utterances_to_include: set = set() elif self.sync == "remote": utterances_to_include = set() elif self.sync == "local": utterances_to_include = local_utterances - remote_utterances - routes_to_delete.extend([(route, utterance) for utterance in remote_utterances if utterance not in local_utterances]) + routes_to_delete.extend( + [ + (route, utterance) + for utterance in remote_utterances + if utterance not in local_utterances + ] + ) elif self.sync == "merge-force-remote": if route in local_dict and route not in remote_dict: utterances_to_include = local_utterances @@ -233,7 +241,13 @@ def _sync_index(self, local_routes: dict): elif self.sync == "merge-force-local": if route in local_dict: utterances_to_include = local_utterances - remote_utterances - routes_to_delete.extend([(route, utterance) for utterance in remote_utterances if utterance not in local_utterances]) + routes_to_delete.extend( + [ + (route, utterance) + for utterance in remote_utterances + if utterance not in local_utterances + ] + ) else: utterances_to_include = set() elif self.sync == "merge": @@ -242,12 +256,20 @@ def _sync_index(self, local_routes: dict): raise ValueError("Invalid sync mode specified") for utterance in utterances_to_include: - indices = [i for i, x in enumerate(local_routes['utterances']) if x == utterance and local_routes['routes'][i] == route] - routes_to_add.extend([(local_routes['embeddings'][idx], route, utterance) for idx in indices]) + indices = [ + i + for i, x in enumerate(local_routes["utterances"]) + if x == utterance and local_routes["routes"][i] == route + ] + routes_to_add.extend( + [ + (local_routes["embeddings"][idx], route, utterance) + for idx in indices + ] + ) return routes_to_add, routes_to_delete - def _batch_upsert(self, batch: List[Dict]): """Helper method for upserting a single batch of records.""" if self.index is not None: @@ -260,8 +282,8 @@ def add( embeddings: List[List[float]], routes: List[str], utterances: List[str], - batch_size: int = 100, sync: bool = False, + batch_size: int = 100, ): """Add vectors to Pinecone in batches.""" if self.index is None: @@ -269,19 +291,28 @@ def add( self.index = self._init_index(force_create=True) if sync: - local_routes = {"routes": routes, "utterances": utterances, "embeddings": embeddings} + local_routes = { + "routes": routes, + "utterances": utterances, + "embeddings": embeddings, + } data_to_upsert, data_to_delete = self._sync_index(local_routes=local_routes) - routes_to_delete = {} + routes_to_delete: dict = {} for route, utterance in data_to_delete: routes_to_delete.setdefault(route, []).append(utterance) for route, utterances in routes_to_delete.items(): remote_routes = self._get_routes_with_ids(route_name=route) - ids_to_delete = [r["id"] for r in remote_routes if (r["route"], r["utterance"]) in zip([route]*len(utterances), utterances)] - if ids_to_delete: + ids_to_delete = [ + r["id"] + for r in remote_routes + if (r["route"], r["utterance"]) + in zip([route] * len(utterances), utterances) + ] + if ids_to_delete and self.index: self.index.delete(ids=ids_to_delete) - + else: data_to_upsert = zip(embeddings, routes, utterances) @@ -298,14 +329,27 @@ def _get_route_ids(self, route_name: str): clean_route = clean_route_name(route_name) ids, _ = self._get_all(prefix=f"{clean_route}#") return ids - + def _get_routes_with_ids(self, route_name: str): clean_route = clean_route_name(route_name) ids, _ = self._get_all(prefix=f"{clean_route}#") route_tuples = [] for id in ids: - res_meta = self.index.fetch(ids=[id], namespace=self.namespace) - route_tuples.extend([{"id": id, "route": x["metadata"]["sr_route"], "utterance": x["metadata"]["sr_utterance"]} for x in res_meta["vectors"].values()]) + res_meta = ( + self.index.fetch(ids=[id], namespace=self.namespace) + if self.index + else {} + ) + route_tuples.extend( + [ + { + "id": id, + "route": x["metadata"]["sr_route"], + "utterance": x["metadata"]["sr_utterance"], + } + for x in res_meta["vectors"].values() + ] + ) return route_tuples def _get_all(self, prefix: Optional[str] = None, include_metadata: bool = False): From 21c049b6310544e2d52fc13dd17f2064e22a1be1 Mon Sep 17 00:00:00 2001 From: James Briggs Date: Thu, 11 Jul 2024 16:19:05 +0800 Subject: [PATCH 3/6] fix: add sync param --- pyproject.toml | 2 +- semantic_router/index/qdrant.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5b05fbc9..d7fba4d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "semantic-router" -version = "0.0.50" +version = "0.0.51" description = "Super fast semantic router for AI decision making" authors = [ "James Briggs ", diff --git a/semantic_router/index/qdrant.py b/semantic_router/index/qdrant.py index 165a4c93..f3a6bbf2 100644 --- a/semantic_router/index/qdrant.py +++ b/semantic_router/index/qdrant.py @@ -165,8 +165,11 @@ def add( embeddings: List[List[float]], routes: List[str], utterances: List[str], + sync: bool = False, batch_size: int = DEFAULT_UPLOAD_BATCH_SIZE, ): + if sync: + raise NotImplementedError("Sync add is not implemented for QdrantIndex") self.dimensions = self.dimensions or len(embeddings[0]) self._init_collection() From caefb77cc8a0647eaa5a5778b08d96d494162543 Mon Sep 17 00:00:00 2001 From: James Briggs Date: Thu, 11 Jul 2024 16:34:48 +0800 Subject: [PATCH 4/6] fix: modify sync param for qdrant --- semantic_router/index/qdrant.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/semantic_router/index/qdrant.py b/semantic_router/index/qdrant.py index f3a6bbf2..4bf8d893 100644 --- a/semantic_router/index/qdrant.py +++ b/semantic_router/index/qdrant.py @@ -169,7 +169,7 @@ def add( batch_size: int = DEFAULT_UPLOAD_BATCH_SIZE, ): if sync: - raise NotImplementedError("Sync add is not implemented for QdrantIndex") + logger.warning("Sync add is not implemented for QdrantIndex") self.dimensions = self.dimensions or len(embeddings[0]) self._init_collection() From 4ffe29c649b98e3f369ac6c37e75fe53ff95618c Mon Sep 17 00:00:00 2001 From: Vits Date: Thu, 11 Jul 2024 23:00:30 +0200 Subject: [PATCH 5/6] Fixed pinecone.py _get_all method unordered return for ids and metadata and used the fixed version inside _get_routes_with_ids --- semantic_router/index/pinecone.py | 33 ++++++++++++++----------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index dc86004a..27038824 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -332,23 +332,15 @@ def _get_route_ids(self, route_name: str): def _get_routes_with_ids(self, route_name: str): clean_route = clean_route_name(route_name) - ids, _ = self._get_all(prefix=f"{clean_route}#") + ids, metadata = self._get_all(prefix=f"{clean_route}#", include_metadata=True) route_tuples = [] - for id in ids: - res_meta = ( - self.index.fetch(ids=[id], namespace=self.namespace) - if self.index - else {} - ) - route_tuples.extend( - [ - { - "id": id, - "route": x["metadata"]["sr_route"], - "utterance": x["metadata"]["sr_utterance"], - } - for x in res_meta["vectors"].values() - ] + for id, data in zip(ids, metadata): + route_tuples.append( + { + "id": id, + "route": data["sr_route"], + "utterance": data["sr_utterance"], + } ) return route_tuples @@ -391,9 +383,14 @@ def _get_all(self, prefix: Optional[str] = None, include_metadata: bool = False) # if we need metadata, we fetch it if include_metadata: - res_meta = self.index.fetch(ids=vector_ids, namespace=self.namespace) + for id in vector_ids: + res_meta = ( + self.index.fetch(ids=[id], namespace=self.namespace) + if self.index + else {} + ) + metadata.extend([x["metadata"] for x in res_meta["vectors"].values()]) # extract metadata only - metadata.extend([x["metadata"] for x in res_meta["vectors"].values()]) # Check if there's a next page token; if not, break the loop next_page_token = response_data.get("pagination", {}).get("next") From 54df32bb81096a489314e17c9294e05d2c523fc4 Mon Sep 17 00:00:00 2001 From: Vits Date: Thu, 11 Jul 2024 23:59:41 +0200 Subject: [PATCH 6/6] Added _add_and_sync to replace add for index syncing when adding routes at startup --- semantic_router/index/base.py | 15 ++++++++-- semantic_router/index/local.py | 12 +++++++- semantic_router/index/pinecone.py | 47 +++++++++++++++++++++++-------- semantic_router/index/qdrant.py | 14 +++++++-- semantic_router/layer.py | 3 +- 5 files changed, 71 insertions(+), 20 deletions(-) diff --git a/semantic_router/index/base.py b/semantic_router/index/base.py index e53ca44f..9e226da7 100644 --- a/semantic_router/index/base.py +++ b/semantic_router/index/base.py @@ -18,14 +18,13 @@ class BaseIndex(BaseModel): utterances: Optional[np.ndarray] = None dimensions: Union[int, None] = None type: str = "base" - sync: str = "merge-force-local" + sync: Union[str, None] = None def add( self, embeddings: List[List[float]], routes: List[str], utterances: List[Any], - sync: bool = False, ): """ Add embeddings to the index. @@ -33,6 +32,18 @@ def add( """ raise NotImplementedError("This method should be implemented by subclasses.") + def _add_and_sync( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[Any], + ): + """ + Add embeddings to the index and manage index syncing if necessary. + This method should be implemented by subclasses. + """ + raise NotImplementedError("This method should be implemented by subclasses.") + def delete(self, route_name: str): """ Deletes route by route name. diff --git a/semantic_router/index/local.py b/semantic_router/index/local.py index b1108873..7e32f3a8 100644 --- a/semantic_router/index/local.py +++ b/semantic_router/index/local.py @@ -4,6 +4,7 @@ from semantic_router.index.base import BaseIndex from semantic_router.linear import similarity_matrix, top_scores +from semantic_router.utils.logger import logger class LocalIndex(BaseIndex): @@ -25,7 +26,6 @@ def add( embeddings: List[List[float]], routes: List[str], utterances: List[str], - sync: bool = False, ): embeds = np.array(embeddings) # type: ignore routes_arr = np.array(routes) @@ -42,6 +42,16 @@ def add( self.routes = np.concatenate([self.routes, routes_arr]) self.utterances = np.concatenate([self.utterances, utterances_arr]) + def _add_and_sync( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + ): + if self.sync is not None: + logger.warning("Sync add is not implemented for LocalIndex.") + self.add(embeddings, routes, utterances) + def get_routes(self) -> List[Tuple]: """ Gets a list of route and utterance objects currently stored in the index. diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index 27038824..a9c93ccd 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -65,7 +65,7 @@ def __init__( host: str = "", namespace: Optional[str] = "", base_url: Optional[str] = "https://api.pinecone.io", - sync: str = "merge-force-local", + sync: str = "local", ): super().__init__() self.index_name = index_name @@ -282,7 +282,6 @@ def add( embeddings: List[List[float]], routes: List[str], utterances: List[str], - sync: bool = False, batch_size: int = 100, ): """Add vectors to Pinecone in batches.""" @@ -290,14 +289,34 @@ def add( self.dimensions = self.dimensions or len(embeddings[0]) self.index = self._init_index(force_create=True) - if sync: - local_routes = { - "routes": routes, - "utterances": utterances, - "embeddings": embeddings, - } - data_to_upsert, data_to_delete = self._sync_index(local_routes=local_routes) + vectors_to_upsert = [ + PineconeRecord(values=vector, route=route, utterance=utterance).to_dict() + for vector, route, utterance in zip(embeddings, routes, utterances) + ] + + for i in range(0, len(vectors_to_upsert), batch_size): + batch = vectors_to_upsert[i : i + batch_size] + self._batch_upsert(batch) + + def _add_and_sync( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + batch_size: int = 100, + ): + """Add vectors to Pinecone in batches.""" + if self.index is None: + self.dimensions = self.dimensions or len(embeddings[0]) + self.index = self._init_index(force_create=True) + local_routes = { + "routes": routes, + "utterances": utterances, + "embeddings": embeddings, + } + if self.sync is not None: + data_to_upsert, data_to_delete = self._sync_index(local_routes=local_routes) routes_to_delete: dict = {} for route, utterance in data_to_delete: routes_to_delete.setdefault(route, []).append(utterance) @@ -312,9 +331,11 @@ def add( ] if ids_to_delete and self.index: self.index.delete(ids=ids_to_delete) - else: - data_to_upsert = zip(embeddings, routes, utterances) + data_to_upsert = [ + (vector, route, utterance) + for vector, route, utterance in zip(embeddings, routes, utterances) + ] vectors_to_upsert = [ PineconeRecord(values=vector, route=route, utterance=utterance).to_dict() @@ -389,7 +410,9 @@ def _get_all(self, prefix: Optional[str] = None, include_metadata: bool = False) if self.index else {} ) - metadata.extend([x["metadata"] for x in res_meta["vectors"].values()]) + metadata.extend( + [x["metadata"] for x in res_meta["vectors"].values()] + ) # extract metadata only # Check if there's a next page token; if not, break the loop diff --git a/semantic_router/index/qdrant.py b/semantic_router/index/qdrant.py index 4bf8d893..0fff2314 100644 --- a/semantic_router/index/qdrant.py +++ b/semantic_router/index/qdrant.py @@ -160,16 +160,24 @@ def _init_collection(self) -> None: **self.config, ) - def add( + def _add_and_sync( self, embeddings: List[List[float]], routes: List[str], utterances: List[str], - sync: bool = False, batch_size: int = DEFAULT_UPLOAD_BATCH_SIZE, ): - if sync: + if self.sync is not None: logger.warning("Sync add is not implemented for QdrantIndex") + self.add(embeddings, routes, utterances, batch_size) + + def add( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + batch_size: int = DEFAULT_UPLOAD_BATCH_SIZE, + ): self.dimensions = self.dimensions or len(embeddings[0]) self._init_collection() diff --git a/semantic_router/layer.py b/semantic_router/layer.py index 3ac1596c..5c2d7228 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -466,11 +466,10 @@ def _add_routes(self, routes: List[Route]): # create route array route_names = [route.name for route in routes for _ in route.utterances] # add everything to the index - self.index.add( + self.index._add_and_sync( embeddings=embedded_utterances, routes=route_names, utterances=all_utterances, - sync=True, ) def _encode(self, text: str) -> Any: