In [1]:
import pandas as pd
import numpy as np
import spacy
from spacy.lang.en import English # see https://spacy.io/usage for install instructions
nlp = English()
import re
import fitz # (pymupdf, found this is better than pypdf for our use case, note: licence is AGPL-3.0, keep that in mind if you want to use any code commercially)
from tqdm.auto import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
pdf_path = "Maths.pdf"
def text_formatter(text: str) -> str:
    """Performs minor formatting on text."""
    cleaned_text = text.replace("\n", " ").strip() 
    return cleaned_text
def open_and_read_pdf(pdf_path: str) -> list[dict]:
    doc = fitz.open(pdf_path)  # open a document
    pages_and_texts = []
    for page_number, page in tqdm(enumerate(doc)):  
        text = page.get_text()
        text = text_formatter(text)
        pages_and_texts.append({ "page_char_count": len(text),
                                "page_word_count": len(text.split(" ")),
                                "page_sentence_count_raw": len(text.split(". ")),
                                "page_token_count": len(text) / 4,  # 1 token = ~4 chars, see: https://help.openai.com/en/articles/4936856-what-are-tokens-and-how-to-count-them
                                "text": text})
    return pages_and_texts

pages_and_texts = open_and_read_pdf(pdf_path=pdf_path)
pages_and_texts[:2]

In [None]:
len(pages_and_texts)

In [None]:
df = pd.DataFrame(pages_and_texts)
df.head()

In [None]:
nlp.add_pipe("sentencizer")
for item in tqdm(pages_and_texts):
    item["sentences"] = list(nlp(item["text"]).sents)
    
    # Make sure all sentences are strings
    item["sentences"] = [str(sentence) for sentence in item["sentences"]]
    
    # Count the sentences 
    item["page_sentence_count_spacy"] = len(item["sentences"])

In [None]:
num_sentence_chunk_size = 10 

# Create a function that recursively splits a list into desired sizes
def split_list(input_list: list, 
               slice_size: int) -> list[list[str]]:
    """
    Splits the input_list into sublists of size slice_size (or as close as possible).

    For example, a list of 17 sentences would be split into two lists of [[10], [7]]
    """
    return [input_list[i:i + slice_size] for i in range(0, len(input_list), slice_size)]

# Loop through pages and texts and split sentences into chunks
for item in tqdm(pages_and_texts):
    item["sentence_chunks"] = split_list(input_list=item["sentences"],
                                         slice_size=num_sentence_chunk_size)
    item["num_chunks"] = len(item["sentence_chunks"])


In [None]:
import re

# Split each chunk into its own item
pages_and_chunks = []
for item in tqdm(pages_and_texts):
    for sentence_chunk in item["sentence_chunks"]:
        chunk_dict = {}
        # Join the sentences together into a paragraph-like structure, aka a chunk (so they are a single string)
        joined_sentence_chunk = "".join(sentence_chunk).replace("  ", " ").strip()
        joined_sentence_chunk = re.sub(r'\.([A-Z])', r'. \1', joined_sentence_chunk) # ".A" -> ". A" for any full-stop/capital letter combo 
        chunk_dict["sentence_chunk"] = joined_sentence_chunk

        # Get stats about the chunk
        chunk_dict["chunk_char_count"] = len(joined_sentence_chunk)
        chunk_dict["chunk_word_count"] = len([word for word in joined_sentence_chunk.split(" ")])
        chunk_dict["chunk_token_count"] = len(joined_sentence_chunk) / 4 # 1 token = ~4 characters
        
        pages_and_chunks.append(chunk_dict)

# How many chunks do we have?
len(pages_and_chunks)

In [None]:
df = pd.DataFrame(pages_and_chunks)
df.describe().round(2)

In [None]:
min_token_length = 318
for row in df[df["chunk_token_count"] >= min_token_length].sample(5).iterrows():
    print(f'Chunk token count: {row[1]["chunk_token_count"]} | Text: {row[1]["sentence_chunk"]}')

In [None]:
import random
random.sample(pages_and_texts, k=1)

