In [7]:
import torch
from transformers import PreTrainedModel, PreTrainedTokenizer
from typing import List, Tuple, Dict, Optional
import numpy as np

import json
import pandas as pd
from datasets import load_dataset
from pathlib import Path
import re

from transformers import AutoModelForCausalLM, AutoTokenizer

Model and Tokenizer (from HuggingFace)

In [105]:
model_name = "Qwen/Qwen2.5-0.5B-Instruct"

model = AutoModelForCausalLM.from_pretrained(
    model_name, attn_implementation="eager")
tokenizer = AutoTokenizer.from_pretrained(model_name)

For evaluation of Correctness of GSM8K (taken from the authors)

In [91]:
# from the authors' code

def _is_float(s):
    try:
        float(s)
        return True
    except:
        return False


def is_correct(target, ans):
    if _is_float(target) and _is_float(ans):
        if abs(float(target) - float(ans)) <= 1e-5:
            return True
    elif str(target) == str(ans):
        return True
    return False

For CoT-decoding (taken from GitHub)

In [8]:
def get_device():
    if torch.backends.mps.is_available():
        return torch.device("mps")
    elif torch.cuda.is_available():
        return torch.device("cuda")
    else:
        return torch.device("cpu")


def calculate_confidence(logits: List[torch.Tensor], answer_ids: torch.Tensor) -> float:
    """
    Calculate the confidence score (Δ) as specified in the paper.

    Args:
        logits: List of logits for each decoding step
        answer_ids: Tensor of token ids for the answer

    Returns:
        Confidence score (Δ)
    """
    confidence_sum = 0.0
    valid_tokens = 0
    for t, token_id in enumerate(answer_ids):
        if t >= len(logits):
            break
        token_logits = logits[t]
        probs = torch.softmax(token_logits, dim=-1)
        if probs.size(-1) > 1:
            top_2_probs, _ = torch.topk(probs, min(2, probs.size(-1)))
            if top_2_probs.size(-1) > 1:
                confidence_sum += (top_2_probs[-1]
                                   [0] - top_2_probs[-1][1]).item()
            else:
                confidence_sum += 1.0  # Max confidence if there's only one token
        else:
            confidence_sum += 1.0  # Max confidence if there's only one token
        valid_tokens += 1

    return confidence_sum / valid_tokens if valid_tokens > 0 else 0.0


def aggregate_paths_based_on_scores(paths: List[Tuple[str, float]]) -> Tuple[str, float]:
    """Aggregate multiple paths based on their confidence scores."""
    answer_scores = {}
    for answer, delta in paths:
        answer_scores[answer] = answer_scores.get(answer, 0) + delta
    best_answer = max(answer_scores, key=answer_scores.get)
    return best_answer, answer_scores[best_answer]


