In [None]:
%%capture
!pip install sentence-transformers

In [2]:
# Import
import os
import sys

# Change to the project root directory
os.chdir('/home/alaa/repos/seez-assignment')
current_dir = os.getcwd()
if current_dir not in sys.path:
    sys.path.insert(0, current_dir)

try:
    from src.utils.tools import read_dialogue
    print("Import successful!")
except:
    print("Failed")

import os

from src.api_key import OPENAI_API_KEY

os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

Import successful!


In [4]:
from abc import ABC, abstractmethod
from typing import List

from src.common.config import ConfigBase

class EmbeddingBase(ABC):
    def __init__(self, config: ConfigBase):
        self.config = config

    @abstractmethod
    def embed_query(self, text: str) -> List[float]:
        """
        Embed a single query text.
        
        Args:
            text (str): The text to embed.
        
        Returns:
            List[float]: The embedding vector.
        """
        pass

    @abstractmethod
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """
        Embed a list of documents.
        
        Args:
            texts (List[str]): The list of texts to embed.
        
        Returns:
            List[List[float]]: A list of embedding vectors.
        """
        pass

    def __call__(self, texts: List[str]) -> List[List[float]]:
        """
        Make the class callable. This allows the class to be used as a function.
        
        Args:
            texts (List[str]): The list of texts to embed.
        
        Returns:
            List[List[float]]: A list of embedding vectors.
        """
        return self.embed_documents(texts)

In [9]:
import os
import openai
from typing import List, Union
# from ..base_embedding import EmbeddingBase

from src.common.config import ConfigBase, load_config

class OpenAIEmbedding(EmbeddingBase):
    def __init__(self, config: ConfigBase = load_config().embeddings):
        super().__init__(config)
        openai.api_key = os.getenv("OPENAI_API_KEY") or self.config.openai_api_key
        if not openai.api_key:
            raise ValueError("OpenAI API key must be provided either as an argument, "
                             "in the OPENAI_API_KEY environment variable, or in the config.")
        self.model = self.config.parameters.embedding_model
        self.client = openai.OpenAI()

    def embed_query(self, text: str) -> List[float]:
        """
        Embed a single query text using OpenAI's API.
        
        Args:
            text (str): The text to embed.
        
        Returns:
            List[float]: The embedding vector.
        """
        text = text.replace("\n", " ")
        return self.client.embeddings.create(input=[text], model=self.model).data[0].embedding

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """
        Embed a list of documents using OpenAI's API.
        
        Args:
            texts (List[str]): The list of texts to embed.
        
        Returns:
            List[List[float]]: A list of embedding vectors.
        """
        texts = [text.replace("\n", " ") for text in texts]
        response = self.client.embeddings.create(input=texts, model=self.model)
        return [data.embedding for data in response.data]

    def __call__(self, texts: Union[str, List[str]]) -> List[List[float]]:
        """
        Make the class callable. This allows the class to be used as a function.
        
        Args:
            texts (Union[str, List[str]]): The text or list of texts to embed.
        
        Returns:
            List[List[float]]: A list of embedding vectors.
        """
        if isinstance(texts, str):
            return [self.embed_query(texts)]
        return self.embed_documents(texts)


[2024-11-19 15:14:44,673: INFO: config: Config loaded successfully: config/config.yaml]


In [10]:
from pydantic import BaseModel, Field
from typing import Dict, Any, List

class Document(BaseModel):
    page_content: str
    metadata: Dict[str, Any] = Field(default_factory=dict)

class SearchEngineResponse(BaseModel):
    status: bool
    response: List
    # [SearchResult]

    class Config:
        arbitrary_types_allowed = True

In [11]:
import os
from typing import Optional
from abc import ABC, abstractmethod

from src.common.logger import logger
from src.common.config import load_config, ConfigBase
# from src.infrastructure.embeddings.base_embedding import EmbeddingBase
# from src.multiAgent.infrastructure.embeddings.openai import OpenAIEmbedding
from langchain_openai import OpenAIEmbeddings

# from .types import SearchEngineResponse

class SimilaritySearchEngine(ABC):
    """Abstract base class for vector store functionalities."""
    
    def __init__(
        self, 
        config: Optional[ConfigBase] = None, 
        encoder: Optional[EmbeddingBase] = None, 
        **kwargs
        ) -> None:
        self.config = config or load_config().similarity_search
        self.logger = logger
        self.encoder = encoder
        
        if encoder is None:
            self.encoder = OpenAIEmbeddings()
        else:
            self.encoder = encoder
        
        self.logger.info("Initialized SearchEngineBase!")

    @abstractmethod
    def load_embeddings(self) -> bool:
        """Abstract method for loading embeddings."""
        pass

    @abstractmethod
    def embed_data(self) -> bool:
        """Abstract method for embedding data."""
        pass

    @abstractmethod
    def save_embeddings(self) -> None:
        """Abstract method for saving embeddings."""
        pass

    @abstractmethod
    def search(self, query: str, **kwargs) -> SearchEngineResponse:
        """Abstract method for finding similar items based on input."""
        pass

