## Packages import

In [1]:
import os
os.environ["WANDB_DISABLED"] = "true"

In [2]:
!pip install pytesseract

import os
import pandas as pd
from sklearn.model_selection import train_test_split
from transformers import BertTokenizerFast, BertForSequenceClassification, Trainer, TrainingArguments
import torch
import json

# Optional OCR tools
import pytesseract
from PIL import Image
# or use EasyOCR:
# import easyocr



In [3]:
# PyTorch
import torch

# Check GPU
if torch.cuda.is_available():
    device = torch.device('cuda')
    print(f'使用 GPU：{torch.cuda.get_device_name(0)}')
else:
    device = torch.device('cpu')
    print('使用 CPU')

使用 GPU：NVIDIA L4


## Pre Processing

In [4]:
# 1. Data Loading and Preparation

def load_and_merge(subject: str) -> pd.DataFrame:
    """
    Load question bank and chapter list CSVs for a given subject in current directory and merge on chapter ID.

    Args:
        subject: 'math' or 'social'
    Returns:
        DataFrame with columns: question_text, chapter_name, subject
    """
    qbank_path = f"{subject}_question_bank.csv"
    chap_path = f"{subject}_chapter_list.csv"
    qdf = pd.read_csv(qbank_path)
    cdf = pd.read_csv(chap_path)
    df = qdf.merge(cdf, on='section_name', how='left')
    df['subject'] = subject
    return df

# Load and combine data from current directory
df = pd.concat([load_and_merge('math'), load_and_merge('social')], ignore_index=True)

df.count()
df.head()


