In [None]:
!pip install datasets
!pip install textstat
# !pip install bitsandbytes

Collecting datasets
  Downloading datasets-3.5.1-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2025.3.0,>=2023.1.0 (from fsspec[http]<=2025.3.0,>=2023.1.0->datasets)
  Downloading fsspec-2025.3.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.5.1-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.4/491.4 kB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2025.3.0-py3-none-any.whl (1

In [None]:
from huggingface_hub import hf_hub_download
# from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, pipeline
from datasets import load_dataset
import re
import numpy as np
from tqdm import tqdm
import torch
import nltk
import textstat
import random
import pickle
# import torch.nn.functional as F
# import torch.optim as optim
# import torch.nn as nn
import spacy
from collections import defaultdict
import pandas as pd

In [None]:
!python -m spacy download en_core_web_lg

Collecting en-core-web-lg==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.8.0/en_core_web_lg-3.8.0-py3-none-any.whl (400.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m400.7/400.7 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: en-core-web-lg
Successfully installed en-core-web-lg-3.8.0
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_lg')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [None]:
nlp = spacy.load("en_core_web_lg")

In [None]:
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

In [None]:
def get_dataset():
    train_dataset = load_dataset("openai/gsm8k", "main", split='train')
    test_dataset = load_dataset("openai/gsm8k", "main", split='test')
    return train_dataset, test_dataset

In [None]:
class Stats_Helpers():
  def __init__(self, question):
    self.question = question
    self.keywords = ['sum', 'total', 'difference', 'product', 'per']
    self.stopwords = set([
        'the', 'is', 'in', 'at', 'of', 'a', 'an', 'and', 'to', 'for', 'on',
        'with', 'as', 'by', 'that', 'from', 'it', 'this', 'be', 'or', 'are',
        'was', 'were', 'but', 'not', 'have', 'has', 'had', 'if', 'then', 'so'
    ])
    self.doc = nlp(question)
    self.unit_pattern = re.compile(r'\b(k|kg|km|m|cm|mm|ml|l|lb|oz|g|mg|hr|min|sec|miles?|feet|inches?)\b', re.IGNORECASE)

  def get_numeric_tokens(self):
    # returns number of numeric tokens in the problem
    number_pattern = r'''
        \b              # Word boundary
        (?!\d{4}\b)     # Exclude 4-digit standalone numbers (likely years)
        \d{1,3}         # 1-3 digits
        (?:,\d{3})+     # Thousands separators (e.g., 1,000)
        (?:\.\d+)?      # Optional decimal portion
        |
        \d+             # Standard integers
        (?:\.\d+)?      # Decimals without commas
        \b              # Word boundary
    '''
    numbers = re.findall(number_pattern, self.question, re.VERBOSE)

    # Convert to floats for normalization (5.0 vs 5 vs 5.00)
    normalized = set()
    for num in numbers:
        try:
            normalized.add(float(num.replace(',', '')))
        except ValueError:
            continue  # Skip malformed numbers

    return list(normalized)

  def get_keyword_frequencies(self):
    """Count exact matches of mathematical keywords"""
    freq = {k: 0 for k in self.keywords}
    for keyword in self.keywords:
        freq[keyword] = len(re.findall(rf'\b{re.escape(keyword)}\b', self.question))
    return {
        'keyword-sum': freq['sum'],
        'keyword-total': freq['total'],
        'keyword-difference': freq['difference'],
        'keyword-product': freq['product'],
        'keyword-per': freq['per']
    }

  def get_stopword_ratio(self):
    """Calculate fraction of stopwords in question"""
    words = re.findall(r'\b\w+\b', self.question)
    if not words: return 0.0
    stop_count = sum(1 for w in words if w in self.stopwords)
    return round(stop_count / len(words), 4)

  def get_pos_distribution(self):
    """Calculate proportional POS tag distribution using spaCy's universal tags"""
    pos_counts = defaultdict(int)
    for token in self.doc:
        pos_counts[token.pos_] += 1

    total = len(self.doc)
    return {
        'prop_noun': pos_counts['NOUN'] / total if total else 0,
        'prop_verb': pos_counts['VERB'] / total if total else 0,
        'prop_num': pos_counts['NUM'] / total if total else 0,
        'prop_adj': pos_counts['ADJ'] / total if total else 0
    }

  def get_dependency_metrics(self):
    """Calculate syntactic complexity metrics from dependency parse"""
    # Tree depth calculation
    def get_depth(token):
        if token.head == token:  # Root node
            return 1
        return 1 + get_depth(token.head)

    depths = [get_depth(token) for token in self.doc]

    # Dependency distance calculation
    dep_distances = [abs(token.i - token.head.i) for token in self.doc
                    if token.head != token]

    return {
        'max_tree_depth': max(depths),
        'avg_dep_distance': sum(dep_distances)/len(dep_distances) if dep_distances else 0
    }

  def get_named_entities(self):
    """Extract quantities, units, and person names with spaCy NER"""
    entities = {
        'quantity_count': 0,
        'unit_count': 0,
        'person_count': 0,
    }

    # SpaCy entity detection
    for ent in self.doc.ents:
        if ent.label_ in ['QUANTITY', 'CARDINAL', 'MONEY']:
            entities['quantity_count'] += 1
        elif ent.label_ == 'PERSON':
            entities['person_count'] += 1

    # Custom unit detection with regex
    units = self.unit_pattern.findall(self.doc.text)
    entities['unit_count'] = len(units)

    return entities

def get_problem_stats(problem):
  stats = {}
  question = problem['question'].lower()
  stat_helper = Stats_Helpers(question)

  # readability metrics
  sentences = [s.strip() for s in question.split('.') if s.strip()]
  words = question.split()
  avg_word_length = sum(len(word) for word in words)/len(words) if words else 0
  stats['word_count'] = len(question.split())
  stats['sentence_count'] = len(sentences)
  stats['avg_sentence_length'] = np.mean([len(s.split()) for s in sentences])
  stats['avg_word_length'] = avg_word_length
  stats['flesch_reading_ease'] = textstat.flesch_reading_ease(question)   # 0 to 100, higher is more readable

  # numerical metrics
  numeric_values = stat_helper.get_numeric_tokens()
  stats.update({
      'num_numeric_values': len(numeric_values),
      'min_value': min(numeric_values) if numeric_values else 0,
      'mean_value': np.mean(numeric_values) if numeric_values else 0,
      'max_value': max(numeric_values) if numeric_values else 0
  })
  operations = re.findall(r'[+\-*/^]', question)
  stats['num_operations'] = len(operations)

  # keyword metrics
  # stats['keyword_frequencies'] = stat_helper.get_keyword_frequencies()
  stats.update(stat_helper.get_keyword_frequencies())
  stats['stopword_ratio'] = stat_helper.get_stopword_ratio()

  # pos metrics
  stats.update(stat_helper.get_pos_distribution())
  stats.update(stat_helper.get_dependency_metrics())

  # semantic metrics
  stats.update(stat_helper.get_named_entities())

  return stats

In [None]:
def get_model(model_name):
    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.float16,  # Match your input dtype
        bnb_4bit_quant_type="nf4",  # Add quantization type
        bnb_4bit_use_double_quant=True
    )
    if model_name == "wizardmath":
        wizardmath_tokenizer = AutoTokenizer.from_pretrained("WizardLM/WizardMath-7B-V1.1")
        wizardmath_model = AutoModelForCausalLM.from_pretrained(
            "WizardLM/WizardMath-7B-V1.1",
            quantization_config=quantization_config,
            device_map={"": 0},
            torch_dtype=torch.float16
        )
        return {
            'model': wizardmath_model,
            'model_name': "wizardmath",
            'tokenizer': wizardmath_tokenizer,
            'cost_per_token': 0.7
        }
    elif model_name == "phi2":
        phi2_tokenizer = AutoTokenizer.from_pretrained("microsoft/phi-2", trust_remote_code=True)
        phi2_model = AutoModelForCausalLM.from_pretrained(
            "microsoft/phi-2",
            quantization_config=quantization_config,
            device_map={"": 0},
            torch_dtype=torch.float16,
            trust_remote_code=True
        )
        return {
            'model': phi2_model,
            'model_name': "phi2",
            'tokenizer': phi2_tokenizer,
            'cost_per_token': 0.13  # Lower cost since it's a smaller model
        }

In [None]:
def extract_answer(answer_text):
    # The final answer in GSM8K follows the '####' pattern
    match = re.search(r'####\s*(-?\d+)', answer_text)
    if match:
        return match.group(1).strip()
    return None

In [None]:
def process_problem(problem, model_index, models):
    prompt = f"""

Follow these instructions:
1. Work through the problem step by step
2. Calculate the numerical answer
3. On the last line, write ONLY: #### <numerical answer>. Do not add any units like "kg" or "m", or any currency symbols like "$".
4. Do not write anything after the final answer

-------------------
EXAMPLE FORMAT:
Step 1: [explanation]
Step 2: [explanation]
Final calculation: [calculation]
#### [numerical answer]
-------------------

NOW SOLVE THE PROBLEM CORRECTLY: {problem['question']}
"""
    # print("Entered global process problem")
    model_obj = models[model_index]['model']
    tokenizer = models[model_index].get('tokenizer', None)
    if tokenizer:
        tokenizer = models[model_index]['tokenizer']

    # if models[model_index]['model_name'] == "wizardmath":
    inputs = tokenizer(prompt, return_tensors="pt").to(model_obj.device)
    outputs = model_obj.generate(
        inputs.input_ids,
        max_new_tokens=1024,
        temperature=0.1,
        do_sample=True,
        attention_mask=inputs.attention_mask,
        # pad_token_id=tokenizer.eos_token_id,
    )
    full_output = tokenizer.decode(outputs[0], skip_special_tokens=True)

    prompt_end = full_output.find(f"NOW SOLVE THE PROBLEM CORRECTLY: {problem['question']}")
    if prompt_end != -1:
        # Move past the question to get to the solution
        prompt_end = prompt_end + len(f"NOW SOLVE THE PROBLEM CORRECTLY: {problem['question']}")
        model_response = full_output[prompt_end:].strip()
    else:
        # Fallback if we can't find the exact prompt ending
        model_response = full_output

    # Check for #### pattern first (Phi-2 style)
    hash_match = re.search(r'####\s*([\$]?\s*\d+(?:\.\d+)?)', model_response)
    if hash_match:
        # Extract just the number, removing any currency symbols
        answer_text = hash_match.group(1)
        numeric_match = re.search(r'(\d+(?:\.\d+)?)', answer_text)
        if numeric_match:
            numeric_answer = numeric_match.group(1)
            # return f"{prompt}\n\n{model_response.split('####')[0].strip()}\n#### {numeric_answer}"
            return {
                'prompt': prompt,
                'response': model_response,
                'answer': numeric_answer
            }

    # Check for explicit "answer is" pattern (WizardMath style)
    answer_match = re.search(r'(?:final answer|the answer is)[^0-9]*?([\$]?\s*\d+(?:\.\d+)?)',
                            model_response.lower())
    if answer_match:
        answer_text = answer_match.group(1)
        numeric_match = re.search(r'(\d+(?:\.\d+)?)', answer_text)
        if numeric_match:
            numeric_answer = numeric_match.group(1)
            # Find where this answer occurs in the text to split it there
            answer_position = model_response.lower().find(answer_match.group(0))
            if answer_position != -1:
                # return f"{prompt}\n\n{model_response[:answer_position].strip()}\n#### {numeric_answer}"
                return {
                    'prompt': prompt,
                    'response': model_response,
                    'answer': numeric_answer
                }

    # If all else fails, look for numbers in the last few lines
    lines = model_response.split('\n')
    for i in range(len(lines)-1, max(0, len(lines)-5), -1):
        line = lines[i]
        # Skip lines that are clearly not the answer
        if len(line.strip()) < 1 or any(word in line.lower() for word in ["step", "explanation"]):
            continue

        numeric_match = re.search(r'(\d+(?:\.\d+)?)', line)
        if numeric_match:
            numeric_answer = numeric_match.group(1)
            # return f"{prompt}\n\n{model_response.split(line)[0].strip()}\n#### {numeric_answer}"
            return {
                'prompt': prompt,
                'response': model_response,
                'answer': numeric_answer
            }

    # If we couldn't extract an answer, return the unmodified output
    # return full_output
    return {
        'prompt': prompt,
        'response': full_output,
        'answer': None
    }


In [None]:
temp_set = get_dataset()
gsm8k_dataset = {
    'train': temp_set[0],
    'test': temp_set[1]
}
# models = [get_model('phi2'), get_model('wizardmath')]

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/7.94k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/2.31M [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/419k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/7473 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/1319 [00:00<?, ? examples/s]

In [None]:
def calculate_cost(prediction_data, model):
    """Calculate cost using structured prediction data"""
    tokenizer = model['tokenizer']
    cost_per_token = model['cost_per_token']

    # Tokenize prompt and response separately
    input_tokens = tokenizer.encode(prediction_data['prompt'], return_tensors='pt').shape[1]
    output_tokens = tokenizer.encode(prediction_data['response'], return_tensors='pt').shape[1]

    return (input_tokens + output_tokens) * cost_per_token

# Logistic Regression

In [None]:
# features = ['word_count', 'sentence_count', 'avg_sentence_length', 'avg_word_length',
#             'flesch_reading_ease', 'num_numeric_values', 'min_value', 'mean_value',
#             'max_value', 'stopword_ratio', 'prop_noun', 'prop_verb', 'prop_num',
#             'prop_adj', 'max_tree_depth', 'avg_dep_distance', 'quantity_count',
#             'unit_count', 'person_count', 'keyword-sum',
#             'keyword-total', 'keyword-difference', 'keyword-per',
#             'keyword-product', 'num_operations', 'label']

features = [
            'flesch_reading_ease', 'num_numeric_values', 'stopword_ratio', 'max_tree_depth', 'avg_dep_distance', 'quantity_count',
            'unit_count', 'person_count', 'num_operations', 'label']

df = pd.DataFrame(columns=features)

# for problem in gsm8k_dataset['train']:
#     stats = get_problem_stats(problem)
#     df = pd.concat([df, pd.DataFrame([stats])], ignore_index=True)

In [None]:
phi2_df = pd.read_csv('phi2_preds_dataset.csv')

In [None]:
phi2_df

Unnamed: 0,question,answer,is_correct
0,Natalia sold clips to 48 of her friends in Apr...,Natalia sold 48/2 = <<48/2=24>>24 clips in May...,True
1,Weng earns $12 an hour for babysitting. Yester...,Weng earns 12/60 = $<<12/60=0.2>>0.2 per minut...,False
2,Betty is saving money for a new wallet which c...,"In the beginning, Betty has only 100 / 2 = $<<...",True
3,"Julie is reading a 120-page book. Yesterday, s...",Maila read 12 x 2 = <<12*2=24>>24 pages today....,False
4,James writes a 3-page letter to 2 different fr...,He writes each friend 3*2=<<3*2=6>>6 pages a w...,True
...,...,...,...
295,Pam has some bags of apples. Each of her bags ...,Each of Pam's bags contain 40*3=<<40*3=120>>12...,False
296,Chelsea has 24 kilos of sugar. She divides the...,Each bag has 24/4=<<24/4=6>>6 kilos of sugar.\...,False
297,Bert fills out the daily crossword puzzle in t...,"At 7 days a week, it takes Bert 2 * 7 = <<2*7=...",False
298,There were sweets on the table. Jack came and ...,"Jack took 4 more candies than the half, and Pa...",True


In [None]:
for idx, row in phi2_df.iterrows():
    stats = get_problem_stats(row)
    label = row['is_correct']
    # filter out stats that are not part of features
    stats = {k: v for k, v in stats.items() if k in features}
    df = pd.concat([df, pd.DataFrame([stats])], ignore_index=True)
    df.loc[idx, 'label'] = 1 if label else 0

df['label'] = pd.to_numeric(df['label'])

In [None]:
df.head()

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

y = df['label']
X = df.drop('label', axis=1)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

corr_matrix = X_train.corr().abs()
high_corr = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)) > 0.8
# remove highly correlated features
to_drop = [column for column in high_corr.columns if any(high_corr[column])]
X_train = X_train.drop(to_drop, axis=1)
X_test = X_test.drop(to_drop, axis=1)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

