In [None]:
import re
import unicodedata

def process_documents(documents):
    '''
    @param documents: List of documents.
    @return: a long sting. Concatenated text from document.
    '''
    doc_text = ''
    for doc in documents:

        text = doc.page_content
        # Remove unknown characters
        text = ''.join(c for c in text if unicodedata.category(c) != 'Co')
        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text).strip()
        # Remove non-alphanumeric characters
        text = re.sub(r'[^a-zA-Z0-9\s,.]', '', text)
        # Convert to lowercase
        text = text.lower()
        doc_text += text

    return doc_text

: 

In [None]:
from langchain.document_loaders import PyPDFLoader
# Reading pdfs
def read_pdf(file_path):
    loader = PyPDFLoader(file_path)
    documents = loader.load_and_split()
    return documents

: 

In [None]:
#
documents = read_pdf('release engineering.pdf')
processed_text = process_documents(documents)
#
print(len(documents))
print(processed_text)

: 

### Perform Native Chunking(RecursiveCharacterTextSplitting)

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
def recursive_character_text_splitter(text):
    '''
    @param text: a long string
    @return: a list of strings. Each string is a chunk of the text.
    '''

    chunk_size = 500

    if len(text) > 2000:
        chunk_size = 800

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, # split into chunks of 100 characters
        chunk_overlap=20, # overlap by 20 characters
        length_function=len, 
        separators=["\n\n", "\n","(?<=\. )", " ", ""], # split by new line, space, and period
        is_separator_regex=True, # use regex for separators
    )
    texts = text_splitter.split_text(text)
    return texts

: 

In [None]:
naive_chunks = recursive_character_text_splitter(processed_text)
for chunk in naive_chunks[:15]:
  print(chunk+ "\n")

: 

### Instantiate Embedding Model

In [None]:
import cohere
def embed(docs):
    
    co = cohere.Client(api_key="fAZMudlG7f2M0CtUYWrQikcrxzzRoPze5UbW81nA")
    embeds = co.embed(texts=docs, model='embed-english-v3.0',input_type='search_document').embeddings

    # model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
    # embeds = model.encode(docs)

    # model = SentenceTransformer("Snowflake/snowflake-arctic-embed-m", trust_remote_code=True)
    # embeds = model.encode(docs)
    return embeds

: 

In [None]:
import numpy as np
def upSertEmbeds(processed_text, index):
    '''
    @param processed_text: [[text1], [text2], ...]
    @param index: Pinecone index
    '''
    embeds = embed(processed_text)
    shape = np.array(embeds).shape
    vectors = []

    for i in range(shape[0]):
        vector = {'id': str(i),
                  'values': embeds[i],
                  'metadata': {'text': processed_text[i]}
                 }
        vectors.append(vector)

    index.upsert(vectors)

: 

In [None]:
#
from pinecone import Pinecone as PineconeClient

pc = PineconeClient(api_key="1bba0667-e178-4139-a1a6-34f805bae975")
naive_index = pc.Index(name="test2") # you have to change the index when you run the code again
upSertEmbeds(naive_chunks, naive_index)

: 

### Setup the API Key for LLM

In [None]:
from langchain_cohere import ChatCohere
client = ChatCohere(
                cohere_api_key="fAZMudlG7f2M0CtUYWrQikcrxzzRoPze5UbW81nA"
            )

: 

### Perform Semantic Chunking

In [None]:
from langchain.text_splitter import SpacyTextSplitter

def spacy_text_splitter(text):
    '''
    @param text: a long string
    @return: a list of strings. Each string is a chunk of the text.
    '''

    chunk_size = 500

    if len(text) > 2000:
        chunk_size = 800

    text_splitter = SpacyTextSplitter(
        pipeline="en_core_web_sm",
        chunk_size=chunk_size,
        chunk_overlap=0,
    )
    texts = text_splitter.split_text(text)
    return texts

: 

In [None]:
advanced_chunks = spacy_text_splitter(processed_text)
for chunk in advanced_chunks[:15]:
  print(chunk+ "\n")

: 

In [None]:
advanced_index = pc.Index(name="test3") # you have to change the index when you run the code again
upSertEmbeds(advanced_chunks, advanced_index)

: 

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

