In [None]:
from langchain_community.llms import Ollama
from langchain_core.output_parsers import StrOutputParser
from langchain_community.document_loaders import PyPDFLoader
from langchain.prompts import PromptTemplate
from langchain_community.vectorstores import DocArrayInMemorySearch
from langchain_community.embeddings import OllamaEmbeddings
from operator import itemgetter
import requests
from bs4 import BeautifulSoup
from fpdf import FPDF
from PyPDF2 import PdfFileMerger,PdfFileReader
import json
import matplotlib.pyplot as plt
from nltk.translate.bleu_score import sentence_bleu
from rouge_score import rouge_scorer
from bert_score import score as bert_score
import numpy as np
from datetime import datetime

In [None]:
class RAG_Pipeline:
    def __init__(self, default_model="llama2", doc="output.pdf"):
        self.model_name = default_model
        self.doc_path = doc
        self.model = Ollama(model=self.model_name)
        self.embeddings = OllamaEmbeddings()
        self.parser = StrOutputParser()
        self.loader = PyPDFLoader(self.doc_path)
        self.pages = self.loader.load_and_split()
        self.vectorstore = DocArrayInMemorySearch.from_documents(self.pages, embedding=self.embeddings)
        self.retriever = self.vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 5})
        self.default_prompt_template = """
        Answer the question based on the context below. If you cannot answer the question, reply "I don't know".

        Context: {context}

        Question: {question}
        """
        self.default_prompt = PromptTemplate.from_template(self.default_prompt_template)

        self.default_chain = (
            {
                "context": itemgetter("question") | self.retriever, 
                "question": itemgetter("question")
            }
            | self.default_prompt
            | self.model
            | self.parser
        )

    def fetch(self, question, prompt_type="default"):
        if prompt_type == "explicit":prompt = self.get_explicit_prompt(question)
        elif prompt_type == "role":prompt = self.get_role_prompt(question)
        elif prompt_type == "chain_of_thought":prompt = self.get_chain_of_thought_prompt(question)
        elif prompt_type == "self_consistency":return self.get_self_consistent_response(question)
        else:prompt = self.default_prompt
        chain = (
            {
                "context": itemgetter("question") | self.retriever, 
                "question": itemgetter("question")
            }
            | prompt
            | self.model
            | self.parser
        )
        return chain.invoke({'question': question})

    def get_explicit_prompt(self,question):
        template = """
        You are an AI model that provides detailed and specific answers. Answer the question based on the context below. If you cannot answer the question, reply "I don't know". Limit your response to 250 words.

        Context: {context}

        Question: {question}
        """
        return PromptTemplate.from_template(template)

    def get_role_prompt(self,question):
        template = """
        You are a project manager with extensive experience in assigning tasks based on individual strengths. Answer the question based on the context below. If you cannot answer the question, reply "I don't know".

        Context: {context}

        Question: Who is the best suited for the project {question}, say the name and explain why that person is best suited for this project
        """
        return PromptTemplate.from_template(template)

    def get_chain_of_thought_prompt(self,question):
        template = """
        Let's think through this carefully, step by step. Answer the question based on the context below. If you cannot answer the question, reply "I don't know".

        Context: {context}

        Question: {question}
        """
        return PromptTemplate.from_template(template)

    def get_self_consistent_response(self,question):
        explanations = []
        for _ in range(5):
            chain = (
                {
                    "context": itemgetter("question") | self.retriever, 
                    "question": itemgetter("question")
                }
                | self.default_prompt | self.model | self.parser
            )
            explanations.append(chain.invoke({'question': question}))
        most_consistent=max(set(explanations),key=explanations.count)
        return most_consistent

    def get_metadata_for_chunk(self, chunk_id):
        metadata=self.vectorstore.get_metadata(chunk_id)
        return metadata


In [None]:
model=RAG_Pipeline()
user_input="Develop a comprehensive proposal for a Retrieval-Augmented Generation (RAG) project utilizing ChromaDB and Ollama, detailing the integration of advanced retrieval mechanisms with generative models, data pipelines, evaluation metrics, project timeline, and anticipated performance improvements."
answer=model.fetch(user_input)
print("\n" + answer)

In [None]:
model = RAG_Pipeline()
user_input="explain what is cnn?"
prompt_types=["default", "explicit", "role", "chain_of_thought", "self_consistency"]
answers ={}

for prompt_type in prompt_types:
    timestamp=datetime.now().isoformat()
    answer=model.fetch(user_input, prompt_type=prompt_type)
    answers[prompt_type]={"timestamp":timestamp,"answer":answer}

with open("answers.json","w") as file:json.dump(answers,file,indent=4)


for prompt_type, content in answers.items():
    print(f"\n{prompt_type} prompt:\n{content['answer']}\n")
    print("---------------------------------------")

In [None]:
def compute_bleu(answer,correct_answer):
    reference=[correct_answer.split()]
    candidate=answer.split()
    return sentence_bleu(reference,candidate)
def compute_rouge(answer,correct_answer):
    scorer=rouge_scorer.RougeScorer(['rouge1','rougeL'],use_stemmer=True)
    scores=scorer.score(correct_answer,answer)
    return scores

def compute_bertscore(answer,correct_answer):
    _, _,F1=bert_score([answer],[correct_answer],lang='en')
    return F1.mean().item()

with open("answers.json","r") as file:answers =json.load(file)

correct_answer = """A Convolutional Neural Network (CNN) is a type of deep learning model specifically designed for processing structured grid data like images. It consists of multiple layers, including convolutional layers, pooling layers, and fully connected layers. Convolutional layers apply filters to the input data to extract features, pooling layers reduce the dimensionality, and fully connected layers perform the final classification or regression tasks. CNNs are widely used in image recognition, object detection, and other computer vision tasks due to their ability to automatically learn spatial hierarchies of features from input data."""


