In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from torch.utils.data import Dataset, DataLoader
import string
import torch.nn.functional as F
import xml.etree.ElementTree as ET

In [26]:
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        """
        Args:
            texts: scipy.sparse.csr_matrix - TF-IDF特征矩阵
            labels: array-like - 类别标签
        """
        self.texts = texts  # 保持稀疏格式
        self.labels = np.asarray(labels)
    
    def __getitem__(self, idx):
        # 转换单个样本为密集张量
        text = torch.FloatTensor(self.texts[idx].toarray()[0])
        label = torch.LongTensor([self.labels[idx]])[0]
        return text, label

    def __len__(self):
        return len(self.labels)

class TextCNN(nn.Module):
    def __init__(self, input_dim, num_classes, filter_sizes, num_filters):
        super(TextCNN, self).__init__()
        
        # 卷积层
        self.convs = nn.ModuleList([
            nn.Conv1d(1, num_filters, filter_size)
            for filter_size in filter_sizes
        ])
        
        # Dropout层
        self.dropout = nn.Dropout(0.5)
        
        # 全连接层
        self.fc = nn.Linear(len(filter_sizes) * num_filters, num_classes)
        
    def forward(self, x):
        # x shape: (batch_size, feature_dim)
        
        # 添加channel维度并转换维度顺序
        x = x.unsqueeze(1)  # (batch_size, 1, feature_dim)
        
        # 应用卷积
        x = [F.relu(conv(x)) for conv in self.convs]
        
        # 最大池化
        x = [F.max_pool1d(i, i.size(2)).squeeze(2) for i in x]
        
        # 拼接
        x = torch.cat(x, 1)
        
        # Dropout
        x = self.dropout(x)
        
        # 全连接层
        logit = self.fc(x)
        
        return logit

def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    best_val_acc = 0.0
    
    for epoch in range(num_epochs):
        # 训练阶段
        model.train()
        train_loss = 0
        train_correct = 0
        train_total = 0
        
        for batch_idx, (texts, labels) in enumerate(train_loader):
            texts, labels = texts.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(texts)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()
            
            if (batch_idx + 1) % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}], Loss: {loss.item():.4f}')
        
        train_acc = 100. * train_correct / train_total
        
        # 验证阶段
        model.eval()
        val_loss = 0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for texts, labels in val_loader:
                texts, labels = texts.to(device), labels.to(device)
                outputs = model(texts)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()
        
        val_acc = 100. * val_correct / val_total
        
        print(f'Epoch [{epoch+1}/{num_epochs}]')
        print(f'Train Loss: {train_loss/len(train_loader):.4f} | Train Acc: {train_acc:.2f}%')
        print(f'Val Loss: {val_loss/len(val_loader):.4f} | Val Acc: {val_acc:.2f}%')
        print('--------------------')
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_textcnn.pth')
    
    return model

def main(texts, labels):
    # 文本特征提取
    vectorizer = TfidfVectorizer(
        max_features=10000,  # 限制特征数量
        min_df=5,           # 最小文档频率
        max_df=0.8,         # 最大文档频率
        preprocessor=lambda x: x.lower(),  # 转小写
        token_pattern=r'[a-zA-Z]+',  # 只保留字母
        strip_accents='unicode'
    )
    
    X = vectorizer.fit_transform(texts)
    
    # 划分数据集
    X_train, X_val, y_train, y_val = train_test_split(
        X, labels, test_size=0.2, random_state=42
    )
    
    # 创建数据加载器
    train_dataset = TextDataset(X_train, y_train)
    val_dataset = TextDataset(X_val, y_val)
    
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32)
    
    # 模型参数
    input_dim = X.shape[1]  # TF-IDF特征维度
    num_classes = len(set(labels))
    filter_sizes = [3, 4, 5]  # 卷积核大小
    num_filters = 100        # 每种大小的卷积核数量
    
    # 初始化模型
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = TextCNN(input_dim, num_classes, filter_sizes, num_filters).to(device)
    
    # 损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # 训练模型
    model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10, device=device)
    
    return model, vectorizer

# 预测函数
def predict(model, vectorizer, text, device):
    model.eval()
    # 转换文本为TF-IDF特征
    features = vectorizer.transform([text])
    features_tensor = torch.FloatTensor(features.toarray()).to(device)
    
    with torch.no_grad():
        output = model(features_tensor)
        _, predicted = output.max(1)
    
    return predicted.item()

# 数据预处理部分

In [27]:
def remove_punctuation(text):
    return ''.join(char for char in text if char not in string.punctuation)

def extract_train_party_text_pairs(xml_string: str) -> list[tuple[str, str]]:
    root = ET.fromstring(xml_string)
    pairs = []
    
    for doc in root.findall('.//doc'):
        parti_elem = doc.find('.//PARTI')
        if parti_elem is not None:
            party = parti_elem.get('valeur')
            text_elem = doc.find('.//texte')
            if text_elem is not None:
                paragraphs = [p.text for p in text_elem.findall('p') if p.text]
                full_text = ' '.join(paragraphs)
                pairs.append([full_text, party])
    
    pairs = np.array(pairs, dtype=str)
    
    return pairs

with open("./Corpus d_apprentissage/deft09_parlement_appr_en.xml", encoding="utf-8") as f:
    train_text = f.read()

data_raw = extract_train_party_text_pairs(train_text)

In [28]:
label_mapping = {
    "ELDR": 0,
    "GUE-NGL": 1,
    "PPE-DE": 2,
    "PSE": 3,
    "Verts-ALE": 4
}

texts = data_raw[:, 0]
y_data_raw = data_raw[:, 1]

labels = np.array([label_mapping[name] for name in y_data_raw])

model, vectorizer = main(texts, labels)

# 预测新文本
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
prediction = predict(model, vectorizer, "new text", device)

Epoch [1/10], Batch [100], Loss: 1.5751
Epoch [1/10], Batch [200], Loss: 1.7138
Epoch [1/10], Batch [300], Loss: 1.5145
Epoch [1/10], Batch [400], Loss: 1.6598
Epoch [1/10]
Train Loss: 1.5017 | Train Acc: 34.32%
Val Loss: 1.4970 | Val Acc: 35.13%
--------------------
Epoch [2/10], Batch [100], Loss: 1.4082
Epoch [2/10], Batch [200], Loss: 1.5157
Epoch [2/10], Batch [300], Loss: 1.5958
Epoch [2/10], Batch [400], Loss: 1.5668
Epoch [2/10]
Train Loss: 1.4970 | Train Acc: 34.81%
Val Loss: 1.4965 | Val Acc: 35.13%
--------------------
Epoch [3/10], Batch [100], Loss: 1.5112
Epoch [3/10], Batch [200], Loss: 1.5417


KeyboardInterrupt: 