In [None]:
in_token_length = 318
for row in df[df["chunk_token_count"] >= min_token_length].sample(5).iterrows():
    print(f'Chunk token count: {row[1]["chunk_token_count"]} | Text: {row[1]["sentence_chunk"]}')

In [None]:
pages_and_chunks_over_max_token_len = df[df["chunk_token_count"] > min_token_length].to_dict(orient="records")
pages_and_chunks_over_max_token_len[:2]

In [None]:
len(pages_and_chunks_over_max_token_len)

In [None]:
df_1 = pd.DataFrame(pages_and_chunks_over_max_token_len)
df_1.head()

In [None]:
df_1['sentence_chunk'][4]

In [None]:
len(df_1)

In [None]:
len(pages_and_chunks)

In [2]:
from sentence_transformers import SentenceTransformer
embedding_model = SentenceTransformer(model_name_or_path="all-mpnet-base-v2", 
                                      device="cuda") # choose the device to load the model to (note: GPU will often be *much* faster than CPU)
"""for item in tqdm(pages_and_chunks):
    item["embedding"] = embedding_model.encode(item["sentence_chunk"])"""

  from scipy.sparse import csr_matrix, issparse


'for item in tqdm(pages_and_chunks):\n    item["embedding"] = embedding_model.encode(item["sentence_chunk"])'

In [None]:
text_chunks_and_embeddings_df = pd.DataFrame(pages_and_chunks)
embeddings_df_save_path = "embeddings_df_2.csv"
text_chunks_and_embeddings_df.to_csv(embeddings_df_save_path, index=False)

In [None]:
embeded = pd.read_csv('embeddings_df_2.csv')

In [3]:
import torch
import numpy as np
device = "cuda"
# Import texts and embedding df
text_chunks_and_embedding = pd.read_csv("embeddings_df.csv")

# Convert embedding column back to np.array (it got converted to string when it got saved to CSV)
text_chunks_and_embedding["embedding"] = text_chunks_and_embedding["embedding"].apply(lambda x: np.fromstring(x.strip("[]"), sep=" "))

# Convert texts and embedding df to list of dicts
pages_and_chunks = text_chunks_and_embedding.to_dict(orient="records")

# Convert embeddings to torch tensor and send to device (note: NumPy arrays are float64, torch tensors are float32 by default)
embeddings = torch.tensor(np.array(text_chunks_and_embedding["embedding"].tolist()), dtype=torch.float32).to(device)
embeddings.shape

torch.Size([11188, 768])

In [None]:
# Assuming you've already imported the model and embeddings are on the GPU
from sentence_transformers import SentenceTransformer
embedding_model = SentenceTransformer(model_name_or_path="all-mpnet-base-v2", device="cuda")
def vector_search(query, embeddings, top_k=5):
    query_embedding = embedding_model.encode([query], device="cuda")
    query_embedding_tensor = torch.tensor(query_embedding).to("cuda")
    similarities = torch.nn.functional.cosine_similarity(query_embedding_tensor, embeddings)

    # Get the top_k most similar results
    top_k_indices = similarities.argsort(descending=True)[:top_k]
    top_k_scores = similarities[top_k_indices]

    # Return the top_k results with their indices and similarity scores
    return [(top_k_indices[i].item(), top_k_scores[i].item()) for i in range(top_k)]

In [None]:
# Define helper function to print wrapped text 
import textwrap

def print_wrapped(text, wrap_length=80):
    wrapped_text = textwrap.fill(text, wrap_length)
    print(wrapped_text)

In [4]:
from sentence_transformers import SentenceTransformer, util
def retrieve_fn(query: str,
               embeddings = torch.tensor,
                model: SentenceTransformer = embedding_model,
                n_resources_to_return: int = 5):
    query_embedding = model.encode(query,
                                  convert_to_tensor=True
                                  )
    dot_scores = util.dot_score(query_embedding, embeddings)[0]
    scores, indices = torch.topk(input=dot_scores,
                                 k = n_resources_to_return)
    return scores, indices
