In [1]:
%pip install langchain -q
%pip install langchain_chroma -q
%pip install langchain_community -q
%pip install langchain_groq -q
%pip install grandalf -q
%pip install numpy -q
%pip install pandas -q
%pip install sentence-transformers -q

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import nest_asyncio
nest_asyncio.apply()

In [3]:
from dotenv import load_dotenv
import warnings
import os
from langchain_groq import ChatGroq
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [4]:
warnings.filterwarnings('ignore')
_ = load_dotenv()

os.environ["TOKENIZERS_PARALLELISM"] = "false"

embed_model = HuggingFaceEmbeddings(model_name="BAAI/bge-small-en-v1.5")
rag_llm = ChatGroq(model="llama-3.2-3b-preview") # Used for RAG
qa_llm = ChatGroq(model="llama-3.2-11b-text-preview", temperature=0.1) # Used to create eval dataset
benchmark_llm = ChatGroq(model="llama-3.2-90b-text-preview") # Used to evaluate (Judge)

In [4]:
from langchain_chroma import Chroma
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Use popular github repo instead
loader = DirectoryLoader("data/paul_graham/", use_multithreading=True, loader_cls=TextLoader)
text_splitter = RecursiveCharacterTextSplitter(
    separators=[
        "\n\n", 
        "\n", 
        " ",
        "",
    ],
    chunk_size=3000,
    chunk_overlap=200,
    length_function=len,
    is_separator_regex=False,
)
documents = loader.load_and_split(text_splitter=text_splitter) # Load text
vectorstore = Chroma.from_documents(documents, embedding=embed_model, collection_name="groq_rag")
retriever = vectorstore.as_retriever()
print(f"Documents indexed: {len(documents)}")

Documents indexed: 27


In [5]:
await retriever.ainvoke("What did paul graham do growing up?")

