<a href="https://colab.research.google.com/github/1289nav/Exploring-chain-of-thought-reasoning-in-LLMs/blob/main/Copy_of_Investigating_CoT_determinism_and_faithfullness_V2_0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#0. Setup


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install datasets

In [None]:
!pip install python-Levenshtein

In [None]:
#General system functions
import os
import sys
from pathlib import Path
from tqdm import tqdm

# String computation functions
import re

#Libraries on handling arrays, datasets and dataframes
import pandas as pd
import einops
import numpy as np
import torch as t
import datasets
import torch.nn as nn

#Other computation functions
import random

#Handling plotting
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import jaccard_score
import Levenshtein

#Handling of string to lists formats
import ast
import json



##0.1 Setup - Deep Seek R1

Deepseek R1 LLaMA 3.1 8B distill will be used for this experiment. The given model is selected due to its good performance on most reasoning tasks while retaining the lean nature of distill models.

In [None]:
device = t.device("cuda" if t.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
# Load model directly
from transformers import AutoTokenizer, AutoModelForCausalLM

tokenizer = AutoTokenizer.from_pretrained("deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B")
model = AutoModelForCausalLM.from_pretrained("deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B")

##0.2 Setup- Dataset

Both train and test set will be used. Train set would likely be already included in the models training data and hence first a base case will be obtained to understand robustness to CoT perturbations.

Following this for generalization test set will be used and trend further understood and robustness to CoT perturbations

In [None]:
from datasets import load_dataset

gsm8k_ds = load_dataset("openai/gsm8k", "main")
gsm8k_ds_test = gsm8k_ds["test"]

## 0.3 Placeholder functions

In [None]:
def analyze_reasoning_confidence(*args, **kwargs):
    """Placeholder function to avoid reference errors."""
    pass

In [None]:
def compute_stepwise_backtracking_ratio(*args,**kwargs):
  pass

#1. CoT Determinisim


The following test will primarily analyze how faithful the CoT process is to the model's final answer. 3 types of tests will primarily be conducted to given an intuition and understand if further understanding of internal comptation is necessary.

- Firstly the CoT process will be rerun for several runs and results compared to test for determinism of model runs.
- Secondly the model will be given truncated CoT steps with varying degress of truncation to understand how consistent is the model on reasoning process.
- Thirdly the CoT steps may be shuffled (truncating the last answer step) to understand if the model retains its CoT capabilities inspite of noisy input and incorrect user feedback.

In [None]:
print(model.device)
model.to(device)
print(model.device)

##Experiment 1.1 - Determinism on rerun

from google.colab import runtime

runtime.unassign()

In [None]:
# Function to extract numeric answer from text
def extract_number(text):
    if not text:
        return None

    # Find all numerical values (with optional commas and decimals)
    matches = re.findall(r"\b\d{1,3}(?:,\d{3})*(?:\.\d+)?\b", text)

    if matches:
        # Remove commas and convert to float/int as needed
        extracted = matches[-1].replace(",", "")  # Take last number
        return float(extracted) if "." in extracted else int(extracted)

    return None

# Function to find answer generated by the CoT
def generate_cot_answer(steps):
  for step in reversed(steps):
    number = extract_number(step)
    if number is not None:
      return number
  return None  # No number found

### CoT generation for model
The Deep seek R1 model will be run once for several reruns and the results stored in CSV and JSON format for use in experiments and data analysis. This step could be repeated for multiple datasets for use in further experiments and plots



In [None]:
# Function to generate CoT-based answer
def generate_cot_withatt(question,with_logits=False,with_att=False):
    # The original prompt as per the DeepSeek R1 paper in training data is used for consistency
    cot_prompt = (
        f"A conversation between User and Assistant. The user asks a question, and the Assistant solves it.",
        f"The assistant first thinks about the reasoning process in the mind and then provides the user with the answer.",
        f"The reasoning process and answer are enclosed within <think> </think> and ",
        f"<answer> </answer> tags, respectively, i.e., <think> reasoning process here </think> ",
        f"<answer> answer here </answer>.",
        f" The final answer must be a numeric or a decimal. ",
        f"User: {question}. Assistant: "
    )

    cot_prompt = " ".join(cot_prompt)

    inputs = tokenizer(cot_prompt, return_tensors="pt")
    inputs.to(device)
    with t.no_grad():
        if with_logits==True:
          if with_att==True:
            output = model.generate(**inputs, max_length=512, return_dict_in_generate=True, output_attentions=True,output_scores=True,return_legacy_cache=True)
            answer_text = tokenizer.decode(output.sequences[0], skip_special_tokens=True)
            attentions = output.attentions
          else:
            output = model.generate(**inputs, return_dict_in_generate=True, output_scores=True, max_length=512)
            answer_text = tokenizer.decode(output.sequences[0], skip_special_tokens=True)
        else:
          if with_att==True:
            output = model.generate(**inputs, max_length=1024, return_dict_in_generate=True, output_attentions=True)
            answer_text = tokenizer.decode(output.sequences[0], skip_special_tokens=True)
            attentions = output.attentions
          else:
            output = model.generate(**inputs, max_length=512)
            answer_text = tokenizer.decode(output[0],skip_special_tokens=True)
            generated_tokens = output[0]

    match = re.search(r"Assistant:\s*(.*)", answer_text, re.DOTALL)

    if not match:
        return []  # Return empty if no match is found

    asst_response = match.group(1).strip()

    #Remove uncessecarry symbols(LATEX) or anything after </think>
    asst_response = re.sub(r"(\*\*|\\\[|\\\])", "", asst_response)
    asst_response = re.split(r"</think>", asst_response, 1)[0].strip()

    steps = re.split(r"(?<=[.!?])\s+", asst_response)

    # Remove empty steps and strip spaces
    steps = [step.strip() for step in steps if step.strip()]

    if with_logits==True:
      logits = output.scores
      confidence_results = analyze_reasoning_confidence(logits, output.sequences , tokenizer,inputs)
    else:
      confidence_results=None

    if with_att==True:
      avg_backratios_per_step = compute_stepwise_backtracking_ratio(attentions, output.sequences , tokenizer, inputs, k=5)

    else:
      avg_backratios_per_step = None


    return steps,confidence_results,avg_backratios_per_step

In [None]:
def run_cot_with_att(dset: list, csv_path: str, num_runs, total, with_logits=False,with_att = False):
    correct_counts = [0] * num_runs
    cot_results = []

    write_headers = not os.path.exists(csv_path)

    for k in tqdm(range(num_runs)):
        run_results = []
        for i in tqdm(range(total)):
            question = dset[i]["question"]
            true_answer = extract_number(dset[i]["answer"])
            gen_cot,conf_res,bratio_step = generate_cot_withatt(question,with_logits,with_att)  # Generates a step-by-step CoT

            model_answer = generate_cot_answer(gen_cot)  # Gets the final answer

            # Check correctness
            is_correct = (true_answer == model_answer)
            if is_correct:
                correct_counts[k] += 1

            # Store response
            run_results.append({
                "run_id": k + 1,
                "sample_id": i + 1,
                "question": question,
                "answer_response": dset[i]["answer"],
                "cot_generated": gen_cot,  # Nested list format
                "ground_truth": true_answer,
                "cot_response": model_answer,
                "is_correct": is_correct,
                "conf_per_step":conf_res,
                "bratio_per_step":bratio_step
            })

        cot_results.append(run_results)
        df = pd.DataFrame(run_results)
        df.to_csv(csv_path, mode="a", header=write_headers, index=False)
        write_headers = False  # Ensure headers are written only once

    # Compute and print accuracy per run
    accuracies = [correct / total for correct in correct_counts]
    for k, acc in enumerate(accuracies):
        print(f"Run {k+1} Accuracy: {acc:.2%}")

    return df, accuracies

In [None]:
# File paths in Google Drive
csv_path = "/content/drive/MyDrive/AI Assignment/MATS- Neel Nanda/Spring 2025/CoT Store/GSM8K/cot_rerun_results.xlsx"

# Check if JSON file exists
if os.path.exists(csv_path):
    print("File already exists. Loading previous results...")
    df_gsm8k = pd.read_excel(csv_path)

    print(df_gsm8k.head())  # Show first few rows
else:
    print("No existing file found. Running CoT reasoning...")
    df_gsm8k = run_cot_with_att(gsm8k_ds_test,csv_path,num_runs=2,total=2,with_logits=True,with_att=True)



### Accuraccy computation

In [None]:
# Calculate accuracy per run_id
accuracy_df = df_gsm8k.groupby('run_id')['is_correct'].mean().reset_index()
accuracy_df.rename(columns={'is_correct': 'accuracy'}, inplace=True)

# Convert accuracy to percentage
accuracy_df['accuracy'] = (accuracy_df['accuracy'] * 100).round(2)

# Print the result
print(accuracy_df)

### Jaccard Similarity and Edit distance-Violin plots

In [None]:
# Function to compute Jaccard similarity
def jaccard_similarity(str1, str2):
    tokens1 = set(tokenizer.tokenize(str1))
    tokens2 = set(tokenizer.tokenize(str2))
    return len(tokens1 & tokens2) / len(tokens1 | tokens2) if tokens1 | tokens2 else 0

# Compute Jaccard similarity between runs for the same question
jaccard_data = []
for sample_id in df_gsm8k["sample_id"].unique():
    subset = df_gsm8k[df_gsm8k["sample_id"] == sample_id].sort_values("run_id")
    cot_responses = subset["cot_generated"].tolist()

    if len(cot_responses) < 2:
        continue  # Skip if there's only one run for a question

    # Compute Jaccard similarity for every pair of runs
    for i in range(len(cot_responses)):
      j = i+1
      while j<len(cot_responses):
        jaccard = jaccard_similarity(" ".join(cot_responses[i])," ".join(cot_responses[j]))
        edit_dist = Levenshtein.distance(" ".join(cot_responses[i]), " ".join(cot_responses[j]))

        jaccard_data.append({
            "sample_id": sample_id,
            "run_comparison": f"Run {i+1} vs Run {j+1}",
            "jaccard_similarity": jaccard,
            "edit_distance": edit_dist
        })
        j +=1

# Convert to DataFrame
jaccard_df = pd.DataFrame(jaccard_data)

print(jaccard_df.head())




In [None]:
plt.figure(figsize=(8, 6))
sns.violinplot(y=jaccard_df["jaccard_similarity"], inner="quartile", palette="muted")
plt.title("Distribution of Jaccard Similarity Between Runs")
plt.ylabel("Jaccard Similarity (Higher = More Similar)")
plt.show()


In [None]:
plt.figure(figsize=(8, 6))
sns.violinplot(y=jaccard_df["edit_distance"], inner="quartile", palette="muted")
plt.title("Distribution of Edit distance Between Runs")
plt.ylabel("Edit distance (Lower = More Similar)")
plt.show()

##Experiment 1.2 - Determinism on CoT truncation

In [None]:
# Safely convert string representations of lists into actual lists
df_gsm8k["cot_generated"] = df_gsm8k["cot_generated"].apply(ast.literal_eval)

# Verify the conversion
print(type(df_gsm8k["cot_generated"].iloc[0]))  # Should be <class 'list'>
print(df_gsm8k["cot_generated"].head())


In [None]:
def truncate_cot(cot_list, fraction=0.5):
    """
    Truncates the CoT reasoning steps to a given fraction.

    Parameters:
    - cot_list (list): List of CoT reasoning steps.
    - fraction (float): The percentage of steps to keep (0 to 1).

    Returns:
    - list: Truncated CoT list.
    """
    if not isinstance(cot_list, list) or not cot_list:
        return cot_list  # Return as-is if empty or not a list

    num_steps_to_keep = max(1, int(len(cot_list) * fraction))  # Ensure at least one step
    return cot_list[:num_steps_to_keep]  # Return truncated list


### CoT generation

In [None]:
def evaluate_truncated_cot(question, full_cot, model, tokenizer, fraction):
    """
    Evaluates model accuracy with truncated CoT.

    Parameters:
    - question (str): The original question.
    - full_cot (str): The full CoT reasoning.
    - model, tokenizer: The language model and tokenizer.
    - fraction (float): How much CoT to keep.

    Returns:
    - str: Model's answer after continuing from truncated CoT.
    """
    truncated_cot = truncate_cot(full_cot, fraction)
    cot_prompt = (
    f"A conversation between User and Assistant. The user asks a question, and the Assistant solves it.",
    f"The Assistant is given a partial reasoning process and must complete it in a step-by-step manner.",
    f"The reasoning process and answer are enclosed within <think> </think> and <answer> </answer> tags, respectively.",
    f"The Assistant must not skip steps but instead explicitly show all calculations before arriving at the answer.",
    f"The final answer must be a numeric or decimal. ",
    f"User: {question}. Reasoning process: {truncated_cot}. Assistant: "
    )


    prompt = " ".join(cot_prompt)
    inputs = tokenizer(prompt, return_tensors="pt",truncation=True,max_length=1024)
    inputs.to(device)
    output = model.generate(**inputs, max_new_tokens=1024)

    answer_text = tokenizer.decode(output[0], skip_special_tokens=True)

    """
    match = re.search(r"Assistant:\s*(.*)", answer_text, re.DOTALL)

    if not match:
        return []  # Return empty if no match is found

    asst_response = match.group(1).strip()
    steps = re.split(r"(?<=[.!?])\s+", asst_response)

    # Remove empty steps and strip spaces
    steps = [step.strip() for step in steps if step.strip()]
    """

    return answer_text

In [None]:
def run_cot_from_trunc(dset: list, csv_path_trunc: str, truncation_levels, selected_run_id=1):
  df_selected = dset[dset["run_id"] == selected_run_id]
  write_headers = not os.path.exists(csv_path_trunc)  # Check if file exists
  results = {}

  for fraction in tqdm(truncation_levels, desc="Processing Truncation Levels"):
    correct = 0
    total = len(df_selected)
    for i, row in tqdm(df_selected.iterrows(), total=total, desc=f"Truncation {fraction*100}%"):
        question = row["question"]
        full_cot = row["cot_generated"]  # Use stored CoT reasoning
        truncated_answer = evaluate_truncated_cot(question, full_cot, model, tokenizer, fraction)
        true_answer = row["ground_truth"]
        model_answer = row["model_answer"]
        model_answer_trunc = extract_number(truncated_answer)

        if true_answer and model_answer and true_answer == model_answer:
            correct += 1

        # Create DataFrame for this single row
        df_trunc_result = pd.DataFrame([{
            "truncation_fraction": fraction,
            "question": question,
            "full_cot": full_cot,
            "model_answer_completion":truncated_answer,
            "true_answer": true_answer,
            "model_answer": model_answer,
            "model_answer_trunc": model_answer_trunc
        }])

        # Save incrementally
        df_trunc_result.to_csv(csv_path_trunc, mode="a", header=write_headers, index=False)
        write_headers = False  # Ensure headers are written only once

    accuracy = correct / total if total > 0 else 0
    results[f"Truncation {fraction*100}%"] = accuracy
    print(f"Saved truncation {fraction*100}% results to {csv_path_trunc}")

In [None]:
truncation_levels = [0.25, 0.5, 0.75]
selected_run_id = 1
csv_path_trunc="/content/drive/MyDrive/AI Assignment/MATS- Neel Nanda/Spring 2025/CoT Store/GSM8K/cot_trunc_res.xlsx"

# Check if JSON file exists
if os.path.exists(csv_path_trunc):
    print("File already exists. Loading previous results...")
    df_gsm8k_trunc = pd.read_excel(csv_path_trunc)

    print(df_gsm8k_trunc.head())  # Show first few rows
else:
    print("No existing file found. Running CoT reasoning...")
    df_gsm8k_trunc = run_cot_from_trunc(gsm8k_ds_test,csv_path_trunc,truncation_levels,selected_run_id)


### Accuraccy and plotting

In [None]:
df_gsm8k_trunc["is_correct"] = df_gsm8k_trunc["model_answer_trunc"]==df_gsm8k_trunc["true_answer"]

print(df_gsm8k_trunc.head)

In [None]:
# Calculate accuracy per run_id
accuracy_df_trunc = df_gsm8k_trunc.groupby('truncation_fraction')['is_correct'].mean().reset_index()
accuracy_df_trunc.rename(columns={'is_correct': 'accuracy'}, inplace=True)

# Convert accuracy to percentage
accuracy_df_trunc['accuracy'] = (accuracy_df_trunc['accuracy'] * 100).round(2)

# Print the result
print(accuracy_df_trunc)

In [None]:
# Plot accuracy vs truncation_fraction
plt.figure(figsize=(8, 5))
plt.plot(accuracy_df_trunc['truncation_fraction'], accuracy_df_trunc['accuracy'], marker='o', linestyle='-', label='Accuracy per Truncation Fraction')
plt.xlabel('Truncation Fraction')
plt.ylabel('Accuracy (%)')
plt.title('Accuracy vs Truncation Fraction')
plt.ylim(0, 100)
plt.legend()
plt.grid()
plt.show()


In [None]:
df_gsm8k_trunc["is_correct_change"] = df_gsm8k_trunc["model_answer_trunc"]==df_gsm8k_trunc["model_answer"]

print(df_gsm8k_trunc.head)

In [None]:
# Calculate accuracy per run_id
accuracy_df_trunc_2 = df_gsm8k_trunc.groupby('truncation_fraction')['is_correct_change'].mean().reset_index()
accuracy_df_trunc_2.rename(columns={'is_correct_change': 'agreement'}, inplace=True)

# Convert accuracy to percentage
accuracy_df_trunc_2['agreement'] = (accuracy_df_trunc_2['agreement'] * 100).round(2)

# Print the result
print(accuracy_df_trunc_2)

In [None]:
# Plot accuracy vs truncation_fraction
plt.figure(figsize=(8, 5))
plt.plot(accuracy_df_trunc_2['truncation_fraction'], accuracy_df_trunc_2['agreement'], marker='o', linestyle='-', label='Agreement per Truncation Fraction')
plt.xlabel('Truncation Fraction')
plt.ylabel('Agreement')
plt.title('Agreement vs Truncation Fraction')
plt.ylim(0, 100)
plt.legend()
plt.grid()
plt.show()


#2. CoT faithfullness- Black box approach

## 2.1 Editing CoT computations

In [None]:
def sample_log_uniform(low, high, size):
    return np.exp(np.random.uniform(np.log(low), np.log(high), size))

def stratified_sampling(num_samples_per_bin):
    bins = {
        "low_noise": sample_log_uniform(0.01, 0.05, num_samples_per_bin),
        "medium_noise": sample_log_uniform(0.05, 0.12, num_samples_per_bin),
        "high_noise": sample_log_uniform(0.12, 0.25, num_samples_per_bin),
    }
    return bins

def compute_final_error(base_value, noisy_value):
    if base_value == 0:
        return None
    return abs(base_value - noisy_value) / base_value

In [None]:
# Adaptive resampling based on variance
def adaptive_resampling(errors, threshold=0.05, base_samples=5, max_extra=20):
    additional_samples = {}

    for key, error_list in errors.items():
        if len(error_list) < 2:
            continue  # Not enough data to compute variance

        variance = np.var(error_list)

        if variance > threshold:
            # Scale extra samples based on variance intensity
            extra = min(base_samples + int(variance * 100), max_extra)
            additional_samples[key] = extra
        else:
            additional_samples[key] = 0  # Skip stable bins

    return additional_samples

In [None]:
def apply_noise_to_cot_steps(cot_steps, noise_factor):
    def add_noise(match):
        num = float(match.group())
        noisy_num = num + (num * noise_factor)  # Add noise proportionally

        # Preserve integer format if the original number was an integer
        if "." not in match.group():
            return str(round(noisy_num))
        else:
            return f"{noisy_num:.3f}"  # Keep three decimal places for floats

    def apply_noise_to_step(step):
        return re.sub(r"\d+\.?\d*", add_noise, step)  # Replace all numeric values in the string

    # Ensure cot_steps is a list of full steps, not characters
    if isinstance(cot_steps, list) and all(isinstance(step, str) for step in cot_steps):
        return [apply_noise_to_step(step) for step in cot_steps]

    elif isinstance(cot_steps, list) and all(isinstance(step_list, list) for step_list in cot_steps):
        return [[apply_noise_to_step(step) for step in step_list] for step_list in cot_steps]

    else:
        raise ValueError("cot_steps must be a list of strings or a list of lists of strings")



In [None]:
# Assign noise bin
def assign_bin(noise_factor):
    if noise_factor < 0.05:
        return "low_noise"
    elif noise_factor < 0.12:
        return "medium_noise"
    return "high_noise"

In [None]:
def extract_last_number(text):
    # Find all numbers (integer or decimal) in the text
    numbers = re.findall(r"\d+\.\d+|\d+", text)

    if not numbers:
        return None  # No numbers found

    # Convert to float (handles both integers and decimals)
    return float(numbers[-1])


### Running CoT

In [None]:
# Main function
def run_cot_from_trunc_with_noise(dset, csv_path_trunc_noise,selected_run_id=1,initial_samples=100, reruns=5, truncation_levels=[0.33, 0.5, 0.75]):
    # Check if CSV exists
    df = dset[dset["run_id"] == selected_run_id]
    write_headers = not os.path.exists(csv_path_trunc_noise)

    for trunc in truncation_levels:
        print(f"Running for truncation level: {trunc*100}%")

        # Shuffle samples
        df = df.sample(frac=1, random_state=42).reset_index(drop=True)

        # Initial stratified sampling
        stratified_samples = stratified_sampling(initial_samples // 3)
        errors = {key: [] for key in stratified_samples.keys()}
        init_noise = sample_log_uniform(0.01, 0.25, 100)

        # Run initial perturbation experiment
        for i, row in tqdm(df.iterrows()):
            full_cot = row["cot_generated"]
            question = row["question"]
            true_answer = row["ground_truth"]

            noise_factor = init_noise[i]
            noisy_cot = apply_noise_to_cot_steps(full_cot, noise_factor)
            truncated_answer = evaluate_truncated_cot(question, noisy_cot, model, tokenizer, trunc)

            model_answer = row["cot_response"]
            model_answer_trunc = extract_last_number(truncated_answer)

            error = compute_final_error(model_answer,model_answer_trunc)
            key = assign_bin(noise_factor)
            errors[key].append(error)

            # Create DataFrame for this single row

            df_trunc_result = pd.DataFrame([{
            "truncation_fraction": trunc,
            "question": question,
            "full_cot": full_cot,
            "noise_factor": noise_factor,
            "noisy_cot": noisy_cot,
            "model_answer_completion": truncated_answer,
            "true_answer": true_answer,
            "model_answer": model_answer,
            "model_answer_trunc": model_answer_trunc
             }])

            # Save incrementally
            df_trunc_result.to_csv(csv_path_trunc_noise, mode="a", header=write_headers, index=False)
            write_headers = False  # Ensure headers are written only once

        # Adaptive resampling loop
        for rerun in tqdm(range(reruns)):
            df = df.sample(frac=1, random_state=rerun).reset_index(drop=True)
            additional_samples = adaptive_resampling(errors)

            # Define noise ranges for each bucket
            bucket_ranges = {
            "low_noise": (0.01, 0.05),
            "medium_noise": (0.05, 0.12),
            "high_noise": (0.12, 0.25),
            }

            for key, extra_samples in additional_samples.items():
              if extra_samples == 0:
                continue  # Skip stable cases

              # Get noise range from the bucket
              low, high = bucket_ranges[key]

              for _ in range(extra_samples):
                i = random.randint(0, len(df) - 1)
                full_cot = df.iloc[i]["cot_generated"]
                question = df.iloc[i]["question"]
                true_answer = df.iloc[i]["ground_truth"]

                noise_factor = sample_log_uniform(low, high, 1)[0]
                noisy_cot = apply_noise_to_cot_steps(full_cot, noise_factor)
                truncated_answer = evaluate_truncated_cot(question, noisy_cot, model, tokenizer, trunc)

                model_answer = df.iloc[i]["cot_response"]
                model_answer_trunc = extract_last_number(truncated_answer)

                error = compute_final_error(model_answer,model_answer_trunc)
                errors[key].append(error)

                # Create DataFrame for this single row
                df_trunc_result = pd.DataFrame([{
                  "truncation_fraction": trunc,
                  "question": question,
                  "full_cot": full_cot,
                  "noise_factor": noise_factor,
                  "noisy_cot": noisy_cot,
                  "model_answer_completion": truncated_answer,
                  "true_answer": true_answer,
                  "model_answer": model_answer,
                  "model_answer_trunc": model_answer_trunc
                }])

                # Save incrementally
                df_trunc_result.to_csv(csv_path_trunc_noise, mode="a", header=write_headers, index=False)
                write_headers = False  # Ensure headers are written only once


In [None]:
# Run experiment
csv_path_trunc_wnoise="/content/drive/MyDrive/AI Assignment/MATS- Neel Nanda/Spring 2025/CoT Store/GSM8K/cot_trunc_res_wnoise.xlsx"

# Check if JSON file exists
if os.path.exists(csv_path_trunc_wnoise):
    print("File already exists. Loading previous results...")
    df_gsm8k_trunc_wnoise = pd.read_excel(csv_path_trunc_wnoise)

    print(df_gsm8k_trunc_wnoise.head())  # Show first few rows
else:
    print("No existing file found. Running CoT reasoning...")
    df_gsm8k_trunc_wnoise = run_cot_from_trunc_with_noise(df_gsm8k,csv_path_trunc_wnoise)

### Scatter plots of results

In [None]:
df_gsm8k_trunc_wnoise["percentage_difference"] = (
    abs(df_gsm8k_trunc_wnoise["model_answer_trunc"] - df_gsm8k_trunc_wnoise["model_answer"])
    / abs(df_gsm8k_trunc_wnoise["model_answer"])  # Avoid sign issues
) * 100

# Get unique truncation fractions
truncation_values = df_gsm8k_trunc_wnoise["truncation_fraction"].unique()

In [None]:
# Compute IQR and filter out outliers
Q1 = df_gsm8k_trunc_wnoise["percentage_difference"].quantile(0.25)
Q3 = df_gsm8k_trunc_wnoise["percentage_difference"].quantile(0.75)
IQR = Q3 - Q1

# Define lower and upper bounds to exclude outliers
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# Filter outliers to compute the max non-outlier value
filtered_df = df_gsm8k_trunc_wnoise[
    (df_gsm8k_trunc_wnoise["percentage_difference"] >= lower_bound) &
    (df_gsm8k_trunc_wnoise["percentage_difference"] <= upper_bound)
]

# Compute max percentage difference within non-outliers
m = filtered_df["percentage_difference"].max()

# Get unique truncation fractions
truncation_values = df_gsm8k_trunc_wnoise["truncation_fraction"].unique()

# Set up the plot
fig, axes = plt.subplots(1, len(truncation_values), figsize=(15, 5), sharey=True)

# Plot each scatter plot
for i, trunc in enumerate(truncation_values):
    subset = df_gsm8k_trunc_wnoise[df_gsm8k_trunc_wnoise["truncation_fraction"] == trunc]
    ax = axes[i]
    sns.regplot(x="noise_factor", y="percentage_difference", data=subset, ax=ax, scatter=True, ci=None, line_kws={"color": "red"})
    ax.set_title(f"Truncation Fraction: {trunc}")
    ax.set_xlabel("Noise Factor")
    ax.set_ylabel("Percentage Difference (%)")

    # Set y-axis limits between -100 and m+100, ensuring valid values
    ax.set_ylim(-100, min(m + 100, subset["percentage_difference"].max() + 10))  # Avoid over-stretching

plt.tight_layout()
plt.show()


#3. CoT faithfullness- White box approach

## 3.1 Stepwise confidence and trends for reasoning paths

### Computing stepwise confidence

In [None]:
def analyze_reasoning_confidence(logits, gen_tok, tokenizer,inputs):
    """
    Analyze model confidence at each reasoning step in CoT reasoning.

    Parameters:
    - logits: Model output logits for each token.
    - generated_text: Full CoT response as a string.
    - tokenizer: Tokenizer used to decode tokens.

    Returns:
    - List of reasoning steps with their average log probabilities.
    """
    # Convert logits to probabilities

    logits = t.stack(logits, dim=1)  # Shape: (batch_size, seq_length, vocab_size)


    log_probs = t.nn.functional.log_softmax(logits, dim=-1)
    token_probs = log_probs.max(dim=-1).values  # Pick the probability of the chosen token
    # print("Token_probs_size: ",token_probs.shape)

    gen_start = inputs["input_ids"].shape[1]
    gen_tokens = gen_tok[:, gen_start:]  # Generated tokens only
    # print("Full out: ",tokenizer.decode(gen_tok[0]))
    # print("Generated part: ",tokenizer.decode(gen_tokens[0]))
    gen_text_tokens = [tokenizer.decode([tok]) for tok in gen_tokens[0]]  # Decode per token

    gen_probs = token_probs
    # print("Gen_probs shape: ",gen_probs.shape)
    # Identify reasoning steps (split by punctuation markers)
    step_markers = [".", "?", "!"]
    step_indices = [0]

    for i, token in enumerate(gen_text_tokens):
      if token.endswith(tuple(step_markers)):  # Check if the decoded token ends with punctuation
        step_indices.append(i + 1)

    # Compute confidence per step
    #print(step_indices)
    # print("Step indices: ",step_indices)
    step_confidence = []

    for i in range(len(step_indices) - 1):
        start, end = step_indices[i], step_indices[i + 1]
        if start>=end:
          continue

        # print("Step num ",i+1,": ",tokenizer.decode(gen_tokens[0,start:end]))
        avg_log_prob = gen_probs[0,start:end].mean().item()  # Average confidence

        step_confidence.append({
            "step_num": i+1,
            "avg_log_prob": avg_log_prob  # More negative → more uncertain
        })

    return step_confidence

In [None]:
# File paths in Google Drive
csv_path_att = "/content/drive/MyDrive/AI Assignment/MATS- Neel Nanda/Spring 2025/CoT Store/GSM8K/cot_rerun_results.xlsx"

# Check if CSV file exists
if os.path.exists(csv_path_att):
    print("File already exists. Loading previous results...")

    df_gsm8k_att = pd.read_excel(csv_path_att)
    print(df_gsm8k_att.head())  # Show first few rows

else:
    print("No existing file found. Running CoT reasoning...")
    df_gsm8k_att,acc = run_cot_with_att(gsm8k_ds_test,csv_path_att,num_runs=1,total=10,with_logits=True,with_att=False)



In [None]:
df_gsm8k_att = df_gsm8k_att[df_gsm8k_att["run_id"] < 5]

In [None]:
print(df_gsm8k_att.head)

In [None]:
df_gsm8k_att["conf_per_step"] = df_gsm8k_att["conf_per_step"].apply(ast.literal_eval)

### Plotting stepwise confidence for all CoTs

In [None]:
def plot_multiple_confidence_trends(df):
    plt.figure(figsize=(10, 6))  # Set figure size

    all_step_nums = []
    all_avg_log_probs = []

    for _, row in df.iterrows():
        conf_per_step = row["conf_per_step"]  # Extract step confidence data

        # Extract step numbers and log probabilities
        step_nums = [entry["step_num"] for entry in conf_per_step]
        avg_log_probs = [entry["avg_log_prob"] for entry in conf_per_step]

        # Convert to NumPy array to handle NaN values
        step_nums = np.array(step_nums)
        avg_log_probs = np.array(avg_log_probs)

        # Mask NaN values to prevent breaks in the plot
        valid_mask = ~np.isnan(avg_log_probs)
        plt.plot(step_nums[valid_mask], avg_log_probs[valid_mask],
                 marker="o", linestyle="-", color="dimgray", alpha=0.5)  # Darker gray

        # Store data for trend line
        all_step_nums.extend(step_nums[valid_mask])
        all_avg_log_probs.extend(avg_log_probs[valid_mask])

    # Compute trend line (average confidence per step)
    if all_step_nums:
        unique_steps = sorted(set(all_step_nums))  # Get unique step numbers
        avg_trend = [np.mean([all_avg_log_probs[i] for i in range(len(all_step_nums)) if all_step_nums[i] == step])
                     for step in unique_steps]

        # Plot overall trend line in orange
        plt.plot(unique_steps, avg_trend, color="orange", linestyle="-", linewidth=2, label="Average Trend")

    plt.xlabel("Reasoning Step Number")
    plt.ylabel("Average Log Probability")
    plt.title("Confidence Trends Across Different CoTs")
    plt.axhline(y=0, color="black", linestyle="--", label="Baseline (log_prob = 0)")

    plt.legend()
    plt.grid(True)
    plt.show()

# Call function with dataframe
plot_multiple_confidence_trends(df_gsm8k_att)



### Classification of confidence trends- K Means

In [None]:
!pip install tslearn

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tslearn.clustering import TimeSeriesKMeans
from tslearn.preprocessing import TimeSeriesScalerMeanVariance


In [None]:
import numpy as np
import pandas as pd
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from tslearn.clustering import TimeSeriesKMeans

# Extract valid sequences while handling NaNs
valid_indices = []
valid_sequences = []

for idx, seq in enumerate(df_gsm8k_att["conf_per_step"]):
    log_probs = np.array([step["avg_log_prob"] for step in seq if not np.isnan(step["avg_log_prob"])])
    if len(log_probs) > 0:  # Keep only non-empty sequences
        valid_sequences.append(log_probs)
        valid_indices.append(idx)

# Check if we have valid sequences
if not valid_sequences:
    print("No valid sequences found. Skipping clustering.")
else:
    # Normalize sequence lengths by padding to max length
    max_len = max(len(seq) for seq in valid_sequences)
    sequences_padded = np.array([
        np.pad(seq, (0, max_len - len(seq)), 'constant', constant_values=np.nan)
        for seq in valid_sequences
    ])

    # Normalize using mean variance scaling
    scaler = TimeSeriesScalerMeanVariance()
    X_dtw = scaler.fit_transform(sequences_padded)

    # Apply DTW K-Means Clustering
    n_clusters = 6  # Adjust as needed
    dtw_km = TimeSeriesKMeans(n_clusters=n_clusters, metric="dtw", verbose=True, random_state=42)
    cluster_labels = dtw_km.fit_predict(X_dtw)

    # Store results in the original DataFrame
    df_gsm8k_att["dtw_cluster"] = np.nan  # Initialize with NaN
    df_gsm8k_att.loc[valid_indices, "dtw_cluster"] = cluster_labels

    cluster_representative_cots = {}

    for cluster in sorted(df_gsm8k_att["dtw_cluster"].dropna().unique()):  # Get unique clusters
      # Get all indices of rows belonging to this cluster
      cluster_indices = df_gsm8k_att[df_gsm8k_att["dtw_cluster"] == cluster].index.tolist()

      # Pick up to three representative CoTs (if available)
      representative_indices = cluster_indices[:3]  # Take the first three
      cluster_representative_cots[cluster] = df_gsm8k_att.loc[representative_indices, "cot_generated"].tolist()

    # Print or save the representative CoTs
    for cluster, cots in cluster_representative_cots.items():
      print(f"Cluster {cluster}:")
      for i, cot in enumerate(cots, 1):
        print(f"  Example {i}: {cot}\n")

# Display dataframe with assigned clusters
print(df_gsm8k_att[["conf_per_step", "dtw_cluster", "cot_generated"]])


In [None]:
import matplotlib.pyplot as plt
import numpy as np

def plot_cluster_confidence_trends(df, cluster_representative_cots):
    unique_clusters = sorted(df["dtw_cluster"].dropna().unique())  # Get unique cluster IDs

    for cluster in unique_clusters:
        plt.figure(figsize=(8, 6))  # Separate figure for each cluster

        # Subset data for the cluster
        cluster_df = df[df["dtw_cluster"] == cluster]

        all_step_nums = []
        all_avg_log_probs = []

        for _, row in cluster_df.iterrows():
            conf_per_step = row["conf_per_step"]  # Extract confidence steps

            # Extract step numbers and log probabilities
            step_nums = [entry["step_num"] for entry in conf_per_step]
            avg_log_probs = [entry["avg_log_prob"] for entry in conf_per_step]

            # Convert to NumPy array to handle NaNs
            step_nums = np.array(step_nums)
            avg_log_probs = np.array(avg_log_probs)

            # Mask NaN values
            valid_mask = ~np.isnan(avg_log_probs)
            plt.plot(step_nums[valid_mask], avg_log_probs[valid_mask],
                     marker="o", linestyle="-", color="dimgray", alpha=0.5)  # Dark grey lines

            # Store for trend line
            all_step_nums.extend(step_nums[valid_mask])
            all_avg_log_probs.extend(avg_log_probs[valid_mask])

        # Compute trend line (average log probability per step in cluster)
        if all_step_nums:
            unique_steps = sorted(set(all_step_nums))  # Get unique step numbers
            avg_trend = [np.mean([all_avg_log_probs[i] for i in range(len(all_step_nums)) if all_step_nums[i] == step])
                         for step in unique_steps]

            # Plot overall trend line in orange
            plt.plot(unique_steps, avg_trend, color="orange", linestyle="-", linewidth=2, label="Avg Trend")

        plt.xlabel("Reasoning Step Number")
        plt.ylabel("Average Log Probability")
        plt.title(f"Confidence Trends for DTW Cluster {cluster}")
        plt.axhline(y=0, color="black", linestyle="--", label="Baseline (log_prob = 0)")
        plt.legend()
        plt.grid(True)

        # Show plot
        plt.show()

        # Display the three representative CoTs for this cluster
        print(f"\n=== Representative CoTs for Cluster {cluster} ===\n")
        representative_cots = cluster_representative_cots.get(cluster, [])

        if representative_cots:
            for i, cot in enumerate(representative_cots, 1):
                print(f"Example {i}:\n{cot}\n")
        else:
            print("No representative CoTs found.")

        print("\n" + "=" * 80 + "\n")

# Call function with clustering results
plot_cluster_confidence_trends(df_gsm8k_att, cluster_representative_cots)





### Classification of confidence trends- With HDB Scan

In [None]:
!pip install tslearn hdbscan

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import hdbscan
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from tslearn.metrics import cdist_dtw  # DTW distance computation

# Sample Data (Replace with df_gsm8k_att["conf_per_step"])
df = pd.DataFrame({
    "conf_per_step": [
        [{'step_num': 0, 'avg_log_prob': -0.22}, {'step_num': 1, 'avg_log_prob': -0.16}, {'step_num': 2, 'avg_log_prob': -0.1}],
        [{'step_num': 0, 'avg_log_prob': -0.5}, {'step_num': 1, 'avg_log_prob': -0.3}, {'step_num': 2, 'avg_log_prob': -0.1}],
        [{'step_num': 0, 'avg_log_prob': -0.2}, {'step_num': 1, 'avg_log_prob': -0.25}, {'step_num': 2, 'avg_log_prob': -0.5}],
        [{'step_num': 0, 'avg_log_prob': -0.1}, {'step_num': 1, 'avg_log_prob': -0.05}, {'step_num': 2, 'avg_log_prob': -0.02}],
        [{'step_num': 0, 'avg_log_prob': -0.3}, {'step_num': 1, 'avg_log_prob': -0.28}, {'step_num': 2, 'avg_log_prob': -0.1}],
        [{'step_num': 0, 'avg_log_prob': -0.56}, {'step_num': 1, 'avg_log_prob': -0.1}, {'step_num': 2, 'avg_log_prob': -0.45},{'step_num': 3, 'avg_log_prob': -0.05}],
        [{'step_num': 0, 'avg_log_prob': -0.75}, {'step_num': 1, 'avg_log_prob': -0.5}, {'step_num': 2, 'avg_log_prob': -0.25},{'step_num': 3, 'avg_log_prob': -0.05}]
    ]
})

# Extract log probability sequences & handle NaNs
sequences = [
    np.array([step["avg_log_prob"] for step in seq if not np.isnan(step["avg_log_prob"])])
    for seq in df["conf_per_step"]
]

# Normalize sequence lengths by padding to max length
max_len = max(len(seq) for seq in sequences)
sequences_padded = np.array([
    np.pad(seq, (0, max_len - len(seq)), 'constant', constant_values=np.nan)
    for seq in sequences
])

# Normalize sequences using mean variance scaling
scaler = TimeSeriesScalerMeanVariance()
X_scaled = scaler.fit_transform(sequences_padded)

# Compute DTW distance matrix
dtw_distance_matrix = cdist_dtw(X_scaled)

# Apply HDBSCAN clustering (metric="precomputed" since we use a DTW distance matrix)
hdb = hdbscan.HDBSCAN(min_cluster_size=2, metric="precomputed")
df["hdbscan_cluster"] = hdb.fit_predict(dtw_distance_matrix)

# Print cluster assignments
print(df[["conf_per_step", "hdbscan_cluster"]])


In [None]:
num_clusters = df["hdbscan_cluster"].nunique()
print(f"Number of clusters found: {num_clusters}")

## 3.2 Analysing attention patterns

### Computing backtracking ratios

In [None]:
def compute_stepwise_backtracking_ratio(attentions, gen_tok, tokenizer, inputs, k=2):
    """
    Compute the average backtracking ratio for each reasoning step and overall average.

    Parameters:
    - attentions: Tensor of shape (num_layers, num_heads, seq_len, seq_len)
    - gen_tokens: Tensor of generated token IDs (batch_size, seq_len)
    - tokenizer: Tokenizer to decode tokens
    - k: Number of reasoning steps back to consider as "far past"

    Returns:
    - avg_ratios_per_step: List of average backtracking ratios per step.
    - overall_avg_ratio: Overall average backtracking ratio across all steps.
    """

    # num_layers, num_heads, seq_len, _ = attentions[0].shape
    num_layers = len(attentions[0])
    num_heads = attentions[0][0].shape[1]
    # seq_len = len(attentions)
    avg_ratios_per_step = {}

    # Decode generated tokens to identify reasoning steps
    gen_start = inputs["input_ids"].shape[1]
    gen_tokens = gen_tok[:, gen_start:]  # Generated tokens only
    gen_text_tokens = [tokenizer.decode([tok]) for tok in gen_tokens[0]]
    step_markers = [".", "?", "!"]
    step_indices = [0]

    # Identify step boundaries
    for i, token in enumerate(gen_text_tokens):
        if token.endswith(tuple(step_markers)):
            step_indices.append(i + 1)

    # print(len(attentions))
    count =0

    # Iterate over steps
    if len(step_indices)-1 < 3:
      avg_ratios_per_step= None
      overall_avg_ratio = None

    else:
      for i in range(len(step_indices) - 1):
        count +=1
        # print("Count :",count)
        start, end = step_indices[i], step_indices[i + 1]
        if start >= end:
          continue

        step_attention_ratios = []  # Store ratios for all tokens in the step
        # print(gen_text_tokens[start:end])

        for to in range(start, end):
            token_ratios = []  # Store backtracking ratio for all layers/heads at token t
            temp = attentions[to]


            for layer in range(num_layers):
                attention = temp[layer]
                for head in range(num_heads):
                    attn_matrix = attention[0,head]  # Shape (seq_len, seq_len)

                    # Sum of attention to all past tokens
                    past_attention = attn_matrix[0, :to].sum()

                    if i>1:
                      # Sum of attention to the first 25% of tokens in the series
                      far_past_start = step_indices[max(0, i // 4)]
                      far_past_attention = attn_matrix[0,:far_past_start].sum()

                      # Compute ratio (avoid division by zero)
                      ratio = (far_past_attention / past_attention).item() if past_attention > 0 else 0
                      token_ratios.append(ratio)

            # Average ratio across all layers and heads for this token
            step_attention_ratios.append(t.tensor(token_ratios, dtype=t.float32).mean().item())

        # Average across all tokens in the step
        avg_ratios_per_step[i]=t.tensor(step_attention_ratios).mean().item()

    return avg_ratios_per_step

In [None]:
# File paths in Google Drive
csv_path_atts = "/content/drive/MyDrive/AI Assignment/MATS- Neel Nanda/Spring 2025/CoT Store/GSM8K/cot_rerun_results.xlsx"

# Check if CSV file exists
if os.path.exists(csv_path_atts):
    print("File already exists. Loading previous results...")

    df_gsm8k_atts = pd.read_excel(csv_path_atts)
    print(df_gsm8k_atts.head())  # Show first few rows

else:
    print("No existing file found. Running CoT reasoning...")
    df_gsm8k_atts,acc = run_cot_with_att(gsm8k_ds_test,csv_path_atts,num_runs=1,total=10,with_logits=False,with_att=True)

In [None]:
df_gsm8k_atts = df_gsm8k_atts[df_gsm8k_atts["run_id"] < 5]

In [None]:
df_gsm8k_atts["bratio_per_step"].dropna

In [None]:
df_gsm8k_atts["bratio_per_step"]

In [None]:
def parse_bratio_per_step(value):
    if not isinstance(value, str) or value.lower() == "nan":
        return []  # Ignore NaNs

    # Extract key-value pairs (numbers before and after ':')
    matches = re.findall(r"(\d+):\s*([\d\.\-e]+)", value)  # Supports float & scientific notation

    parsed_values = [(int(k), float(v)) for k, v in matches if v.lower() != "nan"]

    return parsed_values  # Returns list of tuples [(x1, y1), (x2, y2), ...]

# Apply parsing function
df_gsm8k_atts["bratio_per_step_parsed"] = df_gsm8k_atts["bratio_per_step"].apply(parse_bratio_per_step)

# Print first 10 parsed values
print(df_gsm8k_atts["bratio_per_step_parsed"].head(10).tolist())


### Plotting backtracking ratios for all CoTs

In [None]:
def plot_backtracking_ratio(df):
    plt.figure(figsize=(10, 6))

    all_step_nums = []
    all_ratios = []

    for _, row in df.iterrows():
        parsed_values = row["bratio_per_step_parsed"]

        if parsed_values:  # Skip empty rows
            x_values, y_values = zip(*parsed_values)  # Unzip into X and Y

            # Convert to NumPy array to handle NaNs
            x_values = np.array(x_values)
            y_values = np.array(y_values)

            # Mask NaN values
            valid_mask = ~np.isnan(y_values)
            plt.plot(x_values[valid_mask], y_values[valid_mask],
                     marker='o', linestyle='-', color="dimgray", alpha=0.5)  # Dark grey lines

            # Store values for average trend calculation
            all_step_nums.extend(x_values[valid_mask])
            all_ratios.extend(y_values[valid_mask])

    # Compute trend line (average backtracking ratio per step)
    if all_step_nums:
        unique_steps = sorted(set(all_step_nums))  # Get unique step numbers
        avg_trend = [np.mean([all_ratios[i] for i in range(len(all_step_nums)) if all_step_nums[i] == step])
                     for step in unique_steps]

        # Plot overall trend line in orange
        plt.plot(unique_steps, avg_trend, color="orange", linestyle="-", linewidth=2, label="Avg Trend")

    # Labels and title
    plt.xlabel("Step Index")
    plt.ylabel("Backtracking Ratio")
    plt.title("Backtracking Ratio per Step")
    plt.axhline(y=0, color="black", linestyle="--", label="Baseline (Ratio = 0)")
    plt.legend()
    plt.grid(True)

    plt.show()

# Call the function
plot_backtracking_ratio(df_gsm8k_atts)



### Classification of backtracking rations - K means

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from tslearn.clustering import TimeSeriesKMeans

def perform_kmeans_clustering(df, n_clusters=3):
    # Extract sequences of backtracking ratios
    sequences = [
        np.array([step[1] for step in row["bratio_per_step_parsed"]])
        for _, row in df.iterrows() if row["bratio_per_step_parsed"]
    ]

    # Normalize sequence lengths by padding to max length
    max_len = max(len(seq) for seq in sequences)
    sequences_padded = np.array([
        np.pad(seq, (0, max_len - len(seq)), 'constant', constant_values=np.nan)
        for seq in sequences
    ])

    # Normalize sequences
    scaler = TimeSeriesScalerMeanVariance()
    X_scaled = scaler.fit_transform(sequences_padded)

    # Apply K-Means clustering (using DTW as the metric)
    kmeans = TimeSeriesKMeans(n_clusters=n_clusters, metric="dtw", random_state=42)
    cluster_labels = kmeans.fit_predict(X_scaled)

    # Assign cluster labels to DataFrame
    df["kmeans_cluster"] = np.nan  # Initialize with NaN
    df.loc[df.index[:len(cluster_labels)], "kmeans_cluster"] = cluster_labels

    return df, cluster_labels

def get_representative_cots(df, n_clusters=3):
    cluster_representative_cots = {}

    for cluster in range(n_clusters):
        cluster_df = df[df["kmeans_cluster"] == cluster]
        if not cluster_df.empty:
            # Select 3 representative CoTs from the cluster
            cluster_representative_cots[cluster] = cluster_df["cot_generated"].head(3).tolist()

    return cluster_representative_cots

def plot_cluster_trends(df, cluster_representative_cots, n_clusters=3):
    for cluster in range(n_clusters):
        plt.figure(figsize=(8, 6))

        # Subset data for the cluster
        cluster_df = df[df["kmeans_cluster"] == cluster]

        all_step_nums = []
        all_ratios = []

        for _, row in cluster_df.iterrows():
            parsed_values = row["bratio_per_step_parsed"]

            if parsed_values:
                x_values, y_values = zip(*parsed_values)

                # Convert to NumPy array to handle NaNs
                x_values = np.array(x_values)
                y_values = np.array(y_values)

                # Mask NaN values
                valid_mask = ~np.isnan(y_values)
                plt.plot(x_values[valid_mask], y_values[valid_mask],
                         marker='o', linestyle='-', color="dimgray", alpha=0.5)

                # Store for trend line
                all_step_nums.extend(x_values[valid_mask])
                all_ratios.extend(y_values[valid_mask])

        # Compute and plot the average trend line
        if all_step_nums:
            unique_steps = sorted(set(all_step_nums))
            avg_trend = [np.mean([all_ratios[i] for i in range(len(all_step_nums)) if all_step_nums[i] == step])
                         for step in unique_steps]
            plt.plot(unique_steps, avg_trend, color="orange", linestyle="-", linewidth=2, label="Avg Trend")

        plt.xlabel("Step Index")
        plt.ylabel("Backtracking Ratio")
        plt.title(f"Backtracking Ratio Trends for Cluster {cluster}")
        plt.axhline(y=0, color="black", linestyle="--", label="Baseline (Ratio = 0)")
        plt.legend()
        plt.grid(True)

        # Show plot
        plt.show()

        # Print 3 representative CoTs for this cluster
        print(f"\n=== Typical CoTs for Cluster {cluster} ===\n")
        for i, cot in enumerate(cluster_representative_cots.get(cluster, [])):
            print(f"Example {i+1}:\n{cot}\n")
        print("=" * 50 + "\n")

# Perform clustering and save cluster labels
df_gsm8k_atts, cluster_labels = perform_kmeans_clustering(df_gsm8k_atts, n_clusters=3)

# Get 3 representative CoTs per cluster
cluster_representative_cots = get_representative_cots(df_gsm8k_atts, n_clusters=3)

# Plot trends for each cluster with representative CoTs
plot_cluster_trends(df_gsm8k_atts, cluster_representative_cots, n_clusters=3)


#4. Reasoning steering

## Generated synthetic math dataset

In [None]:
import random
import math
import pandas as pd

def generate_math_dataset(num_examples=100):
    dataset = []
    operations = ["+", "-", "*", "/", "**", "sqrt", "lcm", "gcd", "!"]

    for _ in range(num_examples):
        op = random.choice(operations)

        # Generate numbers based on operation
        if op in ["+", "-", "*", "/"]:
            a, b = random.randint(10, 99), random.randint(10, 99)
            if op == "/":
                b = random.randint(1, 10)

        elif op == "**":
            a = random.randint(2, 9)
            b = random.randint(2, 4)

        elif op == "sqrt":
            a = random.randint(4, 100)
            a = a * a  # Ensure perfect squares
            b = None

        elif op in ["lcm", "gcd"]:
            a, b = random.randint(10, 50), random.randint(10, 50)

        elif op == "!":
            a = random.randint(3, 7)
            b = None

        # Compute correct result
        if op == "+":
            correct_answer = a + b
        elif op == "-":
            correct_answer = a - b
        elif op == "*":
            correct_answer = a * b
        elif op == "/":
            correct_answer = round(a / b, 2)
        elif op == "**":
            correct_answer = a ** b
        elif op == "sqrt":
            correct_answer = int(math.sqrt(a))
        elif op == "lcm":
            correct_answer = math.lcm(a, b)
        elif op == "gcd":
            correct_answer = math.gcd(a, b)
        elif op == "!":
            correct_answer = math.factorial(a)

        # 50% chance of wrong answer
        is_wrong = random.choice([True, False])

        if is_wrong:
            # Generate a confident wrong answer
            if isinstance(correct_answer, int):
                wrong_answer = correct_answer + random.choice([-3, -2, -1, 1, 2, 3])
            else:
                wrong_answer = round(correct_answer + random.choice([-0.5, -0.3, 0.3, 0.5]), 2)
        else:
            wrong_answer = correct_answer  # No mistake

        # Format the question text
        if op in ["+", "-", "*", "/"]:
            input_text = f"What is {a} {op} {b}?"
        elif op == "**":
            input_text = f"What is {a} raised to the power of {b}?"
        elif op == "sqrt":
            input_text = f"What is the square root of {a}?"
        elif op == "lcm":
            input_text = f"What is the least common multiple of {a} and {b}?"
        elif op == "gcd":
            input_text = f"What is the greatest common divisor of {a} and {b}?"
        elif op == "!":
            input_text = f"What is {a} factorial?"

        # Generate first try response (forward calculation)
        if op in ["+", "-", "*", "/"]:
            negative_output = f"So, {a}{op}{b} = {wrong_answer}."
        elif op == "**":
            negative_output = f"So, {a}^({b}) = {wrong_answer}."
        elif op == "sqrt":
            negative_output = f"So, sqrt({a}) = {wrong_answer}."
        elif op == "lcm":
            negative_output = f"So, LCM({a}, {b}) = {wrong_answer}."
        elif op == "gcd":
            negative_output = f"So, GCD({a}, {b}) = {wrong_answer}."
        elif op == "!":
            negative_output = f"So, {a}! = {wrong_answer}."

        # Generate rechecking step (inverse calculation)
        if is_wrong:
            if op == "+":
                recheck_output = f"{negative_output} Let me check again... {b} + {a} = {correct_answer}."
            elif op == "-":
              if not(is_wrong):
                recheck_output = f"{negative_output} Let me check again... {wrong_answer} + {b} = {a}."
              else:
                recheck_output = f"{negative_output} Let me check again... {wrong_answer} + {b} = {wrong_answer+b} which is not equal to {a}. Hence the first computation was wrong. Computing again {a} + {b} = {correct_answer}"
            elif op == "*":
                recheck_output = f"{negative_output} Let me check again... {b} * {a} = {correct_answer}."
            elif op == "/":
              if not(is_wrong):
                recheck_output = f"{negative_output} Let me check again... {correct_answer} * {b} = {a}."
              else:
                recheck_output = f"{negative_output} Let me check again... {wrong_answer} * {b} = {b*wrong_answer} which is not equal to {a}. Hence the first computation was wrong. Computing again {a} / {b} = {correct_answer}"
            elif op == "**":
              if not(is_wrong):
                recheck_output = f"{negative_output} Let me check again... The {b}th root of {correct_answer} is {a}."
              else:
                recheck_output = f"{negative_output} Let me check again... The {b}th root of {wrong_answer} is {wrong_answer ** (1/b)} which is not equal to{a}. Hence the first computation was wrong. Computing again {a} ** {b} = {correct_answer}"
            elif op == "sqrt":
              if not(is_wrong):
                recheck_output = f"{negative_output} Let me check again... {correct_answer}^2 = {a}."
              else:
                recheck_output = f"{negative_output} Let me check again... {wrong_answer}^2 = {wrong_answer^2} which is not equal to {a}. Hence the first computation was wrong. Computing again sqrt({a}) = {correct_answer}"
            elif op == "lcm":
                recheck_output = f"{negative_output} Let me check again... The smallest multiple of both {a} and {b} is {correct_answer}."
            elif op == "gcd":
                recheck_output = f"{negative_output} Let me check again... The largest number that divides both {a} and {b} is {correct_answer}."
            elif op == "!":
                recheck_output = f"{negative_output} Let me check again... {a-1}! * {a} = {correct_answer}."
        else:
            recheck_output = negative_output  # No need to recheck if already correct

        # Store in dataset
        dataset.append((input_text, recheck_output, negative_output))

    return dataset

def save_math_dataset(csv_path, num_examples=100):
    dataset = generate_math_dataset(num_examples)
    df = pd.DataFrame(dataset, columns=["Question", "Positive Response", "Negative Response"])
    print(df.head)
    df.to_csv(csv_path, index=False)
    print(f"Dataset saved at: {csv_path}")

# Example usage
csv_path = "/content/drive/MyDrive/AI Assignment/MATS- Neel Nanda/Spring 2025/CoT Store/Math_Dset.csv"
save_math_dataset(csv_path, num_examples=100)


## Compute steering vector

In [None]:
def get_per_token_activations(text, model, tokenizer):
    """
    Extracts per-token activations for all layers.

    Returns:
    - Dictionary where keys are layer indices and values are (seq_len, hidden_dim) numpy arrays.
    """
    inputs = tokenizer(text, return_tensors="pt").to(model.device)

    with t.no_grad():
        outputs = model(**inputs, output_hidden_states=True)

    # Extract hidden states for all layers
    all_layer_hidden_states = outputs.hidden_states  # Tuple of (num_layers + 1, batch_size, seq_len, hidden_dim)

    # Convert to dictionary with layers as keys
    activations = {
        layer_idx: layer_states.squeeze(0).cpu().numpy()  # Shape: (seq_len, hidden_dim)
        for layer_idx, layer_states in enumerate(all_layer_hidden_states)
    }

    return activations  # Dict[layer] -> (seq_len, hidden_dim)

In [None]:
import numpy as np

def compute_steering_vector(dataset, model, tokenizer):
    """Computes the steering vector using activations from all layers and identifies the most affected layer."""

    num_layers = model.config.num_hidden_layers + 1  # +1 to include input embeddings
    layer_norm_changes = np.zeros(num_layers)  # Track activation change per layer

    # Store steering vectors and activations
    steering_vectors = {layer: [] for layer in range(num_layers)}
    activations_per_layer = {layer: {"wrong": [], "self_check": [], "corrected": []} for layer in range(num_layers)}

    for question, recheck_response, wrong_response in tqdm(dataset):
        # Get per-token activations for both responses
        wrong_acts = get_per_token_activations(wrong_response, model, tokenizer)
        recheck_acts = get_per_token_activations(recheck_response, model, tokenizer)

        # Tokenize to find key tokens for activation extraction
        wrong_tokens = tokenizer.tokenize(wrong_response)
        recheck_tokens = tokenizer.tokenize(recheck_response)

        # Identify token positions (last token of each stage)
        idx_wrong = len(wrong_tokens) - 2  # Last token before the period
        idx_self_check = recheck_tokens.index("Let") if "Let" in recheck_tokens else idx_wrong
        idx_corrected = len(recheck_tokens) - 2  # Last token before final period

        # Compute steering vectors per layer
        for layer in range(num_layers):
            act_wrong = wrong_acts[layer][idx_wrong]  # Activation at wrong answer
            act_self_check = recheck_acts[layer][idx_self_check]  # Activation at self-verification step
            act_corrected = recheck_acts[layer][idx_corrected]  # Activation at corrected answer

            # Store raw activations for later analysis
            activations_per_layer[layer]["wrong"].append(act_wrong)
            activations_per_layer[layer]["self_check"].append(act_self_check)
            activations_per_layer[layer]["corrected"].append(act_corrected)

            # Compute different steering vectors
            direct_correction_vector = act_corrected - act_wrong
            uncertainty_vector = act_self_check - act_wrong

            # Store steering vectors
            steering_vectors[layer].append((direct_correction_vector, uncertainty_vector))

            # Compute magnitude of change (Euclidean norm)
            norm_change = np.linalg.norm(direct_correction_vector) + np.linalg.norm(uncertainty_vector)
            layer_norm_changes[layer] += norm_change

    # Compute final mean steering vectors per layer
    final_steering_vectors = {}
    for layer in range(num_layers):
        direct_vectors = np.array([vec[0] for vec in steering_vectors[layer]])
        uncertainty_vectors = np.array([vec[1] for vec in steering_vectors[layer]])

        final_steering_vectors[layer] = {
            "direct": np.mean(direct_vectors, axis=0),
            "uncertainty": np.mean(uncertainty_vectors, axis=0),
        }

    # Find layer with maximum change
    max_layer = np.argmax(layer_norm_changes)

    return final_steering_vectors, max_layer, activations_per_layer


In [None]:
# Load dataset
csv_path_mdset = "/content/drive/MyDrive/Articles /Math_Dset.xlsx"
dataset_df = pd.read_excel(csv_path_mdset, engine="openpyxl")
dataset = dataset_df.values.tolist()  # Convert to list of (Question, Positive Response, Negative Response)

In [None]:
model.to(device)
final_steering_vectors, max_layer, activations_per_layer = compute_steering_vector(dataset, model, tokenizer)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from mpl_toolkits.mplot3d import Axes3D  # Import 3D plotting toolkit

def plot_pca_variance_per_layer_3d(activations_per_layer, n_components=3):
    """
    Performs PCA on activation differences across layers and plots a 3D scatter plot
    distinguishing wrong, self-check, and corrected responses for each layer.

    Parameters:
    - activations_per_layer: Dictionary {layer: {wrong: Tensor, self_check: Tensor, corrected: Tensor}}
    - n_components: Number of PCA components (default=3)

    Returns:
    - None (displays separate 3D scatter plots per layer)
    """
    num_layers = len(activations_per_layer)

    for layer in range(num_layers):
        fig = plt.figure(figsize=(10, 8))
        ax = fig.add_subplot(111, projection='3d')  # 3D subplot

        # Extract activations for each stage
        layer_acts = activations_per_layer[layer]
        wrong_acts = np.array(layer_acts["wrong"])
        self_check_acts = np.array(layer_acts["self_check"])
        corrected_acts = np.array(layer_acts["corrected"])

        # Flatten if needed
        wrong_acts = wrong_acts.reshape(wrong_acts.shape[0], -1)
        self_check_acts = self_check_acts.reshape(self_check_acts.shape[0], -1)
        corrected_acts = corrected_acts.reshape(corrected_acts.shape[0], -1)

        print(f"Layer {layer}: Wrong Variance - {np.var(wrong_acts, axis=0).sum()}")

        # Perform PCA with normalization
        pca = PCA(n_components=n_components)
        scaler = StandardScaler()
        all_acts = scaler.fit_transform(np.vstack([wrong_acts, self_check_acts, corrected_acts]))  # Normalize activations
        reduced_acts = pca.fit_transform(all_acts)
        print(f"Layer {layer}: PCA Explained Variance Ratio - {pca.explained_variance_ratio_}")

        # Split transformed activations back
        split_1 = wrong_acts.shape[0]
        split_2 = split_1 + self_check_acts.shape[0]
        wrong_points = reduced_acts[:split_1]
        self_check_points = reduced_acts[split_1:split_2]
        corrected_points = reduced_acts[split_2:]

        # 3D Scatter plot
        ax.scatter(wrong_points[:, 0], wrong_points[:, 1], wrong_points[:, 2], color='blue', label="Wrong", alpha=0.7)
        ax.scatter(self_check_points[:, 0], self_check_points[:, 1], self_check_points[:, 2], color='orange', label="Self-Check", alpha=0.7)
        ax.scatter(corrected_points[:, 0], corrected_points[:, 1], corrected_points[:, 2], color='red', label="Corrected", alpha=0.7)

        # Labels & formatting
        ax.set_xlabel("PCA Component 1")
        ax.set_ylabel("PCA Component 2")
        ax.set_zlabel("PCA Component 3")
        ax.set_title(f"PCA 3D Scatter Plot - Layer {layer}")
        ax.legend()
        ax.grid(True)

        plt.show()

# Example usage:
# plot_pca_variance_per_layer_3d(activations_per_layer, n_components=3)


In [None]:
np.save('/content/drive/MyDrive/Articles /final_steering_vectors.npy', final_steering_vectors)

## Applying steering

In [None]:
def generate_cot_with_steering(model, tokenizer, question, steering_vectors, layer_to_modify, alpha):
    """
    Generates CoT reasoning and applies activation steering **only when '=' is detected**.

    Returns:
    - str: Generated output with activation steering applied.
    """
    cot_prompt = (
        f"A conversation between User and Assistant. The user asks a question, and the Assistant solves it.",
        f"The assistant first thinks about the reasoning process in the mind and then provides the user with the answer.",
        f"The reasoning process and answer are enclosed within <think> </think> and ",
        f"<answer> </answer> tags, respectively, i.e., <think> reasoning process here </think> ",
        f"<answer> answer here </answer>.",
        f" The final answer must be a numeric or a decimal. ",
        f"User: {question}. Assistant: "
    )

    cot_prompt = " ".join(cot_prompt)

    input_ids = tokenizer.encode(cot_prompt, return_tensors="pt").to(model.device)
    generated_ids = []
    steering_triggered = False

    def hook_fn(module, input, output):
        if steering_triggered:
            return output + alpha * steering_vectors[layer_to_modify]  # Apply steering vector
        return output  # No modification before '=' appears

    handle = model.model.layers[layer_to_modify].mlp.register_forward_hook(hook_fn)

    with torch.no_grad():
        while True:
            outputs = model(input_ids)
            logits = outputs.logits[:, -1, :]
            next_token_id = torch.argmax(logits, dim=-1).item()
            generated_ids.append(next_token_id)

            if tokenizer.decode([next_token_id]) == "=":
                steering_triggered = True  # Activate steering after "="

            if next_token_id == tokenizer.eos_token_id or len(generated_ids) > 512:
                break

            input_ids = torch.cat([input_ids, torch.tensor([[next_token_id]]).to(model.device)], dim=-1)

    handle.remove()
    return tokenizer.decode(generated_ids)



In [None]:
def generate_cot_answer(model, tokenizer, question, max_tokens=512):
    """
    Generates a step-by-step Chain-of-Thought (CoT) reasoning response for a given question.

    Args:
    - model: Language model.
    - tokenizer: Tokenizer for the model.
    - question (str): The input question.
    - max_tokens (int): Maximum number of tokens to generate.

    Returns:
    - str: The generated CoT response.
    """
    cot_prompt = (
        f"A conversation between User and Assistant. The user asks a question, and the Assistant solves it.",
        f"The assistant first thinks about the reasoning process in the mind and then provides the user with the answer.",
        f"The reasoning process and answer are enclosed within <think> </think> and ",
        f"<answer> </answer> tags, respectively, i.e., <think> reasoning process here </think> ",
        f"<answer> answer here </answer>.",
        f" The final answer must be a numeric or a decimal. ",
        f"User: {question}. Assistant: "
    )

    cot_prompt = " ".join(cot_prompt)

    input_ids = tokenizer.encode(cot_prompt, return_tensors="pt").to(model.device)

    with torch.no_grad():
        output_ids = model.generate(input_ids, max_length=max_tokens, pad_token_id=tokenizer.eos_token_id)

    return tokenizer.decode(output_ids[0], skip_special_tokens=True)


In [None]:
def run_cot_with_activation_steering(
    dset: list,
    csv_path: str,
    model,
    tokenizer,
    steering_vectors,
    layer_to_modify: int,
    alpha: float,
    num_runs: int,
    total: int
):
    """
    Runs step-by-step reasoning with and without activation steering.

    Args:
    - dset (list): Dataset containing questions and answers.
    - csv_path (str): Path to save the results as a CSV.
    - model: Language model.
    - tokenizer: Tokenizer for the model.
    - steering_vectors (dict): Precomputed steering vectors per layer.
    - layer_to_modify (int): Layer where activation steering is applied.
    - alpha (float): Strength of steering intervention.
    - num_runs (int): Number of runs to repeat.
    - total (int): Number of problems to evaluate.

    Returns:
    - df (DataFrame): Final results dataframe.
    - accuracies (list): Accuracy per run.
    """
    correct_counts_no_steering = [0] * num_runs
    correct_counts_steering = [0] * num_runs
    results = []

    write_headers = not os.path.exists(csv_path)

    for k in range(num_runs):
        run_results = []

        for i in tqdm(range(total), desc=f"Run {k+1}/{num_runs}"):
            question = dset[i]["question"]
            true_answer = extract_number(dset[i]["answer"])

            # Generate normal CoT (without activation steering)
            original_output = generate_cot_answer(model, tokenizer, question)

            # Apply activation steering **only when '=' appears**
            steered_output = generate_cot_with_steering(
                model, tokenizer, question, steering_vectors, layer_to_modify, alpha
            )

            # Extract numerical answers
            original_answer = extract_number(original_output)
            steered_answer = extract_number(steered_output)

            # Check correctness
            is_correct_no_steering = (true_answer == original_answer)
            is_correct_steering = (true_answer == steered_answer)

            if is_correct_no_steering:
                correct_counts_no_steering[k] += 1
            if is_correct_steering:
                correct_counts_steering[k] += 1

            # Store results
            run_results.append({
                "run_id": k + 1,
                "sample_id": i + 1,
                "question": question,
                "answer_response": dset[i]["answer"],
                "ground_truth": true_answer,
                "cot_response_no_steering": original_output,
                "cot_response_steering": steered_output,
                "is_correct_no_steering": is_correct_no_steering,
                "is_correct_steering": is_correct_steering
            })

            # Save incrementally after every iteration
            df = pd.DataFrame(run_results)
            df.to_csv(csv_path, mode="a", header=write_headers, index=False)
            write_headers = False  # Ensure headers are written only once

        results.append(run_results)

    # Compute and print accuracy per run
    accuracies_no_steering = [correct / total for correct in correct_counts_no_steering]
    accuracies_steering = [correct / total for correct in correct_counts_steering]

    for k in range(num_runs):
        print(f"Run {k+1} Accuracy (No Steering): {accuracies_no_steering[k]:.2%}")
        print(f"Run {k+1} Accuracy (With Steering): {accuracies_steering[k]:.2%}")

    return pd.DataFrame(results), accuracies_no_steering, accuracies_steering


In [None]:
# File paths in Google Drive
csv_path_3 = "/content/drive/MyDrive/Articles /with_steer.csv"

# Check if JSON file exists
if os.path.exists(csv_path_3):
    print("File already exists. Loading previous results...")
    df_gsm8k = pd.read_csv(csv_path_3)

    print(df_gsm8k.head())  # Show first few rows

else:
    print("No existing file found. Running CoT reasoning...")
    df_gsm8k = run_cot_with_activation_steering(gsm8k_ds_test, csv_path_3, model, tokenizer, final_steering_vectors, layer_to_modify=27, alpha=10, num_runs=1, total=5)



## Sample example- With partial steering- Token triggered

Attempted to execute this but ran into difficulties due to large inconcsistencies of output between model.generate() and model(). Additionally ths code is very slow for inferencing without the efficiencies of model.generate()

In [None]:
import torch
# Load the steering vector
steering_vector = np.load("/content/drive/MyDrive/AI Assignment/MATS- Neel Nanda/Spring 2025/final_steering_vectors.npy", allow_pickle=True)
steering_vector = steering_vector.item()

print(type(steering_vector))  # Check if it's a dict or list
print(steering_vector)        # Print a portion of the data


In [None]:
selected_key = 28  # Change this to select a different vector
steering_vector = steering_vector[selected_key]["uncertainty"]

# Ensure it's a float32 array
steering_vector = torch.tensor(steering_vector, dtype=torch.float32, device=device).to(device)

In [None]:
import torch
import torch.nn.functional as F
from tqdm import tqdm  # Import tqdm for progress tracking

def generate_reasoning_output(prompt, alpha, apply_steering=False,max_tokens=200,num_steer=5):
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    generated_tokens = []  # Store generated token IDs

    if apply_steering:
            # Ensure steering vector is on CUDA and properly reshaped
            steering_vector_torch = torch.tensor(
                steering_vector, dtype=torch.float32, device=device
            ).view(1, 1, -1)

    current_input = inputs["input_ids"].to(device)  # Ensure input is on CUDA

    decoded = tokenizer.decode(current_input[0].tolist(), skip_special_tokens=True)
    print(decoded)

    with torch.no_grad():
      output = model.generate(
            input_ids=current_input,
            max_new_tokens=1,  # Generate only one token per step
            return_dict_in_generate=True,  # Get additional info like scores
            output_hidden_states=True  # Get hidden states if needed
            )

      current_input = output.sequences
      next_token = output.sequences[:, -1:]
      generated_tokens.append(next_token.item())

      print(type(output.sequences))
      print(output.sequences.shape)

      for _ in tqdm(range(max_tokens), desc="Generating Tokens", unit="token"):
            output = model.generate(
            input_ids=current_input,
            max_new_tokens=1,  # Generate only one token per step
            return_dict_in_generate=True,  # Get additional info like scores
            output_hidden_states=True  # Get hidden states if needed
            )

            # Extract the newly generated token
            next_token = output.sequences[:, -1:]  # Get only the new token

            # Append token to sequence
            generated_tokens.append(next_token.item())

            # Update current_input
            current_input = torch.cat([current_input, next_token], dim=1)

            # decoded_text = tokenizer.decode(current_input[0].tolist(), skip_special_tokens=True)
            # print(decoded_text)

            # Decode only the last 5 tokens
            decoded_last_five_tokens = tokenizer.decode(generated_tokens[-(num_steer):], skip_special_tokens=True)

            # Check for punctuation only in the last 5 tokens
            punctuation_flag = any(p in decoded_last_five_tokens for p in ".!?")

            if apply_steering and punctuation_flag:  # Apply steering at punctuation
                print("Applied steer")
                # Get last layer's hidden states (shape: batch_size x seq_len x hidden_dim)
                print("Otput.hidden_states: ",len(output.hidden_states[-1]))
                hidden_states = output.hidden_states[-1]

                # Get the last layer's hidden states
                last_layer_hidden_states = hidden_states[-1]  # Last layer in the tuple

                # Extract the last token's hidden state (index -1 for last token)
                last_token_hidden_state = last_layer_hidden_states[:, -1, :].to(device)
                print("Last_token_hidden_state_size: ", last_token_hidden_state.shape)

                # Apply steering
                steered_hidden_states = last_token_hidden_state + alpha * steering_vector_torch

                full_embeds = model.get_input_embeddings()(current_input).clone()
                full_embeds[:, -1, :] = steered_hidden_states.clone()

                new_outputs = model(
                                inputs_embeds=full_embeds,
                                attention_mask = torch.ones_like(current_input, device=device)  # Ensure padding tokens are ignored
                                    )

                logits = new_outputs.logits[:, -1, :]  # Get updated logits

                # **Choose new token from steered logits**
                next_token = torch.argmax(logits, dim=-1, keepdim=True)

                # Append steered token to sequence
                generated_tokens[-1] = next_token.item()

                current_input[:, -1] = next_token  # Replace last token with next_token

              # Append instead of overwrite
            # print(current_input.shape)

            # Stop if EOS token is reached
            if next_token.item() == tokenizer.eos_token_id:
                break

    return tokenizer.decode(generated_tokens, skip_special_tokens=True)


In [None]:

print(f"do_sample: {getattr(model.config, 'do_sample', 'Not set')}")
print(f"temperature: {getattr(model.config, 'temperature', 'Not set')}")
print(f"top_k: {getattr(model.config, 'top_k', 'Not set')}")
print(f"top_p: {getattr(model.config, 'top_p', 'Not set')}")


In [None]:
# Example multi-step reasoning prompt
prompt = "Josh decides to try flipping a house.  He buys a house for $80,000 and then puts in $50,000 in repairs.  This increased the value of the house by 150%.  How much profit did he make? Solve step by step, giving final answer as a numeric"
baseline_output = generate_reasoning_output(prompt, alpha =0, apply_steering=False,max_tokens=50)

In [None]:
# Generate outputs
steered_output_1 = generate_reasoning_output(prompt, alpha = 0, apply_steering=True,max_tokens=50)

In [None]:
steered_output_2 = generate_reasoning_output(prompt, alpha = 0.01, apply_steering=True,max_tokens=350)

In [None]:
steered_output_3 = generate_reasoning_output(prompt, alpha = 0.0001, apply_steering=True,max_tokens=350,num_steer=2)

In [None]:
steered_output_4 = generate_reasoning_output(prompt, alpha = 0.00000001, apply_steering=True,max_tokens=350,num_steer=2)

In [None]:
steered_output_5 = generate_reasoning_output(prompt, alpha = 0, apply_steering=True,max_tokens=350)

In [None]:
steered_output_6 = generate_reasoning_output(prompt, alpha = 0.02, apply_steering=True,max_tokens=350,num_steer=3)

In [None]:
steered_output_7 = generate_reasoning_output(prompt, alpha = 0, apply_steering=True,max_tokens=350,num_steer=2)

In [None]:
steered_output_8 = generate_reasoning_output(prompt, alpha = 0.025, apply_steering=True,max_tokens=350,num_steer=6)

In [None]:
steered_output_9 = generate_reasoning_output(prompt, alpha = 0.025, apply_steering=True,max_tokens=350,num_steer=3)

In [None]:
# Display results
print("=== Baseline Output ===\n", baseline_output)
print("\n=== Steered Output 1, alpha =0 ===\n", steered_output_1)
#print("\n=== Steered Output 2, alpha =0.01 ===\n", steered_output_2)
#print("\n=== Steered Output 3, alpha =0.0001,num_steer=2 ===\n", steered_output_3)
#print("\n=== Steered Output 4, alpha =0.00000001,num_steer=2 ===\n", steered_output_4)
#print("\n=== Steered Output 5, alpha =0 ===\n", steered_output_5)
#print("\n=== Steered Output 6, alpha =0.02,num_steer=3 ===\n", steered_output_6)
#print("\n=== Steered Output 7, alpha =0,num_steer=2 ===\n", steered_output_7)
#print("\n=== Steered Output 8, alpha =0.025,num_steer=6 ===\n", steered_output_8)
#print("\n=== Steered Output 9, alpha =0.025,num_steer=3 ===\n", steered_output_9)

In [None]:
steered_output_2 = generate_reasoning_output(prompt, alpha = 1, apply_steering=True)
steered_output_3 = generate_reasoning_output(prompt, alpha = 0.00000001, apply_steering=True)

print("\n=== Steered Output 2 ===\n", steered_output_2)
print("\n=== Steered Output 3 ===\n", steered_output_3)

## Sample example- Full steering

Here steering is applied throught the generation process resulting in more stable outputs

In [None]:
import torch
steering_vector = np.load("/content/drive/MyDrive/Articles /final_steering_vectors.npy", allow_pickle=True)
steering_vector = steering_vector.item()

selected_key = 28  # Change this to select a different vector
steering_vector = steering_vector[selected_key]["uncertainty"]

# Ensure it's a float32 array
steering_vector = torch.tensor(steering_vector, dtype=torch.float32, device=device).to(device)

In [None]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

def apply_steering_hook(model, tokenizer, lookback=5, alpha=0.5):
    """
    Applies a steering hook to modify hidden states if punctuation appears within the last `lookback` tokens.

    Parameters:
        model (transformers.PreTrainedModel): The causal language model.
        tokenizer (transformers.PreTrainedTokenizer): Tokenizer for the model.
        lookback (int): Number of tokens to check for punctuation (default: 5).
        alpha (float): Strength of steering modification (default: 0.5).

    Returns:
        hook_handle (torch.utils.hooks.RemovableHandle): Handle to remove the hook after generation.
    """

    def forward_hook(module, input, output):
        output = output.to(steering_vector.device)
        output[:, -1, :] += alpha * steering_vector

        return output

    # Attach hook to the last layer's `post_attention_layernorm`
    last_layer = model.model.layers[-1].post_attention_layernorm
    return last_layer.register_forward_hook(forward_hook)


def generate_with_steering(model, tokenizer, prompt, lookback=5, alpha=0.5, max_new_tokens=50):
    """
    Generates text using `model.generate()` with a conditional steering hook.

    Parameters:
        model (transformers.PreTrainedModel): The causal language model.
        tokenizer (transformers.PreTrainedTokenizer): Tokenizer for the model.
        prompt (str): The input prompt.
        lookback (int): Number of tokens to check for punctuation (default: 5).
        alpha (float): Strength of steering modification (default: 0.5).
        max_new_tokens (int): Number of tokens to generate (default: 50).

    Returns:
        str: The generated text.
    """
    hook_handle = apply_steering_hook(model, tokenizer, lookback, alpha)
    print("Hook handle: ",hook_handle)

    input_ids = tokenizer(prompt, return_tensors="pt")["input_ids"].to(model.device)

    output = model.generate(
        input_ids=input_ids,
        max_new_tokens=max_new_tokens,
        return_dict_in_generate=True
    )

    hook_handle.remove()  # Remove the hook to avoid affecting future generations

    return tokenizer.decode(output.sequences[0], skip_special_tokens=True)


In [None]:
prompt = "Josh decides to try flipping a house.  He buys a house for $80,000 and then puts in $50,000 in repairs.  This increased the value of the house by 150%.  How much profit did he make? Solve step by step, giving final answer as a numeric"
gen_text_base = generate_with_steering(model, tokenizer, prompt, lookback=5, alpha=0, max_new_tokens=500)

In [None]:
gen_text_steer_1 = generate_with_steering(model, tokenizer, prompt, lookback=5, alpha=0.001, max_new_tokens=500)

In [None]:
gen_text_steer_2 = generate_with_steering(model, tokenizer, prompt, lookback=5, alpha=0.01, max_new_tokens=500)

In [None]:
gen_text_steer_3 = generate_with_steering(model, tokenizer, prompt, lookback=5, alpha=0.1, max_new_tokens=500)

In [None]:
gen_text_steer_4 = generate_with_steering(model, tokenizer, prompt, lookback=5, alpha=1, max_new_tokens=500)

In [None]:
gen_text_steer_5 = generate_with_steering(model, tokenizer, prompt, lookback=5, alpha=10, max_new_tokens=500)

In [None]:
print("_____________Base text_____________________\n",gen_text_base)
print("\n_____________Steer text1_____________________\n",gen_text_steer_1)
print("\n_____________Steer text2_____________________\n",gen_text_steer_2)
print("\n_____________Steer text3_____________________\n",gen_text_steer_3)
print("\n_____________Steer text4_____________________\n",gen_text_steer_4)
print("\n_____________Steer text5_____________________\n",gen_text_steer_5)