model = LogisticRegression(max_iter=1000)
model.fit(X_train, y_train)

In [None]:
# test model and get accuracy
y_pred = model.predict(X_test)
accuracy = np.mean(y_pred == y_test)
print(f"Accuracy: {accuracy}")

In [None]:
probabilities = model.predict_proba(X_test)

# Extract confidence for positive class (label=1)
positive_class_confidence = probabilities[:, 1]
print(positive_class_confidence)

In [None]:
for a, b, c in zip(y_pred, y_test, positive_class_confidence):
    print(a, b, c)

In [None]:
t_df = phi2_df.copy()
# drop the answer column and rename question to problem
t_df = t_df.drop('answer', axis=1)
t_df = t_df.rename(columns={'question': 'text'})
# make the is_correct column to 0 or 1
t_df['is_correct'] = t_df['is_correct'].apply(lambda x: 1 if x else 0)

In [None]:
from sklearn.model_selection import train_test_split
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification, Trainer, TrainingArguments
from datasets import Dataset
import torch
import numpy as np
from sklearn.metrics import accuracy_score, f1_score

# 1. Prepare datasets (ensure 'is_correct' is renamed to 'labels')
t_df = t_df.rename(columns={
    'problem': 'text',
    'model correctness': 'labels'  # Ensure this is 0/1
})