In [62]:
import re
import os
import warnings
from typing import Optional

from langchain_openai import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from langchain.docstore.document import Document
from langchain_community.vectorstores.faiss import FAISS

from src.common.logger import logger
from src.common.config import ConfigBase

from src.utils.tools import read_dialogue, read_user_data, read_jsonl, read_json, get_conversation_by_id

# from src.infrastructure.embeddings.base_embedding import EmbeddingBase
# from src.infrastructure.search_engine.base_engines import SimilaritySearchEngine
# from ..types import SearchEngineResponse

class FAISS_Search(SimilaritySearchEngine):
    def __init__(
            self,
        config: Optional[ConfigBase] = None,
        encoder: Optional[EmbeddingBase] = None,
        **kwargs
        ) -> None:
        super().__init__(config, encoder, **kwargs)
        
        self.logger = logger
        self.data = None
        self.vectorstore = None
        self.embeddings_loaded = False

    def load_embeddings(self) -> bool:
        if os.path.exists(self.config.embeddings,vectordb_path):
            self.vectorstore = FAISS.load_local(
                self.config.paths.vectordb, 
                self.encoder, 
                allow_dangerous_deserialization=True)
            self.embeddings_loaded = True
            return True
        else:
            self.logger.error(f"Embeddings not found: {self.config.paths.vectordb}")
            return self.embed_data()

    def _read_data(self):
        
        path = self.config.data.conversations
        docs = read_dialogue(path)
        
        pattern = r"(\d+)\n\n((?:User:.*?\n\nAgent:.*?(?:\n\n|$))+)"
        matches = re.findall(pattern, docs, re.DOTALL)

        documents = []
        for match in matches:
            dialogue_number = match[0]
            dialogue_content = match[1].strip()
            metadata = {"dialogue_number": dialogue_number}
            documents.append(Document(page_content=dialogue_content, metadata=metadata))

        return documents

    def embed_data(self) -> bool:
        try:
            self.logger.info("Reading the data")
            docs = self._read_data()
            self.logger.info("Embedding the data")
            self.vectorstore = FAISS.from_documents(docs, self.encoder)
            self.embeddings_loaded = True
            self.logger.info("Saving the embeddings")
            self.save_embeddings()
            return True

        except Exception as e:
            self.logger.error(f"Failed to embed data: {e}")
            return False

    def save_embeddings(self) -> None:
        if self.vectorstore:
            self.vectorstore.save_local(self.config.embeddings.vectordb_path)
            self.logger.info("Embeddings saved locally.")
        else:
            self.logger.error("No vectorstore to save embeddings from.")

    def search(
        self, 
        query: str, 
        top_k: int = 5,
        filter_neg: bool = True, 
    ) -> SearchEngineResponse:

        if not self.embeddings_loaded:
            if not self.load_embeddings():
                return SearchEngineResponse(status=False, response=[])

        query = query.strip()
        self.logger.info(f"Searching for: {query[:100]} ...")

        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            try:
                docs = self.vectorstore.similarity_search_with_relevance_scores(query, k=top_k)
                docs = sorted(docs, key=lambda x: x[1], reverse=True)

                if filter_neg:
                    Hscore = docs[0][1]
                    if Hscore < 0.0:
                        self.logger.info("No similar items found in the item list for item `{}`.".format(query))
                        return SearchEngineResponse(status=True, response=[])
                    qdocs = [d for d in docs if d[1] >= 0.0]
                # else:
                #     docs = [d for d in docs if d[1] >= self.config.min_similarity_thr]
                #     qdocs = sorted(docs, key=lambda x: x[1], reverse=True)

                return SearchEngineResponse(status=True, response=qdocs)

            except Exception as e:
                self.logger.error(f"Similarity search error: \n{e}")
                return SearchEngineResponse(status=False, response=[])

In [None]:
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

In [None]:
data = ss._read_data()

In [60]:
from src.common.config import load_config

configs = load_config()

ss = FAISS_Search(config)

[2024-11-19 20:23:36,617: INFO: config: Config loaded successfully: config/config.yaml]
[2024-11-19 20:23:36,946: INFO: 2264863557: Initialized SearchEngineBase!]


In [61]:
ss.embed_data()

[2024-11-19 20:24:00,605: INFO: _client: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"]
[2024-11-19 20:24:14,509: INFO: _client: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"]
[2024-11-19 20:24:29,902: INFO: _client: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"]
[2024-11-19 20:24:42,084: INFO: _client: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"]
[2024-11-19 20:24:57,184: INFO: _client: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"]
[2024-11-19 20:25:07,245: INFO: _client: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 429 Too Many Requests"]
[2024-11-19 20:25:07,248: INFO: _base_client: Retrying request to /embeddings in 16.201000 seconds]



KeyboardInterrupt

