In [None]:
import os
print(os.listdir("kaggle/input/homework2dataset/libriphone"))

In [None]:
import numpy as np
import torch
import random
import os
import gc

from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler

def same_seeds(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    # cudnn.benchmark = False 可能会略微降低性能，但确保每次运行结果一致
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

def load_feat(path):

    feat = torch.load(path)
    return feat

def shift(x, n):

    if n < 0:
        left = x[0].repeat(-n, 1)
        right = x[:n]
    elif n > 0:
        right = x[-1].repeat(n, 1)
        left = x[n:]
    else:
        return x
    return torch.cat((left, right), dim=0)


def concat_feat(x, concat_n):

    assert concat_n % 2 == 1
    if concat_n < 2:
        return x

    seq_len, feature_dim = x.size(0), x.size(1)

    x_padded = x.repeat(1, concat_n).view(seq_len, concat_n, feature_dim).permute(1, 0, 2)

    mid = (concat_n // 2)
    for r_idx in range(1, mid + 1):
        x_padded[mid + r_idx, :, :] = shift(x_padded[mid + r_idx, :, :].clone(), r_idx)
        x_padded[mid - r_idx, :, :] = shift(x_padded[mid - r_idx, :, :].clone(), -r_idx)

    return x_padded.permute(1, 0, 2).reshape(seq_len, concat_n * feature_dim)

def preprocess_data(split, feat_dir, phone_path, concat_nframes, train_ratio=0.8):

    class_num = 41

    if split == 'train' or split == 'val':
        mode = 'train'
    elif split == 'test':
        mode = 'test'
    else:
        raise ValueError('Invalid \'split\' argument for dataset: PhoneDataset!')

    label_dict = {}
    if mode == 'train':
        with open(os.path.join(phone_path, f'{mode}_labels.txt')) as f:
            for line in f.readlines():
                line = line.strip('\n').split(' ')
                label_dict[line[0]] = [int(p) for p in line[1:]]

        with open(os.path.join(phone_path, 'train_split.txt')) as f:
            usage_list = f.readlines()
        random.shuffle(usage_list)
        train_len = int(len(usage_list) * train_ratio)
        usage_list = [line.strip('\n') for line in usage_list]
        usage_list = usage_list[:train_len] if split == 'train' else usage_list[train_len:]

    elif mode == 'test':
        with open(os.path.join(phone_path, 'test_split.txt')) as f:
            usage_list = f.readlines()
        usage_list = [line.strip('\n') for line in usage_list]

    print(f'[Dataset] - # phone classes: {class_num}, number of utterances for {split}: {len(usage_list)}')

    all_feats = []
    all_labels = [] if mode == 'train' else None

    for fname in tqdm(usage_list, desc=f"Loading {split} data"):
        feat = load_feat(os.path.join(feat_dir, mode, f'{fname}.pt'))
        feat = concat_feat(feat, concat_nframes)  # 应用局部上下文拼接

        all_feats.append(feat)

        if mode == 'train':
            label = torch.LongTensor(label_dict[fname])
            all_labels.append(label)

    print(f'[INFO] {split} set loaded. Total {len(all_feats)} utterances.')

    if mode == 'train':
        return all_feats, all_labels
    else:
        return all_feats

In [None]:
class LibriDataset(Dataset):

    def __init__(self, X, y=None):
        self.data = X
        self.label = y

    def __getitem__(self, idx):
        if self.label is not None:
            return self.data[idx], self.label[idx]
        else:
            return self.data[idx]

    def __len__(self):
        return len(self.data)


class PadSequence:

    def __call__(self, batch):

        has_labels = len(batch[0]) == 2

        if has_labels:
            features = [item[0] for item in batch]
            labels = [item[1] for item in batch]
        else:
            features = [item for item in batch]

        max_len = max([f.size(0) for f in features])

        padded_features = []
        for f in features:
            pad_tensor = torch.zeros(max_len - f.size(0), f.size(1), dtype=f.dtype)
            padded_features.append(torch.cat([f, pad_tensor], dim=0))
        padded_features = torch.stack(padded_features)  # (batch_size, max_len, feature_dim)

        if has_labels:
            # 填充标签序列 (-100 是一个常见的忽略索引，或者可以用0)
            padded_labels = []
            for l in labels:
                pad_tensor = torch.full((max_len - l.size(0),), -100, dtype=l.dtype)
                padded_labels.append(torch.cat([l, pad_tensor], dim=0))
            padded_labels = torch.stack(padded_labels)
            return padded_features, padded_labels
        else:
            return padded_features

# --- LSTM 分类器模型 ---
class LSTMClassifier(nn.Module):

    def __init__(self, input_feature_dim, hidden_dim, num_layers, output_dim, dropout_p=0.5, bidirectional=True):
        super(LSTMClassifier, self).__init__()

        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.bidirectional = bidirectional

        self.lstm = nn.LSTM(input_feature_dim, hidden_dim, num_layers,
                            batch_first=True,  # 输入形状是 (batch, seq, feature)
                            dropout=dropout_p if num_layers > 1 else 0,  # 多层时才应用dropout
                            bidirectional=bidirectional)

        fc_input_dim = hidden_dim * 2 if bidirectional else hidden_dim
        self.fc = nn.Linear(fc_input_dim, output_dim)

    def forward(self, x):

        num_directions = 2 if self.bidirectional else 1

        h0 = torch.zeros(self.num_layers * num_directions, x.size(0), self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.num_layers * num_directions, x.size(0), self.hidden_dim).to(x.device)

        output, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))
        output = self.fc(output.reshape(-1, output.size(2)))
        return output

concat_nframes = 5
train_ratio = 0.75

seed = 42
batch_size = 32
num_epoch = 30
learning_rate = 1e-3
model_path = '/kaggle/working/lstm_phone_classifier.ckpt'