train_df, val_df = train_test_split(t_df, test_size=0.2, random_state=42)

# 2. Convert to Dataset WITHOUT pandas index
train_dataset = Dataset.from_pandas(train_df[['text', 'labels']], preserve_index=False)
val_dataset = Dataset.from_pandas(val_df[['text', 'labels']], preserve_index=False)

# 3. Tokenize properly with dataset.map()
tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')

def tokenize_function(examples):
    return tokenizer(examples["text"], truncation=True, padding='max_length')

tokenized_train = train_dataset.map(
    tokenize_function,
    batched=True,
    remove_columns=['text']  # Remove original text column
)
tokenized_val = val_dataset.map(
    tokenize_function,
    batched=True,
    remove_columns=['text']
)

# 4. Define metrics function
def compute_metrics(p):
    preds = np.argmax(p.predictions, axis=1)
    return {
        'accuracy': accuracy_score(p.label_ids, preds),
        'f1': f1_score(p.label_ids, preds)
    }

# 5. Initialize model and trainer
model = DistilBertForSequenceClassification.from_pretrained(
    'distilbert-base-uncased',
    num_labels=2
)

training_args = TrainingArguments(
    output_dir='./results',
    learning_rate=1e-5,  # Reduced from 2e-5
    per_device_train_batch_size=4,  # Smaller batches
    num_train_epochs=3,  # Fewer epochs
    weight_decay=0.01,  # Regularization
    eval_strategy='steps',
    eval_steps=50,  # More frequent checks
    report_to='none',
)

