In [None]:
from models import *
from utils import *
from test import *

import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import itertools

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler


In [None]:
X, y = occupant()
X_columns_to_normalize = X.columns.difference(["timestamp [dd/mm/yyyy HH:MM]"])
X_scaler = MinMaxScaler()
X[X_columns_to_normalize] = X_scaler.fit_transform(X[X_columns_to_normalize])

X_train = X[X["timestamp [dd/mm/yyyy HH:MM]"] <= '2013-09-30']
y_train = y[y["timestamp [dd/mm/yyyy HH:MM]"] <= '2013-09-30']

X_test = X[X["timestamp [dd/mm/yyyy HH:MM]"] >= '2013-09-30']
y_test = y[y["timestamp [dd/mm/yyyy HH:MM]"] >= '2013-09-30']


In [None]:
seq_length = 50
predict_length = 5
batch_size = 100

Train_dataset = TimeSeriesDataset_sep(X_train, y_train, seq_length, predict_length = predict_length, time = "timestamp [dd/mm/yyyy HH:MM]")
Train_dataloader = DataLoader(Train_dataset, batch_size=batch_size, shuffle=False)

Test_dataset = TimeSeriesDataset_sep(X_test, y_test, seq_length, predict_length = predict_length, time = "timestamp [dd/mm/yyyy HH:MM]")
Test_dataloader = DataLoader(Test_dataset, batch_size=batch_size, shuffle=False)


In [None]:
num_epochs = 50
input_size = 43
output_size = 1
hidden_size = 50
num_layers = 2
num_heads = 5
model = BiLSTMTransformer(input_size, hidden_size, num_layers, output_size, num_heads, predict_length).to(device)

In [None]:
teacher_forcing_ratio = 5
criterion = nn.MSELoss()
#criterion = nn.SmoothL1Loss()  # 用于回归任务
optimizer = torch.optim.Adam(model.parameters(), lr = 0.01)
loss_per_epoch = []
val_mse_per_epoch = []
val_r2_per_epoch = []

for epoch in range(num_epochs):
    model.train()  # 确保模型在训练模式下
    for external, internal, batch_y in Train_dataloader:
        external, internal, batch_y = external.to(device), internal.to(device), batch_y.to(device)

        # 前向传播
        total_loss = 0
        y = internal[:, -1:, :]
        for step in range(predict_length):
            outputs = model(external, internal, y)
            next_pred = outputs[:, -1:, :]
            #outputs = model(batch_X, batch_y.view(batch_y.shape[0], batch_y.shape[2]))
            loss = criterion(next_pred, batch_y[:, step:step+1, :])
            total_loss += loss
            if np.random.rand() < teacher_forcing_ratio:
                next_input = batch_y[:, step:step + 1, :]
            else:
                next_input = next_pred
            y = torch.cat([y, next_input], dim=1)
                
        total_loss = total_loss / predict_length
        # 反向传播和优化
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

    #scheduler.step()

    loss_per_epoch.append(loss.item())
    teacher_forcing_ratio -= 0.05

    # 评估验证集
    val_loss, (val_mse, val_mae, val_r2) = evaluate_Transformer(model, Test_dataloader, criterion, device, [mean_squared_error, mean_absolute_error, r2_score])
    val_mse_per_epoch.append(val_mse)
    val_r2_per_epoch.append(val_r2)

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss.item():.4f}, Val Loss: {val_loss:.4f}, Val MSE: {val_mse:.4f}, Val R²: {val_r2:.4f}')
