In [119]:
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import transformers
import torch

from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from llama_index.embeddings.langchain import LangchainEmbedding
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, Settings, StorageContext, load_index_from_storage
from llama_index.vector_stores.faiss import FaissVectorStore
from llama_index.llms.huggingface import HuggingFaceLLM
from llama_index.core import Settings
import warnings
import faiss

import os
import gc
warnings.filterwarnings('ignore')

In [127]:
class RagLlamaFAISS:
    def __init__(self,
                 context_window=4096,
                 max_new_tokens=64,
                 generate_kwargs={"temperature": 0.0, "do_sample": False},
                 tokenizer_name="meta-llama/Llama-2-7b-chat-hf",
                 model_name="meta-llama/Llama-2-7b-chat-hf",
                 device_map="auto",
                 model_kwargs={"torch_dtype": torch.float16, "load_in_8bit": True},
                 similarity_top_k=3,
                 persist_dir="storage",
                 faiss_index_file="faiss_index.idx",
                 system_prompt="""You are a human being that is trying to converse with an Alzheimer's patient. Use the memories in the data and respond naturally."""):

        self.context_window = context_window
        self.max_new_tokens = max_new_tokens
        self.generate_kwargs = generate_kwargs
        self.system_prompt = system_prompt
        self.tokenizer_name = tokenizer_name
        self.model_name = model_name
        self.device_map = device_map
        self.model_kwargs = model_kwargs
        self.similarity_top_k = similarity_top_k
        self.persist_dir = persist_dir
        self.faiss_index_file = faiss_index_file

        self.query_engine = None
        self.documents = None
        self.index = None
        self.llm = None
        self.embed_model = None
        self.vector_store = None

    def load_model(self):
        self.llm = HuggingFaceLLM(
            context_window=self.context_window,
            max_new_tokens=self.max_new_tokens,
            generate_kwargs=self.generate_kwargs,
            system_prompt=self.system_prompt,
            tokenizer_name=self.tokenizer_name,
            model_name=self.model_name,
            device_map=self.device_map,
            model_kwargs=self.model_kwargs,
        )

    def load_data(self, data_path="./data"):
        self.documents = SimpleDirectoryReader(data_path).load_data()
        if not self.documents:
            raise ValueError("No documents found at specified path.")
        print(f"Loaded {len(self.documents)} documents.")

    def prepare(self, embedding_model="sentence-transformers/all-mpnet-base-v2", data_path="./data"):
        self.load_model()
        self.load_data(data_path)
    
        hf_embed_model = HuggingFaceEmbeddings(model_name=embedding_model)
        self.embed_model = LangchainEmbedding(hf_embed_model)
    
        Settings.llm = self.llm
        Settings.embed_model = self.embed_model
        Settings.chunk_size = 1024
    
        if not os.path.exists(self.persist_dir):
            os.makedirs(self.persist_dir)
    
        # Check if persisted index exists
        storage_context = StorageContext.from_defaults(persist_dir=self.persist_dir)
    
        # Try loading index from storage
        try:
            self.index = load_index_from_storage(storage_context)
            print("Loaded existing FAISS index from disk...")
        except Exception:
            print("Building new FAISS index from documents...")
    
            # Create FAISS index inside FaissVectorStore automatically
            self.index = VectorStoreIndex.from_documents(
                self.documents,
                storage_context=storage_context
            )
            # Persist the storage context, which includes the vector store & index files
            storage_context.persist()
    
        self.query_engine = self.index.as_query_engine(similarity_top_k=self.similarity_top_k)

    def call(self, query: str):
        if self.query_engine is None:
            raise RuntimeError("Model not prepared. Call `prepare()` first.")
        print(f"Querying: {query}")
        response = self.query_engine.query(query)
        return str(response)


In [121]:
# class Rag_Llama:
#     def __init__(self,
#                 context_window=4096,
#                 max_new_tokens=256,
#                 generate_kwargs={"temperature": 0.0, "do_sample": False},
#                 system_prompt="""""",
#                 tokenizer_name="meta-llama/Llama-2-7b-chat-hf",
#                 model_name="meta-llama/Llama-2-7b-chat-hf",
#                 device_map="cuda:0",
#                 model_kwargs={"torch_dtype": torch.float16 , "load_in_8bit":True}):
        
