In [11]:
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import spacy
import torch.optim as optim
import math
from tqdm import tqdm
import zipfile  # ⭐️ 新增：壓縮功能
import os       # ⭐️ 新增：檢查與建立檔案

In [12]:
# === Dataset ===
class TextDataset(Dataset):
    def __init__(self, texts, labels, word_vocab, pos_vocab, ner_vocab, max_len=128):
        self.labels = labels
        self.word_vocab = word_vocab
        self.pos_vocab = pos_vocab
        self.ner_vocab = ner_vocab
        self.max_len = max_len

        self.tokenized_data = []

        for text in texts:
            doc = nlp(text)
            tokens = [token.text.lower() for token in doc][:max_len]
            pos_tags = [token.pos_ for token in doc][:max_len]
            ner_tags = [token.ent_type_ if token.ent_type_ != "" else "O" for token in doc][:max_len]

            # Padding
            pad_len = max_len - len(tokens)
            tokens += ['<pad>'] * pad_len
            pos_tags += ['<pad>'] * pad_len
            ner_tags += ['<pad>'] * pad_len

            self.tokenized_data.append((tokens, pos_tags, ner_tags))

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        tokens, pos_tags, ner_tags = self.tokenized_data[idx]
        word_ids = [self.word_vocab.get(t, 1) for t in tokens]  # UNK fallback 是 1
        pos_ids = [self.pos_vocab.get(p, 1) for p in pos_tags]
        ner_ids = [self.ner_vocab.get(n, 1) for n in ner_tags]
        return (torch.tensor(word_ids), torch.tensor(pos_ids), torch.tensor(ner_ids)), torch.tensor(self.labels[idx])


In [13]:

# === 工具函數 ===
def tokenize_with_features(text):
    doc = nlp(text)
    tokens, pos_tags, ner_tags = [], [], []
    for token in doc:
        if token.is_alpha:
            tokens.append(token.text.lower())
            pos_tags.append(token.pos_)
            ner_tags.append(token.ent_type_ if token.ent_type_ else "O")
    return tokens, pos_tags, ner_tags

def build_vocab_with_features(texts, min_freq=2):
    word_counter, pos_set, ner_set = Counter(), set(), set()
    for text in texts:
        tokens, pos_tags, ner_tags = tokenize_with_features(text)
        word_counter.update(tokens)
        pos_set.update(pos_tags)
        ner_set.update(ner_tags)
    
    # 建 word vocab，保留 PAD 和 UNK
    word_vocab = {'<PAD>': 0, '<UNK>': 1}
    for word, freq in word_counter.items():
        if freq >= min_freq:
            word_vocab[word] = len(word_vocab)
    
    # 建 pos vocab，加入 PAD 和 UNK
    pos_vocab = {'<PAD>': 0, '<UNK>': 1}
    for pos in sorted(pos_set):
        pos_vocab[pos] = len(pos_vocab)
    
    # 建 ner vocab，加入 PAD 和 UNK
    ner_vocab = {'<PAD>': 0, '<UNK>': 1}
    for ner in sorted(ner_set):
        ner_vocab[ner] = len(ner_vocab)
    
    return word_vocab, pos_vocab, ner_vocab



def encode_text_with_features(text, word_vocab, pos_vocab, ner_vocab):
    tokens, pos_tags, ner_tags = tokenize_with_features(text)
    word_ids = [word_vocab.get(t, 1) for t in tokens]
    pos_ids = [pos_vocab.get(p, 0) for p in pos_tags]
    ner_ids = [ner_vocab.get(n, 0) for n in ner_tags]
    
    pad_len = MAX_LEN - len(word_ids)
    if pad_len > 0:
        word_ids += [0] * pad_len
        pos_ids += [0] * pad_len
        ner_ids += [0] * pad_len
    else:
        word_ids, pos_ids, ner_ids = word_ids[:MAX_LEN], pos_ids[:MAX_LEN], ner_ids[:MAX_LEN]
    
    return word_ids, pos_ids, ner_ids
    
