In [30]:
import litellm
import numpy as np
from numpy.typing import NDArray
import  time
from dotenv import load_dotenv, find_dotenv

# from .rag_simulation.corpus_ingestion import BDDChunks
load_dotenv(find_dotenv())


class AugmentedRAG:
    """A class for performing a simple RAG process.

    This class utilizes a retrieval process to fetch relevant information from a
    database (or corpus) and then passes it to a generative model for further processing.

    """
    HF_TOKEN = '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'
    MISTRAL_API_KEY =  'dkMKu81kFgJeP7HmIqjztosQTxyiynW6'


    def __init__(
        self,
        generation_model: str,
        role_prompt: str,
        bdd_chunks: BDDChunks,
        max_tokens: int,
        temperature: int,
        top_n: int = 2,
    ) -> None:
        """
        Initializes the SimpleRAG class with the provided parameters.

        Args:
            generation_model (str): The model used for generating responses.
            role_prompt (str): The role of the model as specified by the prompt.
            bdd_chunks (Any): The database or chunks of information used in the retrieval process.
            max_tokens (int): Maximum number of tokens to generate.
            temperature (int): The temperature setting for the generative model.
            top_n (int, optional): The number of top documents to retrieve. Defaults to 2.
        """
        self.llm = generation_model
        self.bdd = bdd_chunks
        self.top_n = top_n
        self.role_prompt = role_prompt
        self.max_tokens = max_tokens
        self.temperature = temperature
        self.latency = 0.0
        self.input_tokens = 0
        self.output_tokens = 0
        self.dollor_cost = 0.0

    def get_cosim(self, a: NDArray[np.float32], b: NDArray[np.float32]) -> float:
        """
        Calculates the cosine similarity between two vectors.

        Args:
            a (NDArray[np.float32]): The first vector.
            b (NDArray[np.float32]): The second vector.

        Returns:
            float: The cosine similarity between the two vectors.
        """

        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def get_top_similarity(
            self,
            embedding_query: NDArray[np.float32],
            embedding_chunks: NDArray[np.float32],
            corpus: list[str],
        ) -> list[str]:
            """
            Retrieves the top N most similar documents from the corpus based on the query's embedding.

            Args:
                embedding_query (NDArray[np.float32]): The embedding of the query.
                embedding_chunks (NDArray[np.float32]): A NumPy array of embeddings for the documents in the corpus.
                corpus (List[str]): A list of documents (strings) corresponding to the embeddings in `embedding_chunks`.
                top_n (int, optional): The number of top similar documents to retrieve. Defaults to 5.

            Returns:
                List[str]: A list of the most similar documents from the corpus, ordered by similarity to the query.
            """
            cos_dist_list = np.array(
                [
                    self.get_cosim(embedding_query, embed_doc)
                    for embed_doc in embedding_chunks
                ]
            )
            indices_of_max_values = np.argsort(cos_dist_list)[-self.top_n :][::-1]
            print(indices_of_max_values)
            return [corpus[i] for i in indices_of_max_values]


    def build_prompt(
        self, context: list[str], history: str, query: str
    ) -> list[dict[str, str]]:
        """
        Builds a prompt string for a conversational agent based on the given context and query.

        Args:
            context (str): The context information, typically extracted from books or other sources.
            query (str): The user's query or question.

        Returns:
            list[dict[str, str]]: The RAG prompt in the OpenAI format
        """
        context_joined = "\n".join(context)
        system_prompt = self.role_prompt
        history_prompt = f"""
        # Historique de conversation:
        {history}
        """
        context_prompt = f"""
        Tu disposes de la section "Contexte" pour t'aider à répondre aux questions.
        # Contexte: 
        {context_joined}
        """
        query_prompt = f"""
        # Question:
        {query}

        # Réponse:
        """
        return [
            {"role": "system", "content": system_prompt},
            {"role": "system", "content": history_prompt},
            {"role": "system", "content": context_prompt},
            {"role": "user", "content": query_prompt},
        ]



    def _generate(self, prompt_dict: list[dict[str, str]]) -> litellm.ModelResponse:


         

        response = litellm.completion(
            model=f"mistral/{self.llm}",
            messages=prompt_dict,
            max_tokens=self.max_tokens,
            temperature=self.temperature,
        )  # type: ignore

        return response



    def call_model(self, prompt_dict: list[dict[str, str]]) -> str:
        """
        Calls the LLM with the given prompt and returns the response.

        Args:
            prompt_dict (List[Dict[str, str]]): A list of dictionaries where each dictionary represents
                                                a message prompt with a string key and string value.

        Returns:
            str: The response generated by the LLM.
        """

        start_time = time.process_time()
        chat_response: str = self._generate(prompt_dict=prompt_dict)
        end_time = time.process_time()
        self.latency = end_time - start_time

        self.input_tokens = chat_response.usage.prompt_tokens
        self.output_tokens = chat_response.usage.completion_tokens
    

        dict_response = {
            "response": chat_response.choices[0].message.content,
            "latency": self.latency,
            "input_tokens": self.input_tokens,
            "output_tokens": self.output_tokens,
            "llm": self.llm,           
        }
        return dict_response
        # return str(chat_response.choices[0].message.content)


    def __call__(self, query: str, history: dict[str, str]) -> str:
        """
        Process a query and return a response based on the provided history and database.

        This method performs the following steps:
        1. Queries the ChromaDB instance to retrieve relevant documents based on the input query.
        2. Constructs a prompt using the retrieved documents, the provided query, and the history.
        3. Sends the prompt to the model for generating a response.

        Args:
            query (str): The user query to be processed.
            history (dict[str, str]): A dictionary containing the conversation history,
                where keys represent user inputs and values represent corresponding responses.

        Returns:
            str: The generated response from the model.
        """
        chunks = self.bdd.chroma_db.query(
            query_texts=[query],
            n_results=self.top_n,
        )
        chunks_list: list[str] = chunks["documents"][0]
        prompt_rag = self.build_prompt(
            context=chunks_list, history=str(history), query=query
        )
        response = self.call_model(prompt_dict=prompt_rag)
        return response


generation_model = "ministral-8b-latest"
role_prompt = "Tu es un assistant virtuel qui aide les utilisateurs à répondre à des questions."
bdd_chunks = BDDChunks(embedding_model="paraphrase-multilingual-MiniLM-L12-v2", path="./")
# bdd_chunks = BDDChunks(embedding_model="paraphrase-xlm-r-multilingual-v1", path="./")
max_tokens = 100
temperature = 0.5

# Initialize the SimpleRAG instance
simple_rag = AugmentedRAG(
    generation_model=generation_model,
    role_prompt=role_prompt,
    bdd_chunks=bdd_chunks,
    max_tokens=max_tokens,
    temperature=temperature,

)

# Define the conversation history
history = {
    "user": "Quelle est la capitale de la France ?",
    "bot": "La capitale de la France est Paris.",
}

# Define the user query
query = " meilleurs  restaurant lyon"
bdd_chunks._create_collection(path="./")

# Generate a response using the SimpleRAG instance
response = simple_rag(query=query, history=history)
print(response)


