In [18]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import random
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
import joblib

In [None]:
# 加载数据
file_paths = [
    '/mnt/workspace/整合1.xlsx',
    '/mnt/workspace/整合2.xlsx',
    '/mnt/workspace/整合3.xlsx',
    '/mnt/workspace/整合4.xlsx'
]

data_frames = [pd.read_excel(file_path) for file_path in file_paths]
data = pd.concat(data_frames, ignore_index=True)

# 分组提取输入序列和目标序列
input_sequences = []
target_sequences = []

for i in range(0, len(data) - 63, 63):  # 确保每组包含63行，并避免越界
    input_sequence = data.iloc[i, :9].values  # 每组的第一行作为输入序列
    target_sequence = data.iloc[i:i + 63, 10].values  # 每组的第11列的第1到第63行作为目标序列
    input_sequences.append(input_sequence)
    target_sequences.append(target_sequence)

input_sequences = np.array(input_sequences)
target_sequences = np.array(target_sequences)

print("Input sequences shape:", input_sequences.shape)
print("Target sequences shape:", target_sequences.shape)

# 数据标准化
input_scaler = StandardScaler()
target_scaler = StandardScaler()

input_sequences = input_scaler.fit_transform(input_sequences.reshape(-1, input_sequences.shape[-1])).reshape(
    input_sequences.shape)
target_sequences = target_scaler.fit_transform(target_sequences.reshape(-1, 1)).reshape(target_sequences.shape)

# 保存标准化器以便预测时使用
joblib.dump(input_scaler, 'input_scaler.pkl1')
joblib.dump(target_scaler, 'target_scaler.pkl1')

In [3]:
# 创建自定义数据集类
class SimpleDataset(Dataset):
    def __init__(self, input_sequences, target_sequences):
        self.input_sequences = torch.tensor(input_sequences, dtype=torch.float32)
        self.target_sequences = torch.tensor(target_sequences, dtype=torch.float32)

    def __len__(self):
        return len(self.input_sequences)

    def __getitem__(self, idx):
        return self.input_sequences[idx], self.target_sequences[idx]

dataset = SimpleDataset(input_sequences, target_sequences)

# 划分训练集、验证集和测试集
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [17]:

# 定义位置编码
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]

# 定义Transformer模型
class TransformerModel(nn.Module):
    def __init__(self, input_dim, target_dim, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6,
                 dim_feedforward=2048, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.model_type = 'Transformer'
        self.src_embedding = nn.Linear(input_dim, d_model)
        self.tgt_embedding = nn.Linear(target_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                                          dropout)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, target_dim)
        self._reset_parameters()

    def _reset_parameters(self):
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

    def forward(self, src, tgt, tgt_mask):
        # 编码器输入
        src = self.src_embedding(src) * np.sqrt(self.d_model)
        src = self.pos_encoder(src)

        # 解码器输入
        tgt = self.tgt_embedding(tgt) * np.sqrt(self.d_model)
        tgt = self.pos_encoder(tgt)

        output = self.transformer(src, tgt, tgt_mask=tgt_mask)
        output = self.decoder(output)
        return output

# 生成上三角掩码
def generate_square_subsequent_mask(sz):
    mask = torch.triu(torch.ones(sz, sz)) == 1
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

# 模型参数
input_dim = input_sequences.shape[1]
target_dim = 1

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerModel(input_dim=input_dim, target_dim=target_dim).to(device)

# 定义损失函数和优化器
criterion = nn.SmoothL1Loss()
optimizer = optim.AdamW(model.parameters(), lr=0.0001, weight_decay=1e-5)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

