In [None]:
import os

In [None]:
os.getcwd()

In [None]:
os.chdir("..")

In [None]:
os.getcwd()

# constants

In [None]:
from customer_support.utils import load_yaml
from dataclasses import dataclass
from dotenv import load_dotenv
import os


load_dotenv()
CONFIG=load_yaml("config/config.yaml")

@dataclass
class DataIngestionConstants:
    GOOGLE_EMBEDDING_MODEL_NAME=CONFIG.MODELS.EMBEDDING.GOOGLE
    HUGGINGFACE_EMBEDDING_MODEL_NAME=CONFIG.MODELS.EMBEDDING.HUGGINGFACE
    DATA_DIR_NAME=CONFIG.DATA_INGESTION.DATA_DIR_NAME
    DATA_FILE_NAME=CONFIG.DATA_INGESTION.DATA_FILE_NAME
    MONGODB_DATABASE_NAME=CONFIG.DATA_INGESTION.MONGODB_DATABASE_NAME
    MONGODB_COLLECTION_NAME=CONFIG.DATA_INGESTION.MONGODB_COLLECTION_NAME
    ASTRADB_KEYSPACE_NAME=CONFIG.DATA_INGESTION.ASTRADB_KEYSPACE_NAME
    ASTRADB_COLLECTION_NAME=CONFIG.DATA_INGESTION.ASTRADB_COLLECTION_NAME
    ASTRADB_K_VALUE=int(CONFIG.DATA_INGESTION.ASTRADB_K_VALUE)
    ASTRADB_ENDPOINT=os.getenv("ASTRADB_ENDPOINT")
    ASTRADB_TOKEN=os.getenv("ASTRADB_TOKEN")
    MONGODB_URI=os.getenv("MONGODB_URI")
    GROQ_API_KEY=os.getenv("GROQ_API_KEY")
    GOOGLE_API_KEY=os.getenv("GOOGLE_API_KEY")
    HF_TOKEN=os.getenv("HF_TOKEN")

In [None]:
print(f"GOOGLE_EMBEDDING_MODEL_NAME:{DataIngestionConstants.GOOGLE_EMBEDDING_MODEL_NAME}")
print(f"HUGGINGFACE_EMBEDDING_MODEL_NAME:{DataIngestionConstants.HUGGINGFACE_EMBEDDING_MODEL_NAME}")
print(f"DATA_DIR_NAME:{DataIngestionConstants.DATA_DIR_NAME}")
print(f"DATA_FILE_NAME:{DataIngestionConstants.DATA_FILE_NAME}")
print(f"MONGODB_DATABASE_NAME:{DataIngestionConstants.MONGODB_DATABASE_NAME}")
print(f"MONGODB_COLLECTION_NAME:{DataIngestionConstants.MONGODB_COLLECTION_NAME}")
print(f"ASTRADB_KEYSPACE_NAME:{DataIngestionConstants.ASTRADB_KEYSPACE_NAME}")
print(f"ASTRADB_COLLECTION_NAME:{DataIngestionConstants.ASTRADB_COLLECTION_NAME}")
print(f"ASTRADB_K_VALUE:{DataIngestionConstants.ASTRADB_K_VALUE}")
print(f"ASTRADB_ENDPOINT:{DataIngestionConstants.ASTRADB_ENDPOINT}")
print(f"ASTRADB_TOKEN:{DataIngestionConstants.ASTRADB_TOKEN}")
print(f"MONGODB_URI:{DataIngestionConstants.MONGODB_URI}")
print(f"GROQ_API_KEY:{DataIngestionConstants.GROQ_API_KEY}")
print(f"GOOGLE_API_KEY:{DataIngestionConstants.GOOGLE_API_KEY}")
print(f"HF_TOKEN:{DataIngestionConstants.HF_TOKEN}")

# entity

In [None]:
from dataclasses import dataclass
from pathlib import Path