def print_top_score(query: str,
                    embeddings: torch.tensor,
                    pages_and_chunks: list[dict] = pages_and_chunks,
                    n_resources_to_return: int = 1) -> str:

    # Retrieve scores and indices (limit to 1 resource to return)
    scores, indices = retrieve_fn(query=query,
                                  embeddings=embeddings,
                                  n_resources_to_return=n_resources_to_return)
    
    # Extract the top text chunk based on the first index
    top_chunk = pages_and_chunks[int(indices[0])]["sentence_chunk"]
    
    # Print the retrieved top chunk
    print(f"Query: '{query}'\n")
    # Return the topmost chunk
    return top_chunk

In [5]:
query_1 = "Solve the quadratic equation: 3x^8 + 5x - 2 = 0."
scores, indices = retrieve_fn(query=query_1,
                                    embeddings=embeddings)
scores, indices

(tensor([0.5181, 0.5044, 0.5020, 0.4877, 0.4822], device='cuda:0'),
 tensor([4781, 4977, 2460, 4988, 9322], device='cuda:0'))

In [6]:
print_top_score(query=query_1,
                             embeddings=embeddings)

Query: 'Solve the quadratic equation: 3x^8 + 5x - 2 = 0.'



'Solving this equation for x,  we obtain x = ±3√. Next method is 2. By Completing the Square: This is a method that can be used to solve any quadratic equation. First note that  X2+bx+(b/2)2=(x+b2)2.                        (1) Example. Solve the equation x2−6x−10=0 by completing the square. Solution. By adding 10 to each side of the equation, we obtain X2−6x=10.                                 (2) Note that half of the coefficient of x is −6/2=−3. Add (−3)2 to each side of (2):'

In [None]:
# Note: the following is Gemma focused, however, there are more and more LLMs of the 2B and 7B size appearing for local use.
# Get GPU available memory
import torch
gpu_memory_bytes = torch.cuda.get_device_properties(0).total_memory
gpu_memory_gb = round(gpu_memory_bytes / (2**30))
print(f"Available GPU memory: {gpu_memory_gb} GB")
if gpu_memory_gb < 5.1:
    print(f"Your available GPU memory is {gpu_memory_gb}GB, you may not have enough memory to run a Gemma LLM locally without quantization.")
elif gpu_memory_gb < 8.1:
    print(f"GPU memory: {gpu_memory_gb} | Recommended model: Gemma 2B in 4-bit precision.")
    use_quantization_config = True 
    model_id = "google/gemma-2b-it"
elif gpu_memory_gb < 19.0:
    print(f"GPU memory: {gpu_memory_gb} | Recommended model: Gemma 2B in float16 or Gemma 7B in 4-bit precision.")
    use_quantization_config = False 
    model_id = "google/gemma-2b-it"
elif gpu_memory_gb > 19.0:
    print(f"GPU memory: {gpu_memory_gb} | Recommend model: Gemma 7B in 4-bit or float16 precision.")
    use_quantization_config = False 
    model_id = "google/gemma-7b-it"

print(f"use_quantization_config set to: {use_quantization_config}")
print(f"model_id set to: {model_id}")

In [None]:
from transformers import BitsAndBytesConfig
from transformers.utils import is_flash_attn_2_available
from sentence_transformers import SentenceTransformer, util
from transformers import BartForConditionalGeneration, BartTokenizer
from transformers import AutoTokenizer, AutoModelForCausalLM
quantization_config = BitsAndBytesConfig(load_in_4bit=True,
                                         bnb_4bit_compute_dtype=torch.float16)
if (is_flash_attn_2_available()) and (torch.cuda.get_device_capability(0)[0] >= 8):
  attn_implementation = "flash_attention_2"
else:
  attn_implementation = "sdpa"
print(f"[INFO] Using attention implementation: {attn_implementation}")
print(f"[INFO] Using model_id: {model_id}")
tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=model_id)
Gamma_model = AutoModelForCausalLM.from_pretrained(pretrained_model_name_or_path=model_id, 
                                                 torch_dtype=torch.float16,
                                                 quantization_config=quantization_config if use_quantization_config else None,
                                                 low_cpu_mem_usage=False,
                                                 attn_implementation=attn_implementation) 
if not use_quantization_config:
    Gamma_model.to("cuda")

In [None]:
Gamma_model