# 训练和验证过程
num_epochs = 50
teacher_forcing_ratio = 0.5

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for input_seq, target_seq in train_loader:
        optimizer.zero_grad()
        input_seq, target_seq = input_seq.to(device), target_seq.to(device)
        input_seq = input_seq.unsqueeze(1).permute(1, 0, 2)  # [1, batch_size, input_dim]

        # 编码器生成上下文向量
        encoder_output = model.src_embedding(input_seq) * np.sqrt(model.d_model)
        encoder_output = model.pos_encoder(encoder_output)
        memory = model.transformer.encoder(encoder_output)

        # 准备解码器输入和掩码
        decoder_input = torch.zeros((target_seq.size(1), target_seq.size(0), 1), device=target_seq.device)
        tgt_mask = generate_square_subsequent_mask(decoder_input.size(0)).to(input_seq.device)

        # 自回归预测或Teacher Forcing
        for t in range(1, target_seq.size(1)):
            if random.random() < teacher_forcing_ratio:
                decoder_input[t] = target_seq[:, t-1:t]
            else:
                decoder_input[t] = model.decoder(model.pos_encoder(model.tgt_embedding(decoder_input[t-1:t]))).detach()

        decoder_input = model.tgt_embedding(decoder_input) * np.sqrt(model.d_model)
        decoder_input = model.pos_encoder(decoder_input)
        output = model.transformer.decoder(decoder_input, memory, tgt_mask=tgt_mask)
        output = model.decoder(output)

        loss = criterion(output.permute(1, 0, 2).squeeze(2), target_seq)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)
    scheduler.step()

    # 验证过程
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for input_seq, target_seq in val_loader:
            input_seq, target_seq = input_seq.to(device), target_seq.to(device)
            input_seq = input_seq.unsqueeze(1).permute(1, 0, 2)  # [1, batch_size, input_dim]

            # 编码器生成上下文向量
            encoder_output = model.src_embedding(input_seq) * np.sqrt(model.d_model)
            encoder_output = model.pos_encoder(encoder_output)
            memory = model.transformer.encoder(encoder_output)

            # 准备解码器输入和掩码
            decoder_input = torch.zeros((target_seq.size(1), target_seq.size(0), 1), device=target_seq.device)
            tgt_mask = generate_square_subsequent_mask(decoder_input.size(0)).to(input_seq.device)

            # 自回归预测
            for t in range(1, target_seq.size(1)):
                decoder_input[t] = model.decoder(model.pos_encoder(model.tgt_embedding(decoder_input[t-1:t]))).detach()

            decoder_input = model.tgt_embedding(decoder_input) * np.sqrt(model.d_model)
            decoder_input = model.pos_encoder(decoder_input)
            output = model.transformer.decoder(decoder_input, memory, tgt_mask=tgt_mask)
            output = model.decoder(output)

            loss = criterion(output.permute(1, 0, 2).squeeze(2), target_seq)
            val_loss += loss.item()

    val_loss /= len(val_loader)

    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss}, Validation Loss: {val_loss}")

# 保存模型
model_path = 'transformer_model_no_teacher_forcing_smooth_l1.pth'
torch.save(model.state_dict(), model_path)

RuntimeError: the batch number of src and tgt must be equal

In [1]:
import matplotlib.pyplot as plt
import random

# 测试过程
test_loss = 0
model.eval()
test_predictions = []
test_targets = []
with torch.no_grad():
    for input_seq, target_seq in test_loader:
        input_seq, target_seq = input_seq.to(device), target_seq.to(device)
        input_seq = input_seq.unsqueeze(1).permute(1, 0, 2)  # [1, batch_size, input_dim]

        # 编码器生成上下文向量
        encoder_output = model.src_embedding(input_seq) * np.sqrt(model.d_model)
        encoder_output = model.pos_encoder(encoder_output)
        memory = model.transformer.encoder(encoder_output)

        # 准备解码器输入和掩码
        decoder_input = torch.zeros((target_seq.size(1), target_seq.size(0), 1), device=target_seq.device)
        tgt_mask = generate_square_subsequent_mask(decoder_input.size(0)).to(input_seq.device)

        # 自回归预测
        for t in range(1, target_seq.size(1)):
            decoder_input[t] = model.decoder(model.pos_encoder(model.tgt_embedding(decoder_input[t-1:t]))).detach()

        decoder_input = model.tgt_embedding(decoder_input) * np.sqrt(model.d_model)
        decoder_input = model.pos_encoder(decoder_input)
        output = model.transformer.decoder(decoder_input, memory, tgt_mask=tgt_mask)
        output = model.decoder(output)

        loss = criterion(output.permute(1, 0, 2).squeeze(2), target_seq)
        test_loss += loss.item()

        test_predictions.append(output.permute(1, 0, 2).squeeze(2).cpu().numpy())
        test_targets.append(target_seq.cpu().numpy())

test_loss /= len(test_loader)
print(f"Test Loss: {test_loss}")

# 将所有测试集的预测结果和真实结果拼接起来
test_predictions = np.concatenate(test_predictions, axis=0)
test_targets = np.concatenate(test_targets, axis=0)

# 反标准化预测值和真实值
test_predictions = target_scaler.inverse_transform(test_predictions)
test_targets = target_scaler.inverse_transform(test_targets)

# 绘制图像
num_samples_per_fig = 2
num_figs = 4

for fig_idx in range(num_figs):
    fig, ax = plt.subplots(figsize=(15, 10))
    random_indices = random.sample(range(test_predictions.shape[0]), num_samples_per_fig)

    for idx in random_indices:
        ax.plot(range(test_targets.shape[1]), test_targets[idx], 'b-', alpha=0.5, label='True' if idx == random_indices[0] else "")
        ax.plot(range(test_predictions.shape[1]), test_predictions[idx], 'ro', alpha=0.5, label='Predicted' if idx == random_indices[0] else "")

    ax.legend(fontsize=12, prop={'family': 'Times New Roman'})
    ax.set_title(f'Test Samples Comparison - Figure {fig_idx + 1}', fontsize=14, fontname='Times New Roman')
    ax.tick_params(direction='in', labelsize=12)
    ax.grid(True)
    plt.tight_layout()
    plt.show()

NameError: name 'model' is not defined