In [1]:
import os, re, csv
from collections import Counter
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn

ROOT = r"./data/stanfordSentimentTreebank"

In [2]:
# 1) 读文件
phrase2id = {}
with open(os.path.join(ROOT, "dictionary.txt"), encoding="utf-8") as f:
    for line in f:
        phrase, pid = line.rstrip("\n").split("|")
        phrase2id[phrase] = int(pid)

def prob2label(p):
    if 0.0 <= p <= 0.2: return 1
    if 0.2 < p <= 0.4:  return 2
    if 0.4 < p <= 0.6:  return 3
    if 0.6 < p <= 0.8:  return 4
    return 5

pid2label = {}
with open(os.path.join(ROOT, "sentiment_labels.txt"), encoding="utf-8") as f:
    reader = csv.reader(f, delimiter="|")
    next(reader)
    for pid_str, prob_str in reader:
        pid2label[int(pid_str)] = prob2label(float(prob_str))

idx2sent = {}
with open(os.path.join(ROOT, "datasetSentences.txt"), encoding="utf-8") as f:
    next(f)
    for line in f:
        sid_str, sent = line.rstrip("\n").split("\t")
        idx2sent[int(sid_str)] = sent

sid2split = {}
with open(os.path.join(ROOT, "datasetSplit.txt"), encoding="utf-8") as f:
    next(f)
    for line in f:
        sid_str, sp_str = line.rstrip("\n").split(",")
        sid2split[int(sid_str)] = int(sp_str)  # 1=train,2=test,3=dev

In [3]:
# 2) 分词
TOKEN_RE = re.compile(r"\w+|[^\w\s]")
def tokenize(text): return TOKEN_RE.findall(text.lower())

In [4]:
# 3) 构造样本（句级，避免子树）
train, val, test = [], [], []
for sid, sent in idx2sent.items():
    pid = phrase2id.get(sent)
    if pid is None or pid not in pid2label:  # 过滤无标签项
        continue
    y = pid2label[pid]
    toks = tokenize(sent)
    sp = sid2split[sid]
    (train if sp==1 else test if sp==2 else val).append((toks, y))

In [5]:
# 4) 词表
counter = Counter()
for toks, _ in train: counter.update(toks)
itos = ["<unk>", "<pad>"] + [w for w,_ in counter.most_common()]
stoi = {w:i for i,w in enumerate(itos)}
pad_idx = stoi["<pad>"]
def numericalize(tokens): return torch.tensor([stoi.get(t, stoi["<unk>"]) for t in tokens], dtype=torch.long)

In [6]:
# 5) Dataset/DataLoader
class SSTDataset(Dataset):
    def __init__(self, samples): self.samples = samples
    def __len__(self): return len(self.samples)
    def __getitem__(self, i):
        toks, y = self.samples[i]
        return numericalize(toks), torch.tensor(y, dtype=torch.long)

def collate_fn(batch):
    xs, ys = zip(*batch)
    xs = pad_sequence(xs, batch_first=True, padding_value=pad_idx)
    ys = torch.stack(ys)
    return xs, ys

train_loader = DataLoader(SSTDataset(train), batch_size=64, shuffle=True, collate_fn=collate_fn)
val_loader   = DataLoader(SSTDataset(val),   batch_size=64, shuffle=False, collate_fn=collate_fn)
test_loader  = DataLoader(SSTDataset(test),  batch_size=64, shuffle=False, collate_fn=collate_fn)

print("sizes:", len(train), len(val), len(test))

sizes: 8117 1044 2125


In [7]:
# 6) 预训练词向量
from pathlib import Path

VECTOR_PATH = Path("./data/vector.txt")  # 如需改路径，请修改此变量

def is_float(s):
    try:
        float(s)
        return True
    except Exception:
        return False

