In [3]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from transformers import AutoTokenizer, AutoModel, get_scheduler
from sklearn.metrics import accuracy_score
from sklearn.model_selection import KFold
from tqdm.auto import tqdm
import re
import nltk
from nltk.corpus import wordnet
import random
nltk.download("wordnet")
nltk.download("omw-1.4")

[nltk_data] Downloading package wordnet to /usr/share/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /usr/share/nltk_data...


True

In [4]:
import random
import numpy as np
import torch

SEED = 42  # ✅ 你可以修改为任意整数
def set_seed(seed=SEED):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # 多卡也能一致
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()


In [5]:
# ========== Step 1. 加载数据 ==========
df_train = pd.read_csv("/kaggle/input/classification-of-math-problems-by-kasut-academy/train.csv")
# new_data = pd.read_csv("/content/train_augmented.csv")
# df = pd.concat([df_train, new_data]).reset_index(drop=True).sample(frac=1).reset_index(drop=True)
df_test = pd.read_csv("/kaggle/input/classification-of-math-problems-by-kasut-academy/test.csv")
df = df_train.copy()
print(df.columns)
print(df.head())
print(df.shape)

Index(['Question', 'label'], dtype='object')
                                            Question  label
0  A solitaire game is played as follows.  Six di...      3
1  2. The school table tennis championship was he...      5
2  Given that $x, y,$ and $z$ are real numbers th...      0
3  $25 \cdot 22$ Given three distinct points $P\l...      1
4  I am thinking of a five-digit number composed ...      5
(10189, 2)


In [6]:
import random
import pandas as pd
from nltk.corpus import wordnet
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, random_split
import torch
from tqdm.auto import tqdm
from sklearn.metrics import accuracy_score

# ---------- 1. 定义增强函数 ----------
def synonym_replacement(text, n=2):
    words = text.split()
    new_words = words.copy()
    random_word_list = list(set([word for word in words if word.isalpha()]))
    random.shuffle(random_word_list)
    num_replaced = 0
    for word in random_word_list:
        synonyms = set()
        for syn in wordnet.synsets(word):
            for lemma in syn.lemmas():
                synonym = lemma.name().replace("_", " ").lower()
                if synonym != word and synonym.isalpha():
                    synonyms.add(synonym)
        if synonyms:
            synonym = random.choice(list(synonyms))
            new_words = [synonym if w == word else w for w in new_words]
            num_replaced += 1
        if num_replaced >= n:
            break
    return ' '.join(new_words)

def random_char_replacement(text):
    chars = list(text)
    indices = [i for i, c in enumerate(chars) if c.isalpha() or c.isdigit()]
    if not indices:
        return text
    idx = random.choice(indices)
    if chars[idx].isdigit():
        chars[idx] = random.choice('0123456789'.replace(chars[idx], ''))
    elif chars[idx].isalpha():
        replacement_pool = 'abcdefghijklmnopqrstuvwxyz'
        chars[idx] = random.choice(replacement_pool.replace(chars[idx].lower(), ''))
    return ''.join(chars)

def augment_dataframe_full(df):
    augmented_rows = []
    for _, row in df.iterrows():
        q = row["Question"]
        label = row["label"]

        # 原始 + 2种变换
        aug_list = [
            q,
            synonym_replacement(q, n=2),                     # 仅同义词替换
            random_char_replacement(q),                     # 仅字符扰动
        ]

        for aug_q in aug_list:
            augmented_rows.append({"Question": aug_q, "label": label})

    return pd.DataFrame(augmented_rows)


In [7]:
# ========== Step 2. 模型与参数设置 ==========
MODEL_NAME = "microsoft/deberta-v3-base"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 32
EPOCHS = 30
PATIENCE = 3
MAX_LEN = 128

In [8]:
from huggingface_hub import login

# 替换为你的 Huggingface Token
login("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")


In [9]:
# ========== Step 3. 定义 Dataset ==========
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

class MathDataset(Dataset):
    def __init__(self, questions, labels=None):
        self.questions = ["Classify the topic of this math problem: " + q for q in questions]
        self.labels = labels

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        enc = tokenizer(self.questions[idx], padding='max_length', truncation=True, max_length=MAX_LEN, return_tensors="pt")
        item = {key: val.squeeze(0) for key, val in enc.items()}
        if self.labels is not None:
            item['labels'] = torch.tensor(self.labels[idx])
        return item

tokenizer_config.json:   0%|          | 0.00/52.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/579 [00:00<?, ?B/s]

spm.model:   0%|          | 0.00/2.46M [00:00<?, ?B/s]



In [11]:
class MathClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.backbone = AutoModel.from_pretrained(MODEL_NAME)
        hidden_size = self.backbone.config.hidden_size
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_size, 8)
        )

    def forward(self, input_ids, attention_mask):
        out = self.backbone(input_ids=input_ids, attention_mask=attention_mask)
        cls = out.last_hidden_state[:, 0]  # [CLS] token
        logits = self.fc(cls)
        return logits


In [12]:
def clean_math_text_final(text):

    text = str(text)
    text = re.sub(r'^\s*\d+\.\s*', '', text)
    text = re.sub(r'https?://\S+|www\.\S+', ' ', text)
    text = re.sub(r'#\w+', ' ', text)
    emoji_pattern = re.compile("["
                           u"\U0001F600-\U0001F64F"
                           u"\U0001F300-\U0001F5FF"
                           u"\U0001F680-\U0001F6FF"
                           u"\U0001F1E0-\U0001F1FF"
                           u"\U00002702-\U000027B0"
                           u"\U000024C2-\U0001F251"
                           "]+", flags=re.UNICODE)
    text = emoji_pattern.sub(r' ', text)
    text = re.sub(r'\s+', ' ', text).strip().lower()

    return text