original_feature_dim = 39
input_feature_dim_for_lstm = original_feature_dim * concat_nframes
lstm_hidden_dim = 256
lstm_num_layers = 3
lstm_dropout_p = 0.3
bidirectional = True

class_num = 41  # 电话（音素）分类的类别数量

same_seeds(seed)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'DEVICE: {device}')
print("--- Preparing Data ---")
train_X, train_y = preprocess_data(split='train', feat_dir='kaggle/input/homework2dataset/libriphone/feat', phone_path='kaggle/input/homework2dataset/libriphone', concat_nframes=concat_nframes,
                                   train_ratio=train_ratio)
val_X, val_y = preprocess_data(split='val', feat_dir='kaggle/input/homework2dataset/libriphone/feat', phone_path='kaggle/input/homework2dataset/libriphone', concat_nframes=concat_nframes,
                               train_ratio=train_ratio)

train_set = LibriDataset(train_X, train_y)
val_set = LibriDataset(val_X, val_y)


del train_X, train_y, val_X, val_y
gc.collect()

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, collate_fn=PadSequence())
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, collate_fn=PadSequence())
print("--- Data Prepared ---")

model = LSTMClassifier(
    input_feature_dim=input_feature_dim_for_lstm,
    hidden_dim=lstm_hidden_dim,
    num_layers=lstm_num_layers,
    output_dim=class_num,
    dropout_p=lstm_dropout_p,
    bidirectional=bidirectional
).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=-100)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5, verbose=True)

print(f"--- Model Initialized: {model.__class__.__name__} ---")
print(model)

del train_set, val_set, train_loader, val_loader
gc.collect()
torch.cuda.empty_cache()


In [None]:
print("--- Start Training ---")
best_val_acc = 0.0

for epoch in range(num_epoch):
    model.train()

    total_train_loss = 0.0
    correct_train_predictions = 0
    total_train_effective_labels = 0


    for batch_idx, (features, labels) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epoch} [Train]")):
        features = features.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(features)

        active_labels = labels.view(-1)
        active_outputs = outputs[active_labels != -100]
        active_labels = active_labels[active_labels != -100]

        if active_labels.numel() == 0:
            continue

        loss = criterion(active_outputs, active_labels)
        loss.backward()
        optimizer.step()

        _, train_pred = torch.max(active_outputs, 1)
        correct_train_predictions += (train_pred == active_labels).sum().item()
        total_train_loss += loss.item() * len(active_labels)
        total_train_effective_labels += len(active_labels)

    model.eval()
    total_val_loss = 0.0
    correct_val_predictions = 0
    total_val_effective_labels = 0

    with torch.no_grad():
        for batch_idx, (features, labels) in enumerate(tqdm(val_loader, desc=f"Epoch {epoch + 1}/{num_epoch} [Val]")):
            features = features.to(device)
            labels = labels.to(device)
            outputs = model(features)

            active_labels = labels.view(-1)
            active_outputs = outputs[active_labels != -100]
            active_labels = active_labels[active_labels != -100]

            if active_labels.numel() == 0:
                continue

            loss = criterion(active_outputs, active_labels)

            _, val_pred = torch.max(active_outputs, 1)
            correct_val_predictions += (val_pred == active_labels).sum().item()
            total_val_loss += loss.item() * len(active_labels)
            total_val_effective_labels += len(active_labels)


    avg_train_acc = correct_train_predictions / total_train_effective_labels if total_train_effective_labels > 0 else 0
    avg_train_loss = total_train_loss / total_train_effective_labels if total_train_effective_labels > 0 else 0
    avg_val_acc = correct_val_predictions / total_val_effective_labels if total_val_effective_labels > 0 else 0
    avg_val_loss = total_val_loss / total_val_effective_labels if total_val_effective_labels > 0 else 0


    print(f'[{epoch + 1:03d}/{num_epoch:03d}] '
          f'Train Acc: {avg_train_acc:.5f} Loss: {avg_train_loss:.5f} | '
          f'Val Acc: {avg_val_acc:.5f} Loss: {avg_val_loss:.5f}')


    scheduler.step(avg_val_acc)

    if avg_val_acc > best_val_acc:
        best_val_acc = avg_val_acc
        torch.save(model.state_dict(), model_path)
        print(f'Saving model with Val Acc: {best_val_acc:.5f}')

del train_set, val_set, train_loader, val_loader
gc.collect()
torch.cuda.empty_cache()
print("--- Training Finished ---")

In [None]:
print("--- Start Testing ---")
test_X = preprocess_data(split='test', feat_dir='kaggle/input/homework2dataset/libriphone/feat', phone_path='kaggle/input/homework2dataset/libriphone', concat_nframes=concat_nframes)
test_set = LibriDataset(test_X, None)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, collate_fn=PadSequence())

model = LSTMClassifier(
    input_feature_dim=input_feature_dim_for_lstm,
    hidden_dim=lstm_hidden_dim,
    num_layers=lstm_num_layers,
    output_dim=class_num,
    dropout_p=lstm_dropout_p,
    bidirectional=bidirectional
).to(device)
model.load_state_dict(torch.load(model_path))


model.eval()
all_predictions = []

with torch.no_grad():
    for batch_idx, features in enumerate(tqdm(test_loader, desc="[Test Prediction]")):
        features = features.to(device)
        outputs = model(features)

        _, test_pred = torch.max(outputs, 1)
        all_predictions.extend(test_pred.cpu().numpy())

with open('/kaggle/working/prediction.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(all_predictions):
        f.write(f'{i},{y}\n')

print("Prediction file 'prediction.csv' generated.")
print("--- Testing Finished ---")

In [None]:
import os
print(os.listdir("/kaggle/working/.virtual_documents"))