def load_vectors_flex(path, itos):
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"向量文件不存在: {path}")

    # 读取前两行进行格式判定
    with path.open(encoding="utf-8", errors="ignore") as f:
        first = f.readline().strip()
        second = f.readline().strip()
    first_parts = first.split() if first else []
    second_parts = second.split() if second else []

    # 情况1：word2vec 文本（第一行 header，第二行以词开头）
    if (len(first_parts) >= 2 and is_float(first_parts[0]) and is_float(first_parts[1])
        and len(second_parts) >= 2 and not is_float(second_parts[0]) and is_float(second_parts[1])):
        from gensim.models.keyedvectors import KeyedVectors
        kv = KeyedVectors.load_word2vec_format(str(path), binary=False)
        dim = kv.vector_size
        emb = torch.empty(len(itos), dim).uniform_(-0.05, 0.05)
        for i, tok in enumerate(itos):
            if tok in kv:
                emb[i] = torch.tensor(kv[tok])
        return emb, dim

    # 情况2：GloVe 文本（无 header，每行以词开头）
    if (len(first_parts) >= 2 and not is_float(first_parts[0]) and is_float(first_parts[1])):
        from gensim.models.keyedvectors import KeyedVectors
        # GloVe 风格无 header
        kv = KeyedVectors.load_word2vec_format(str(path), binary=False, no_header=True)
        dim = kv.vector_size
        emb = torch.empty(len(itos), dim).uniform_(-0.05, 0.05)
        for i, tok in enumerate(itos):
            if tok in kv:
                emb[i] = torch.tensor(kv[tok])
        return emb, dim

    # 情况3：纯矩阵（每行只有浮点数，无词）
    rows = []
    dim = None
    with path.open(encoding="utf-8", errors="ignore") as f:
        for line in f:
            parts = line.strip().split()
            if not parts:
                continue
            if not all(is_float(x) for x in parts):
                raise ValueError("无法识别向量文件格式：既不是 word2vec（有 header），也不是 GloVe（词+向量），也不是纯矩阵。")
            if dim is None:
                dim = len(parts)
            elif len(parts) != dim:
                raise ValueError("纯矩阵每行维度不一致。")
            rows.append([float(x) for x in parts])
    mat = torch.tensor(rows, dtype=torch.float32)
    if mat.shape[0] != len(itos):
        raise ValueError(f"纯矩阵行数({mat.shape[0]})与词表大小({len(itos)})不一致，无法按顺序对齐。")
    return mat, dim

# 加载向量
try:
    emb_matrix, embedding_dim = load_vectors_flex(VECTOR_PATH, itos)
    print("Embedding matrix shape:", emb_matrix.shape)
except Exception as e:
    print("向量文件解析失败:", e)
    embedding_dim = 100
    emb_matrix = torch.empty(len(itos), embedding_dim).uniform_(-0.05, 0.05)

# 构建模型并拷贝
class TinyModel(nn.Module):
    def __init__(self, vocab_size, emb_dim, num_classes=5, pad_idx=pad_idx):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=pad_idx)
        self.encoder = nn.GRU(emb_dim, 128, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(256, num_classes)
    def forward(self, x):
        x = self.embedding(x)
        out, _ = self.encoder(x)
        return self.fc(out[:, -1, :])

model = TinyModel(len(itos), emb_matrix.shape[1])
with torch.no_grad():
    model.embedding.weight.copy_(emb_matrix)

xb, yb = next(iter(train_loader))
logits = model(xb)
print("batch:", xb.shape, yb.min().item(), yb.max().item(), "logits:", logits.shape)

Embedding matrix shape: torch.Size([14746, 300])
batch: torch.Size([64, 46]) 1 5 logits: torch.Size([64, 5])


In [41]:
# 7) 训练循环（LSTM)
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, emb_dim, hidden_size=256, num_layers=1, num_classes=5, pad_idx=pad_idx, dropout=0.5, bidirectional=True):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(
            emb_dim,
            hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=bidirectional,
            dropout=dropout if num_layers > 1 else 0.0,
        )
        enc_dim = hidden_size * (2 if bidirectional else 1)
        self.layer_norm = nn.LayerNorm(enc_dim)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(enc_dim, num_classes)
        self.pad_idx = pad_idx
    def forward(self, x):
        x_emb = self.embedding(x)
        out, _ = self.lstm(x_emb)  # (B, T, enc_dim)
        # 取每个样本的最后一个非 pad 时间步
        mask = (x != self.pad_idx)
        lengths = mask.sum(dim=1).clamp_min(1)  # (B,)
        last_idx = lengths - 1  # (B,)
        B = out.size(0)
        feat = out[torch.arange(B, device=out.device), last_idx, :]  # (B, enc_dim)
        feat = self.layer_norm(feat)
        feat = self.dropout(feat)
        return self.fc(feat)

# 初始化模型并拷贝预训练向量
model = LSTMClassifier(len(itos), emb_matrix.shape[1], bidirectional=True).to(device)
with torch.no_grad():
    model.embedding.weight.copy_(emb_matrix.to(device))

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', patience=3, factor=0.5, threshold=0.0001, min_lr=1e-5
)


def evaluate(loader):
    model.eval()
    total, correct, loss_sum = 0, 0, 0.0
    with torch.no_grad():
        for xb, yb in loader:
            xb = xb.to(device)
            yb = yb.to(device) - 1  # 标签从 [1..5] 映射到 [0..4]
            logits = model(xb)
            loss = criterion(logits, yb)
            loss_sum += loss.item() * xb.size(0)
            preds = logits.argmax(dim=1)
            correct += (preds == yb).sum().item()
            total += xb.size(0)
    return loss_sum / max(total, 1), correct / max(total, 1)