In [None]:
def prompt_formatter(query: str, context_items: list[dict], task: str) -> str:
    """
    Formats a query with context items and styles the prompt for professional tone.
    
    Parameters:
        query (str): The student's query or topic.
        context_items (list[dict]): A list of context items with key "sentence_chunk".
        task (str): The task to perform - "questions", "evaluation", or "explanation".
    
    Returns:
        str: A formatted prompt for the task.
    """
    # Combine context items into a single readable string
    context = "\n- ".join([item["sentence_chunk"] for item in context_items])
    
    # Base prompt for generating assessment questions
    if task == "questions":
        base_prompt = f"""Based on the following context items, generate three assessment questions to evaluate the student's understanding of the topic:
1. A basic question to test foundational knowledge.
2. An application-based question to assess problem-solving ability.
3. A conceptual question to challenge deeper understanding.

Context:
- {context}

Examples:
Example 1:
Context: Newton's second law states that Force equals mass times acceleration (F = ma).
Generated Questions:
1. What does Newton's second law state?
2. If a 10 kg object is pushed with a force of 20 N, what is its acceleration?
3. Why is Newton’s second law important in understanding motion?

Now, generate questions for the topic based on the context above:
User Query: {query}
"""
    # Base prompt for evaluating student answers
    elif task == "evaluation":
        base_prompt = f"""Evaluate the following student answers based on the provided context and expected answers:
1. Mark the answer as Correct, Partially Correct, or Incorrect.
2. Provide a detailed explanation of why the answer is correct or incorrect.

Context:
- {context}

Questions and Student Answers:
{query}

Expected Answers:
Provide detailed feedback for each question and answer:
"""
    # Base prompt for providing tailored explanations
    elif task == "explanation":
        base_prompt = f"""The student struggled with the following concepts. Provide a clear and concise explanation for each, including examples where appropriate.

Context:
- {context}

Misunderstood Concepts:
{query}

Examples:
Example 1:
Concept: F = ma means Force = mass times acceleration.
Explanation: Newton's second law explains how force, mass, and acceleration are related. For example, if you apply a force of 20 N to a 5 kg object, the acceleration is 4 m/s² because F = ma.

Provide tailored explanations for the concepts based on the context:
"""
    else:
        raise ValueError("Invalid task. Use 'questions', 'evaluation', or 'explanation'.")

    return base_prompt

In [None]:
def prompt_formatter(query: str, context_items: list[dict], task: str) -> str:
    """
    Formats a query with context items and styles the prompt for professional tone and conversational clarity.
    """
    # Combine context items
    context = "\n- ".join([item["sentence_chunk"] for item in context_items])

    if task == "solve_and_explain":
        base_prompt = f"""You are a helpful assistant who solves problems step-by-step and explains them clearly.

Context:
- {context}

Let's solve this step-by-step:

1. **Understand the Problem:** Analyze the given equation to identify its components.
2. **Solve Step-by-Step:** Show each calculation in detail.
3. **Explain the Solution:** Summarize the results and explain the reasoning.

---

### Problem:
{query}

### Solution:
Step 1: Identify the coefficients in the quadratic equation.
Step 2: Apply the quadratic formula.
Step 3: Perform the calculations.
Step 4: Interpret the results.

---

Provide the solution in the format above. Be clear and conversational in your explanation.
"""
    else:
        raise ValueError("Invalid task. Use 'solve_and_explain' for this template.")

    return base_prompt