similarities = {}
for prompt_type,content in answers.items():
    answer=content["answer"]
    bleu_score=compute_bleu(answer, correct_answer)
    rouge_scores=compute_rouge(answer, correct_answer)
    bertscore=compute_bertscore(answer, correct_answer)
    similarities[prompt_type]={"BLEU":bleu_score,"ROUGE-1":rouge_scores['rouge1'].fmeasure,"ROUGE-L":rouge_scores['rougeL'].fmeasure,"BERTScore":bertscore}
labels=list(similarities.keys())
bleu_scores=[similarities[label]["BLEU"] for label in labels]
rouge1_scores=[similarities[label]["ROUGE-1"] for label in labels]
rougeL_scores=[similarities[label]["ROUGE-L"] for label in labels]
bertscores=[similarities[label]["BERTScore"] for label in labels]

x =range(len(labels))

fig,axs =plt.subplots(2,2,figsize=(14,10))
axs[0,0].bar(x,bleu_scores,width=0.4,label='BLEU',color='blue',align='center')
axs[0,0].set_xlabel('Prompt Type')
axs[0,0].set_ylabel('Score')
axs[0,0].set_title('BLEU Score')
axs[0,0].set_xticks(x)
axs[0,0].set_xticklabels(labels)
axs[0,0].set_ylim([0,1])
axs[0,1].bar(x, rouge1_scores,width=0.4,label='ROUGE-1', color='orange', align='center')
axs[0,1].set_xlabel('Prompt Type')
axs[0,1].set_ylabel('Score')
axs[0,1].set_title('ROUGE-1 Score')
axs[0,1].set_xticks(x)
axs[0,1].set_xticklabels(labels)
axs[0,1].set_ylim([0, 1])
axs[1,0].bar(x,rougeL_scores,width=0.4,label='ROUGE-L',color='green',align='center')
axs[1,0].set_xlabel('Prompt Type')
axs[1,0].set_ylabel('Score')
axs[1,0].set_title('ROUGE-L Score')
axs[1,0].set_xticks(x)
axs[1,0].set_xticklabels(labels)
axs[1,0].set_ylim([0, 1])
axs[1,1].bar(x,bertscores,width=0.4,label='BERTScore', color='red',align='center')
axs[1,1].set_xlabel('Prompt Type')
axs[1,1].set_ylabel('Score')
axs[1,1].set_title('BERTScore')
axs[1,1].set_xticks(x)
axs[1,1].set_xticklabels(labels)
axs[1,1].set_ylim([0,1])
plt.tight_layout()
plt.show()

for prompt_type, scores in similarities.items():
    print(f"{prompt_type} prompt:")
    print(f" BLEU: {scores['BLEU']:.4f}")
    print(f" ROUGE-1: {scores['ROUGE-1']:.4f}")
    print(f" ROUGE-L: {scores['ROUGE-L']:.4f}")
    print(f" BERTScore: {scores['BERTScore']:.4f}")

In [None]:
with open("answers.json","r") as file:answers=json.load(file)
similarities={}
for prompt_type, content in answers.items():
    answer=content["answer"]
    bleu_score=compute_bleu(answer, correct_answer)
    rouge_scores =compute_rouge(answer, correct_answer)
    bertscore =compute_bertscore(answer, correct_answer)
    start_time = datetime(2024,6,23, 13, 15, 36)
    end_time = datetime.fromisoformat(content["timestamp"])
    time_taken = (end_time - start_time).total_seconds()
    similarities[prompt_type] = {"BLEU": bleu_score,"ROUGE-1": rouge_scores['rouge1'].fmeasure,"ROUGE-L": rouge_scores['rougeL'].fmeasure,"BERTScore": bertscore,"Time": time_taken}

metrics = ["BLEU", "ROUGE-1", "ROUGE-L", "BERTScore"]

max_values={metric:max([similarities[prompt][metric] for prompt in similarities]) for metric in metrics}
max_time = max([similarities[prompt]["Time"] for prompt in similarities])
normalized_scores={}
for prompt in similarities:
    normalized_scores[prompt]={}
    for metric in metrics:
        normalized_scores[prompt][metric]=similarities[prompt][metric]/max_values[metric]
    normalized_scores[prompt]["Time"]=similarities[prompt]["Time"]/max_time


w1 = 0.7
w2 = 0.3

final_scores={}
for prompt in normalized_scores:
    q=np.mean([normalized_scores[prompt][metric] for metric in metrics])
    te=1-normalized_scores[prompt]["Time"]
    final_scores[prompt]=w1*q+w2*te

ordered_prompts = sorted(final_scores.keys(), key=lambda x: final_scores[x], reverse=True)
print("Final Scores:")
for prompt in ordered_prompts:print(f"{prompt} prompt: {final_scores[prompt]:.4f}")

plt.figure(figsize=(10, 5))
plt.bar(ordered_prompts, [final_scores[prompt] for prompt in ordered_prompts], color='skyblue')
plt.xlabel('Prompt Type')
plt.ylabel('Final Score')
plt.title('Final Scores for Prompt Types Considering Quality and Time Efficiency')
plt.ylim([0, 1])
plt.show()

for prompt_type, scores in similarities.items():
    print(f"{prompt_type} prompt:")
    print(f" BLEU: {scores['BLEU']:.4f}")
    print(f" ROUGE-1: {scores['ROUGE-1']:.4f}")
    print(f" ROUGE-L: {scores['ROUGE-L']:.4f}")
    print(f" BERTScore: {scores['BERTScore']:.4f}")