def load_glove_embedding(path, vocab, dim):
    embedding = np.random.randn(len(vocab), dim) * 0.05
    with open(path, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != dim + 1: continue
            word = parts[0]
            if word in vocab:
                embedding[vocab[word]] = np.array(parts[1:], dtype=np.float32)
    return torch.tensor(embedding, dtype=torch.float32)


In [14]:
# === Transformer Components ===
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -math.log(10000.0) / d_model)
        pe[:, 0::2] = torch.sin(pos * div_term)
        pe[:, 1::2] = torch.cos(pos * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, nhead):
        super().__init__()
        assert d_model % nhead == 0, "d_model 必須能被 nhead 整除"
        self.nhead = nhead
        self.d_k = d_model // nhead
        self.qkv = nn.Linear(d_model, d_model * 3)
        self.out = nn.Linear(d_model, d_model)

    def forward(self, x):
        B, T, C = x.size()
        qkv = self.qkv(x).reshape(B, T, 3, self.nhead, self.d_k).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]
        scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        attn = scores.softmax(dim=-1)
        x = (attn @ v).transpose(1, 2).reshape(B, T, C)
        return self.out(x)

class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, d_ff, dropout):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, nhead)
        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x2 = self.attn(x)
        x = self.norm1(x + self.dropout(x2))
        x2 = self.ff(x)
        x = self.norm2(x + self.dropout(x2))
        return x

class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, pos_size, ner_size, d_model=128, nhead=4, d_ff=256,
                 nlayers=2, num_classes=2, dropout=0.1, embedding_matrix=None):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pos_embed = nn.Embedding(pos_size, POS_EMBED_DIM, padding_idx=0)
        self.ner_embed = nn.Embedding(ner_size, NER_EMBED_DIM, padding_idx=0)

        if embedding_matrix is not None:
            self.embedding.weight.data.copy_(embedding_matrix)
        self.embedding.weight.requires_grad = True  # 可訓練

        self.project = nn.Linear(EMBED_DIM + POS_EMBED_DIM + NER_EMBED_DIM, d_model)
        self.pos_enc = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([
            TransformerEncoderLayer(d_model, nhead, d_ff, dropout)
            for _ in range(nlayers)
        ])
        self.classifier = nn.Linear(d_model, num_classes)

    def forward(self, inputs):
        word_ids, pos_ids, ner_ids = inputs
        word_emb = self.embedding(word_ids)
        pos_emb = self.pos_embed(pos_ids)
        ner_emb = self.ner_embed(ner_ids)
        x = torch.cat([word_emb, pos_emb, ner_emb], dim=-1)
        x = self.project(x)
        x = self.pos_enc(x)
        for layer in self.layers:
            x = layer(x)
        return self.classifier(x[:, 0])

In [15]:
def evaluate(model, dataloader, device):
    model.eval()
    correct, count = 0, 0
    with torch.no_grad():
        for (word_ids, pos_ids, ner_ids), labels in dataloader:
            word_ids, pos_ids, ner_ids, labels = word_ids.to(device), pos_ids.to(device), ner_ids.to(device), labels.to(device)
            outputs = model((word_ids, pos_ids, ner_ids))
            correct += (outputs.argmax(1) == labels).sum().item()
            count += labels.size(0)
    return correct / count

In [16]:
def expand_embedding_dim(embedding_matrix, target_dim=256):
    old_dim = embedding_matrix.shape[1]
    if old_dim >= target_dim:
        return embedding_matrix[:, :target_dim]
    else:
        pad = torch.randn(embedding_matrix.shape[0], target_dim - old_dim) * 0.05
        return torch.cat([embedding_matrix, pad], dim=1)