#         self.context_window= context_window
#         self.max_new_tokens= max_new_tokens
#         self.generate_kwargs= generate_kwargs
#         self.system_prompt=system_prompt
#         # query_wrapper_prompt=query_wrapper_prompt,
#         self.tokenizer_name= tokenizer_name
#         self.model_name= model_name
#         self.device_map= device_map
#         # uncomment this if using CUDA to reduce memory usage
#         self.model_kwargs= model_kwargs
        
#         self.query_engine = None
#         self.documents = None
#         self.index = None
#         self.llm = None  # to be loaded later
#         self.embed_model = None
#         self.system_prompt = """
#                 You are a human being that is trying to converse with an 
#                 Alzheimer's patient. 
#                 Use the memories in the data and respond naturally.
#                 """

#     def load_model(self):
#         self.llm = HuggingFaceLLM(
#                 context_window= self.context_window,
#                 max_new_tokens= self.max_new_tokens,
#                 generate_kwargs= self.generate_kwargs,
#                 system_prompt= self.system_prompt,
#                 # query_wrapper_prompt=query_wrapper_prompt,
#                 tokenizer_name= self.tokenizer_name,
#                 model_name= self.model_name,
#                 device_map= self.device_map,
#                 # uncomment this if using CUDA to reduce memory usage
#                 model_kwargs= self.model_kwargs,
#                 # llm_int8_enable_fp32_cpu_offload=True
#             )

#     def load_data(self, data_path = "./data"):
#         # try:
#         self.documents=SimpleDirectoryReader("./data").load_data()
#         if self.documents:
#             print("Documents Loaded")
#         else:
#             print("No Documents found, please check the path or the document format \n The document format must be in pdf")
#         # except:
#         #     print("Error in loading document, Simple Directory Error")

#     def prepare(self, embedding_model="sentence-transformers/all-mpnet-base-v2", data_path="./data"):
#         """Load model, data, embeddings, and prepare index."""
#         self.load_model()
#         self.load_data(data_path)

#         self.embed_model = LangchainEmbedding(
#             HuggingFaceEmbeddings(model_name=embedding_model)
#         )

#         Settings.llm = self.llm
#         Settings.embed_model = self.embed_model
#         Settings.chunk_size = 1024

#         self.index = VectorStoreIndex.from_documents(self.documents)
#         self.query_engine = self.index.as_query_engine()
        
#     def call(self, query: str):
#         """Perform inference using the initialized query engine."""
#         if self.query_engine is None:
#             raise RuntimeError("Model not prepared. Call `prepare()` first.")
#         return self.query_engine.query(query)

In [122]:
# load_documents("./data/")

In [123]:
def get_response(prompt, query_engine):
        response=query_engine.query(prompt)
        return response

In [124]:
class Plain_Llama:

    def __init__(self,
                model = "meta-llama/Llama-2-7b-chat-hf"):
        self.model = model # meta-llama/Llama-2-7b-hf

    
    def load_plain_model_pipeline(self, device_map="cuda:0"):
        self.model_8bit = AutoModelForCausalLM.from_pretrained(self.model, device_map = device_map, load_in_8bit=True)
        self.tokenizer = AutoTokenizer.from_pretrained(self.model, use_auth_token=True)
    
    
    
        self.llama_pipeline = pipeline(
            "text-generation",  # LLM task
            model=self.model_8bit,
            tokenizer = self.tokenizer,
            torch_dtype=torch.uint8,
            device_map="cuda:0",
        )
    
    def get_plain_llama_response(self,prompt: str) -> None:
        """
        Generate a response from the Llama model.
    
        Parameters:
            prompt (str): The user's input/question for the model.
    
        Returns:
            None: Prints the model's response.
        """
        self.sequences = self.llama_pipeline(
            prompt,
            do_sample=True,
            top_k=10,
            num_return_sequences=1,
            eos_token_id=self.tokenizer.eos_token_id,
            max_length=256,
        )
    
        return self.sequences[0]

    def call_plain_model(self, prompt = ""):
        self.load_plain_model_pipeline()
        
        self.prompt = prompt
        self.plain_response = self.get_plain_llama_response(self.prompt)

        # self.model_8bit.cpu()
        # del self.model_8bit, checkpoint
        # gc.collect()
        # torch.cuda.empty_cache()
        
        return self.plain_response