def cot_decode(
    model: PreTrainedModel,
    tokenizer: PreTrainedTokenizer,
    messages: List[Dict[str, str]],
    k: int = 10,
    num_beams: int = 1,
    max_new_tokens: int = 512,
    temperature: float = 1.0,
    top_p: float = 1.0,
    repetition_penalty: float = 1.0,
    length_penalty: float = 1.0,
    no_repeat_ngram_size: int = 0,
    early_stopping: bool = False,
    aggregate_paths: bool = False,
) -> Tuple[str, float]:
    """
    Implement CoT-decoding for a given chat input.

    Args:
        model: The Hugging Face transformer model.
        tokenizer: The associated tokenizer.
        messages: List of chat messages in the format [{"role": "user", "content": "..."}]
        k: The number of alternative tokens to consider at the first step.
        num_beams: Number of beams for beam search.
        max_new_tokens: Maximum number of new tokens to generate.
        temperature: Sampling temperature.
        top_p: Nucleus sampling probability.
        repetition_penalty: Repetition penalty factor.
        length_penalty: Length penalty factor.
        no_repeat_ngram_size: Size of n-grams to avoid repeating.
        early_stopping: Whether to stop generation when all beams are finished.
        aggregate_paths: Whether to aggregate multiple paths.

    Returns:
        A tuple containing the best path (or aggregated result) and its confidence score.
    """
    device = get_device()
    model.to(device)

    # Use the chat template to format the input
    if tokenizer.chat_template:
        input_text = tokenizer.apply_chat_template(
            messages, tokenize=False, add_generation_prompt=True)
    else:
        # Fallback for tokenizers without chat templates
        input_text = "\n".join(
            [f"{msg['role']}: {msg['content']}" for msg in messages])
        input_text += "\nassistant:"

    input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device)
    attention_mask = torch.ones_like(input_ids).to(device)

    # Set pad_token_id if it's not set
    if tokenizer.pad_token_id is None:
        tokenizer.pad_token_id = tokenizer.eos_token_id

    # Get the top-k tokens for the first decoding step
    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        first_token_logits = outputs.logits[0, -1, :]
        top_k_logits, top_k_indices = torch.topk(first_token_logits, k)

    paths = []
    for idx in top_k_indices:
        # Generate sequence starting with the selected token
        start_ids = torch.cat(
            [input_ids, idx.unsqueeze(0).unsqueeze(0)], dim=-1)
        start_mask = torch.cat([attention_mask, torch.ones(
            (1, 1), dtype=torch.long, device=device)], dim=-1)

        output = model.generate(
            start_ids,
            attention_mask=start_mask,
            max_new_tokens=max_new_tokens,
            num_beams=num_beams,
            temperature=temperature,
            top_p=top_p,
            repetition_penalty=repetition_penalty,
            length_penalty=length_penalty,
            no_repeat_ngram_size=no_repeat_ngram_size,
            early_stopping=early_stopping,
            pad_token_id=tokenizer.pad_token_id,
            eos_token_id=tokenizer.eos_token_id,
            output_scores=True,
            return_dict_in_generate=True,
        )

        generated_sequence = output.sequences[0]
        answer_ids = generated_sequence[len(input_ids[0]):]
        print(f"answer_ids: {answer_ids}")
        answer_text = tokenizer.decode(answer_ids, skip_special_tokens=True)
        print(f"answer_text: {answer_text}")

        # Calculate confidence score (Δ)
        confidence = calculate_confidence(output.scores, answer_ids)
        paths.append((answer_text, confidence))

    if aggregate_paths:
        return aggregate_paths_based_on_scores(paths)
    else:
        return max(paths, key=lambda x: x[1])

In [None]:
# Usage example
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "Qwen/Qwen2.5-0.5B-Instruct"
model = AutoModelForCausalLM.from_pretrained(
    model_name, attn_implementation="eager")
tokenizer = AutoTokenizer.from_pretrained(model_name)

messages = [
    {"role": "user", "content": "In a dance class of 20 students, 20% enrolled in contemporary dance, 25% of the remaining enrolled in jazz dance, and the rest enrolled in hip-hop dance. What percentage of the entire students enrolled in hip-hop dance?"}
]

# # Generate the response using CoT decoding
print(f"Using device: {get_device()}")
result, confidence = cot_decode(
    model, tokenizer, messages, aggregate_paths=True, max_new_tokens=512)
print(f"CoT Decoding:\n {result}")

For setting up the data set (from Nilofar)

In [18]:
def setup_gsm8k():
    """
    Sets up the GSM8K dataset and provides basic analysis functionality.
    Returns train and test datasets as pandas DataFrames.
    """
    # Load the dataset using the Hugging Face datasets library
    dataset = load_dataset("gsm8k", "main")

    # Convert to pandas DataFrames for easier manipulation
    train_df = pd.DataFrame(dataset['train'])
    test_df = pd.DataFrame(dataset['test'])

    # Create a directory for saving the data
    data_dir = Path('C:\\Users\\roegn\\Documents\\Universität\\Master_Semester_III\\ML4NLP_Übung\\ML4NLP\\gsm8k_data')
    data_dir.mkdir(exist_ok=True)

    # Save the datasets locally
    train_df.to_csv(data_dir / 'train.csv', index=False)
    test_df.to_csv(data_dir / 'test.csv', index=False)

    return train_df, test_df