from sklearn.utils.class_weight import compute_class_weight
import torch
import torch.nn as nn

class_weights = compute_class_weight(
    'balanced',
    classes=np.unique(train_df['labels']),  # Get classes as numpy array
    y=train_df['labels']
)
class_weights = torch.tensor(class_weights, dtype=torch.float32)

class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits

        # Move weights to same device as logits
        loss_fct = nn.CrossEntropyLoss(
            weight=class_weights.to(logits.device)  # <-- Critical fix
        )

        loss = loss_fct(logits.view(-1, 2), labels.view(-1))
        return (loss, outputs) if return_outputs else loss


trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    compute_metrics=compute_metrics
)

# 6. Train and evaluate
trainer.train()

# 7. Final evaluation on validation set
val_results = trainer.evaluate()
print(f"Validation Accuracy: {val_results['eval_accuracy']:.2f}")
print(f"Validation F1-score: {val_results['eval_f1']:.2f}")


Map:   0%|          | 0/240 [00:00<?, ? examples/s]

Map:   0%|          | 0/60 [00:00<?, ? examples/s]

Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Step,Training Loss,Validation Loss,Accuracy,F1
50,No log,0.697627,0.333333,0.393939
100,No log,0.685535,0.666667,0.166667
150,No log,0.682955,0.683333,0.095238


Validation Accuracy: 0.62
Validation F1-score: 0.15