Unnamed: 0,subject,ques_no,chapter_name_x,section_name,ques_detl,year_grade,book,chapter_num,chapter_name_y,section_num
0,math,2MA0810001,整數的運算,負數與數線,以中午12時為基準，下午3時記作＋3，那麼上午10時可以記作多少？(A)＋10(B)－10(...,8.0,1.0,1.0,整數的運算,1.0
1,math,2MA0810002,整數的運算,負數與數線,若數學科成績以30分為基準，得40分記作＋10，那麼考25分可記作多少？(A)－25　(B...,8.0,1.0,1.0,整數的運算,1.0
2,math,2MA0810003,整數的運算,負數與數線,如果以正午12時為基準，當日下午4時用＋4表示，則當日上午8時應記為多少？(A)－8　(B...,8.0,1.0,1.0,整數的運算,1.0
3,math,2MA0810004,整數的運算,負數與數線,如果以中午12時為基準，當日下午2時記為＋8，則當日上午6時應記為多少？(A)－24(B)－...,8.0,1.0,1.0,整數的運算,1.0
4,math,2MA0810005,整數的運算,負數與數線,如果＋40°表示北緯40°，則南緯30°該如何表示？(A)30°　(B)－30° (C)...,8.0,1.0,1.0,整數的運算,1.0


In [5]:
df = df[['subject','chapter_name_x','section_name','ques_detl']]
df.head()

Unnamed: 0,subject,chapter_name_x,section_name,ques_detl
0,math,整數的運算,負數與數線,以中午12時為基準，下午3時記作＋3，那麼上午10時可以記作多少？(A)＋10(B)－10(...
1,math,整數的運算,負數與數線,若數學科成績以30分為基準，得40分記作＋10，那麼考25分可記作多少？(A)－25　(B...
2,math,整數的運算,負數與數線,如果以正午12時為基準，當日下午4時用＋4表示，則當日上午8時應記為多少？(A)－8　(B...
3,math,整數的運算,負數與數線,如果以中午12時為基準，當日下午2時記為＋8，則當日上午6時應記為多少？(A)－24(B)－...
4,math,整數的運算,負數與數線,如果＋40°表示北緯40°，則南緯30°該如何表示？(A)30°　(B)－30° (C)...


In [6]:
df.count()

Unnamed: 0,0
subject,33839
chapter_name_x,33839
section_name,33839
ques_detl,33839


In [17]:
len(df)

33839

In [7]:
# 2. Label Encoding
# Combine subject and chapter_name as classification label

df['label_str'] = df['subject'] + '::' + df['chapter_name_x']
label2id = {lab: i for i, lab in enumerate(sorted(df['label_str'].unique()))}
id2label = {i: lab for lab, i in label2id.items()}
df['label'] = df['label_str'].map(label2id)

In [8]:
# 3. Train/Test Split
train_df, test_df = train_test_split(df, test_size=0.1, stratify=df['label'], random_state=42)

# 4. Tokenization and Dataset Definition
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')

class QDataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        inputs = self.tokenizer(text,
                                 padding='max_length',
                                 truncation=True,
                                 max_length=self.max_len,
                                 return_tensors='pt')
        item = {k: v.squeeze() for k, v in inputs.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

train_dataset = QDataset(train_df['ques_detl'].tolist(), train_df['label'].tolist(), tokenizer)
test_dataset = QDataset(test_df['ques_detl'].tolist(), test_df['label'].tolist(), tokenizer)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [9]:
# 5. Model Initialization
n_labels = len(label2id)
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=n_labels)

# 6. Training Arguments and Trainer Setup
training_args = TrainingArguments(
    output_dir='./outputs',
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    eval_strategy='epoch',
    save_strategy='epoch',
    logging_steps=50,
    load_best_model_at_end=True,
    metric_for_best_model='accuracy'
)

# Define metric computation
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = logits.argmax(axis=-1)
    acc = (preds == labels).mean()
    return {'accuracy': acc}

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
    tokenizer=tokenizer
)


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).
  trainer = Trainer(


In [10]:
# 7. Train and Evaluate
if __name__ == '__main__':
    trainer.train()
    metrics = trainer.evaluate()
    print(f"Test Accuracy: {metrics['eval_accuracy']:.4f}")

# 8. Save Model and Label Mappings
os.makedirs('./saved_model', exist_ok=True)
model.save_pretrained('./saved_model')
tokenizer.save_pretrained('./saved_model')
with open('./saved_model/label2id.json', 'w') as f:
    json.dump(label2id, f)
with open('./saved_model/id2label.json', 'w') as f:
    json.dump(id2label, f)

Epoch,Training Loss,Validation Loss,Accuracy
1,1.3953,1.218611,0.623227
2,0.8542,0.878935,0.73552


Epoch,Training Loss,Validation Loss,Accuracy
1,1.3953,1.218611,0.623227
2,0.8542,0.878935,0.73552
3,0.6516,0.792598,0.763593


Test Accuracy: 0.7636


In [None]:
# 7. Train and Evaluate
def train():
    trainer.train()
    metrics = trainer.evaluate()
    print(f"Test Accuracy: {metrics['eval_accuracy']:.4f}")

# 8. Save Model and Label Mappings
def save_model():
    os.makedirs('./saved_model', exist_ok=True)
    model.save_pretrained('./saved_model')
    tokenizer.save_pretrained('./saved_model')
    with open('./saved_model/label2id.json', 'w') as f:
        json.dump(label2id, f)
    with open('./saved_model/id2label.json', 'w') as f:
        json.dump(id2label, f)

In [14]:
# 9. Inference Pipeline
def ocr_to_text(image_path: str) -> str:
    # Using pytesseract
    img = Image.open(image_path)
    text = pytesseract.image_to_string(img, lang='eng')
    return text.strip()
    # Or using EasyOCR:
    # reader = easyocr.Reader(['en'])
    # result = reader.readtext(image_path, detail=0)
    # return ' '.join(result)


def load_classifier(model_dir: str = './saved_model'):
    tokenizer = BertTokenizerFast.from_pretrained(model_dir)
    model = BertForSequenceClassification.from_pretrained(model_dir)
    model.eval()
    return tokenizer, model


def predict_text(text: str, tokenizer, model):
    inputs = tokenizer(text, padding='max_length', truncation=True, max_length=128, return_tensors='pt')
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        pred_id = logits.argmax(dim=-1).item()
    label = id2label[pred_id]
    subject, chapter = label.split('::')
    return subject, chapter


def test_image(image_path: str):
    print("Processing:", image_path)
    text = ocr_to_text(image_path)
    print("OCR Result:\n", text)
    tokenizer, model = load_classifier()
    subject, chapter_name = predict_text(text, tokenizer, model)
    print(f"Predicted Subject: {subject}\nPredicted Chapter: {chapter_name}")

def test_text(text):
    tokenizer, model = load_classifier()
    subject, chapter_name = predict_text(text, tokenizer, model)
    print(f"Predicted Subject: {subject}\nPredicted Chapter: {chapter_name}")

# Usage examples
if __name__ == '__main__':
    # 1. Train and save
    #train()
    #save_model()

    # 2. Test with an example image
    test_image('test_math.png')

    test = "根據經濟部水利署的統計，截至2019年底，臺灣40座主要水庫中，淤積率超過30%的共有15座，例如霧社水庫淤積率達74.8%、烏山頭水庫達49.2%。顯示臺灣水庫淤積程度嚴重，影響水庫蓄水功能。下列何項策略最能有效改善上述現象"
    test_text(test)


Processing: test_math.png
OCR Result:
 NU E#m "To, Aga 120 tal -
gee $5 RANA BA REF AR

, eT o +s an ARH A AG > PLT 66
CPE rC a) KALI S 2 SRA?

  

¢ C
Predicted Subject: math
Predicted Chapter: 相似形
Predicted Subject: social
Predicted Chapter: 地理：基本概念與臺灣


## textCNN pipeline

In [None]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizerFast
import json
from tqdm import tqdm

In [None]:
# 3. TextCNN Model Definition
class TextCNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes, kernel_sizes=[3,4,5], num_filters=100):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=tokenizer.pad_token_id)
        self.convs = nn.ModuleList([
            nn.Conv1d(in_channels=embed_dim,
                      out_channels=num_filters,
                      kernel_size=k)
            for k in kernel_sizes
        ])
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(num_filters * len(kernel_sizes), num_classes)

    def forward(self, input_ids, attention_mask=None):
        x = self.embedding(input_ids)  # [B, L, D]
        x = x.permute(0, 2, 1)        # [B, D, L]
        convs = [torch.relu(conv(x)) for conv in self.convs]  # list of [B, F, L-k+1]
        pools = [torch.max(c, dim=2)[0] for c in convs]     # list of [B, F]
        out = torch.cat(pools, dim=1)                       # [B, F*len]
        out = self.dropout(out)
        return self.fc(out)

# 4. Training and Evaluation Functions
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_cnn = TextCNN(vocab_size=len(tokenizer), embed_dim=128,
                    num_classes=len(label2id)).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_cnn.parameters(), lr=1e-3)

def train_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss, total_correct = 0, 0
    for batch in tqdm(loader, desc='Train'):
        input_ids = batch['input_ids'].to(device)
        labels = batch['label'].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * input_ids.size(0)
        preds = outputs.argmax(dim=1)
        total_correct += (preds == labels).sum().item()
    return total_loss/len(loader.dataset), total_correct/len(loader.dataset)

def eval_epoch(model, loader, criterion):
    model.eval()
    total_loss, total_correct = 0, 0
    with torch.no_grad():
        for batch in tqdm(loader, desc='Eval'):
            input_ids = batch['input_ids'].to(device)
            labels = batch['label'].to(device)
            outputs = model(input_ids)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * input_ids.size(0)
            preds = outputs.argmax(dim=1)
            total_correct += (preds == labels).sum().item()
    return total_loss/len(loader.dataset), total_correct/len(loader.dataset)

# 5. Run Training and Compare
epochs = 5
best_acc = 0
for epoch in range(epochs):
    train_loss, train_acc = train_epoch(model_cnn, train_loader, optimizer, criterion)
    val_loss, val_acc = eval_epoch(model_cnn, test_loader, criterion)
    print(f"Epoch {epoch+1}/{epochs} | "
          f"Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Acc: {val_acc:.4f}")
    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model_cnn.state_dict(), 'best_textcnn.pt')

print(f"Best TextCNN Val Acc: {best_acc:.4f}")

# 6. Save Mappings for CNN
with open('tokenizer_vocab.json', 'w') as f:
    json.dump(tokenizer.vocab, f)
with open('label2id.json', 'w') as f:
    json.dump(label2id, f)