In [1]:
import pandas as pd

datasets = ['multiArith1','gsm8k','svamp']
def run_datasets(data):
  # for i in datasets:
  if data == 'gsm8k':

    splits = {'train': 'main/train-00000-of-00001.parquet', 'test': 'main/test-00000-of-00001.parquet'}
    df = pd.read_parquet("hf://datasets/openai/gsm8k/" + splits["train"]).head(100)



#############
  elif data=='multiArith1':
    df = pd.read_excel('data/multiArith_d0_l3.xlsx')
  
  elif data == "svamp":

    df = pd.read_parquet("hf://datasets/tongyx361/svamp/data/test-00000-of-00001.parquet").head(100)
    df["question"] = df["Body"] + " " + df["Question"]
    df = df.rename(columns={"Answer":"answer"})



  return df

In [3]:
datasets

['svamp']

In [4]:
import torch
from transformers import PreTrainedModel, PreTrainedTokenizer
from typing import List, Tuple, Dict, Optional
import numpy as np

def get_device():
    if torch.backends.mps.is_available():
        return torch.device("mps")
    elif torch.cuda.is_available():
        return torch.device("cuda")
    else:
        return torch.device("cpu")

def calculate_confidence(logits: List[torch.Tensor], answer_ids: torch.Tensor) -> float:
    """
    Calculate the confidence score (Δ) as specified in the paper.

    Args:
        logits: List of logits for each decoding step
        answer_ids: Tensor of token ids for the answer

    Returns:
        Confidence score (Δ)
    """
    confidence_sum = 0.0
    valid_tokens = 0
    for t, token_id in enumerate(answer_ids):
        if t >= len(logits):
            break
        token_logits = logits[t]
        probs = torch.softmax(token_logits, dim=-1)
        if probs.size(-1) > 1:
            top_2_probs, _ = torch.topk(probs, min(2, probs.size(-1)))
            if top_2_probs.size(-1) > 1:
                confidence_sum += (top_2_probs[-1][0] - top_2_probs[-1][1]).item()
            else:
                confidence_sum += 1.0  # Max confidence if there's only one token
        else:
            confidence_sum += 1.0  # Max confidence if there's only one token
        valid_tokens += 1

    return confidence_sum / valid_tokens if valid_tokens > 0 else 0.0

def aggregate_paths_based_on_scores(paths: List[Tuple[str, float]]) -> Tuple[str, float]:
    """Aggregate multiple paths based on their confidence scores."""
    answer_scores = {}
    for answer, delta in paths:
        answer_scores[answer] = answer_scores.get(answer, 0) + delta
    best_answer = max(answer_scores, key=answer_scores.get)
    return best_answer, answer_scores[best_answer]

def cot_decode(
    model: PreTrainedModel,
    tokenizer: PreTrainedTokenizer,
    messages: List[Dict[str, str]],
    k: int = 10,
    num_beams: int = 1,
    max_new_tokens: int = 512,
    temperature: float = 1.0,
    top_p: float = 1.0,
    repetition_penalty: float = 1.0,
    length_penalty: float = 1.0,
    no_repeat_ngram_size: int = 0,
    early_stopping: bool = False,
    aggregate_paths: bool = False,
) -> Tuple[str, float]:
    """
    Implement CoT-decoding for a given chat input.

    Args:
        model: The Hugging Face transformer model.
        tokenizer: The associated tokenizer.
        messages: List of chat messages in the format [{"role": "user", "content": "..."}]
        k: The number of alternative tokens to consider at the first step.
        num_beams: Number of beams for beam search.
        max_new_tokens: Maximum number of new tokens to generate.
        temperature: Sampling temperature.
        top_p: Nucleus sampling probability.
        repetition_penalty: Repetition penalty factor.
        length_penalty: Length penalty factor.
        no_repeat_ngram_size: Size of n-grams to avoid repeating.
        early_stopping: Whether to stop generation when all beams are finished.
        aggregate_paths: Whether to aggregate multiple paths.

    Returns:
        A tuple containing the best path (or aggregated result) and its confidence score.
    """
    device = get_device()
    model.to(device)

    # Use the chat template to format the input
    if tokenizer.chat_template:
        input_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    else:
        # Fallback for tokenizers without chat templates
        input_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in messages])
        input_text += "\nassistant:"

    input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device)
    attention_mask = torch.ones_like(input_ids).to(device)

    # Set pad_token_id if it's not set
    if tokenizer.pad_token_id is None:
        tokenizer.pad_token_id = tokenizer.eos_token_id

    # Get the top-k tokens for the first decoding step
    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        first_token_logits = outputs.logits[0, -1, :]
        top_k_logits, top_k_indices = torch.topk(first_token_logits, k)

    paths = []
    for idx in top_k_indices:
        # Generate sequence starting with the selected token
        start_ids = torch.cat([input_ids, idx.unsqueeze(0).unsqueeze(0)], dim=-1)
        start_mask = torch.cat([attention_mask, torch.ones((1, 1), dtype=torch.long, device=device)], dim=-1)

        output = model.generate(
            start_ids,
            attention_mask=start_mask,
            max_new_tokens=max_new_tokens,
            num_beams=num_beams,
            temperature=temperature,
            top_p=top_p,
            repetition_penalty=repetition_penalty,
            length_penalty=length_penalty,
            no_repeat_ngram_size=no_repeat_ngram_size,
            early_stopping=early_stopping,
            pad_token_id=tokenizer.pad_token_id,
            eos_token_id=tokenizer.eos_token_id,
            output_scores=True,
            return_dict_in_generate=True,
        )

        generated_sequence = output.sequences[0]
        answer_ids = generated_sequence[len(input_ids[0]):]
        answer_text = tokenizer.decode(answer_ids, skip_special_tokens=True)

        # Calculate confidence score (Δ)
        confidence = calculate_confidence(output.scores, answer_ids)
        paths.append((answer_text, confidence))

    if aggregate_paths:
        return aggregate_paths_based_on_scores(paths)
    else:
        return max(paths, key=lambda x: x[1])

