# Imports

In [None]:
import os
import re
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score
from transformers import (
    AutoTokenizer, 
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer
)
import torch
from torch.utils.data import Dataset
from scipy.stats import mode

# Config

In [None]:
MODEL_NAME = "tbs17/MathBERT"  
MAX_LENGTH = 256
BATCH_SIZE = 16
SEED = 42

# EDA

In [None]:
train_df = pd.read_csv('/kaggle/input/classification-of-math-problems-by-kasut-academy/train.csv')
test_df = pd.read_csv('/kaggle/input/classification-of-math-problems-by-kasut-academy/test.csv')
train_df['label'].value_counts()

In [None]:
train_df.head()

In [None]:
train_df['Question']

# Augmentation

In [None]:
!pip install textattack

In [None]:
%%time
import random
import re
from textattack.augmentation import Augmenter
from textattack.transformations import (
    WordSwapRandomCharacterDeletion,
    WordSwapChangeLocation
)
from textattack.transformations import CompositeTransformation

class MathAugmenter:
    def __init__(self):
        self.num_augments = 2
        transformation = CompositeTransformation([
            WordSwapRandomCharacterDeletion(random_one=True),
            WordSwapChangeLocation()
        ])
        self.augmenter = Augmenter(
            transformation=transformation,
            transformations_per_example=1  # Generate 1 augmented version per call
        )

    def augment_math_problem(self, text):
        """Augment while preserving mathematical structure"""
        try:
            equations = re.findall(r'\$(.*?)\$', text, re.DOTALL)
            placeholders = [f' EQUATION_{i} ' for i in range(len(equations))]
            
            # Create template with placeholders
            template = re.sub(r'\$(.*?)\$', lambda m: placeholders.pop(0), text)
            
            # Get list of augmented texts
            augmented_texts = self.augmenter.augment(template)
            
            # Restore equations in all augmented versions
            processed = []
            for aug_text in augmented_texts:
                for i, eq in enumerate(equations):
                    aug_text = aug_text.replace(f'EQUATION_{i}', f'${eq}$')
                processed.append(aug_text)
                
            return processed
            
        except Exception as e:
            print(f"Augmentation failed for text: {text[:50]}... | Error: {e}")
            return [text]  # Return original as fallback

# Usage for minority classes (3,6,7)
minority_classes = [6, 7]
augmenter = MathAugmenter()

for class_id in minority_classes:
    class_samples = train_df[train_df['label'] == class_id]['Question'].tolist()
    augmented_samples = []
    
    for sample in class_samples:
        # Get multiple augmented versions
        augmented_versions = augmenter.augment_math_problem(sample)
        augmented_samples.extend(augmented_versions[:3])
        
    # Add to training data
    new_rows = pd.DataFrame({
        'Question': augmented_samples,
        'label': [class_id] * len(augmented_samples)
    })
    train_df = pd.concat([train_df, new_rows], ignore_index=True)
train_df['label'].value_counts()

In [None]:
train_df[train_df['label'] == 7]['Question']

# Data Processing

In [None]:
def load_data(train_df, test_df):
    #train_df = pd.read_csv('/kaggle/input/classification-of-math-problems-by-kasut-academy/train.csv')
    #test_df = pd.read_csv('/kaggle/input/classification-of-math-problems-by-kasut-academy/test.csv')
    tr = train_df.copy()
    te = test_df.copy()
    
    def clean_math_text(text):
        # Preserve mathematical notation
        text = re.sub(r'\$(.*?)\$', r' [MATH] \1 [MATH] ', text)
        text = re.sub(r'\\\w+', lambda m: ' ' + m.group(0) + ' ', text)
        return text.strip()
    
    tr['cleaned'] = tr['Question'].apply(clean_math_text)
    te['cleaned'] = te['Question'].apply(clean_math_text)
    tr.drop(columns=['Question'], inplace = True)
    te.drop(columns=['Question'], inplace = True)
    
    return tr, te

class MathDataset(Dataset):
    def __init__(self, texts, labels, tokenizer):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        encoding = self.tokenizer(
            self.texts[idx],
            max_length=MAX_LENGTH,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(self.labels[idx])
        }

# Training and Submission

In [None]:
def train_mathbert():
    train, test = load_data(train_df, test_df)
    
    # Initialize tokenizer with math special tokens
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    tokenizer.add_special_tokens({'additional_special_tokens': ['[MATH]']})
    
    # Prepare datasets
    test_dataset = MathDataset(
        test['cleaned'].tolist(), 
        [0]*len(test), 
        tokenizer
    )
    
    # Cross-validation setup
    N_SPLITS=3
    skf = StratifiedKFold(n_splits=N_SPLITS)
    all_preds = []
    
    for fold, (train_idx, val_idx) in enumerate(skf.split(
        train['cleaned'], train['label']
    )):
        print(f"\nTraining Fold {fold+1}/{N_SPLITS}")
        
        # Model initialization
        model = AutoModelForSequenceClassification.from_pretrained(
            MODEL_NAME,
            num_labels=8,
            ignore_mismatched_sizes=True
        )
        model.resize_token_embeddings(len(tokenizer))
        
        # Training arguments
        args = TrainingArguments(
            num_train_epochs = 5,
            output_dir=f'./fold_{fold}',
            evaluation_strategy='epoch',
            save_strategy='epoch',
            save_total_limit=1,
            learning_rate=2e-5,
            per_device_train_batch_size=16,
            per_device_eval_batch_size=32,
            fp16=True,
            gradient_accumulation_steps=1,
            dataloader_pin_memory=True,
            dataloader_num_workers=2,
            logging_dir='./logs',
            logging_steps=100,
            report_to='none',
            warmup_ratio=0.1,
            weight_decay=0.01,
            seed=42,
            load_best_model_at_end=True,
            metric_for_best_model='f1_micro'
)
        
        # Trainer setup
        trainer = Trainer(
            model=model,
            args=args,
            train_dataset=MathDataset(
                train.iloc[train_idx]['cleaned'].tolist(),
                train.iloc[train_idx]['label'].values,
                tokenizer
            ),
            eval_dataset=MathDataset(
                train.iloc[val_idx]['cleaned'].tolist(),
                train.iloc[val_idx]['label'].values,
                tokenizer
            ),
            compute_metrics=lambda p: {
                'f1_micro': f1_score(p.label_ids, p.predictions.argmax(-1), average='micro')
            }
        )
        
        # Training
        trainer.train()
        
        # Prediction
        fold_preds = trainer.predict(test_dataset).predictions.argmax(-1)
        all_preds.append(fold_preds)
        print(f"\nFold {fold+1} Predictions Sample:", fold_preds[:5])
        print(f"Class Distribution:", np.bincount(fold_preds))

        #final_preds, _ = mode(all_preds, axis=1)
        #final_preds = final_preds.flatten().astype(int)

        #submission = pd.DataFrame({
        #    'id': test_df['id'].values,
        #    'label': final_preds
        #})
        #print(submission)
        del model
        torch.cuda.empty_cache()
    
    # Ensemble predictions
    all_preds_array = np.array(all_preds)
    
    # Calculate mode ACROSS FOLDS (axis=0)
    final_preds, _ = mode(all_preds_array, axis=0)
    final_preds = final_preds.flatten().astype(int)
    
    # Create submission
    submission = pd.DataFrame({
        'id': test['id'].values,
        'label': final_preds
    })
    submission.to_csv('submission.csv', index=False)

In [None]:
#train_dummy, test_dummy = load_data(train_df, test_df)
#train_dummy

In [None]:
train_mathbert()

In [None]:
submission = pd.read_csv('/kaggle/working/submission.csv')
submission