def get_response(query, model, top_k_val, index):
    '''
    @param query: a string. The question to ask the model.
    @param model: a string. The model to use for the response.
    @param recall: an int. The number of documents to retrieve.
    @return: a string. The response from the model.
    '''
    query_vector = embed([query])
    pc = PineconeClient(api_key="1bba0667-e178-4139-a1a6-34f805bae975")

    top_k_chunks = index.query(
                        vector = query_vector,
                        top_k = top_k_val,
                        include_values = False,
                        include_metadata = True
                    )
    
    retrieved_chunks = [match['metadata'].get('text', 'Default text') for match in top_k_chunks['matches']]

    context = ''

    for chunk in retrieved_chunks:
        context += chunk + ' '

    # RAG prompt
    template =  """
                Answer the question based only on the following context:
                {context}
                Question: {question}
                """
    
    prompt = ChatPromptTemplate.from_template(template)

    # RAG
    chain = (
        RunnableParallel(
            {"context": lambda x: context, "question": RunnablePassthrough()})
        | prompt
        | model
        | StrOutputParser()
    )

    response = chain.invoke(query)

    return response

: 

In [None]:
query = "what is release engineering?"
response = get_response(query, client, 5, advanced_index)
print(response)

: 

### Create the Following Datasets

Questions — synthetically generated (grogq-mixtral-8x7b-32768)

Contexts — created above(Synthetic data chunks)

Ground Truths — synthetically generated (grogq-mixtral-8x7b-32768)

Answers — generated from our Semantic RAG Chain

### Ragas Assessment for naive Chunker

In [None]:
questions = []
ground_truths_semantic = []
contexts = []
answers = []

question_prompt = """\
You are a teacher preparing a test. Please create a question that can be answered by referencing the following context.

Context:
{context}
"""

question_prompt = ChatPromptTemplate.from_template(question_prompt)

ground_truth_prompt = """\
Use the following context and question to answer this question using *only* the provided context.

Question:
{question}

Context:
{context}
"""

ground_truth_prompt = ChatPromptTemplate.from_template(ground_truth_prompt)

question_chain = (question_prompt
        | client
        | StrOutputParser()
    )
ground_truth_chain = ground_truth_prompt | client | StrOutputParser()

: 

In [None]:
for chunk in naive_chunks[10:15]:
  questions.append(question_chain.invoke({"context" : chunk}))
  contexts.append([chunk])
  ground_truths_semantic.append(ground_truth_chain.invoke({"question" : questions[-1], "context" : contexts[-1]}))
  answers.append(get_response(questions[-1], client, 5, naive_index))

: 

### Format the content generated into HuggingFace Dataset Format

In [None]:
from datasets import Dataset

qagc_list = []

for question, answer, context, ground_truth in zip(questions, answers, contexts, ground_truths_semantic):
  qagc_list.append({
      "question" : question,
      "answer" : answer,
      "contexts" : context,
      "ground_truth" : ground_truth
  })

naive_eval_dataset = Dataset.from_list(qagc_list)
naive_eval_dataset

: 

### Implement Ragas metrics and evaluate our created dataset.

In [None]:
co = cohere.Client(api_key="fAZMudlG7f2M0CtUYWrQikcrxzzRoPze5UbW81nA")

from ragas.metrics import (
    answer_relevancy,
    faithfulness,
    context_recall,
    context_precision,
)

#
from ragas import evaluate

naive_result = evaluate(
    naive_eval_dataset,
    metrics=[
        context_precision,
        faithfulness,
        answer_relevancy,
        context_recall,
    ],
     llm=client, 
    embeddings=co,
    raise_exceptions=False
)

: 

In [None]:
naive_results_df = naive_result.to_pandas()
naive_results_df

: 

### Ragas Assessment Comparison for advanced Chunker

In [None]:
import tqdm
questions = []
ground_truths_semantic = []
contexts = []
answers = []
for chunk in tqdm.tqdm(advanced_chunks[10:15]):
  questions.append(question_chain.invoke({"context" : chunk}))
  contexts.append([chunk])
  ground_truths_semantic.append(ground_truth_chain.invoke({"question" : questions[-1], "context" : contexts[-1]}))
  answers.append(get_response(question[-1], client, 5, advanced_index))

: 

In [None]:
qagc_list = []

for question, answer, context, ground_truth in zip(questions, answers, contexts, ground_truths_semantic):
  qagc_list.append({
      "question" : question,
      "answer" : answer,
      "contexts" : context,
      "ground_truth" : ground_truth
  })

advanced_eval_dataset = Dataset.from_list(qagc_list)
advanced_eval_dataset

: 

In [None]:
advanced_result = evaluate(
    advanced_eval_dataset,
    metrics=[
        context_precision,
        faithfulness,
        answer_relevancy,
        context_recall,
    ],
    llm=client, 
    embeddings=co,
    raise_exceptions=False
)

: 

In [None]:
advanced_results_df = advanced_result.to_pandas()
advanced_results_df

: 