In [5]:
import logging
from typing import List, Dict
from difflib import SequenceMatcher
from transformers import pipeline

logger = logging.getLogger(__name__)

class AdvancedSelfConsistency:
    def __init__(self, model_name: str, num_samples: int = 5, similarity_threshold: float = 0.8):
        self.model_name = model_name
        self.num_samples = num_samples
        self.similarity_threshold = similarity_threshold
        self.self_consistency_completion_tokens = 0

        # Load the Hugging Face model pipeline for text generation
        self.generator = pipeline("text-generation", model=self.model_name)

    def generate_responses(self, system_prompt: str, user_prompt: str) -> List[str]:
        """Generates multiple responses using a Hugging Face model."""
        full_prompt = f"{system_prompt}\n\n{user_prompt}"
        responses = []

        for _ in range(self.num_samples):
            response = self.generator(
                full_prompt,
                max_length=512,  # Adjust based on model
                num_return_sequences=1,
                temperature=1.0
            )
            responses.append(response[0]['generated_text'])

        return responses

    def calculate_similarity(self, a: str, b: str) -> float:
        return SequenceMatcher(None, a, b).ratio()

    def cluster_similar_responses(self, responses: List[str]) -> List[List[str]]:
        clusters = []
        for response in responses:
            added_to_cluster = False
            for cluster in clusters:
                if self.calculate_similarity(response, cluster[0]) >= self.similarity_threshold:
                    cluster.append(response)
                    added_to_cluster = True
                    break
            if not added_to_cluster:
                clusters.append([response])
        return clusters

    def aggregate_results(self, responses: List[str]) -> Dict[str, any]:
        clusters = self.cluster_similar_responses(responses)

        cluster_info = [
            {"answer": cluster[0], "frequency": len(cluster), "variants": cluster}
            for cluster in clusters
        ]
        cluster_info.sort(key=lambda x: x['frequency'], reverse=True)

        return {
            "clusters": cluster_info,
            "total_responses": len(responses),
            "num_unique_clusters": len(clusters)
        }

    def evaluate(self, system_prompt: str, user_prompt: str) -> Dict[str, any]:
        responses = self.generate_responses(system_prompt, user_prompt)
        aggregated_result = self.aggregate_results(responses)
        return {"individual_responses": responses, "aggregated_result": aggregated_result}

def advanced_self_consistency_approach(system_prompt: str, initial_query: str, model_name: str):
    self_consistency = AdvancedSelfConsistency(model_name)
    result = self_consistency.evaluate(system_prompt, initial_query)

    logger.info("Advanced Self-Consistency Results:")
    logger.info(f"Total responses: {result['aggregated_result']['total_responses']}")
    logger.info(f"Number of unique clusters: {result['aggregated_result']['num_unique_clusters']}")

    for i, cluster in enumerate(result['aggregated_result']['clusters'], 1):
        logger.debug(f"\nCluster {i}:")
        logger.debug(f"  Representative answer: {cluster['answer']}")
        logger.debug(f"  Frequency: {cluster['frequency']}")
        logger.debug(f"  Variants: {cluster['variants']}")

    if result['aggregated_result']['clusters']:
        return result['aggregated_result']['clusters'][0]['answer']
    else:
        return "No consistent answer found."