@dataclass
class DataIngestion:
    GOOGLE_EMBEDDING_MODEL_NAME:str
    HUGGINGFACE_EMBEDDING_MODEL_NAME:str
    DATA_DIR_PATH:Path
    DATA_FILE_PATH:str
    MONGODB_DATABASE_NAME:str
    MONGODB_COLLECTION_NAME:str
    ASTRADB_KEYSPACE_NAME:str
    ASTRADB_COLLECTION_NAME:str
    ASTRADB_K_VALUE:int
    ASTRADB_ENDPOINT:str
    ASTRADB_TOKEN:str
    MONGODB_URI:str
    GROQ_API_KEY:str
    GOOGLE_API_KEY:str
    HF_TOKEN:str

# configuration

In [None]:
from dataclasses import dataclass
from pathlib import Path
import os


@dataclass
class DataIngestionConfig:
    GOOGLE_EMBEDDING_MODEL_NAME=DataIngestionConstants.GOOGLE_EMBEDDING_MODEL_NAME
    HUGGINGFACE_EMBEDDING_MODEL_NAME=DataIngestionConstants.HUGGINGFACE_EMBEDDING_MODEL_NAME
    DATA_DIR_PATH=Path(DataIngestionConstants.DATA_DIR_NAME)
    DATA_FILE_PATH=os.path.join(DATA_DIR_PATH, DataIngestionConstants.DATA_FILE_NAME)
    MONGODB_DATABASE_NAME=DataIngestionConstants.MONGODB_DATABASE_NAME
    MONGODB_COLLECTION_NAME=DataIngestionConstants.MONGODB_COLLECTION_NAME
    ASTRADB_KEYSPACE_NAME=DataIngestionConstants.ASTRADB_KEYSPACE_NAME
    ASTRADB_COLLECTION_NAME=DataIngestionConstants.ASTRADB_COLLECTION_NAME
    ASTRADB_K_VALUE=DataIngestionConstants.ASTRADB_K_VALUE
    ASTRADB_ENDPOINT=DataIngestionConstants.ASTRADB_ENDPOINT
    ASTRADB_TOKEN=DataIngestionConstants.ASTRADB_TOKEN
    MONGODB_URI=DataIngestionConstants.MONGODB_URI
    GROQ_API_KEY=DataIngestionConstants.GROQ_API_KEY
    GOOGLE_API_KEY=DataIngestionConstants.GOOGLE_API_KEY
    HF_TOKEN=DataIngestionConstants.HF_TOKEN

In [None]:
print(f"GOOGLE_EMBEDDING_MODEL_NAME:{DataIngestionConfig.GOOGLE_EMBEDDING_MODEL_NAME}")
print(f"HUGGINGFACE_EMBEDDING_MODEL_NAME:{DataIngestionConfig.HUGGINGFACE_EMBEDDING_MODEL_NAME}")
print(f"DATA_DIR_PATH:{DataIngestionConfig.DATA_DIR_PATH}")
print(f"DATA_FILE_PATH:{DataIngestionConfig.DATA_FILE_PATH}")
print(f"MONGODB_DATABASE_NAME:{DataIngestionConfig.MONGODB_DATABASE_NAME}")
print(f"MONGODB_COLLECTION_NAME:{DataIngestionConfig.MONGODB_COLLECTION_NAME}")
print(f"ASTRADB_KEYSPACE_NAME:{DataIngestionConfig.ASTRADB_KEYSPACE_NAME}")
print(f"ASTRADB_COLLECTION_NAME:{DataIngestionConfig.ASTRADB_COLLECTION_NAME}")
print(f"ASTRADB_K_VALUE:{DataIngestionConfig.ASTRADB_K_VALUE}")
print(f"ASTRADB_ENDPOINT:{DataIngestionConfig.ASTRADB_ENDPOINT}")
print(f"ASTRADB_TOKEN:{DataIngestionConfig.ASTRADB_TOKEN}")
print(f"MONGODB_URI:{DataIngestionConfig.MONGODB_URI}")
print(f"GROQ_API_KEY:{DataIngestionConfig.GROQ_API_KEY}")
print(f"GOOGLE_API_KEY:{DataIngestionConfig.GOOGLE_API_KEY}")
print(f"HF_TOKEN:{DataIngestionConfig.HF_TOKEN}")