Batches: 100%|██████████| 1/1 [00:00<00:00, 20.68it/s]
[92m12:34:57 - LiteLLM:INFO[0m: utils.py:2802 - 
LiteLLM completion() model= ministral-8b-latest; provider = mistral
INFO:LiteLLM:
LiteLLM completion() model= ministral-8b-latest; provider = mistral
INFO:httpx:HTTP Request: POST https://api.mistral.ai/v1/chat/completions "HTTP/1.1 200 OK"
[92m12:34:58 - LiteLLM:INFO[0m: utils.py:949 - Wrapper: Completed Call, calling success_handler
INFO:LiteLLM:Wrapper: Completed Call, calling success_handler


{'response': "Pour trouver les meilleurs restaurants à Lyon, il est souvent utile de consulter des avis et des recommandations de critiques gastronomiques. Voici quelques suggestions basées sur les avis disponibles :\n\n1. **Brasserie Georges** : Bien que certains avis mentionnent qu'il ne s'agit pas d'un haut lieu de la gastronomie lyonnaise, la Brasserie Georges est appréciée pour son service correct, ses prix raisonnables, et ses plats traditionnels comme le pâté en", 'latency': 0.171875, 'input_tokens': 350, 'output_tokens': 100, 'llm': 'ministral-8b-latest'}


In [31]:
import pandas as pd
import tiktoken
from tqdm import tqdm
import os
import re
import uuid
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
from chromadb.config import Settings
import logging

# Initialize tiktoken encoding
enc = tiktoken.get_encoding("o200k_base")

class BDDChunks:
    """
    A class to process restaurant data and reviews from a SQLite database,
    transforming them into consolidated chunks with embeddings stored in ChromaDB.
    """

    def __init__(self, embedding_model: str, path: str):
        """
        Initialize the BDDChunks instance.

        Args:
            embedding_model (str): Name of the embedding model to use.
            path (str): Path to the dataset or collection.
        """
        self.path = path
        self.client = chromadb.PersistentClient(
            path="./ChromaDB11", settings=Settings(anonymized_telemetry=False)
        )
        self.embedding_name = embedding_model
        self.embeddings = SentenceTransformerEmbeddingFunction(
            model_name=embedding_model
        )
        self.chroma_db = None

        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def get_db(self):
        """
        Provide a database session.

        Yields:
            db: Database session instance.
        """
        db = database.SessionLocal()
        try:
            yield db
        finally:
            db.close()

    def get_all_restaurants_names(self) -> list[str]:
        """
        Get all restaurant names from the SQLite database.

        Returns:
            list[str]: List of restaurant names.
        """
        try:
            with next(self.get_db()) as db:
                restaurants = db.query(models.DimRestaurant).all()
                return [r.nom for r in restaurants]
        except Exception as e:
            self.logger.error(f"An error occurred while fetching restaurant names: {e}")
            return []

    def _sanitize_collection_name(self, name: str) -> str:
        """
        Sanitize collection name to meet ChromaDB requirements:
        - 3-63 characters
        - Alphanumeric with hyphens and underscores
        - No consecutive periods
        """
        sanitized = re.sub(r'[^a-zA-Z0-9-_]', '-', name)
        sanitized = re.sub(r'^[^a-zA-Z0-9]+', '', sanitized)
        sanitized = re.sub(r'[^a-zA-Z0-9]+$', '', sanitized)
        sanitized = re.sub(r'\.{2,}', '.', sanitized)
        if len(sanitized) < 3:
            sanitized = sanitized + "000"[:3-len(sanitized)]
        if len(sanitized) > 63:
            sanitized = sanitized[:63]
        return sanitized

    def _create_collection(self, path: str) -> None:
        """
        Create a new ChromaDB collection to store embeddings.

        Args:
            path (str): Name of the collection to create in ChromaDB.
        """
        try:
            collection_name = self._sanitize_collection_name(path)
            self.chroma_db = self.client.get_or_create_collection(
                name=collection_name,
                embedding_function=self.embeddings,
                metadata={"hnsw:space": "cosine"}
            )
        except Exception as e:
            self.logger.error(f"Error creating collection: {e}")
            raise

    def clean_text(self, text: str) -> str:
        """
        Clean text by removing unwanted characters and spaces.

        Args:
            text (str): Input text.

        Returns:
            str: Cleaned text.
        """
        text = re.sub(r"\s+", " ", text)  # Remove multiple spaces
        text = re.sub(r"[^a-zA-Z0-9À-ÿ\s]", "", text)  # Remove special characters
        return text.strip()

    def get_restaurant_reviews_location(self, restaurant_name: str) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        Get the location and reviews for a specific restaurant.

        Args:
            restaurant_name (str): Name of the restaurant.

        Returns:
            tuple: DataFrames for reviews, location, and restaurant info.
        """
        try:
            with next(self.get_db()) as db:
                restaurant = db.query(models.DimRestaurant).filter(models.DimRestaurant.nom == restaurant_name).first()
                location = db.query(models.DimLocation).filter(models.DimLocation.id_location == restaurant.id_location).first()
                avis = db.query(models.FaitAvis).filter(models.FaitAvis.id_restaurant == restaurant.id_restaurant).all()

            avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
            location_df = pd.DataFrame([schemas.DimLocation.from_orm(location).dict()])
            restaurant_df = pd.DataFrame([schemas.DimRestaurant.from_orm(restaurant).dict()])

            return avis_df, location_df, restaurant_df
        except Exception as e:
            self.logger.error(f"An error occurred while fetching reviews and locations: {e}")
            return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

    # def transform_restaurant_chunk(self, restaurant_name: str,  chuncksize: 500) -> pd.DataFrame:
    #     """
    #     Transform restaurant data and reviews into a single structured chunk.

    #     Args:
    #         restaurant_name (str): Name of the restaurant.

    #     Returns:
    #         pd.DataFrame: DataFrame containing the chunks.
    #     """
    #     colnames = ['restaurant', 'chunk']
    #     avis_df, location_df, restaurant_df = self.get_restaurant_reviews_location(restaurant_name)

    #     if restaurant_df.empty:
    #         return pd.DataFrame(columns=colnames)

    #     # Combine all information into a single chunk
    #     all_info = f"Restaurant: {restaurant_df['nom'].iloc[0]} | Description: {restaurant_df['nom'].iloc[0]} " \
    #                f"| Localisation: {location_df['adresse'].iloc[0]}, {location_df['adresse'].iloc[0]} | " \
    #                f"Avis: {' | '.join(avis_df['review'].apply(self.clean_text))}"

    #     chunks = [{'restaurant': restaurant_name, 'chunk': all_info}]
    #     return pd.DataFrame(chunks, columns=colnames)


    def transform_restaurant_chunk(self, restaurant_name: str, chunk_size: int = 5000) -> pd.DataFrame:
        """
        Transform restaurant data and reviews into structured chunks.

        Args:
            restaurant_name (str): Name of the restaurant.
            chunk_size (int, optional): Maximum size of each chunk (in characters). Defaults to 500.

        Returns:
            pd.DataFrame: DataFrame containing the chunks with restaurant names.
        """
        colnames = ['restaurant', 'chunk']
        avis_df, location_df, restaurant_df = self.get_restaurant_reviews_location(restaurant_name)

        if restaurant_df.empty:
            return pd.DataFrame(columns=colnames)

        # Combine all relevant data into a single string
        combined_info = []
        
        for column in restaurant_df.columns:
            value = self.clean_text(str(restaurant_df[column].iloc[0]))
            combined_info.append(f"{column}: {value}")
        
        for column in location_df.columns:
            value = self.clean_text(str(location_df[column].iloc[0]))
            combined_info.append(f"{column}: {value}")
        
        for _, review in avis_df.iterrows():
            cleaned_review = self.clean_text(review['review'])
            combined_info.append(f"Review: {cleaned_review}")
        
        all_info = " ".join(combined_info)

        # Split the information into chunks of `chunk_size`
        chunks = []
        while len(all_info) > 0:
            chunk = all_info[:chunk_size]
            # Add the restaurant name at the beginning of each chunk
            chunks.append(f"Restaurant: {restaurant_name} | {chunk}")
            all_info = all_info[chunk_size:]

        # Create a DataFrame for the chunks
        chunk_data = [{'restaurant': restaurant_name, 'chunk': chunk} for chunk in chunks]
        return pd.DataFrame(chunk_data, columns=colnames)


    def create_corpus(self) -> str:
        """
        Create a corpus with a single chunk per restaurant.

        Returns:
            str: Text corpus containing all information.
        """
        corpus = []
        for restaurant in self.get_all_restaurants_names():
            df = self.transform_restaurant_chunk(restaurant)
            if not df.empty:
                corpus.append(" ".join(df['chunk'].values))
        return " ".join(corpus)

    def split_text_into_chunks(self, corpus: str, chunk_size: int = 100) -> list[str]:
        """
        Split text into chunks of specified size.

        Args:
            corpus (str): Text to split.
            chunk_size (int, optional): Size of each chunk.

        Returns:
            list[str]: List of chunks.
        """
        tokenized_corpus = enc.encode(corpus)
        chunks = [
            "".join(enc.decode(tokenized_corpus[i : i + chunk_size]))
            for i in tqdm(range(0, len(tokenized_corpus), chunk_size))
        ]
        return chunks
    
    

    def add_embeddings(self, restaurant_chunks: pd.DataFrame) -> None:
        """
        Add embeddings for each restaurant chunk to the ChromaDB collection.

        Args:
            restaurant_chunks (pd.DataFrame): DataFrame containing restaurant names and their corresponding chunks.
        """
        if self.chroma_db is None:
            raise ValueError("ChromaDB collection is not initialized. Call `_create_collection` first.")

        for _, row in tqdm(restaurant_chunks.iterrows(), total=restaurant_chunks.shape[0]):
            restaurant_name = row['restaurant']
            chunk = row['chunk']
            document_id = str(uuid.uuid4())  # Generate a unique ID for this chunk
            self.chroma_db.add(documents=[chunk], ids=[document_id], metadatas=[{"restaurant": restaurant_name}])


    def __call__(self, *args, **kwargs):
        """
        Entry point to execute class methods.
        """
        self._create_collection(self.path)
        all_restaurant_chunks = []

        for restaurant_name in self.get_all_restaurants_names():
            restaurant_chunk = self.transform_restaurant_chunk(restaurant_name)
            if not restaurant_chunk.empty:
                all_restaurant_chunks.append(restaurant_chunk)

        if all_restaurant_chunks:
            all_chunks_df = pd.concat(all_restaurant_chunks, ignore_index=True)
            self.add_embeddings(all_chunks_df)


# Test the class
if __name__ == "__main__":
    test = BDDChunks(embedding_model="paraphrase-multilingual-MiniLM-L12-v2", path="./")
    test()
    print("Done")


C:\Users\ediad\AppData\Local\Temp\ipykernel_8872\3011249731.py:135: PydanticDeprecatedSince20: The `from_orm` method is deprecated; set `model_config['from_attributes']=True` and use `model_validate` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
C:\Users\ediad\AppData\Local\Temp\ipykernel_8872\3011249731.py:135: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
C:\Users\ediad\AppData\Local\Temp\ipykernel_8872\3011249731.py:136: PydanticDeprecatedSince20: The `from_orm` method is deprecated; set `model_config['from_attributes']=True` and use `model_validate` instead. Deprecated i

Done





In [None]:
import pandas as pd
import tiktoken
from tqdm import tqdm
import os
import re
import uuid
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
from chromadb.config import Settings
import logging

# Initialize tiktoken encoding
enc = tiktoken.get_encoding("o200k_base")

class BDDChunks:
    """
    A class to process reviews from a SQLite database and store them as chunks with embeddings.
    Each restaurant's data is combined into a single chunk.
    """

    def __init__(self, embedding_model: str, path: str):
        """
        Initialize the BDDChunks instance.

        Args:
            embedding_model (str): Name of the embedding model to use.
            path (str): Path to the PDF or dataset to process.
        """
        self.path = path
        self.chunks: list[str] | None = None
        self.client = chromadb.PersistentClient(
            path="./ChromaDB7", settings=Settings(anonymized_telemetry=False)
        )
        self.embedding_name = embedding_model
        self.embeddings = SentenceTransformerEmbeddingFunction(
            model_name=embedding_model
        )
        self.chroma_db = None

        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def get_db(self):
        """
        Provide a database session.

        Yields:
            db: Database session instance.
        """
        db = database.SessionLocal()
        try:
            yield db
        finally:
            db.close()

    def get_all_restaurants_names(self) -> list[str]:
        """
        Get all restaurant names from the SQLite database.

        Returns:
            list[str]: List of restaurant names.
        """
        try:
            with next(self.get_db()) as db:
                restaurants = db.query(models.DimRestaurant).all()
                return [r.nom for r in restaurants]
        except Exception as e:
            self.logger.error(f"An error occurred while fetching restaurant names: {e}")
            return []

    def _sanitize_collection_name(self, name: str) -> str:
        """
        Sanitize collection name to meet ChromaDB requirements:
        - 3-63 characters
        - Alphanumeric with hyphens and underscores
        - No consecutive periods
        """
        sanitized = re.sub(r'[^a-zA-Z0-9-_]', '-', name)
        sanitized = re.sub(r'^[^a-zA-Z0-9]+', '', sanitized)
        sanitized = re.sub(r'[^a-zA-Z0-9]+$', '', sanitized)
        sanitized = re.sub(r'\.{2,}', '.', sanitized)
        if len(sanitized) < 3:
            sanitized = sanitized + "000"[:3-len(sanitized)]
        if len(sanitized) > 63:
            sanitized = sanitized[:63]
        return sanitized

    def _create_collection(self, path: str) -> None:
        """
        Create a new ChromaDB collection to store embeddings.

        Args:
            path (str): Name of the collection to create in ChromaDB.
        """
        try:
            collection_name = self._sanitize_collection_name(path)
            self.chroma_db = self.client.get_or_create_collection(
                name=collection_name,
                embedding_function=self.embeddings,
                metadata={"hnsw:space": "cosine"}
            )
        except Exception as e:
            self.logger.error(f"Error creating collection: {e}")
            raise

    def clean_text(self, text: str) -> str:
        """
        Clean text by removing unwanted characters and spaces.

        Args:
            text (str): Input text.

        Returns:
            str: Cleaned text.
        """
        text = re.sub(r"\s+", " ", text)  # Remove multiple spaces
        text = re.sub(r"[^a-zA-Z0-9À-ÿ\s]", "", text)  # Remove special characters
        return text.strip()

    def get_restaurant_reviews_location(self, restaurant_name: str) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        Get the location and reviews for a specific restaurant.

        Args:
            restaurant_name (str): Name of the restaurant.

        Returns:
            tuple: DataFrames for reviews, location, and restaurant info.
        """
        try:
            with next(self.get_db()) as db:
                restaurant = db.query(models.DimRestaurant).filter(models.DimRestaurant.nom == restaurant_name).first()
                location = db.query(models.DimLocation).filter(models.DimLocation.id_location == restaurant.id_location).first()
                avis = db.query(models.FaitAvis).filter(models.FaitAvis.id_restaurant == restaurant.id_restaurant).all()

            avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
            location_df = pd.DataFrame([schemas.DimLocation.from_orm(location).dict()])
            restaurant_df = pd.DataFrame([schemas.DimRestaurant.from_orm(restaurant).dict()])

            return avis_df, location_df, restaurant_df
        except Exception as e:
            self.logger.error(f"An error occurred while fetching reviews and locations: {e}")
            return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

    def transform_restaurant_chunk(self, restaurant_name: str) -> pd.DataFrame:
        """
        Combine all restaurant data and reviews into a single structured chunk.

        Args:
            restaurant_name (str): Name of the restaurant.

        Returns:
            pd.DataFrame: DataFrame containing the chunk.
        """
        colnames = ['restaurant', 'chunk']
        avis_df, location_df, restaurant_df = self.get_restaurant_reviews_location(restaurant_name)

        if restaurant_df.empty:
            return pd.DataFrame(columns=colnames)

        # Combine all relevant data into one chunk
        combined_info = []

        for column in restaurant_df.columns:
            value = self.clean_text(str(restaurant_df[column].iloc[0]))
            combined_info.append(f"{column}: {value}")

        for column in location_df.columns:
            value = self.clean_text(str(location_df[column].iloc[0]))
            combined_info.append(f"{column}: {value}")

        for _, review in avis_df.iterrows():
            cleaned_review = self.clean_text(review['review'][:500])  # Limit review size
            combined_info.append(f"Review: {cleaned_review}")

        chunk = " ".join(combined_info)
        return pd.DataFrame([{'restaurant': restaurant_name, 'chunk': chunk}], columns=colnames)

    def create_corpus(self) -> str:
        """
        Create a corpus from restaurant chunks.

        Returns:
            str: Text corpus.
        """
        corpus = ""
        for restaurant in self.get_all_restaurants_names():
            df = self.transform_restaurant_chunk(restaurant)
            corpus += " ".join(df['chunk'].values) + " "
        return corpus

    def split_text_into_chunks(self, corpus: str, chunk_size: int = 100) -> list[str]:
        """
        Split text into chunks of specified size.

        Args:
            corpus (str): Text to split.
            chunk_size (int, optional): Size of each chunk.

        Returns:
            list[str]: List of chunks.
        """
        tokenized_corpus = enc.encode(corpus)
        chunks = [
            "".join(enc.decode(tokenized_corpus[i : i + chunk_size]))
            for i in tqdm(range(0, len(tokenized_corpus), chunk_size))
        ]
        return chunks

    def add_embeddings(self, list_chunks: list[str], batch_size: int = 100) -> None:
        """
        Add embeddings to the ChromaDB collection.

        Args:
            list_chunks (list[str]): List of chunks.
            batch_size (int, optional): Batch size.
        """
        if self.chroma_db is None:
            raise ValueError("ChromaDB collection is not initialized. Call `_create_collection` first.")

        for i in tqdm(range(0, len(list_chunks), batch_size)):
            batch_documents = list_chunks[i : i + batch_size]
            list_ids = [str(uuid.uuid4()) for _ in batch_documents]
            self.chroma_db.add(documents=batch_documents, ids=list_ids)

    def __call__(self, *args, **kwargs):
        """
        Entry point to execute class methods.
        """
        corpus = self.create_corpus()
        chunks = self.split_text_into_chunks(corpus)
        self._create_collection(self.path)
        self.add_embeddings(chunks)

# # Test the class
# if __name__ == "__main__":
#     test = BDDChunks(embedding_model="paraphrase-multilingual-MiniLM-L12-v2", path="./")
#     test()
#     print("Done")


In [4]:
import pandas as pd
import tiktoken
from tqdm import tqdm
import os
import re
import uuid
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
from chromadb.config import Settings
import logging

# Initialize tiktoken encoding
enc = tiktoken.get_encoding("o200k_base")

class BDDChunks:
    """
    A class to process reviews from a SQLite database and store them as chunks with embeddings.
    Each review is considered a single chunk.
    """

    def __init__(self, embedding_model: str, path: str):
        """
        Initialize the BDDChunks instance.

        Args:
            embedding_model (str): Name of the embedding model to use.
            path (str): Path to the PDF or dataset to process.
        """
        self.path = path
        self.chunks: list[str] | None = None
        self.client = chromadb.PersistentClient(
            path="./ChromaDB5", settings=Settings(anonymized_telemetry=False)
        )
        self.embedding_name = embedding_model
        self.embeddings = SentenceTransformerEmbeddingFunction(
            model_name=embedding_model
        )
        self.chroma_db = None

        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def get_db(self):
        """
        Provide a database session.

        Yields:
            db: Database session instance.
        """
        db = database.SessionLocal()
        try:
            yield db
        finally:
            db.close()

    def get_all_restaurants_names(self) -> list[str]:
        """
        Get all restaurant names from the SQLite database.

        Returns:
            list[str]: List of restaurant names.
        """
        try:
            with next(self.get_db()) as db:
                restaurants = db.query(models.DimRestaurant).all()
                return [r.nom for r in restaurants]
        except Exception as e:
            self.logger.error(f"An error occurred while fetching restaurant names: {e}")
            return []

    def _sanitize_collection_name(self, name: str) -> str:
        """
        Sanitize collection name to meet ChromaDB requirements:
        - 3-63 characters
        - Alphanumeric with hyphens and underscores
        - No consecutive periods
        """
        sanitized = re.sub(r'[^a-zA-Z0-9-_]', '-', name)
        sanitized = re.sub(r'^[^a-zA-Z0-9]+', '', sanitized)
        sanitized = re.sub(r'[^a-zA-Z0-9]+$', '', sanitized)
        sanitized = re.sub(r'\.{2,}', '.', sanitized)
        if len(sanitized) < 3:
            sanitized = sanitized + "000"[:3-len(sanitized)]
        if len(sanitized) > 63:
            sanitized = sanitized[:63]
        return sanitized

    def _create_collection(self, path: str) -> None:
        """
        Create a new ChromaDB collection to store embeddings.

        Args:
            path (str): Name of the collection to create in ChromaDB.
        """
        try:
            collection_name = self._sanitize_collection_name(path)
            self.chroma_db = self.client.get_or_create_collection(
                name=collection_name,
                embedding_function=self.embeddings,
                metadata={"hnsw:space": "cosine"}
            )
        except Exception as e:
            self.logger.error(f"Error creating collection: {e}")
            raise

    def clean_text(self, text: str) -> str:
        """
        Clean text by removing unwanted characters and spaces.

        Args:
            text (str): Input text.

        Returns:
            str: Cleaned text.
        """
        text = re.sub(r"\s+", " ", text)  # Remove multiple spaces
        text = re.sub(r"[^a-zA-Z0-9À-ÿ\s]", "", text)  # Remove special characters
        return text.strip()

    def get_restaurant_reviews_location(self, restaurant_name: str) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        Get the location and reviews for a specific restaurant.

        Args:
            restaurant_name (str): Name of the restaurant.

        Returns:
            tuple: DataFrames for reviews, location, and restaurant info.
        """
        try:
            with next(self.get_db()) as db:
                restaurant = db.query(models.DimRestaurant).filter(models.DimRestaurant.nom == restaurant_name).first()
                location = db.query(models.DimLocation).filter(models.DimLocation.id_location == restaurant.id_location).first()
                avis = db.query(models.FaitAvis).filter(models.FaitAvis.id_restaurant == restaurant.id_restaurant).all()

            avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
            location_df = pd.DataFrame([schemas.DimLocation.from_orm(location).dict()])
            restaurant_df = pd.DataFrame([schemas.DimRestaurant.from_orm(restaurant).dict()])

            return avis_df, location_df, restaurant_df
        except Exception as e:
            self.logger.error(f"An error occurred while fetching reviews and locations: {e}")
            return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

    def transform_restaurant_chunk(self, restaurant_name: str) -> pd.DataFrame:
        """
        Transform restaurant data and reviews into structured chunks.

        Args:
            restaurant_name (str): Name of the restaurant.

        Returns:
            pd.DataFrame: DataFrame containing the chunks.
        """
        colnames = ['restaurant', 'chunk']
        avis_df, location_df, restaurant_df = self.get_restaurant_reviews_location(restaurant_name)

        if restaurant_df.empty:
            return pd.DataFrame(columns=colnames)

        chunks = []
        for column in restaurant_df.columns:
            if column not in ["nom", "description"]:  # Keep only relevant columns
                continue
            value = self.clean_text(str(restaurant_df[column].iloc[0]))
            chunks.append({'restaurant': restaurant_name, 'chunk': f"{column}: {value}"})

        for _, review in avis_df.iterrows():
            cleaned_review = self.clean_text(review['review'][:500])  # Limit review size
            chunks.append({'restaurant': restaurant_name, 'chunk': f"Review: {cleaned_review}"})

        return pd.DataFrame(chunks, columns=colnames)

    def create_corpus(self) -> str:
        """
        Create a corpus from restaurant chunks.

        Returns:
            str: Text corpus.
        """
        corpus = ""
        for restaurant in self.get_all_restaurants_names():
            df = self.transform_restaurant_chunk(restaurant)
            corpus += " ".join(df['chunk'].values) + " "
        return corpus

    def split_text_into_chunks(self, corpus: str, chunk_size: int = 20000) -> list[str]:
        """
        Split text into chunks of specified size.

        Args:
            corpus (str): Text to split.
            chunk_size (int, optional): Size of each chunk.

        Returns:
            list[str]: List of chunks.
        """
        tokenized_corpus = enc.encode(corpus)
        chunks = [
            "".join(enc.decode(tokenized_corpus[i : i + chunk_size]))
            for i in tqdm(range(0, len(tokenized_corpus), chunk_size))
        ]
        return chunks

    def add_embeddings(self, list_chunks: list[str], batch_size: int = 100) -> None:
        """
        Add embeddings to the ChromaDB collection.

        Args:
            list_chunks (list[str]): List of chunks.
            batch_size (int, optional): Batch size.
        """
        if self.chroma_db is None:
            raise ValueError("ChromaDB collection is not initialized. Call `_create_collection` first.")

        for i in tqdm(range(0, len(list_chunks), batch_size)):
            batch_documents = list_chunks[i : i + batch_size]
            list_ids = [str(uuid.uuid4()) for _ in batch_documents]
            self.chroma_db.add(documents=batch_documents, ids=list_ids)

    def __call__(self, *args, **kwargs):
        """
        Entry point to execute class methods.
        """
        corpus = self.create_corpus()
        chunks = self.split_text_into_chunks(corpus)
        self._create_collection(self.path)
        self.add_embeddings(chunks)

# Test the class
if __name__ == "__main__":
    test = BDDChunks(embedding_model="paraphrase-multilingual-MiniLM-L12-v2", path="./")
    test()
    print("Done")


C:\Users\ediad\AppData\Local\Temp\ipykernel_8872\1822548933.py:136: PydanticDeprecatedSince20: The `from_orm` method is deprecated; set `model_config['from_attributes']=True` and use `model_validate` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
C:\Users\ediad\AppData\Local\Temp\ipykernel_8872\1822548933.py:136: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
C:\Users\ediad\AppData\Local\Temp\ipykernel_8872\1822548933.py:137: PydanticDeprecatedSince20: The `from_orm` method is deprecated; set `model_config['from_attributes']=True` and use `model_validate` instead. Deprecated i

Done





In [9]:
from model import models, schemas
from utils import database
import pandas as pd
import tiktoken
from tqdm import tqdm
import os
import re
import uuid
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
from chromadb.config import Settings
import logging

# Initialize tiktoken encoding
enc = tiktoken.get_encoding("o200k_base")

class BDDChunks:
    """
    A class to process reviews from a SQLite database and store them as chunks with embeddings.
    Each review is considered a single chunk.
    """

    def __init__(self, embedding_model: str, path: str):
        """
        Initialize the BDDChunks instance.

        Args:
            embedding_model (str): Name of the embedding model to use.
            path (str): Path to the PDF or dataset to process.
        """
        self.path = path
        self.chunks: list[str] | None = None
        self.client = chromadb.PersistentClient(
            path="./ChromaDB3", settings=Settings(anonymized_telemetry=False)
        )
        self.embedding_name = embedding_model
        self.embeddings = SentenceTransformerEmbeddingFunction(
            model_name=embedding_model
        )
        self.chroma_db = None
        
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def get_db(self):
        """
        Provide a database session.

        Yields:
            db: Database session instance.
        """
        db = database.SessionLocal()
        try:
            yield db
        finally:
            db.close()

    def get_all_restaurants_names(self) -> list[str]:
        """
        Get all restaurant names from the SQLite database.

        Returns:
            list[str]: List of restaurant names.
        """
        try:
            with next(self.get_db()) as db:
                restaurants = db.query(models.DimRestaurant).all()
                return [r.nom for r in restaurants]
        except Exception as e:
            self.logger.error(f"An error occurred while fetching restaurant names: {e}")
            return []

    def _sanitize_collection_name(self, name: str) -> str:
        """
        Sanitize collection name to meet ChromaDB requirements:
        - 3-63 characters
        - Alphanumeric with hyphens and underscores
        - No consecutive periods
        """
        # Replace invalid characters with hyphens
        sanitized = re.sub(r'[^a-zA-Z0-9-_]', '-', name)
        
        # Ensure name starts and ends with alphanumeric
        sanitized = re.sub(r'^[^a-zA-Z0-9]+', '', sanitized)
        sanitized = re.sub(r'[^a-zA-Z0-9]+$', '', sanitized)
        
        # Remove consecutive periods
        sanitized = re.sub(r'\.{2,}', '.', sanitized)
        
        # Ensure minimum length
        if len(sanitized) < 3:
            sanitized = sanitized + "000"[:3-len(sanitized)]
            
        # Truncate if too long
        if len(sanitized) > 63:
            sanitized = sanitized[:63]
            
        return sanitized

    def _create_collection(self, path: str) -> None:
        """
        Create a new ChromaDB collection to store embeddings.

        Args:
            path (str): Name of the collection to create in ChromaDB.
        """
        try:
            # Create a valid collection name
            collection_name = self._sanitize_collection_name(path)
            self.chroma_db = self.client.get_or_create_collection(
                name=collection_name,
                embedding_function=self.embeddings,
                metadata={"hnsw:space": "cosine"}
            )
        except Exception as e:
            self.logger.error(f"Error creating collection: {e}")
            raise

    def convert_to_arrow_compatible(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Convert a DataFrame to an Arrow-compatible format.

        Args:
            df (pd.DataFrame): Input DataFrame.

        Returns:
            pd.DataFrame: Converted DataFrame.
        """
        for column in df.columns:
            if df[column].dtype == 'object':
                df[column] = df[column].astype(str)
            elif df[column].dtype == 'int64':
                df[column] = df[column].astype('int32')
        return df

    def get_restaurant_reviews_location(self, restaurant_name: str) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        Get the location and reviews for a specific restaurant.

        Args:
            restaurant_name (str): Name of the restaurant.

        Returns:
            tuple: DataFrames for reviews, location, and restaurant info.
        """
        try:
            with next(self.get_db()) as db:
                restaurant = db.query(models.DimRestaurant).filter(models.DimRestaurant.nom == restaurant_name).first()
                location = db.query(models.DimLocation).filter(models.DimLocation.id_location == restaurant.id_location).first()
                avis = db.query(models.FaitAvis).filter(models.FaitAvis.id_restaurant == restaurant.id_restaurant).all()
            
            avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
            location_df = pd.DataFrame([schemas.DimLocation.from_orm(location).dict()])
            restaurant_df = pd.DataFrame([schemas.DimRestaurant.from_orm(restaurant).dict()])
            
            return avis_df, location_df, restaurant_df
        except Exception as e:
            self.logger.error(f"An error occurred while fetching reviews and locations: {e}")
            return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

    def transform_restaurant_chunk(self, restaurant_name: str) -> pd.DataFrame:
        """
        Transform restaurant data and reviews into structured chunks.

        Args:
            restaurant_name (str): Name of the restaurant.

        Returns:
            pd.DataFrame: DataFrame containing the chunks.
        """
        colnames = ['restaurant', 'chunk']
        avis_df, location_df, restaurant_df = self.get_restaurant_reviews_location(restaurant_name)
        
        if restaurant_df.empty:
            return pd.DataFrame(columns=colnames)

        chunks = []
        for column in restaurant_df.columns:
            value = restaurant_df[column].iloc[0]
            chunks.append({'restaurant': restaurant_name, 'chunk': f"{column}: {value}"})

        # Include reviews as chunks
        for _, review in avis_df.iterrows():
            chunks.append({'restaurant': restaurant_name, 'chunk': f"Review: {review['review']}"})

        return pd.DataFrame(chunks, columns=colnames)

    def create_corpus(self) -> str:
        """
        Create a corpus from restaurant chunks.

        Returns:
            str: Text corpus.
        """
        corpus = ""
        for restaurant in self.get_all_restaurants_names():
            df = self.transform_restaurant_chunk(restaurant)
            corpus += " ".join(df['chunk'].values) + " "
        return corpus

    def split_text_into_chunks(self, corpus: str, chunk_size: int = 5000) -> list[str]:
        """
        Split text into chunks of specified size.

        Args:
            corpus (str): Text to split.
            chunk_size (int, optional): Size of each chunk.

        Returns:
            list[str]: List of chunks.
        """
        tokenized_corpus = enc.encode(corpus)
        chunks = [
            "".join(enc.decode(tokenized_corpus[i : i + chunk_size]))
            for i in tqdm(range(0, len(tokenized_corpus), chunk_size))
        ]
        return chunks

    def add_embeddings(self, list_chunks: list[str], batch_size: int = 100) -> None:
        """
        Add embeddings to the ChromaDB collection.

        Args:
            list_chunks (list[str]): List of chunks.
            batch_size (int, optional): Batch size.
        """
        if self.chroma_db is None:
            raise ValueError("ChromaDB collection is not initialized. Call `_create_collection` first.")
        
        if len(list_chunks) < batch_size:
            batch_size_for_chromadb = len(list_chunks)
        else:
            batch_size_for_chromadb = batch_size

        for i in tqdm(range(0, len(list_chunks), batch_size_for_chromadb)):
            batch_documents = list_chunks[i : i + batch_size_for_chromadb]
            list_ids = [str(uuid.uuid4()) for _ in batch_documents]
            self.chroma_db.add(documents=batch_documents, ids=list_ids)

    def __call__(self, *args, **kwargs):
        """
        Entry point to execute class methods.
        """
        corpus = self.create_corpus()
        # self._create_collection
        chunks = self.split_text_into_chunks(corpus)
        self._create_collection(self.path)
        self.add_embeddings(chunks)
        # pass

# Test the class
if __name__ == "__main__":
    test = BDDChunks(embedding_model="paraphrase-multilingual-MiniLM-L12-v2", path="./")
    # test = BDDChunks(embedding_model="paraphrase-xlm-r-multilingual-v1", path="./")
    test()
    print("Done")   

C:\Users\ediad\AppData\Local\Temp\ipykernel_8872\3532425305.py:152: PydanticDeprecatedSince20: The `from_orm` method is deprecated; set `model_config['from_attributes']=True` and use `model_validate` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
C:\Users\ediad\AppData\Local\Temp\ipykernel_8872\3532425305.py:152: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
C:\Users\ediad\AppData\Local\Temp\ipykernel_8872\3532425305.py:153: PydanticDeprecatedSince20: The `from_orm` method is deprecated; set `model_config['from_attributes']=True` and use `model_validate` instead. Deprecated i

Done





In [None]:
from model import models, schemas
from utils import database
import pandas as pd
import tiktoken
from tqdm import tqdm
import os
import re
import uuid
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
from chromadb.config import Settings

# Initialiser l'encodage tiktoken
enc = tiktoken.get_encoding("o200k_base")

class BDDChunks:
    """
    Une classe pour traiter des avis issus d'une base SQLite et les stocker sous forme de chunks avec embeddings.
    Chaque avis est considéré comme un chunk unique.
    """

    def __init__(self, embedding_model: str, path: str):
        """
        Initialisation de l'instance BDDChunks.

        Args:
            embedding_model (str): Nom du modèle d'embedding à utiliser.
            path (str): Chemin vers le PDF ou le dataset à traiter.
        """
        self.path = path
        self.chunks: list[str] | None = None
        self.client = chromadb.PersistentClient(
            path="./ChromaDB", settings=Settings(anonymized_telemetry=False)
        )
        self.embedding_name = embedding_model
        self.embeddings = SentenceTransformerEmbeddingFunction(
            model_name=embedding_model
        )
        self.chroma_db = None

    def get_db(self):
        """
        Fournit une session de base de données.

        Yields:
            db: Instance de session de base de données.
        """
        db = database.SessionLocal()
        try:
            yield db
        finally:
            db.close()

    def get_all_restaurants_names(self) -> list[str]:
        """
        Récupère tous les noms de restaurants depuis la base SQLite.

        Returns:
            list[str]: Liste des noms de restaurants.
        """
        try:
            with next(self.get_db()) as db:
                restaurants = db.query(models.DimRestaurant).all()
                return [r.nom for r in restaurants]
        except Exception as e:
            print(f"Une erreur s'est produite lors de la récupération des noms de restaurants : {e}")
            return []

    def _create_collection(self, path: str) -> None:
        """
        Crée une nouvelle collection ChromaDB pour stocker les embeddings.

        Args:
            path (str): Nom de la collection à créer dans ChromaDB.
        """
        # Crée un nom de collection valide
        file_name = "a" + os.path.basename(path)[0:50].strip() + "a"
        file_name = re.sub(r"\s+", "-", file_name)

        # Initialiser la collection ChromaDB
        self.chroma_db = self.client.get_or_create_collection(
            name=file_name,
            embedding_function=self.embeddings,
            metadata={"hnsw:space": "cosine"}
        )

    def convert_to_arrow_compatible(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Convertit un DataFrame au format compatible avec Arrow.

        Args:
            df (pd.DataFrame): DataFrame d'entrée.

        Returns:
            pd.DataFrame: DataFrame converti.
        """
        for column in df.columns:
            if df[column].dtype == 'object':
                df[column] = df[column].astype(str)
            elif df[column].dtype == 'int64':
                df[column] = df[column].astype('int32')
        return df

    def get_restaurant_reviews_location(self, restaurant_name: str) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        Récupère l'emplacement et les avis pour un restaurant donné.

        Args:
            restaurant_name (str): Nom du restaurant.

        Returns:
            tuple: DataFrames pour les avis, l'emplacement et les infos du restaurant.
        """
        try:
            with next(self.get_db()) as db:
                restaurant = db.query(models.DimRestaurant).filter(models.DimRestaurant.nom == restaurant_name).first()
                location = db.query(models.DimLocation).filter(models.DimLocation.id_location == restaurant.id_location).first()
                avis = db.query(models.FaitAvis).filter(models.FaitAvis.id_restaurant == restaurant.id_restaurant).all()
            
            avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
            location_df = pd.DataFrame([schemas.DimLocation.from_orm(location).dict()])
            restaurant_df = pd.DataFrame([schemas.DimRestaurant.from_orm(restaurant).dict()])
            
            return avis_df, location_df, restaurant_df
        except Exception as e:
            print(f"Une erreur s'est produite lors de la récupération des avis et emplacements : {e}")
            return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

    def transform_restaurant_chunk(self, restaurant_name: str) -> pd.DataFrame:
        """
        Transforme les données d'un restaurant et ses avis en chunks structurés.

        Args:
            restaurant_name (str): Nom du restaurant.

        Returns:
            pd.DataFrame: DataFrame contenant les chunks.
        """
        colnames = ['restaurant', 'chunk']
        avis_df, location_df, restaurant_df = self.get_restaurant_reviews_location(restaurant_name)
        
        if restaurant_df.empty:
            return pd.DataFrame(columns=colnames)

        chunks = []
        for column in restaurant_df.columns:
            value = restaurant_df[column].iloc[0]
            chunks.append({'restaurant': restaurant_name, 'chunk': f"{column}: {value}"})

        # Inclure les avis comme chunks
        for _, review in avis_df.iterrows():
            chunks.append({'restaurant': restaurant_name, 'chunk': f"Review: {review['review']}"})

        return pd.DataFrame(chunks, columns=colnames)

    def create_corpus(self) -> str:
        """
        Crée un corpus à partir des chunks des restaurants.

        Returns:
            str: Corpus textuel.
        """
        corpus = ""
        for restaurant in self.get_all_restaurants_names():
            df = self.transform_restaurant_chunk(restaurant)
            corpus += " ".join(df['chunk'].values) + " "
        return corpus

    def split_text_into_chunks(self, corpus: str, chunk_size: int = 500) -> list[str]:
        """
        Divise un texte en chunks de taille spécifiée.

        Args:
            corpus (str): Texte à diviser.
            chunk_size (int, optional): Taille de chaque chunk.

        Returns:
            list[str]: Liste des chunks.
        """
        tokenized_corpus = enc.encode(corpus)
        chunks = [
            "".join(enc.decode(tokenized_corpus[i : i + chunk_size]))
            for i in tqdm(range(0, len(tokenized_corpus), chunk_size))
        ]
        return chunks

    def add_embeddings(self, list_chunks: list[str], batch_size: int = 100) -> None:
        """
        Ajoute les embeddings à la collection ChromaDB.

        Args:
            list_chunks (list[str]): Liste des chunks.
            batch_size (int, optional): Taille du batch.
        """
        if self.chroma_db is None:
            raise ValueError("ChromaDB collection is not initialized. Call `_create_collection` first.")
        
        if len(list_chunks) < batch_size:
            batch_size_for_chromadb = len(list_chunks)
        else:
            batch_size_for_chromadb = batch_size

        for i in tqdm(range(0, len(list_chunks), batch_size_for_chromadb)):
            batch_documents = list_chunks[i : i + batch_size_for_chromadb]
            list_ids = [str(uuid.uuid4()) for _ in batch_documents]
            self.chroma_db.add(documents=batch_documents, ids=list_ids)

    def __call__(self, *args, **kwargs):
        """
        Point d'entrée pour exécuter les méthodes de la classe.
        """
        corpus = self.create_corpus()
        chunks = self.split_text_into_chunks(corpus)
        self._create_collection(self.path)
        self.add_embeddings(chunks)


# Tester la classe
if __name__ == "__main__":
    test = BDDChunks(embedding_model="paraphrase-xlm-r-multilingual-v1", path="./")
    test()


C:\Users\ediad\AppData\Local\Temp\ipykernel_8296\2322011855.py:120: PydanticDeprecatedSince20: The `from_orm` method is deprecated; set `model_config['from_attributes']=True` and use `model_validate` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
C:\Users\ediad\AppData\Local\Temp\ipykernel_8296\2322011855.py:120: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
C:\Users\ediad\AppData\Local\Temp\ipykernel_8296\2322011855.py:121: PydanticDeprecatedSince20: The `from_orm` method is deprecated; set `model_config['from_attributes']=True` and use `model_validate` instead. Deprecated i

AttributeError: 'BDDChunks' object has no attribute '_sanitize_collection_name'

In [117]:
from model import models, schemas
from utils import database
import pandas as pd
import tiktoken
from tqdm import tqdm
import os
import re
import uuid
import chromadb
import tiktoken
from tqdm import tqdm

from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
from chromadb.config import Settings

In [128]:
from model import models, schemas
from utils import database
import pandas as pd
import tiktoken
from tqdm import tqdm
import os
import re
import uuid
import chromadb
import tiktoken
from tqdm import tqdm

from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
from chromadb.config import Settings



enc = tiktoken.get_encoding("o200k_base")

class BDDChunks:
    """
    A class to process reviews from a SQLite database and store them as chunks with embeddings.
    Each review is considered a single chunk.
    """

    def __init__(self, embedding_model: str, path: str):
        """
        Initialize a BDDChunks instance.

        Args:
            embedding_model (str): The name of the embedding model to use for generating embeddings.
            path (str): The file path to the PDF or dataset to process.
        """
        self.path = path
        self.chunks: list[str] | None = None
        self.client = chromadb.PersistentClient(
            path="./ChromaDB", settings=Settings(anonymized_telemetry=False)
        )
        self.embedding_name = embedding_model
        self.embeddings = SentenceTransformerEmbeddingFunction(
            model_name=embedding_model
        )
        self.chroma_db = None
        self.db = None

    def get_db(self):
        """
        Provide a database session.

        Yields:
            db: A session instance from the database.
        """
        db = database.SessionLocal()
        try:
            yield db
        finally:
            db.close()

    def get_all_restaurants_names(self) -> list[str]:
        """
        Get all restaurant names from the SQLite database.

        Returns:
            list[str]: A list of restaurant names.
        """
        try:
            with next(self.get_db()) as db:
                restaurants = db.query(models.DimRestaurant).all()
                return [r.nom for r in restaurants]
        except Exception as e:
            print(f"An error occurred while fetching restaurant names: {e}")
            return []


    def _create_collection(self, path: str) -> None:
        """
        Create a new ChromaDB collection for storing embeddings.

        Args:
            path (str): The name of the collection to create in ChromaDB.
        """
        # Tester qu'en changeant de path, on accède pas au reste
        file_name = "a" + os.path.basename(path)[0:50].strip() + "a"
        file_name = re.sub(r"\s+", "-", file_name)
        # Expected collection name that (1) contains 3-63 characters, (2) starts and ends with an alphanumeric character, (3) otherwise contains only alphanumeric characters, underscores or hyphens (-), (4) contains no two consecutive periods (..)
        self.chroma_db = self.client.get_or_create_collection(name=file_name, embedding_function=self.embeddings, metadata={"hnsw:space": "cosine"})  # type: ignore




    def convert_to_arrow_compatible(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Convert a DataFrame to an Arrow-compatible format.
        
        Args:
            df (pd.DataFrame): Input DataFrame.
        
        Returns:
            pd.DataFrame: Converted DataFrame.
        """
        for column in df.columns:
            if df[column].dtype == 'object':
                df[column] = df[column].astype(str)
            elif df[column].dtype == 'int64':
                df[column] = df[column].astype('int32')
        return df

    def get_restaurant_reviews_location(self, restaurant_name: str) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        Get the location and reviews for a specific restaurant.

        Args:
            restaurant_name (str): The name of the restaurant.

        Returns:
            tuple: DataFrames for reviews, location, and restaurant information.
        """
        try:
            with next(self.get_db()) as db:
                restaurant = db.query(models.DimRestaurant).filter(models.DimRestaurant.nom == restaurant_name).first()
                location = db.query(models.DimLocation).filter(models.DimLocation.id_location == restaurant.id_location).first()
                avis = db.query(models.FaitAvis).filter(models.FaitAvis.id_restaurant == restaurant.id_restaurant).all()
            
            avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
            location_df = pd.DataFrame([schemas.DimLocation.from_orm(location).dict()])
            restaurant_df = pd.DataFrame([schemas.DimRestaurant.from_orm(restaurant).dict()])
            
            return avis_df, location_df, restaurant_df
        except Exception as e:
            print(f"An error occurred while fetching restaurant reviews location: {e}")
            return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

    def transform_restaurant_chunk(self, restaurant_name: str) -> pd.DataFrame:
        """
        Transform a restaurant's data and reviews into structured chunks.

        Args:
            restaurant_name (str): The name of the restaurant.

        Returns:
            pd.DataFrame: A DataFrame containing the restaurant chunks.
        """
        colnames = ['restaurant', 'chunk']
        avis_df, location_df, restaurant_df = self.get_restaurant_reviews_location(restaurant_name)
        
        if restaurant_df.empty:
            return pd.DataFrame(columns=colnames)

        chunks = []
        for column in restaurant_df.columns:
            value = restaurant_df[column].iloc[0]
            chunks.append({'restaurant': restaurant_name, 'chunk': f"{column}: {value}"})

        # Include review comments as chunks
        for _, review in avis_df.iterrows():
            chunks.append({'restaurant': restaurant_name, 'chunk': f"Review: {review['review']}"})

        return pd.DataFrame(chunks, columns=colnames)

    def create_corpus(self) -> str:
        """
        Create a corpus from the restaurant chunks.

        Returns:
            str: The text corpus.
        """
        corpus = ""
        for restaurant in self.get_all_restaurants_names():
            df = self.transform_restaurant_chunk(restaurant)
            corpus += " ".join(df['chunk'].values) + " "
        return corpus

    # def insert_into_db(self):
    #     """
    #     Insert the chunks into the SQLite database in batches for improved performance.
    #     """
    #     all_restaurants = self.get_all_restaurants_names()
    #     batch_size = 100 # Define the batch size for insertion

    #     for restaurant in tqdm(all_restaurants):
    #         #verifier si le restaurant est deja dans la table rag_avis
    #         try:
    #             with next(self.get_db()) as db:
    #                 restaurant_exists = db.query(models.RagAvis).filter(models.RagAvis.restaurantName == restaurant).first()
    #         except Exception as e:
    #             print(f"An error occurred while checking if the restaurant exists in the database: {e}")
    #             restaurant_exists = None

    #         if restaurant_exists:
    #             print(f"Restaurant {restaurant} already exists in the database.")
    #             continue
    #         df = self.transform_restaurant_chunk(restaurant)
    #         chunks = [
    #             models.RagAvis(restaurantName=row['restaurant'], review=row['chunk'])
    #             for _, row in df.iterrows()
    #         ]

    #         try:
    #             with next(self.get_db()) as db:
    #                 for i in range(0, len(chunks), batch_size):
    #                     db.bulk_save_objects(chunks[i:i + batch_size])
    #                     db.commit()
    #         except Exception as e:
    #             print(f"An error occurred while inserting chunks into the database for {restaurant}: {e}")

    def split_text_into_chunks(self, corpus: str, chunk_size: int = 500) -> list[str]:
            """
            Splits a given text corpus into chunks of a specified size.
 
            Args:
                corpus (str): The input text corpus to be split into chunks.
                chunk_size (int, optional): The size of each chunk. Defaults to 500.

            Returns:
                list[str]: A list of text chunks.
            """
            tokenized_corpus = enc.encode(corpus)
            chunks = [
                "".join(enc.decode(tokenized_corpus[i : i + chunk_size]))
                for i in tqdm(range(0, len(tokenized_corpus), chunk_size))
            ]

            return chunks
    
    def add_embeddings(self, list_chunks: list[str], batch_size: int = 100) -> None:
        if len(list_chunks) < batch_size:
            batch_size_for_chromadb = len(list_chunks)
        else:
            batch_size_for_chromadb = batch_size

        document_ids: list[str] = []

        for i in tqdm(
            range(0, len(list_chunks), batch_size_for_chromadb)
        ):  # On met en place une stratégie d'ajout par batch car ChromaDB ne supporte pas plus de 166 documents d'un coup.
            batch_documents = list_chunks[i : i + batch_size_for_chromadb]
            list_ids = [
                str(id_chunk) for id_chunk in list(range(i, i + len(batch_documents)))
            ]
            list_id_doc = [str(uuid.uuid4()) for x in list_ids]
            self.chroma_db.add(documents=batch_documents, ids=list_id_doc)  # type: ignore
            document_ids.extend(list_ids)

    def __call__(self, *args, **kwargs):
        """
        Entry point to invoke methods of the class.
        """
        corpus = self.create_corpus()
        chunks = self.split_text_into_chunks(corpus)
        # self._create_collection(self.path)
        # self.chroma_db.insert_documents(chunks)
        self.add_embeddings(chunks)



# Test the class
if __name__ == "__main__":
    test = BDDChunks( embedding_model="paraphrase-xlm-r-multilingual-v1", path="./")
    test()
    


C:\Users\ediad\AppData\Local\Temp\ipykernel_8296\2395343120.py:124: PydanticDeprecatedSince20: The `from_orm` method is deprecated; set `model_config['from_attributes']=True` and use `model_validate` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
C:\Users\ediad\AppData\Local\Temp\ipykernel_8296\2395343120.py:124: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
C:\Users\ediad\AppData\Local\Temp\ipykernel_8296\2395343120.py:125: PydanticDeprecatedSince20: The `from_orm` method is deprecated; set `model_config['from_attributes']=True` and use `model_validate` instead. Deprecated i

AttributeError: 'NoneType' object has no attribute 'add'

In [77]:
from model import models, schemas
from utils import database
import pandas as pd
import tiktoken
from tqdm import tqdm


enc = tiktoken.get_encoding("o200k_base")

class BDDChunksSQLite:
    """
    A class to process reviews from a SQLite database and store them as chunks with embeddings.
    Each review is considered a single chunk.
    """

    def __init__(self):
        """
        Initialize the BDDChunksSQLite instance.
        """
        self.db = next(self.get_db())

    def get_db(self):
        """
        Provide a database session.

        Yields:
            db: A session instance from the database.
        """
        db = database.SessionLocal()
        try:
            yield db
        finally:
            db.close()

    def get_all_restaurants_names(self) -> list[str]:
        """
        Get all restaurant names from the SQLite database.

        Returns:
            list[str]: A list of restaurant names.
        """
        try:
            with self.db as db:
                restaurants = db.query(models.DimRestaurant).all()
                return [r.nom for r in restaurants]
        except Exception as e:
            print(f"An error occurred while fetching restaurant names: {e}")
            return []

    def convert_to_arrow_compatible(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Convert a DataFrame to an Arrow-compatible format.
        
        Args:
            df (pd.DataFrame): Input DataFrame.
        
        Returns:
            pd.DataFrame: Converted DataFrame.
        """
        for column in df.columns:
            if df[column].dtype == 'object':
                df[column] = df[column].astype(str)
            elif df[column].dtype == 'int64':
                df[column] = df[column].astype('int32')
        return df

    def get_restaurant_reviews_location(self, restaurant_name: str) -> pd.DataFrame:
        """
        Get the location and reviews for a specific restaurant.

        Args:
            restaurant_name (str): The name of the restaurant.

        Returns:
            pd.DataFrame: A DataFrame containing restaurant name, location, and reviews.
        """
        try:
            with self.db as db:
                restaurant = db.query(models.DimRestaurant).filter(models.DimRestaurant.nom == restaurant_name).first()
                location = db.query(models.DimLocation).filter(models.DimLocation.id_location == restaurant.id_location).first()
                avis = db.query(models.FaitAvis).filter(models.FaitAvis.id_restaurant == restaurant.id_restaurant).all()
            
            # Convert the data to a DataFrame
            avis_df = pd.DataFrame([schemas.FaitAvis.from_orm(a).dict() for a in avis])
            location_df = pd.DataFrame([schemas.DimLocation.from_orm(location).dict()])
            restaurant_df = pd.DataFrame([schemas.DimRestaurant.from_orm(restaurant).dict()])
            
            return avis_df, location_df, restaurant_df
    
        except Exception as e:
            print(f"An error occurred while fetching restaurant reviews location: {e}")
            return pd.DataFrame()
 
    def transform_restaurant_chunk(self, restaurant_name: str) -> pd.DataFrame:
        """
        Transform a restaurant chunk into a DataFrame.

        Args:
            restaurant_name (str): The name of the restaurant.
            chunck_size (int): The size of each chunk of reviews.

        Returns:
            pd.DataFrame: A DataFrame containing the restaurant chunk.
        """
        colnames = ['restaurant', 'chunk']
        
        avis_df, location_df, restaurant_df = self.get_restaurant_reviews_location(restaurant_name)
        
        # List to hold the chunks before creating the DataFrame
        chunks = []
        #add classement classement
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' classement : '+ str(restaurant_df['classement'][0]) +' '})
        
        # horaires
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' horaires : '+ str(restaurant_df['horaires'][0]) +' '})
        
        # note_globale
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' note_globale : '+ str(restaurant_df['note_globale'][0]) +' '})
        
        # note_cuisine
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' note_cuisine : '+ str(restaurant_df['note_cuisine'][0]) +' '})
        
        # note_service
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' note_service : '+ str(restaurant_df['note_service'][0]) +' '})

        # note_rapportqualiteprix
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' note_rapportqualiteprix : '+ str(restaurant_df['note_rapportqualiteprix'][0]) +' '})

        # note_ambiance
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' note_ambiance : '+ str(restaurant_df['note_ambiance'][0]) +' '})
        
        # infos_pratiques
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' infos_pratiques : '+ str(restaurant_df['infos_pratiques'][0]) +' '})
        
        # repas
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' repas : '+ str(restaurant_df['repas'][0]) +' '})
               
        # fourchette_prix
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' fourchette_prix : '+ str(restaurant_df['fourchette_prix'][0]) +' '})

        # fonctionnalites
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' fonctionnalites : '+ str(restaurant_df['fonctionnalites'][0]) +' '})

        # type_cuisines
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' type_cuisines : '+ str(restaurant_df['type_cuisines'][0]) +' '})

        # nb_avis
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' nb_avis : '+ str(restaurant_df['nb_avis'][0]) +' '})

        # nbExcellent 
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' nbExcellent : '+ str(restaurant_df['nbExcellent'][0]) +' '})
        
        # nbTresbon 
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' nbTresbon : '+ str(restaurant_df['nbTresbon'][0]) +' '})
        
        # nbMoyen 
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' nbMoyen : '+ str(restaurant_df['nbMoyen'][0]) +' '})
        
        # nbMediocre 
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' nbMediocre : '+ str(restaurant_df['nbMediocre'][0]) +' '})
        
        # nbHorrible 
        chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name +' nbHorrible : '+ str(restaurant_df['nbHorrible'][0]) +' '})

        # Insert all avis in the dataframe with the restaurant name
        for i in range(0, len(avis_df)):
            chunks.append({'restaurant': restaurant_name, 'chunk': 'restaurant: ' + restaurant_name + 'Commentaire : ' + avis_df['review'][i]})
    
        # Create DataFrame once all chunks are gathered
        df_chunck = pd.DataFrame(chunks, columns=colnames)
        
        return df_chunck
    
    def create_corpus(self ) -> str:
        """
        Create a corpus from the restaurant chunks.

        Returns:
            str: The text corpus.
        """
        corpus = ""
        for restaurant in self.get_all_restaurants_names():
            df = self.transform_restaurant_chunk(restaurant)
            corpus += " ".join(df['chunk'].values)
        return corpus
    

    def insert_into_db(self):
        """
        Insert the chunks into the SQLite database.
        """
        all_restaurants = self.get_all_restaurants_names()
        for restaurant in tqdm(all_restaurants):
            df = self.transform_restaurant_chunk(restaurant)

            #ajouter les  lignes dans la base de données RagAvisBase
            for i in range(0, len(df)):
                # Insert the chunk into the database
                try:
                    with self.db as db:
                        ragAvis = models.RagAvisBase(restaurant=df['restaurant'][i], chunk=df['chunk'][i])
                        db.add(ragAvis)
                        db.commit()
                except Exception as e:
                    print(f"An error occurred while inserting chunk into the database: {e}")

            
        
        



    
    # def get_embedding(self, restaurantName: str) -> list[float]:
    #     restaurant_chunck = self.transform_restaurant_chunk(restaurantName, 5)
    #     # Get the embeddings for each chunk
    #     embeddings = []
    #     for chunk in restaurant_chunck['chunk']:
    #         embeddings.append(self.embedder.encode(chunk))
    #     return embeddings
    
    # def embedder(self, chunk: str) -> list[float]:
    #     """
    #     Embed a chunk using the SentenceTransformer model.

    #     Args:
    #         chunk (str): The input chunk to embed.

    #     Returns:
    #         list[float]: The embedding of the chunk.
    #     """
    #     return self.embedder.encode(chunk)
        



    def __call__(self, *args, **kwds):
        """
        Entry point to invoke methods of the class.
        """
        self.insert_into_db()
        

# Test the class
test = BDDChunksSQLite()
# # result = test.get_restaurant_reviews_location("Aromatic Restaurant")
# result = test.transform_restaurant_chunk("Aromatic Restaurant")
# result.head(5)
# #renregistrement des chunks dans un fichier csv
# result.to_csv('chunks.csv', index=False)

# # corpus = test.split_text_into_chunks(test.create_corpus())
# with open('corpus.txt', 'w') as f:
#     # f.write(corpus)
#     for chunk in corpus:
#         f.write(chunk + '\n')


In [None]:
from model import models, schemas
from utils import database
import os
import uuid
import sqlite3
from tqdm import tqdm
import pandas as pd

class BDDChunksSQLite:
    """
    A class to process reviews from a SQLite database and store them as chunks with embeddings.

    Each review is considered a single chunk.
    """

    def __init__(self):
        """
        Initialize the BDDChunksSQLite instance.
        """
        self.db = next(self.get_db())
        # pass  # Constructor is currently empty but can be extended if needed in the future.

    def get_db(self):
        """
        Provide a database session.

        Yields:
            db: A session instance from the database.
        """
        db = database.SessionLocal()
        try:
            yield db
        finally:
            db.close()

    def get_all_restaurants_names(self) -> list[str]:
        """
        Get all restaurant names from the SQLite database.

        Returns:
            list[str]: A list of restaurant names.
        """
        try:
            with self.db as db: 
                restaurants = db.query(models.DimRestaurant).all()
                restaurant_names = [r.nom for r in restaurants]
            return restaurant_names
        except Exception as e:
            print(f"An error occurred while fetching restaurant names: {e}")
            return []


    def convert_to_arrow_compatible(df: pd.DataFrame) -> pd.DataFrame:
        for column in df.columns:
            if df[column].dtype == 'object':
                df[column] = df[column].astype(str)
            elif df[column].dtype == 'int64':
                df[column] = df[column].astype('int32')
        return df

    def get_restaurant_reviews_location(self, restaurant_name: str) -> list[str]:
        """
        Get the location of reviews for a specific restaurant.

        Args:
            restaurant_name (str): The name of the restaurant.

        Returns:
            list[str]: A list of review locations.
        """
        try:
            with self.db as db:
                # restaurant = db.query(models.DimRestaurant, models.DimLocation, models.).join.filter(models.DimRestaurant.nom == restaurant_name).all()
                join_data = db.query(models.DimRestaurant, models.DimLocation, models.FaitAvis) \
                    .join(models.DimLocation, models.DimRestaurant.id_location == models.DimLocation.id_location) \
                    .join(models.FaitAvis, models.DimRestaurant.id_restaurant == models.FaitAvis.id_restaurant) \
                    .filter(models.DimRestaurant.nom == restaurant_name).all()
                
            # restaurant = pd.DataFrame([schemas.DimRestaurant.from_orm(r).dict() for r in restaurant])

            # join_data = pd.DataFrame([r.__dict__ for r in join_data])
            data = pd.DataFrame(
                [
                    {
                        "restaurant": r[0].nom,
                        "location": r[1].nom,
                        "avis": r[2].avis,
                    }
                    for  restaurant , location  , avis in join_data
                ]
            )
            # restaurant_df = pd.DataFrame([r.__dict__ for r in restaurant])
            # location_df = pd.DataFrame([l.__dict__ for l in location])
            # avis_df = pd.DataFrame([a.__dict__ for a in avis])


            #transformation  of the result to a list of strings
            
            # return convert_to_arrow_compatible(restaurant)
            return join_data
        except Exception as e:
            print(f"An error occurred while fetching restaurant reviews location: {e}")
            return []   
    
    def __call__(self, *args, **kwds):
        self.get_all_restaurants_names()
        pass     
test = BDDChunksSQLite()
test.get_restaurant_reviews_location("Aromatic Restaurant")
# print(test.get_all_restaurants_names())


An error occurred while fetching restaurant reviews location: 'DimLocation' object has no attribute 'nom'


[]

In [10]:
from model import models, schemas
from utils import database
import os
import uuid
import sqlite3
from tqdm import tqdm



class BDDChunksSQLite:
    """
    A class to process reviews from a SQLite database and store them as chunks with embeddings.
    
    Each review is considered a single chunk.
    """

    def __init__(self ):
        """
        Initialize the BDDChunksSQLite instance.

        Args:
            sqlite_db_path (str): Path to the SQLite database file.
            reviews_table (str): Name of the table containing reviews and restaurant information.
            embeddings_table (str): Name of the table where embeddings will be stored.
        """
 
        # self.db = self.get_db()

    # Database dependency
    def get_db(self):
        db = database.SessionLocal()
        try:
            yield db
        finally:
            db.close()
    
    def get_all_restaurants_names(self) -> list[str]:
        """
        Get all restaurant names from the SQLite database.

        Returns:
            list[str]: A list of restaurant names.
        """

        with next(self.get_db()) as db:
            restaurants = db.query(models.Restaurant).all()
        # restaurants_name = [r.nom for r in restaurants]
        return restaurants
    
    # def  get_restaurant_location_reviews(self, restaurant_name: str) -> list[schemas.Review]:
    #     """
    #     Get all reviews for a given restaurant.

    #     Args:
    #         restaurant_name (str): The name of the restaurant.

    #     Returns:
    #         list[schemas.Review]: A list of reviews for the restaurant.
    #     """
    #     with self.get_db() as db:
    #         reviews = db.query(models.Review).filter(models.Review.restaurant_name == restaurant_name).all()
    #     return reviews


    # def fetch_reviews_from_db(self) -> list[tuple[str, str]]:
    #     """
    #     Fetch reviews and their associated restaurant names from the SQLite database.

    #     Returns:
    #         list[tuple[str, str]]: A list of tuples containing restaurant names and reviews.
    #     """
    #     data = []
    #     # conn = sqlite3.connect(self.sqlite_db_path)
    #     # cursor = conn.cursor()
    #     # cursor.execute(f"SELECT restaurant_name, review FROM {self.reviews_table};")
    #     # data = cursor.fetchall()
    #     # conn.close()
    #     with self.get_db() as db:
    #         data = db.query(models.Review).all()
    #         data = [(d.restaurant_name, d.review) for d in data]

    #     return data

    # def generate_fake_embedding(self, text: str) -> list[float]:
    #     """
    #     Generate a fake embedding for a given text. Replace this with your embedding logic.

    #     Args:
    #         text (str): The text for which to generate an embedding.

    #     Returns:
    #         list[float]: A list representing the embedding vector.
    #     """
    #     # Example: Return the length of each word in the text as a fake embedding.
    #     return [len(word) for word in text.split()]

    # def store_chunk_in_db(self, restaurant_name: str, chunk: str, embedding: list[float]) -> None:
    #     """
    #     Store a chunk and its embedding in the SQLite database.

    #     Args:
    #         restaurant_name (str): The name of the associated restaurant.
    #         chunk (str): The text chunk (in this case, the full review).
    #         embedding (list[float]): The embedding vector for the chunk.
    #     """
    #     conn = sqlite3.connect(self.sqlite_db_path)
    #     cursor = conn.cursor()

    #     # Ensure the embeddings table exists
    #     cursor.execute(f"""
    #     CREATE TABLE IF NOT EXISTS {self.embeddings_table} (
    #         id TEXT PRIMARY KEY,
    #         restaurant_name TEXT,
    #         chunk TEXT,
    #         embedding TEXT
    #     );
    #     """)

    #     embedding_str = ",".join(map(str, embedding))
    #     cursor.execute(
    #         f"INSERT INTO {self.embeddings_table} (id, restaurant_name, chunk, embedding) VALUES (?, ?, ?, ?);",
    #         (str(uuid.uuid4()), restaurant_name, chunk, embedding_str),
    #     )
    #     conn.commit()
    #     conn.close()

    # def process_reviews(self) -> None:
    #     """
    #     Process each review as a single chunk, generate embeddings, and store them in the database.

    #     This method:
    #     1. Fetches reviews and restaurant names from the SQLite database.
    #     2. Generates embeddings for each review.
    #     3. Stores the reviews and embeddings in the SQLite database.
    #     """
    #     data = self.fetch_reviews_from_db()

    #     for restaurant_name, review in tqdm(data, desc="Processing Reviews"):
    #         embedding = self.generate_fake_embedding(review)  # Generate embedding for the review (chunk)
    #         self.store_chunk_in_db(restaurant_name, review, embedding)



bdd_chunks = BDDChunksSQLite()
restaurants = bdd_chunks.get_all_restaurants_names()
restaurants
# bdd_chunks.process_reviews()


AttributeError: module 'model.models' has no attribute 'Restaurant'

In [2]:
from model import models, schemas
from utils import database
import os
import uuid
import sqlite3
from tqdm import tqdm



class BDDChunksSQLite:
    """
    A class to process reviews from a SQLite database and store them as chunks with embeddings.
    
    Each review is considered a single chunk.
    """

    def __init__(self ):
        """
        Initialize the BDDChunksSQLite instance.

        Args:
            sqlite_db_path (str): Path to the SQLite database file.
            reviews_table (str): Name of the table containing reviews and restaurant information.
            embeddings_table (str): Name of the table where embeddings will be stored.
        """
 
        self.db = self.get_db()

    # Database dependency
    def get_db():
        db = database.SessionLocal()
        try:
            yield db
        finally:
            db.close()
    

    def fetch_reviews_from_db(self) -> list[tuple[str, str]]:
        """
        Fetch reviews and their associated restaurant names from the SQLite database.

        Returns:
            list[tuple[str, str]]: A list of tuples containing restaurant names and reviews.
        """
        conn = sqlite3.connect(self.sqlite_db_path)
        cursor = conn.cursor()
        query = f"SELECT restaurant_name, review FROM {self.reviews_table};"
        cursor.execute(query)
        data = cursor.fetchall()
        conn.close()
        return data

    def generate_fake_embedding(self, text: str) -> list[float]:
        """
        Generate a fake embedding for a given text. Replace this with your embedding logic.

        Args:
            text (str): The text for which to generate an embedding.

        Returns:
            list[float]: A list representing the embedding vector.
        """
        # Example: Return the length of each word in the text as a fake embedding.
        return [len(word) for word in text.split()]

    def store_chunk_in_db(self, restaurant_name: str, chunk: str, embedding: list[float]) -> None:
        """
        Store a chunk and its embedding in the SQLite database.

        Args:
            restaurant_name (str): The name of the associated restaurant.
            chunk (str): The text chunk (in this case, the full review).
            embedding (list[float]): The embedding vector for the chunk.
        """
        conn = sqlite3.connect(self.sqlite_db_path)
        cursor = conn.cursor()

        # Ensure the embeddings table exists
        cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {self.embeddings_table} (
            id TEXT PRIMARY KEY,
            restaurant_name TEXT,
            chunk TEXT,
            embedding TEXT
        );
        """)

        embedding_str = ",".join(map(str, embedding))
        cursor.execute(
            f"INSERT INTO {self.embeddings_table} (id, restaurant_name, chunk, embedding) VALUES (?, ?, ?, ?);",
            (str(uuid.uuid4()), restaurant_name, chunk, embedding_str),
        )
        conn.commit()
        conn.close()

    def process_reviews(self) -> None:
        """
        Process each review as a single chunk, generate embeddings, and store them in the database.

        This method:
        1. Fetches reviews and restaurant names from the SQLite database.
        2. Generates embeddings for each review.
        3. Stores the reviews and embeddings in the SQLite database.
        """
        data = self.fetch_reviews_from_db()

        for restaurant_name, review in tqdm(data, desc="Processing Reviews"):
            embedding = self.generate_fake_embedding(review)  # Generate embedding for the review (chunk)
            self.store_chunk_in_db(restaurant_name, review, embedding)


# Example usage
sqlite_db_path = "path/to/your/database.sqlite"  # Path to your SQLite DB
reviews_table = "reviews"  # Table containing reviews
embeddings_table = "embeddings"  # Table to store embeddings

bdd_chunks = BDDChunksSQLite(sqlite_db_path, reviews_table, embeddings_table)
bdd_chunks.process_reviews()


In [1]:
import os
import uuid
import sqlite3
from tqdm import tqdm


class BDDChunksSQLite:
    """
    A class to process reviews from a SQLite database and store them as chunks with embeddings.
    
    Each review is considered a single chunk.
    """

    def __init__(self, sqlite_db_path: str, reviews_table: str, embeddings_table: str):
        """
        Initialize the BDDChunksSQLite instance.

        Args:
            sqlite_db_path (str): Path to the SQLite database file.
            reviews_table (str): Name of the table containing reviews and restaurant information.
            embeddings_table (str): Name of the table where embeddings will be stored.
        """
        self.sqlite_db_path = sqlite_db_path
        self.reviews_table = reviews_table
        self.embeddings_table = embeddings_table

    def fetch_reviews_from_db(self) -> list[tuple[str, str]]:
        """
        Fetch reviews and their associated restaurant names from the SQLite database.

        Returns:
            list[tuple[str, str]]: A list of tuples containing restaurant names and reviews.
        """
        conn = sqlite3.connect(self.sqlite_db_path)
        cursor = conn.cursor()
        query = f"SELECT restaurant_name, review FROM {self.reviews_table};"
        cursor.execute(query)
        data = cursor.fetchall()
        conn.close()
        return data

    def generate_fake_embedding(self, text: str) -> list[float]:
        """
        Generate a fake embedding for a given text. Replace this with your embedding logic.

        Args:
            text (str): The text for which to generate an embedding.

        Returns:
            list[float]: A list representing the embedding vector.
        """
        # Example: Return the length of each word in the text as a fake embedding.
        return [len(word) for word in text.split()]

    def store_chunk_in_db(self, restaurant_name: str, chunk: str, embedding: list[float]) -> None:
        """
        Store a chunk and its embedding in the SQLite database.

        Args:
            restaurant_name (str): The name of the associated restaurant.
            chunk (str): The text chunk (in this case, the full review).
            embedding (list[float]): The embedding vector for the chunk.
        """
        conn = sqlite3.connect(self.sqlite_db_path)
        cursor = conn.cursor()

        # Ensure the embeddings table exists
        cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {self.embeddings_table} (
            id TEXT PRIMARY KEY,
            restaurant_name TEXT,
            chunk TEXT,
            embedding TEXT
        );
        """)

        embedding_str = ",".join(map(str, embedding))
        cursor.execute(
            f"INSERT INTO {self.embeddings_table} (id, restaurant_name, chunk, embedding) VALUES (?, ?, ?, ?);",
            (str(uuid.uuid4()), restaurant_name, chunk, embedding_str),
        )
        conn.commit()
        conn.close()

    def process_reviews(self) -> None:
        """
        Process each review as a single chunk, generate embeddings, and store them in the database.

        This method:
        1. Fetches reviews and restaurant names from the SQLite database.
        2. Generates embeddings for each review.
        3. Stores the reviews and embeddings in the SQLite database.
        """
        data = self.fetch_reviews_from_db()

        for restaurant_name, review in tqdm(data, desc="Processing Reviews"):
            embedding = self.generate_fake_embedding(review)  # Generate embedding for the review (chunk)
            self.store_chunk_in_db(restaurant_name, review, embedding)


# Example usage
sqlite_db_path = "path/to/your/database.sqlite"  # Path to your SQLite DB
reviews_table = "reviews"  # Table containing reviews
embeddings_table = "embeddings"  # Table to store embeddings

bdd_chunks = BDDChunksSQLite(sqlite_db_path, reviews_table, embeddings_table)
bdd_chunks.process_reviews()


OperationalError: unable to open database file