# def analyze_problem(problem):
#     """
#     Analyzes a single GSM8K problem and its solution.
#     """
#     lines = problem['answer'].split('\n')
#     solution_steps = [line for line in lines if line.startswith('<<')]
#     final_answer = lines[-1] if lines else "No answer found"
# 
#     return {
#         'question': problem['question'],
#         'solution_steps': solution_steps,
#         'final_answer': final_answer
#     }

 def analyze_problem(problem):
     """
     Extracts the question and answer from a single GSM8K problem.
     Problems without answers are not returned.
     """
#     if re.findall("[0-9]/[0-9]", problem['answer']):
#         return
     lines = problem['answer'].split('\n')
     #solution_steps = [line for line in lines if line.startswith('<<')]
     
     if lines:
         final_answer = lines[-1].replace("#### ", '')
         return {
         'question': problem['question'],
         'final_answer': final_answer
     }
     
     #final_answer="No answer found"
#     return {}

For a single instance:

In [42]:
# Set up the dataset
train_df, test_df = setup_gsm8k()

print(f"Training set size: {len(train_df)}")
print(f"Test set size: {len(test_df)}")

# Example: Analyze the first problem
if len(train_df) > 0:
    first_problem = train_df.iloc[0]
    analysis = analyze_problem(first_problem)

    print("\nExample Problem Analysis:")
    print("Question:", analysis['question'])

    # print("\nSolution Steps:")
    # for step in analysis['solution_steps']:
    #    print(step)

    print("\nFinal Answer:", analysis['final_answer'])

Training set size: 7473
Test set size: 1319

Example Problem Analysis:


TypeError: 'NoneType' object is not subscriptable

In [111]:
# from the example in the file on GitHub
messages = [
    {"role": "user", "content": f"Q: {analysis['question']}\nA:"}
]

# Generate the response using CoT decoding
print(f"Using device: {get_device()}")
result, confidence = cot_decode(
    model, tokenizer, messages, aggregate_paths=True, max_new_tokens=512)
print(f"CoT Decoding:\n {result}")

TypeError: 'NoneType' object is not subscriptable

In [116]:
# identifying the actual answer to the question
found = re.findall(r"\\boxed\{.+\}|\\(.+\\)", result)

In [141]:
# evaluation
if found:

    answer_span = re.sub(r"\\|\(|\)|\{|\}|boxed", "", found[-1])

    if is_correct(final_answer, answer_span):
        print("Correct Answer", answer_span)
    else:
        print(
            f"Incorrect Answer. Correct Answer: {final_answer}, Answer given: {answer_span}")
else:
    print("No answer was found.")

Incorrect Answer. Correct Answer: 10, Answer given: 75


For iterating through the data set:

In [106]:
# Set up the dataset
train_df, test_df = setup_gsm8k()

print(f"Training set size: {len(train_df)}")
print(f"Test set size: {len(test_df)}")


# Prepare lists for questions and answers
question_list = []
answer_list = []

for i in range(len(test_df)):
    problem = test_df.iloc[i]

    # eliminate tasks that include division as described in the original paper
    if re.findall("[0-9]/[0-9]", problem['answer']):
        continue

    question_list.append(problem['question'])
    answer_list.append(problem['answer'].split('\n')[-1].replace("#### ", ''))

Training set size: 7473
Test set size: 1319


In [None]:
correct: int = 0
no_answer: int = 0
# for analysis, if a sample comes without "boxed" we can retrieve it
indexes_no_answer = []
incorrect: int = 0
indexes_incorrect = []  # for analysis