In [6]:
from transformers import pipeline, AutoTokenizer
import pandas as pd
import os

class SimpleChainOfThought:
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.generator = pipeline("text-generation", model=self.model_name)

    def evaluate(self, user_prompt: str, system_prompt: str = "") -> str:
        full_prompt = f"{system_prompt}\n\n{user_prompt}\nLet's think step by step."
        response = self.generator(
            full_prompt,
            max_length=512,
            num_return_sequences=1,
            temperature=0.7
        )[0]['generated_text']
        return response

all baselines

In [7]:
import pandas as pd
import os
import re

# --- Import your classes / functions ---
# from your_files import SimpleChainOfThought, AdvancedSelfConsistency, cot_decode

def run_all_pipelines(model, tokenizer, model_name: str, question: str, system_prompt: str = ""):
    results = {}

    # 1. Chain of Thought
    cot = SimpleChainOfThought(model_name)
    cot_out = cot.evaluate(user_prompt=question, system_prompt=system_prompt)
    results["chain_of_thought"] = cot_out

    # 2. Self Consistency
    sc = AdvancedSelfConsistency(model_name, num_samples=5, similarity_threshold=0.8)
    sc_out = sc.evaluate(system_prompt=system_prompt, user_prompt=question)
    results["self_consistency"] = sc_out["aggregated_result"]["clusters"][0]["answer"] if sc_out["aggregated_result"]["clusters"] else "N/A"

    # 3. CoT Decoding (no prompting)
    messages = [{"role": "user", "content": question}]
    cot_dec_out, confidence = cot_decode(
        model=model,
        tokenizer=tokenizer,
        messages=messages,
        k=5,
        max_new_tokens=256,
        aggregate_paths=True
    )
    results["cot_no_prompting"] = cot_dec_out
    results["cot_no_prompting_confidence"] = confidence

    return results

def save_results_to_excel(model, tokenizer, model_name: str, questions: list, out_path: str):
    rows = []
    for q in questions:
        outputs = run_all_pipelines(model, tokenizer, model_name, q)
        row = {
            "question": q,
            "chain_of_thought": outputs["chain_of_thought"],
            "self_consistency": outputs["self_consistency"],
            "cot_no_prompting": outputs["cot_no_prompting"],
            "cot_no_prompting_confidence": outputs["cot_no_prompting_confidence"]
        }
        rows.append(row)

    df = pd.DataFrame(rows)

    # ✅ Clean illegal Excel characters
    df = df.applymap(lambda x: re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F]', '', str(x)) if isinstance(x, str) else x)

    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    df.to_excel(out_path, index=False)
    print(f"✅ Results saved to {out_path}")






In [9]:
!huggingface-cli login


    _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
    _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
    _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
    _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
    _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

    To log in, `huggingface_hub` requires a token generated from https://huggingface.co/settings/tokens .
Enter your token (input will not be visible): 
Add token as git credential? (Y/n) n
Token is valid (permission: read).
The token `hf_token` has been saved to /root/.cache/huggingface/stored_tokens
Your token has been saved to /root/.cache/huggingface/token
Login successful.
The current active token is: `hf_token`


In [None]:
# --- Usage ---
# Load your HF model + tokenizer once, then run:

from transformers import AutoModelForCausalLM, AutoTokenizer

import warnings
warnings.filterwarnings("ignore")

from transformers import logging
logging.set_verbosity_error()   # only show errors


models = ['deepseek-ai/deepseek-llm-7b-chat','microsoft/Phi-3.5-mini-instruct','mistralai/Mistral-7B-Instruct-v0.3']

for MODEL_NAME in models:
  print(MODEL_NAME)
  tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
  model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype="auto", device_map="auto")

# MODEL_NAME = "mistralai/Mistral-7B-Instruct"
# tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
# model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype="auto", device_map="auto")
  for i in datasets:
    print(f'------------- starting {i}---------')
      # file_path = "results.xlsx"
    x = run_datasets(i)

    file_path = f"outputs/{MODEL_NAME.split('/')[0]}/{i}_baselines.xlsx"
      # path = "my_directory"

  # Make dir if not exists

    #   for q in range(len(x)):


    # questions = [
    #     "What is 23 * 47?",
    #     "Why does the moon cause tides?"
    # ]
    questions = x['question']
    save_results_to_excel(model, tokenizer, MODEL_NAME, questions, file_path)