num_epochs = 10
for epoch in range(1, num_epochs + 1):
    model.train()
    total, loss_sum, correct = 0, 0.0, 0
    for xb, yb in train_loader:
        xb = xb.to(device)
        yb = (yb - 1).to(device)  # [0..4]
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        loss_sum += loss.item() * xb.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == yb).sum().item()
        total += xb.size(0)
    train_loss = loss_sum / max(total, 1)
    train_acc = correct / max(total, 1)
    val_loss, val_acc = evaluate(val_loader)
    scheduler.step(val_loss)
    current_lr = optimizer.param_groups[0]['lr']
    print(f"Epoch {epoch}: train_loss={train_loss:.4f} train_acc={train_acc:.4f} val_loss={val_loss:.4f} val_acc={val_acc:.4f} lr={current_lr:.2e}")

# 验证集指标
val_loss, val_acc = evaluate(val_loader)
print(f"Final val_loss={val_loss:.4f} val_acc={val_acc:.4f}")

Epoch 1: train_loss=1.5543 train_acc=0.3198 val_loss=1.4002 val_acc=0.3477 lr=1.00e-03
Epoch 2: train_loss=1.2965 train_acc=0.4218 val_loss=1.2755 val_acc=0.4330 lr=1.00e-03
Epoch 3: train_loss=1.1860 train_acc=0.4779 val_loss=1.2986 val_acc=0.4167 lr=1.00e-03
Epoch 4: train_loss=1.0887 train_acc=0.5246 val_loss=1.3222 val_acc=0.4128 lr=1.00e-03
Epoch 5: train_loss=0.9796 train_acc=0.5784 val_loss=1.3692 val_acc=0.3918 lr=1.00e-03
Epoch 6: train_loss=0.8833 train_acc=0.6189 val_loss=1.4887 val_acc=0.3937 lr=5.00e-04
Epoch 7: train_loss=0.6313 train_acc=0.7551 val_loss=1.8970 val_acc=0.3956 lr=5.00e-04
Epoch 8: train_loss=0.4212 train_acc=0.8470 val_loss=2.2039 val_acc=0.4013 lr=5.00e-04
Epoch 9: train_loss=0.3199 train_acc=0.8924 val_loss=2.5465 val_acc=0.3918 lr=5.00e-04
Epoch 10: train_loss=0.2684 train_acc=0.9046 val_loss=2.8975 val_acc=0.3803 lr=2.50e-04
Final val_loss=2.8975 val_acc=0.3803


In [42]:
# 8) 测试集评估
# 说明：使用已训练好的当前模型，在 test_loader 上计算损失与准确率，并展示少量预测样例。

model.eval()
import math

def evaluate_on_test(loader):
    total, correct, loss_sum = 0, 0, 0.0
    samples = []  # 采样若干条用于展示
    with torch.no_grad():
        for xb, yb in loader:
            xb = xb.to(device)
            yb0_4 = (yb - 1).to(device)  # [0..4]
            logits = model(xb)
            loss = criterion(logits, yb0_4)
            loss_sum += loss.item() * xb.size(0)
            preds = logits.argmax(dim=1)
            correct += (preds == yb0_4).sum().item()
            total += xb.size(0)

            # 收集前若干样本展示（最多 8 条）
            if len(samples) < 8:
                # 回到 CPU，取若干条
                for i in range(min(xb.size(0), 8 - len(samples))):
                    samples.append({
                        'pred': int(preds[i].cpu().item()) + 1,
                        'label': int(yb0_4[i].cpu().item()) + 1,
                        'len': int((xb[i] != pad_idx).sum().cpu().item())
                    })
    return loss_sum / max(total, 1), correct / max(total, 1), samples

test_loss, test_acc, sample_preds = evaluate_on_test(test_loader)
print(f"Test: loss={test_loss:.4f} acc={test_acc:.4f} (N={len(test_loader.dataset)})")
print("Samples (pred/label/len):")
for s in sample_preds:
    print(s)

Test: loss=2.7641 acc=0.3845 (N=2125)
Samples (pred/label/len):
{'pred': 2, 'label': 3, 'len': 6}
{'pred': 3, 'label': 4, 'len': 21}
{'pred': 4, 'label': 5, 'len': 26}
{'pred': 4, 'label': 3, 'len': 27}
{'pred': 5, 'label': 5, 'len': 9}
{'pred': 2, 'label': 4, 'len': 20}
{'pred': 3, 'label': 4, 'len': 23}
{'pred': 4, 'label': 4, 'len': 7}