# components

In [None]:
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from customer_support.utils import create_dirs, load_yaml
from langchain_huggingface import HuggingFaceEmbeddings
from customer_support.exception import CustomException
from langchain_astradb import AstraDBVectorStore
from langchain_core.documents import Document
from customer_support.logger import logging
from dataclasses import dataclass
from pymongo import MongoClient
from box import ConfigBox
import pandas as pd
import os, sys


@dataclass
class DataIngestionComponents:
    __data_ingestion_config:DataIngestion

    @staticmethod
    def collection_to_dataframe(collection)->pd.DataFrame:
        """converts mongodb collection into pandas dataframe

        Args:
            collection (mongodb collection): collection which needs to convert into dataframe

        Returns:
            pandas.DataFrame
        """
        try:
            logging.info("In collection_to_dataframe")

            # collection mongodb collection 
            df = pd.DataFrame(collection.find())
            logging.info("data successfully converted (mongodb collection ===> pandas DataFrame)")

            # converting mongodb collection into pandas dataframe
            df = df.drop("_id", axis = 1)

            logging.info("Out collection_to_dataframe")
            return df
        except Exception as e:
            logging.exception(e)
            raise CustomException(e, sys)
        
    @staticmethod
    def validate_data(data:pd.DataFrame, old_schema:ConfigBox)->bool:
        """validates the data based on provided schema

        Args:
            data (pd.DataFrame): dataframe object for validation
            schema (ConfigBox): configbox object as schema for validation

        Returns:
            bool: True if schema matches else False
        """
        try:
            logging.info("In validate_data")

            # create required vars
            schema = dict()
            columns_with_dtype = dict()
            numerical_columns = list()

            for col in data.columns:
                columns_with_dtype[col] = str(data[col].dtype)
                if data[col].dtype!="O":
                    numerical_columns.append(col)

            schema["columns"] = columns_with_dtype
            schema["numerical_columns"] = numerical_columns

            logging.info("regenerated schema")

            new_schema=ConfigBox(schema)
            status= True if old_schema==new_schema else False
            logging.info(f"validation status:{status}")

            logging.info("Out validate_data")
            return status
        except Exception as e:
            logging.exception(e)
            raise CustomException(e, sys)

    def collect_data(self)->None:
        """collects data from data base and saves locally
        """
        try:
            logging.info("In collect_data")

            # create required dirs
            create_dirs(self.__data_ingestion_config.DATA_DIR_PATH)

            # creating environment variable HUGGINGFACE_API_TOKEN
            os.environ["HUGGINGFACE_API_TOKEN"] = self.__data_ingestion_config.HF_TOKEN
            
            # connecting to mongodb
            MONGODB_URI = self.__data_ingestion_config.MONGODB_URI
            client = MongoClient(MONGODB_URI)
            client.admin.command('ping')
            logging.info("You are successfully connected to MongoDB!")

            database_name = self.__data_ingestion_config.MONGODB_DATABASE_NAME
            collection_name = self.__data_ingestion_config.MONGODB_COLLECTION_NAME
            
            # collection mongodb collection
            collection = client[database_name][collection_name]

            # converting mongodb collection into pandas dataframe
            self.data_frame = self.collection_to_dataframe(collection)
            logging.info(f"collected data from mongodb, DATABASE: {database_name} and COLLECTION: {collection_name}")
            
            # saving data into local file path
            file_path = self.__data_ingestion_config.DATA_FILE_PATH
            self.data_frame.to_csv(file_path, index=False, header=True)
            logging.info(f"Data saved at {file_path}")

            logging.info("Out collect_data")
        except Exception as e:
            logging.exception(e)
            raise CustomException(e, sys)
        
    def load_embedding_model(self)->None:
        """loads the embedding model
        """
        try:
            logging.info("In load_embeddings")
            try:
                model_name = self.__data_ingestion_config.GOOGLE_EMBEDDING_MODEL_NAME
                self.embeddings = GoogleGenerativeAIEmbeddings(model=model_name)
            except:
                model_name = self.__data_ingestion_config.HUGGINGFACE_EMBEDDING_MODEL_NAME
                self.embeddings = HuggingFaceEmbeddings(
                    model_name = model_name, 
                    model_kwargs = {"device": "cpu"}, 
                    encode_kwargs = {"normalize_embeddings": True}
                )
            logging.info(f"loaded embedding model {{{model_name}}}")
            logging.info("Out load_embeddings")
        except Exception as e:
            logging.exception(e)
            raise CustomException(e, sys)
        
    def format_data_for_insertion(self)->list[Document]:
        """converts data into required format for insertion in vector store
        """
        try:
            logging.info("In format_data_for_insertion")

            docs = []
            columns = list(self.data_frame.columns)
            columns.remove("review")
            for _, row in self.data_frame.iterrows():
                metadata = {col:row[col] for col in columns}
                doc = Document(page_content=row["review"], metadata=metadata)
                docs.append(doc)
            logging.info(f"total number of records formated {{{len(docs)}}}")
                
            logging.info("Out format_data_for_insertion")
            return docs
        except Exception as e:
            logging.exception(e)
            raise

    def get_vector_store(self)->AstraDBVectorStore:
        """returns dict of astradb config
        """
        try:
            logging.info("In get_vector_store")

            vstore = AstraDBVectorStore(
                collection_name=self.__data_ingestion_config.ASTRADB_COLLECTION_NAME,
                embedding=self.embeddings,
                api_endpoint=self.__data_ingestion_config.ASTRADB_ENDPOINT,
                token=self.__data_ingestion_config.ASTRADB_TOKEN,
                namespace=self.__data_ingestion_config.ASTRADB_KEYSPACE_NAME,
            )
            logging.info("Out get_vector_store")
            return vstore
        except Exception as e:
            logging.exception(e)
            raise
        
    def insert_data(self)->int:
        """Inserts data into vectorstore
        """
        try:
            logging.info("In insert_data")

            # create vector store instance
            self.vector_store = self.get_vector_store()

            # format the data into list of document object
            docs = self.format_data_for_insertion()

            # insert documents into vector store
            total_records_inserted = len(self.vector_store.add_documents(documents = docs))
            logging.info(f"total number of records inserted in vectorstore {{{total_records_inserted}}}")
            
            logging.info("Out insert_data")
            return total_records_inserted
        except Exception as e:
            logging.exception(e)
            raise CustomException(e, sys)
        
    def main(self):
        self.collect_data()
        schema=load_yaml("schema/schema.yaml")
        status=self.validate_data(self.data_frame, schema)
        if status:
            self.load_embedding_model()
            self.insert_data()
        else:
            logging.info(f"collected data and provided schema given {{status:'\{status}'\}}, skipping the upcomming data ingestion.")

# pipeline

In [None]:
from customer_support.logger import logging
from dataclasses import dataclass


@dataclass 
class DataIngestionPipeline:

    def run(self)->dict:
        """runs the full pipeline of training

        Returns:
            dict: output of all initialized objects as a single dict 
        """
        logging.info("In TrainingPipeline")

        # data ingestion
        data_ingestion=DataIngestionComponents(DataIngestionConfig)
        data_ingestion.main()
        retriever = data_ingestion.get_vector_store().as_retriever()


        logging.info("Out TrainingPipeline")
        return {
            "retriever":retriever
        }

# main

In [None]:
if __name__ == "__main__":
    training_pipeline = DataIngestionPipeline()
    training_outputs = training_pipeline.run()
    retriever = training_outputs["retriever"]
    query = "Can you tell me the low budget headphone?"
    k=DataIngestionConfig.ASTRADB_K_VALUE
    results=retriever.invoke(query, k=k)
    for res in results:
        print(res)