for i in range(len(question_list)):

    # Inference
    messages = [
        {"role": "user", "content": f"Q: {question_list[i]}\nA:"}
    ]

    result, confidence = cot_decode(
        model, tokenizer, messages, aggregate_paths=True, max_new_tokens=512)

    # Finding the specific answer to the question in output
    # Right now: work-around with \boxed{}

    found = re.findall(r"\\boxed\{.+\}|\\(.+\\)", result)

    # comparing given answer to actual answer from the data set
    if found:
        # removing \boxing ans brackets for comparison
        answer_span = re.sub(r"\\|\(|\)|\{|\}|boxed", "", found[-1])
        if is_correct(answer_list[i], answer_span):
            correct += 1
        else:
            incorrect += 1
            indexes_incorrect.append(f"{i}: {answer_span}, {result}")
    else:
        no_answer += 1
        indexes_no_answer.append(result)

    # to save time, limit number of operations
    if i >= 200:
        break

# Calculating accuracy:
print(f"Accuracy: {correct/len(question_list) * 100} %")

# Cases were workaround did not work
print(f"No answers found: {no_answer}")

In [144]:
result

'To determine how far John is from home at the end of 4 hours, we can break down the problem into several steps:\n\n1. Calculate the distance John travels during the first 2 hours:\n\\[ \\text{Distance} = \\text{Speed} \\times \\text{Time} = 60 \\text{ mph} \\times 2 \\text{ hours} = 120 \\text{ miles} \\]\n\n2. Calculate the remaining distance after the first 2 hours:\n\\[ \\text{Distance after first 2 hours} = \\text{Total distance} - \\text{Distance during first 2 hours} = 280 \\text{ miles} - 120 \\text{ miles} = 160 \\text{ miles} \\]\n\n3. Calculate the distance John travels during the next half-hour:\n\\[ \\text{Distance} = \\text{Speed} \\times \\text{Time} = 30 \\text{ mph} \\times 0.5 \\text{ hours} = 15 \\text{ miles} \\]\n\n4. Calculate the remaining distance after driving 0.5 hours:\n\\[ \\text{Distance after next half-hour} = \\text{Remaining distance} - \\text{Distance traveled} = 160 \\text{ miles} - 15 \\text{ miles} = 145 \\text{ miles} \\]\n\n5. Sum the remaining dis

In [145]:
messages

[{'role': 'user',
  'content': 'Q: John drives for 3 hours at a speed of 60 mph and then turns around because he realizes he forgot something very important at home.  He tries to get home in 4 hours but spends the first 2 hours in standstill traffic.  He spends the next half-hour driving at a speed of 30mph, before being able to drive the remaining time of the 4 hours going at 80 mph.  How far is he from home at the end of those 4 hours?\nA:'}]

In [146]:
indexes_no_answer

[0, 2, 4]

In [152]:
indexes_incorrect.append(f"{i}: {answer_span}, {result}")

In [153]:
indexes_incorrect

[1,
 3,
 5,
 '5: result',
 '5: [ ',
 '5: [ , To determine how far John is from home at the end of 4 hours, we can break down the problem into several steps:\n\n1. Calculate the distance John travels during the first 2 hours:\n\\[ \\text{Distance} = \\text{Speed} \\times \\text{Time} = 60 \\text{ mph} \\times 2 \\text{ hours} = 120 \\text{ miles} \\]\n\n2. Calculate the remaining distance after the first 2 hours:\n\\[ \\text{Distance after first 2 hours} = \\text{Total distance} - \\text{Distance during first 2 hours} = 280 \\text{ miles} - 120 \\text{ miles} = 160 \\text{ miles} \\]\n\n3. Calculate the distance John travels during the next half-hour:\n\\[ \\text{Distance} = \\text{Speed} \\times \\text{Time} = 30 \\text{ mph} \\times 0.5 \\text{ hours} = 15 \\text{ miles} \\]\n\n4. Calculate the remaining distance after driving 0.5 hours:\n\\[ \\text{Distance after next half-hour} = \\text{Remaining distance} - \\text{Distance traveled} = 160 \\text{ miles} - 15 \\text{ miles} = 145 \\