In [None]:
import os
import pickle
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader, random_split

class CustomDataset(Dataset):
    def __init__(self, csv_path):
        self.metadata = pd.read_csv(csv_path)
        self.labels = pd.factorize(self.metadata['type'])[0]
        
    def __len__(self):
        return len(self.metadata)
    
    def __getitem__(self, idx):
        feature_path = os.path.join('data', self.metadata.iloc[idx]['feature_path'])
        with open(feature_path, 'rb') as f:
            features = pickle.load(f)
        
        features = torch.from_numpy(features.squeeze(1)).float()
        
        label = self.labels[idx]
        return features, torch.tensor(label, dtype=torch.long)


def create_dataloaders(batch_size=32):
    full_dataset = CustomDataset(csv_path='data/metadata.csv')
    
    total = len(full_dataset)
    train_size = int(0.7 * total)
    val_size = int(0.2 * total)
    test_size = total - train_size - val_size
    
    train_dataset, val_dataset, test_dataset = random_split(
        full_dataset,
        [train_size, val_size, test_size],
        generator=torch.Generator().manual_seed(42)
    )
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)
    
    return train_loader, val_loader, test_loader

train_loader, val_loader, test_loader = create_dataloaders()

# 示例：获取一个batch的数据
example_batch = next(iter(train_loader))
print(f"Example batch shape (features): {example_batch[0].shape}")
print(f"Example batch shape (labels): {example_batch[1].shape}")


In [None]:
import torch
import torch.nn as nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer

class SpectralTransformer(nn.Module):
    def __init__(self, input_dim=768, num_bands=128, d_model=256, 
                 nhead=4, num_layers=2, dim_feedforward=1024, num_classes=2):
        super().__init__()
        
        # 输入特征投影层
        self.input_proj = nn.Linear(input_dim, d_model)
        
        # 位置编码（可学习）
        self.pos_encoder = nn.Parameter(torch.randn(num_bands, 1, d_model))
        
        # Transformer编码器层
        encoder_layer = TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            batch_first=False  # 使用 (seq, batch, features) 格式
        )
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # 分类头
        self.classifier = nn.Sequential(
            nn.LayerNorm(d_model),
            nn.Linear(d_model, num_classes)
        )
        
        # 初始化参数
        self._reset_parameters()

    def _reset_parameters(self):
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

    def forward(self, x):
        """
        输入形状: (batch_size, num_bands, input_dim)
        输出形状: (batch_size, num_classes)
        """
        # 输入投影 [B, 128, 768] -> [B, 128, 256]
        x = self.input_proj(x)
        
        # 调整维度为 (seq_len, batch, features) -> [128, B, 256]
        x = x.permute(1, 0, 2)
        
        # 添加位置编码 [128, B, 256]
        x = x + self.pos_encoder
        
        # Transformer编码 [128, B, 256]
        x = self.transformer_encoder(x)
        
        # 平均池化 [B, 256]
        x = x.mean(dim=0)
        
        # 分类 [B, 2]
        return self.classifier(x)

# 初始化模型
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SpectralTransformer(
    input_dim=768,
    num_bands=128,
    d_model=256,
    nhead=4,
    num_layers=2,
    dim_feedforward=1024
).to(device)

# 测试前向传播
example_input = torch.randn(32, 128, 768).to(device)  # 匹配示例输入形状, 并移动到设备
output = model(example_input)
print(f"输出形状: {output.shape}")  # 应输出 torch.Size([32, 2])

# 创建一个示例标签
example_labels = torch.randint(0, 2, (32,)).to(device)  # 0到1的随机整数，形状为(32,)，并移动到设备
print(f"Example batch shape (labels): {example_labels.shape}")

# 计算损失 (示例)
criterion = nn.CrossEntropyLoss()
loss = criterion(output, example_labels)
print(f"Loss: {loss.item()}")

In [None]:
from utils import train_model, test_model, plot_training_history
import torch
import torch.nn as nn
import torch.optim as optim

# 设置训练参数
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SpectralTransformer(
    input_dim=768,
    num_bands=128,
    d_model=256,
    nhead=4,
    num_layers=2,
    dim_feedforward=1024
).to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=5e-4, weight_decay=1e-4)

# 学习率调度器 (可选)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=5, verbose=True
)

# 获取数据加载器
train_loader, val_loader, test_loader = create_dataloaders(batch_size=32)

# 训练模型
history, trained_model = train_model(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=None,  # 如果使用调度器，这里改为scheduler
    num_epochs=100,
    device=device,
    patience=100
)

# 保存训练好的模型 (可选)
torch.save(trained_model.state_dict(), 'spectral_transformer_model.pth')