print("\n--- Applying Text Cleaning ---")
df['Question'] = df['Question'].apply(clean_math_text_final)
print("Cleaning done.")


--- Applying Text Cleaning ---
Cleaning done.


In [13]:
df.shape

(10189, 2)

In [14]:
# ---------- 2. 先划分再增强 ----------
train_df, val_df = train_test_split(df, test_size=0.1, random_state=42, stratify=df["label"])

# 只增强训练集
train_df_aug = augment_dataframe_full(train_df)
print(f"Train size after augmentation: {train_df_aug.shape}")
print(f"Validation size: {val_df.shape}")

# ---------- 3. 构建 Dataset ----------
dataset = MathDataset(train_df_aug["Question"].tolist(), train_df_aug["label"].tolist())
val_dataset = MathDataset(val_df["Question"].tolist(), val_df["label"].tolist())

train_loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

Train size after augmentation: (27510, 2)
Validation size: (1019, 2)


In [18]:
# ---------- 4. 模型训练 ----------
model = MathClassifier().to(DEVICE)
optimizer = torch.optim.AdamW(model.parameters(), lr=4e-5)
loss_fn = torch.nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda step: 1.0)

best_acc = 0
patience_counter = 0

for epoch in range(EPOCHS):
    model.train()
    total_loss, preds, trues = 0, [], []
    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1} Train"):
        input_ids = batch['input_ids'].to(DEVICE)
        attention_mask = batch['attention_mask'].to(DEVICE)
        labels = batch['labels'].to(DEVICE).long()

        outputs = model(input_ids, attention_mask)
        loss = loss_fn(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

        total_loss += loss.item()
        preds.extend(outputs.argmax(dim=1).cpu().tolist())
        trues.extend(labels.cpu().tolist())

    train_acc = accuracy_score(trues, preds)
    print(f"Epoch {epoch+1} Train Loss: {total_loss:.4f}, Acc: {train_acc:.4f}")

    model.eval()
    val_preds, val_trues = [], []
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(DEVICE)
            attention_mask = batch['attention_mask'].to(DEVICE)
            labels = batch['labels'].to(DEVICE)
            outputs = model(input_ids, attention_mask)
            val_preds.extend(outputs.argmax(dim=1).cpu().tolist())
            val_trues.extend(labels.cpu().tolist())

    val_acc = accuracy_score(val_trues, val_preds)
    print(f"Epoch {epoch+1} Val Acc: {val_acc:.4f}")

    if val_acc > best_acc:
        best_acc = val_acc
        patience_counter = 0
        torch.save(model.state_dict(), "best_mathbert.pt")
    else:
        patience_counter += 1
        if patience_counter >= PATIENCE:
            print("Early stopping triggered.")
            break

Epoch 1 Train:   0%|          | 0/860 [00:00<?, ?it/s]

Epoch 1 Train Loss: 588.1787, Acc: 0.7755
Epoch 1 Val Acc: 0.8302


Epoch 2 Train:   0%|          | 0/860 [00:00<?, ?it/s]

Epoch 2 Train Loss: 269.2387, Acc: 0.9012
Epoch 2 Val Acc: 0.8420


Epoch 3 Train:   0%|          | 0/860 [00:00<?, ?it/s]

Epoch 3 Train Loss: 155.9744, Acc: 0.9428
Epoch 3 Val Acc: 0.8322


Epoch 4 Train:   0%|          | 0/860 [00:00<?, ?it/s]

Epoch 4 Train Loss: 96.2365, Acc: 0.9647
Epoch 4 Val Acc: 0.8342


Epoch 5 Train:   0%|          | 0/860 [00:00<?, ?it/s]

Epoch 5 Train Loss: 67.8975, Acc: 0.9753
Epoch 5 Val Acc: 0.8165
Early stopping triggered.


In [15]:
# ========== Step 8. 最终预测 ==========
print("🧠 Step 8: 使用增强模型预测 test.csv...")
model = MathClassifier().to(DEVICE)
model.load_state_dict(torch.load("/kaggle/working/best_mathbert.pt"))
model.eval()
df_test['Question'] = df_test['Question'].apply(clean_math_text_final)
test_dataset = MathDataset(df_test['Question'].tolist())
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

final_preds = []
with torch.no_grad():
    for batch in tqdm(test_loader, desc="Predicting"):
        input_ids = batch['input_ids'].to(DEVICE)
        attention_mask = batch['attention_mask'].to(DEVICE)
        outputs = model(input_ids, attention_mask)
        final_preds.extend(outputs.argmax(dim=1).cpu().tolist())

submission = pd.DataFrame({"id": df_test.index, "label": final_preds})
submission.to_csv("submission.csv", index=False)
print("✅ 提交文件已生成 submission.csv")

🧠 Step 8: 使用增强模型预测 test.csv...


2025-05-06 12:06:19.060887: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1746533179.511831      31 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1746533179.632294      31 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


pytorch_model.bin:   0%|          | 0.00/371M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/371M [00:00<?, ?B/s]

  model.load_state_dict(torch.load("/kaggle/working/best_mathbert.pt"))


FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/working/best_mathbert.pt'