In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import sklearn
import torch.utils.data
from torch.utils.data import TensorDataset
from tqdm import tqdm

In [None]:
torch.manual_seed(9876543210) 
device="cuda" if torch.cuda.is_available() else "cpu" 

In [None]:
data_path = r"D:\displaced prediction\data\####.xlsx"
df = pd.read_excel(data_path,index_col = 0) 
df.head(5)

In [None]:
original_data=np.array(df) 
original_data[np.isnan(original_data)]=0 

In [None]:
timestep = 3 
feature_size = 1 
#input_size = 4
output_size = 1 
batch_size = 64 
hidden_size = 32 
num_layers = 2 
epochs = 1000 
best_loss = 0
learning_rate = 0.01 
model_name = 'LSTM'
save_path = './{}.pt'.format(model_name) 

In [None]:
def normalization(data):
    mean=data.mean()
    shifted_data=data-mean
    scaler=data.var()
    scaled_data=shifted_data/scaler
    return mean,scaler,scaled_data 

In [None]:
def denormalization(data,mean,scaler):
    return data*scaler+mean

In [None]:
def preprocess_data(input_data,output_data,timestep,feature_size):
    data_number=input_data.shape[0]
    dataX=np.zeros([data_number-timestep,timestep,feature_size])
    dataY=np.zeros([data_number-timestep,1])
    
    for index in range(data_number - timestep):
        dataX[index,:,:]=input_data[index: index + timestep,:]
        dataY[index,:]=output_data[index+timestep,:]
    
    return dataX,dataY

In [None]:
def split_data(input_data,output_data, timestep,feature_size):
    dataX,dataY=preprocess_data(input_data,output_data,timestep,feature_size)

    train_size = int(np.round(0.8 * dataX.shape[0]))

    x_train = dataX[:train_size]
    y_train = dataY[:train_size]

    x_test = dataX[train_size:]
    y_test = dataY[train_size:]

    return [x_train, y_train, x_test, y_test]

In [None]:
input_mean,input_scale,input_data=normalization(original_data[:,:feature_size])
output_mean,output_scale,output_data=normalization(original_data[:,-1])
output_data=output_data.reshape(-1,1) 

In [None]:
x_train, y_train, x_test, y_test = split_data(input_data,output_data, timestep,feature_size)

In [None]:
x_train_tensor = torch.from_numpy(x_train).to(torch.float32).to(device)
y_train_tensor = torch.from_numpy(y_train).to(torch.float32).to(device)
x_test_tensor = torch.from_numpy(x_test).to(torch.float32).to(device)
y_test_tensor = torch.from_numpy(y_test).to(torch.float32).to(device)

In [None]:
train_data = TensorDataset(x_train_tensor, y_train_tensor)
test_data = TensorDataset(x_test_tensor, y_test_tensor)

In [None]:
train_loader = torch.utils.data.DataLoader(train_data,
                                           batch_size,
                                           False)

test_loader = torch.utils.data.DataLoader(test_data,
                                          batch_size,
                                          False)

In [None]:
class LSTM(nn.Module):  
    def __init__(self, feature_size, hidden_size, num_layers, output_size, timestep, device):  
        super(LSTM, self).__init__()  
        self.hidden_size = hidden_size  
        self.num_layers = num_layers 
        self.device = device  
        self.lstm = nn.LSTM(feature_size, hidden_size, num_layers, batch_first=True)  
        self.fc = nn.Linear(hidden_size, output_size)  
  
    def forward(self, x, hidden=None):  
    
        batch_size = x.shape[0]   
        seq_len = x.shape[1]    
  
        if hidden is None:  
            h_0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(self.device)  
            c_0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(self.device)  
            hidden = (h_0, c_0)  
  
        output, (hn, cn) = self.lstm(x, hidden)  
  
        output = output[:, -1, :] 
        output = output.reshape(-1, self.hidden_size)
  
        output = self.fc(output)  
        return output, (hn, cn) 
  
model = LSTM(feature_size, hidden_size, num_layers, output_size, timestep, device).to(device)  
loss_function = nn.MSELoss()  
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  
  

In [None]:
'''
class GRU(nn.Module):
    def __init__(self, feature_size, hidden_size, num_layers, output_size,timestamp,device):
        super(GRU, self).__init__()
        self.hidden_size = hidden_size  
        self.num_layers = num_layers 
        self.timestamp=timestamp
        self.device=device
        self.gru = nn.GRU(feature_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)



    def forward(self, x, hidden=None):
        batch_size = x.shape[0]
        

        if hidden is None:
            h_0=torch.zeros(self.num_layers,batch_size,self.hidden_size).to(self.device)
        else:
            h_0 = hidden
            
        output, next_states = self.gru(x, h_0)
        output=output[:,0,:]#[:,-1,:]
        output=output.reshape(-1,self.hidden_size)
    
        output = self.fc(output)
        return output,next_states


model = GRU(feature_size, hidden_size, num_layers, output_size,timestep,device).to(device)
loss_function = nn.MSELoss() 
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) 
'''

In [None]:

for epoch in range(epochs):
    model.train()
    running_loss = 0
    train_bar = tqdm(train_loader)  
    for data in train_bar:
        x_train, y_train = data  
        optimizer.zero_grad()
        y_train_pred,next_states = model(x_train) 
        loss = loss_function(y_train_pred.reshape(-1,1), y_train.reshape(-1, 1)) 
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        train_bar.desc = "train epoch[{}/{}] loss:{:.6f}".format(epoch + 1,
                                                                 epochs,
                                                                 loss)

torch.save(model, save_path)

In [None]:

model.eval()
test_loss = 0
with torch.no_grad():
    test_bar = tqdm(test_loader)
    for data in test_bar:
        x_test, y_test = data
        y_test_pred,next_states = model(x_test)
        test_loss = loss_function(y_test_pred.reshape(-1,1), y_test.reshape(-1, 1))

print('Finished Training')
print("total error:",test_loss)

In [None]:

X_all,Y_all=preprocess_data(input_data, output_data, timestep, feature_size)
x_all_torch=torch.from_numpy(X_all).to(torch.float32).to(device)
predict_y,_=model(x_all_torch)
predict_y=predict_y.detach().cpu().numpy()

plt.plot(denormalization(Y_all,output_mean,output_scale),color="r",label="origial")
plt.plot(denormalization(predict_y,output_mean,output_scale),color="b",label="prediction")
plt.legend()
plt.show()

In [None]:

true = denormalization(Y_all,output_mean,output_scale)
pre = denormalization(predict_y,output_mean,output_scale)
mse = np.sum((true-pre) ** 2) / len(pre)
rmse = np.sqrt(mse)
mae = np.sum(np.absolute(true-pre)) / len(pre)
r2 = 1-mse/ np.var(pre)
mape = np.mean(np.abs((pre - true) / true))
print(" mae:",mae,"mse:",mse," rmse:",rmse," r2:",r2," mape:",mape)


In [None]:

df_1 = pd.DataFrame(pre, columns=['pre'])  
df_2 = pd.DataFrame(true, columns=['true'])  

output_path1 = r'C:\Users\10512\Downloads\output_pre.xlsx'  
output_path2 = r'C:\Users\10512\Downloads\output_true.xlsx'  

df_1.to_excel(output_path1, index=False, engine='openpyxl')  
df_2.to_excel(output_path2, index=False, engine='openpyxl')  
  
print(f"Excel文件已保存到：{output_path1}")  
print(f"Excel文件已保存到：{output_path2}")