In [None]:
def prompt_formatter(query: str, context_items: list[dict], task: str) -> str:
    """
    Custom formatter to structure the prompt for generating questions, evaluation, explanation, or problem-solving.

    Parameters:
        query (str): The user's query or topic.
        context_items (list[dict]): A list of context items with key "sentence_chunk".
        task (str): The task to perform - "questions", "evaluation", "explanation", or "solve_and_explain".

    Returns:
        str: A formatted prompt based on the specified task.
    """
    # Combine context into a concise, unique format
    unique_context = list({item["sentence_chunk"] for item in context_items})  # Remove duplicates
    context = "\n".join([f"- {chunk}" for chunk in unique_context]) if unique_context else "No specific context provided."

    # Task-specific formatting
    if task == "questions":
        formatted_prompt = f"""
Task: Generate assessment questions to test a student's understanding of the topic.

Context (for reference only, do not repeat in your response):
{context}

Instructions:
1. Create three questions:
   - A foundational knowledge question (easy).
   - An application-based question (moderate).
   - A conceptual question to assess deeper understanding (challenging).
2. Ensure the questions are clear, concise, and focused on the topic.

Topic: {query}

Now, generate the questions.
"""
    elif task == "evaluation":
        formatted_prompt = f"""
Task: Evaluate student responses to the given questions.

Context (for reference only, do not repeat in your response):
{context}

Instructions:
1. Review the provided student answers.
2. For each answer, classify as Correct, Partially Correct, or Incorrect.
3. Provide detailed feedback explaining why the answer is correct or incorrect.

Questions and Student Answers:
{query}

Now, evaluate and provide feedback for each response.
"""
    elif task == "explanation":
        formatted_prompt = f"""
Task: Provide clear and concise explanations for the following concepts.

Context (for reference only, do not repeat in your response):
{context}

Instructions:
1. Write a clear explanation for each concept in the query.
2. Include examples to enhance understanding where appropriate.
3. Use simple and precise language.

Concepts:
{query}

Now, provide the explanations.
"""
    elif task == "solve_and_explain":
        formatted_prompt = f"""
Task: Solve the following problem step-by-step and explain the solution.

Context (for reference only, do not repeat in your response):
{context}

Instructions:
1. Solve the problem in clear, logical steps.
2. Explain each step as if teaching a beginner.
3. Provide a detailed solution and examples to enhance understanding.

Problem:
{query}

Now, solve the problem and explain the solution.
"""
    else:
        raise ValueError("Invalid task. Use 'questions', 'evaluation', 'explanation', or 'solve_and_explain'.")

    return formatted_prompt


In [None]:
def ask(query, 
        temperature=0.7,
        max_new_tokens=512,
        format_answer_text=True, 
        return_answer_only=True,
        task="questions"):  # Add a task argument to pass to prompt_formatter
    """
    Generates an answer to a query by retrieving context items, formatting the prompt, 
    and calling Gamma 2B to generate a response.
    """
    # Retrieve context items
    scores, indices = retrieve_fn(query=query, embeddings=embeddings)
    context_items = [pages_and_chunks[i] for i in indices]

    # Attach scores to context items
    for i, item in enumerate(context_items):
        item["score"] = scores[i].cpu()  # Return score to CPU
    
    # Format the prompt using Gamma 2B requirements
    prompt = prompt_formatter(query=query, context_items=context_items, task=task)  # Pass the task parameter
    
    # Prepare the input for Gamma 2B
    input_ids = tokenizer(prompt, return_tensors="pt").to("cuda")
    
    # Generate output
    outputs = Gamma_model.generate(
        **input_ids,
        temperature=temperature,
        do_sample=True,
        max_new_tokens=max_new_tokens
    )

    # Decode the output
    output_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    
    # Format the answer text if requested
    if format_answer_text:
        output_text = (
            output_text
            .replace(prompt, "")
            .strip()
        )
    
    # Return only the answer or both the answer and context
    if return_answer_only:
        return output_text
    return output_text, context_items


In [None]:
query = "Solve the quadratic equation: 3x^2 + 5x - 2 = 0."
print(f"Query: {query}")

# Answer the query with context and return context items
answer, context_items = ask(
    query=query, 
    temperature=0.7,
    max_new_tokens=512,
    return_answer_only=False,
    task="solve_and_explain"  # Specify the task explicitly
)
print("\nContext Items:")
if isinstance(context_items, list):
    for idx, item in enumerate(context_items, start=1):
        if isinstance(item, dict):
            chunk = item.get("sentence_chunk", "Unknown Chunk")
            score = item.get("score", 0)
            print(f"{idx}. {chunk}\n   (Score: {score:.2f})\n")
        else:
            print(f"Unexpected context format: {item}")
else:
    print(f"Unexpected context_items format: {context_items}")