In [17]:
# === 訓練函數 ===
def train_model(model, train_loader, val_loader, epochs, optimizer, criterion, device):
    model.to(device)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
    for epoch in range(epochs):
        model.train()
        total_loss, correct, count = 0, 0, 0

        for (word_ids, pos_ids, ner_ids), labels in train_loader:
            word_ids, pos_ids, ner_ids, labels = word_ids.to(device), pos_ids.to(device), ner_ids.to(device), labels.to(device)
            
            outputs = model((word_ids, pos_ids, ner_ids))
            loss = criterion(outputs, labels)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
            total_loss += loss.item() * labels.size(0)
            correct += (outputs.argmax(1) == labels).sum().item()
            count += labels.size(0)
        
        train_acc = correct / count
        val_acc = evaluate(model, val_loader, device)
        print(f"[Epoch {epoch+1}] Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")

        scheduler.step()


In [None]:
# === 參數 ===
EMBED_DIM = 128
N_HEAD = 4              # 注意力頭數
BATCH_SIZE = 32
DROPOUT = 0.1
EPOCHS = 10
D_FF = 512              # Feedforward 隱藏層大小
N_LAYERS = 4             # Transformer layer 數量
NUM_CLASSES = 15         # 分類類別數（根據資料集調整）
MAX_LEN = 256           # 文字最大長度
GLOVE_PATH = "glove.6B.100d.txt"  # 確保你有這個檔案
POS_EMBED_DIM = 96
NER_EMBED_DIM = 96
nlp = spacy.load("en_core_web_sm")



In [19]:
from sklearn.preprocessing import LabelEncoder
# === 主流程 ===

def train_main():
    df = pd.read_json("News_train.json", lines=True)
    texts = (df['headline'] + " " + df['short_description']).tolist()
    labels = LabelEncoder().fit_transform(df['label'].tolist())
    
    word_vocab, pos_vocab, ner_vocab = build_vocab_with_features(texts)
    embedding_matrix = load_glove_embedding(GLOVE_PATH, word_vocab, dim=100)
    embedding_matrix = expand_embedding_dim(embedding_matrix, target_dim=EMBED_DIM)
    X_train, X_val, y_train, y_val = train_test_split(texts, labels, test_size=0.2, random_state=42)
    train_ds = TextDataset(X_train, y_train, word_vocab, pos_vocab, ner_vocab)
    val_ds = TextDataset(X_val, y_val, word_vocab, pos_vocab, ner_vocab)
    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = TransformerClassifier(
        vocab_size=len(word_vocab),
        pos_size=len(pos_vocab),
        ner_size=len(ner_vocab),
        d_model=EMBED_DIM,
        nhead=N_HEAD,
        d_ff=D_FF,
        nlayers=N_LAYERS,
        dropout=DROPOUT,
        num_classes=NUM_CLASSES,
        embedding_matrix=embedding_matrix
    ).to(device)

    optimizer = optim.Adam(model.parameters(), lr=2e-4)
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

    train_model(model, train_loader, val_loader, EPOCHS, optimizer, criterion, device)
    val_acc = evaluate(model, val_loader, device)
    torch.save({
        'model_state_dict': model.state_dict(),
        'word_vocab': word_vocab,
        'pos_vocab': pos_vocab,
        'ner_vocab': ner_vocab
    }, 'model.pt')


# 執行訓練
train_main()

[Epoch 1] Train Acc: 0.6620, Val Acc: 0.7412
[Epoch 2] Train Acc: 0.7694, Val Acc: 0.7659
[Epoch 3] Train Acc: 0.8001, Val Acc: 0.7563
[Epoch 4] Train Acc: 0.8242, Val Acc: 0.7731
[Epoch 5] Train Acc: 0.8471, Val Acc: 0.7748
[Epoch 6] Train Acc: 0.8693, Val Acc: 0.7708
[Epoch 7] Train Acc: 0.8882, Val Acc: 0.7643
[Epoch 8] Train Acc: 0.9049, Val Acc: 0.7649
[Epoch 9] Train Acc: 0.9162, Val Acc: 0.7611
[Epoch 10] Train Acc: 0.9226, Val Acc: 0.7602