[Document(metadata={'source': 'data/paul_graham/paul_graham_essay.txt'}, page_content="Asterix comics begin by zooming in on a tiny corner of Roman Gaul that turns out not to be controlled by the Romans. You can do something similar on a map of New York City: if you zoom in on the Upper East Side, there's a tiny corner that's not rich, or at least wasn't in 1993. It's called Yorkville, and that was my new home. Now I was a New York artist — in the strictly technical sense of making paintings and living in New York.\n\nI was nervous about money, because I could sense that Interleaf was on the way down. Freelance Lisp hacking work was very rare, and I didn't want to have to program in another language, which in those days would have meant C++ if I was lucky. So with my unerring nose for financial opportunity, I decided to write another book on Lisp. This would be a popular book, the sort of book that could be used as a textbook. I imagined myself living frugally off the royalties and spe

RAG pipeline

In [6]:
from langchain_core.documents import Document
from langchain.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from typing import List, Dict

RAG_SYSTEM_PROMPT = """\
You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context given within delimiters to answer the human's questions.
```
{context}
```
If you don't know the answer, just say that you don't know.\
""" # adapted from https://smith.langchain.com/hub/rlm/rag-prompt-llama3

RAG_HUMAN_PROMPT = "{input}"

RAG_PROMPT = ChatPromptTemplate.from_messages([
    ("system", RAG_SYSTEM_PROMPT),
    ("human", RAG_HUMAN_PROMPT)
])

def format_docs(docs: List[Document]):
    """Format the retrieved documents"""
    return "\n".join(doc.page_content for doc in docs)

rag_chain = (
    {
        "context": retriever | format_docs, # Use retriever to retrieve docs from vectorstore -> format the documents into a string
        "input": RunnablePassthrough() # Propogate the 'input' variable to the next step
    } 
    | RAG_PROMPT # format prompt with 'context' and 'input' variables
    | rag_llm # get response from LLM using the formatteed prompt
    | StrOutputParser() # Parse through LLM response to get only the string response

)

In [7]:
await rag_chain.ainvoke("What did paul graham do growing up?")

'According to the given context, before college, Paul Graham worked on writing and programming. \n\nSpecifically, he started programming in 9th grade (around 13 or 14 years old) on the IBM 1401, a data processing computer, using an early version of Fortran. He also wrote short stories and later, he started writing games and other simple programs on a TRS-80 microcomputer, which he got in about 1980.'

Generate Evaluation Dataset

In [8]:
from typing import TypedDict
# Response object structure
class QAResponse(TypedDict):
    question_1: str
    question_2: str
    question_3: str

QA_HUMAN_PROMPT = """\
You are a Teacher/ Professor. Your task is to setup questions for an upcoming \
quiz/examination. The questions should be diverse in nature across the document. \
Given the context information and not prior knowledge, generate only questions based on the below context. \
Restrict the questions to the context information provided within the delimiters.
```
{text}
```
Output the questions in JSON format with the keys question_1, question_2 and question_3 \
and make sure to escape any special characters to output clean, valid JSON.\
""" # adapted from https://arize.com/blog/evaluate-rag-with-llm-evals-and-benchmarking/

QA_PROMPT = ChatPromptTemplate.from_messages([
    ("human", QA_HUMAN_PROMPT)
])

qa_chain = (
{"text": RunnablePassthrough()}
| QA_PROMPT
| qa_llm.with_structured_output(method='json_mode', schema=QAResponse) # either 'json_mode' or 'function_calling' to get responses always in JSON format
)

In [9]:
texts = [doc.page_content for doc in documents] # Create a list with all the text from the document chunks in the vectorstore
questions: List[Dict] = await qa_chain.abatch(texts)

In [10]:
print(f"From document: \n{texts[0]}\n")
print(f"Questions generated:")
for i, q in enumerate(questions[0].values(), 1): print(f'{i}: {q}')

From document: 
What I Worked On

February 2021

Before college the two main things I worked on, outside of school, were writing and programming. I didn't write essays. I wrote what beginning writers were supposed to write then, and probably still are: short stories. My stories were awful. They had hardly any plot, just characters with strong feelings, which I imagined made them deep.

The first programs I tried writing were on the IBM 1401 that our school district used for what was then called "data processing." This was in 9th grade, so I was 13 or 14. The school district's 1401 happened to be in the basement of our junior high school, and my friend Rich Draves and I got permission to use it. It was like a mini Bond villain's lair down there, with all these alien-looking machines — CPU, disk drives, printer, card reader — sitting up on a raised floor under bright fluorescent lights.

The language we used was an early version of Fortran. You had to type programs on punch cards, then s

Evaluate the RAG Pipeline


In [11]:
from pydantic import BaseModel

# Response object structure
class EvalResponse(BaseModel):
    score: int
    explanation: str

EVAL_HUMAN_PROMPT = """\
You are given a question, an answer and reference text within marked delimiters. \
You must determine whether the given answer correctly answers the question based on the reference text. Here is the data:
```Question
{question}
```
```Reference
{context}
```
```Answer
{answer}
```
Respond with a valid JSON object containing two fields:
{{
    "score": "int: a score between 0-10, 10 being highest, on whether the question is correctly and fully answered by the answer",
    "explanation": "str: Provide an explanation as to why the score was given."
}} 
Make sure to escape any special characters to output clean, valid JSON.\
"""
# adapted from https://docs.arize.com/phoenix/evaluation/how-to-evals/running-pre-tested-evals/q-and-a-on-retrieved-data

EVAL_PROMPT = ChatPromptTemplate.from_messages([
    ("human", EVAL_HUMAN_PROMPT)
])

eval_chain = (
    {
    "context": RunnablePassthrough(), # Propogate all input vars to next step in pipeline
    "question": RunnablePassthrough(), 
    "answer": RunnablePassthrough(),
    }
    | EVAL_PROMPT
    | benchmark_llm.with_structured_output(schema=EvalResponse, method='json_mode') # Parse response according to EvalResponse object
)

In [13]:
q1 = questions[-1]['question_1']
t1 = texts[-1]

print(f"Question: {q1}")
a1 = await rag_chain.ainvoke(q1)
print(f"Answer: {a1}")
eval_input = {
    'context': t1,
    'question': q1,
    'answer': a1
}
response = await eval_chain.ainvoke(eval_input)
print("---------------------")
print(f"Score: {response.score}")
print(f"Explanation: {response.explanation}")
print("---------------------")

Question: What was a major issue with HN (Hacker News) when you both wrote essays and ran a forum?
Answer: A major issue with Hacker News (HN) when you both wrote essays and ran a forum was that you had to respond to highly upvoted misinterpretations of your essays, which created a vicious cycle that encouraged more misinterpretation and conflict.
---------------------
Score: 9
Explanation: The answer accurately summarizes the major issue with HN when both writing essays and running a forum, as described in the reference text. It correctly identifies the problem of having to respond to highly upvoted misinterpretations of essays, which creates a vicious cycle. The only reason the score isn't a perfect 10 is that the answer could be more concise and directly quote the reference text for added clarity.
---------------------


In [14]:
# Lets do it for all now
from typing import TypedDict
from time import time

class EvalResult(TypedDict): # For type hinting
    question: str
    answer: str
    context: str
    score: int # Score between 0 - 10
    explanation: str # Explanation on why the score was given

async def evaluate(questions: List[Dict] = questions, texts: List[str] = texts) -> List[EvalResult]:
    # Prepare inputs
    batch_rag_inputs: List[Dict] = []
    evals: List[Dict] = []
    for q_dict, context in zip(questions, texts): 
        for question in q_dict.values(): 
            batch_rag_inputs.append(question)
            evals.append({'question': question, 'context': context})

    print(f"Running RAG pipeline for {len(batch_rag_inputs)} questions")
    start = time()
    answers = await rag_chain.abatch(batch_rag_inputs, config={'max_concurrency': 2}) # Reduce concurrency to avoid hitting rate limits
    end = time()
    print(f"Time taken: {end - start}")

    # Update eval_input with the answers from the rag_chain
    for eval_input, answer in zip(evals, answers):
        eval_input.update({'answer': answer})
    
    # Run eval_chain to get evaluation
    print(f"Evaluating RAG pipeline...")
    start = time()
    batch_score_explanations = await eval_chain.abatch(evals, config={'max_concurrency': 2}) # Pass in eval which contains List of 'answer', 'context', 'question'
    end = time()
    print(f"Time taken: {end - start}")
    
    # Update eval variable with the score and explanation
    for eval, score_exp_dict in zip(evals, batch_score_explanations):
        eval.update({
            'score': score_exp_dict['score'],
            'explanation': score_exp_dict['explanation']
        })
    
    return evals

In [None]:
evaluations = await evaluate(questions[:1], texts[:1]) # Remove the `:5` to evaluate all the questions on all your data

Display and Save the Evaluation Metrics

In [18]:
import csv
import statistics

score_threshold  = 1 # Display all results below 5

# Calculating basic statistics
scores = [eval['score'] for eval in evaluations]
average_score = sum(scores) / len(scores)
std_dev_score = statistics.stdev(scores)

# Lowest and highest scores
lowest_score = min(scores)
highest_score = max(scores)
lowest_count = scores.count(lowest_score)
highest_count = scores.count(highest_score)

# Display results
print("Average Score:", average_score)
print("Standard Deviation of Score:", std_dev_score)
print("Lowest Score:", lowest_score, "Count:", lowest_count)
print("Highest Score:", highest_score, "Count:", highest_count)

print(f"Evals lower than {score_threshold}")
count = 1
for eval in evaluations:
    if eval['score'] <= score_threshold:
        print("--------------------------")
        print(f"{count}. Score: {eval['score']}")
        print(f"Question: {eval['question']}")
        print(f"Answer: {eval['answer']}")
        print(f"Explanation: {eval['explanation']}")

NameError: name 'evaluations' is not defined

In [None]:
csv_file = 'evaluations.csv'
with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Context', 'Score', 'Question', 'Answer', 'Explanation'])
    for eval in evaluations:
        writer.writerow([eval['context'], eval['score'], eval['question'], eval['answer'], eval['explanation']])

print(f"Evaluations saved to {csv_file}")