In [None]:
# --- Inference ---
def inference_main():
    df = pd.read_json("News_test.json", lines=True)
    texts = (df['headline'] + " " + df['short_description']).tolist()

    checkpoint = torch.load("model.pt", map_location="cpu")
    word_vocab, pos_vocab, ner_vocab = checkpoint['word_vocab'], checkpoint['pos_vocab'], checkpoint['ner_vocab']

    embedding_matrix = load_glove_embedding(GLOVE_PATH, word_vocab, dim=100)
    embedding_matrix = expand_embedding_dim(embedding_matrix, target_dim=EMBED_DIM)

    model = TransformerClassifier(
    vocab_size=len(word_vocab),
    pos_size=len(pos_vocab),
    ner_size=len(ner_vocab),
    d_model=EMBED_DIM,
    nhead=N_HEAD,
    d_ff=D_FF,
    nlayers=N_LAYERS,
    dropout=DROPOUT,
    num_classes=NUM_CLASSES,
    embedding_matrix=embedding_matrix
)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval()

    test_ds = TextDataset(texts, [0]*len(texts), word_vocab, pos_vocab, ner_vocab)
    test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE)

    preds = []
    

    with torch.no_grad():
        for x, _ in test_loader:
            out = model(x)
            pred = out.argmax(dim=1).cpu().numpy()
            preds.extend(pred)

    # 生成 submission.csv
    pd.DataFrame({"ID": list(range(len(preds))), "label": preds}).to_csv("submission.csv", index=False)

# 執行推論
inference_main()


  checkpoint = torch.load("model.pt", map_location="cpu")


: 

Tokenizer: 使用 spaCy 的 en_core_web_sm 模型
理由：spaCy 是一個強大的 NLP 工具包，提供高品質的 tokenization、POS 標註、NER 辨識等功能。選擇 en_core_web_sm 是因為它是輕量、適用於英文的模型，效能與準確率在小型任務中表現良好。

僅保留 英文單詞（is_alpha），過濾數字、標點等無意義 token。

對每個 token 擷取：
token.text.lower()：小寫化單詞（標準化）
token.pos_：詞性（POS）標註
token.ent_type_：命名實體（NER）標註，若無則標為 "O"（代表非實體）

特徵編碼與填充：

每筆文字會被轉換為
word_ids, pos_ids, ner_ids → 對應 vocab 索引，支援 <UNK> (1), <PAD> (0)
長度不足則用 0 補齊（padding），超過最大長度 MAX_LEN=256 則截斷

詞彙表建構：
word_vocab 根據詞頻（最小出現次數 min_freq=2）建立，避免雜訊
pos_vocab 與 ner_vocab 則為集合，不考慮頻率（因為類別較少）

Embedding：
Word embedding 可載入 GloVe (load_glove_embedding)，維度由外部參數指定（如 100d）(kaggle 公開dataset )



Model Structure 與 Hyperparameters 設計說明  

主架構：基於 Transformer Encoder 的分類模型

<pre>
超參數                  值                                    說明與理由  

EMBED_DIM              256	                      word embedding 維度（若用 GloVe 可為 100，這裡為內部投影後總維度）  

POS_EMBED_DIM	        16	                      POS 類別較少，維度不需太高；足以捕捉語法資訊  

NER_EMBED_DIM	        16	                      同 POS。加入實體資訊有助於語義理解  

d_model	               256	                      Transformer 的內部維度，也是每個 token 的最終表示向量大小  

nhead	                4	                      將 d_model=256 分成 4 頭，每頭 64 維，利於模型捕捉多樣化注意力模式  

d_ff	               512	                      Feedforward 隱藏層大小，設為 2×d_model，常見設計  

nlayers	                4	                      疊加 4 層 Encoder；足以學習中階語義特徵，平衡效能與計算成本  

dropout	               0.1	                      防止過擬合，常見的 Transformer 預設值  

MAX_LEN	               256	                      限定句子長度，兼顧記憶體與資訊保留，對大部分文檔已足夠  

classifier	      Linear(d_model → num_classes)   使用 [CLS] token 對整句分類
<pre>