In [None]:
import numpy as np ##用于数值计算，特别是处理数组和矩阵。
import pandas as pd ## 用于数据处理和分析，特别是表格数据。
import matplotlib.pyplot as plt ##用于绘制静态、动态和交互式可视化图形。
import seaborn as sns ##基于matplotlib的可视化库，提供更美观的统计图形。
import torch ## PyTorch库，用于深度学习。
import torch.nn as nn ## PyTorch中用于构建神经网络的模块。
from sklearn.preprocessing import MinMaxScaler ##用于数据归一化，将数据缩放到指定范围。
from sklearn.metrics import mean_squared_error, r2_score ##用于评估模型性能的指标，分别计算均方误差和R²分数。
import math ##提供数学函数。
import time ##用于获取当前时间。

# 读取数据
filepath = '2years.csv'
data = pd.read_csv(filepath) ##从CSV文件读取数据并存储在data变量中。
data['Date'] = pd.to_datetime(data['Date'], format='%Y/%m/%d %H:%M')  ##将Date列转换为datetime格式，便于处理时间序列数据。
print(data.head()) ##打印数据的前几行以查看内容。
print(data.shape) ##打印数据的形状（行数和列数）。

# 可视化收盘价
sns.set_style("darkgrid") ##设置Seaborn的绘图风格为“darkgrid”。
plt.figure(figsize=(15, 9)) ##创建一个新的图形，设置大小为15x9英寸。
plt.plot(data[['water level']]) ##绘制water level列的数据。
plt.xticks(range(0, data.shape[0], 200), data['Date'].loc[::200], rotation=100)
##设置x轴的刻度，以每200个数据点显示一个日期，并旋转标签以便于阅读。
plt.title("tide_level", fontsize=18, fontweight='bold') ##设置图表标题。
plt.show() ##显示图形。

In [None]:
# 1. 特征工程

features = data[['waterlevel1hourago','tide','wind',
                 'atmosphericpressure',
                 'temperaturaair',
                 'temperaturaacqua','rainfall','radiation',
                 'humidity','water level'
]]  ##提取多个特征列（如潮汐、风速、大气压力等）用于模型训练。
print(features.info()) ##打印特征数据的基本信息。

# 数据归一化处理，将所有特征缩放到 [-1, 1] 之间
scaler = MinMaxScaler(feature_range=(-1, 1)) ##创建一个MinMaxScaler实例，将数据缩放到[-1, 1]范围。
scaled_features = scaler.fit_transform(features) ##使用fit_transform方法将特征数据进行归一化处理。
scaled_features_df = pd.DataFrame(scaled_features, columns=features.columns)
print(scaled_features_df.head()) ##将归一化后的数据转换回DataFrame格式并打印前几行。


In [None]:
# 2. 数据集制作
def split_data(level, lookback): ##定义split_data函数，将数据转换为适合训练和测试的数据集。
    data_raw = level.to_numpy()  # 将数据转换为 numpy 数组
    data = []

    # 创建滑动窗口，其中lookback决定使用多少过去的数据来预测未来值。
    for index in range(len(data_raw) - lookback):
        data.append(data_raw[index: index + lookback + 1])  # 包括预测目标

    data = np.array(data)
    test_set_size = int(np.round(0.2 * data.shape[0]))
    train_set_size = data.shape[0] - test_set_size


    x_train = data[:train_set_size, :-1, :-1]# 过去48小时的所有特征（除了目标）
    y_train = data[:train_set_size, -1, 0] # 下一小时的water level（目标值）

    x_test = data[train_set_size:, :-1, :-1]  # 测试集输入
    y_test = data[train_set_size:, -1, 0]  # 测试集输出

    return [x_train, y_train, x_test, y_test]

lookback = 12  # 只使用前24个小时的特征
x_train, y_train, x_test, y_test = split_data(scaled_features_df, lookback)
print('x_train.shape = ', x_train.shape)
print('y_train.shape = ', y_train.shape)
print('x_test.shape = ', x_test.shape)
print('y_test.shape = ', y_test.shape)


In [None]:
# 3. 模型构建 —— CNN_LSTM_KAN
import torch
import torch.nn as nn
from kan import KAN

class CNN_LSTM_KAN(nn.Module):
    def __init__(self, in_channels,hidden_size,num_layers,out_channels,output_size):
        super(CNN_LSTM_KAN, self).__init__()
        # self.args = args
        self.relu = nn.ReLU(inplace=True)
        # (batch_size=30, seq_len=24, input_size=7) ---> permute(0, 2, 1)
        # (30, 7, 24)
        self.conv = nn.Sequential(
            nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=2),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=1)
        )
        # (batch_size=30, out_channels=32, seq_len-4=20) ---> permute(0, 2, 1)
        # (30, 20, 32)
        self.lstm = nn.LSTM(input_size=out_channels, hidden_size=hidden_size,
                            num_layers=num_layers, batch_first=True)
        # self.fc = nn.Linear(hidden_size, output_size)
        self.kan=KAN(width=[hidden_size, output_size])

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.conv(x)
        x = x.permute(0, 2, 1)
        x, _ = self.lstm(x)
        x = self.kan(x[:, -1, :])
        # x = x[:, -1, :]

        return x




input_dim = scaled_features_df.shape[1]-1
hidden_dim = 64
num_layers = 3
output_dim = 1
num_epochs = 100

# 初始化模型
model = CNN_LSTM_KAN(in_channels=input_dim, hidden_size=hidden_dim, num_layers=num_layers, out_channels=64, output_size=output_dim)
criterion = nn.MSELoss()
optimiser = torch.optim.Adam(model.parameters(), lr=0.01,weight_decay=1e-4)##添加 L2 正则化项

# 将训练集和测试集转换为 PyTorch 张量
x_train = torch.from_numpy(x_train).type(torch.Tensor)
x_test = torch.from_numpy(x_test).type(torch.Tensor)
y_train_lstm = torch.from_numpy(y_train).type(torch.Tensor).view(-1, 1)
y_test_lstm = torch.from_numpy(y_test).type(torch.Tensor).view(-1, 1)

In [None]:
# 4. 模型训练
hist = np.zeros(num_epochs)
start_time = time.time()
lstm = []

for t in range(num_epochs):
    y_train_pred = model(x_train)
    loss = criterion(y_train_pred, y_train_lstm)
    print("Epoch ", t, "MSE: ", loss.item())
    hist[t] = loss.item()

    optimiser.zero_grad()
    loss.backward()
    optimiser.step()

training_time = time.time() - start_time
print("Training time: {}".format(training_time))


In [None]:
# 5. 模型结果可视化
# 直接反归一化水位数据，确保只使用该列

predict = pd.DataFrame(scaler.inverse_transform(
    np.concatenate((np.zeros((y_train_pred.shape[0], input_dim )), y_train_pred.detach().numpy()), axis=1)
)[:, -1])

original = pd.DataFrame(scaler.inverse_transform(
    np.concatenate((np.zeros((y_train_lstm.shape[0], input_dim )), y_train_lstm.detach().numpy()), axis=1)
)[:, -1])

original = original.reset_index(drop=True)
predict = predict.reset_index(drop=True)
# 查看反归一化后的最小值和最大值
print("Original min/max:", original.min()[0], original.max()[0])
print("Predicted min/max:", predict.min()[0], predict.max()[0])

sns.set_style("darkgrid")

fig = plt.figure()
fig.subplots_adjust(hspace=0.2, wspace=0.2)

plt.subplot(1, 2, 1)
ax = sns.lineplot(x=original.index, y=original[0], label="Data", color='royalblue')
ax = sns.lineplot(x=predict.index, y=predict[0], label="Training Prediction (CNN_LSTM_kans)", color='tomato')

ax.set_title('water level', size=14, fontweight='bold')
ax.set_xlabel("hours", size=14)
ax.set_ylabel("Water Level(m)", size=14)

plt.subplot(1, 2, 2)
ax = sns.lineplot(data=hist, color='royalblue')
ax.set_xlabel("Epoch", size=14)
ax.set_ylabel("Loss", size=14)
ax.set_title("Training Loss", size=14, fontweight='bold')
fig.set_figheight(6)
fig.set_figwidth(16)
plt.show()

In [None]:
# 6. 模型验证
# make predictions
y_test_pred = model(x_test)

# invert predictions
y_train_pred = scaler.inverse_transform(np.concatenate((np.zeros((y_train_pred.shape[0], input_dim)), y_train_pred.detach().numpy()), axis=1))[:, -1]
y_train = scaler.inverse_transform(np.concatenate((np.zeros((y_train_lstm.shape[0], input_dim)), y_train_lstm.detach().numpy()), axis=1))[:, -1]
y_test_pred = scaler.inverse_transform(np.concatenate((np.zeros((y_test_pred.shape[0], input_dim)), y_test_pred.detach().numpy()), axis=1))[:, -1]
y_test = scaler.inverse_transform(np.concatenate((np.zeros((y_test_lstm.shape[0], input_dim)), y_test_lstm.detach().numpy()), axis=1))[:, -1]

# calculate root mean squared error
trainScore = math.sqrt(mean_squared_error(y_train, y_train_pred))
print('Train Score: %.2f RMSE' % (trainScore))
testScore = math.sqrt(mean_squared_error(y_test, y_test_pred))
print('Test Score: %.2f RMSE' % (testScore))

trainr2Score = r2_score(y_train, y_train_pred)
print('Train Score: %.2f R2' % (trainr2Score))
testr2Score = r2_score(y_test, y_test_pred)
print('Test Score: %.2f R2' % (testr2Score))


# 计算 Mean Squared Error (MSE)
trainMSE = mean_squared_error(y_train, y_train_pred)
print('Train Score: %.5f MSE' % (trainMSE))
testMSE = mean_squared_error(y_test, y_test_pred)
print('Test Score: %.5f MSE' % (testMSE))

# 计算 Nash-Sutcliffe Efficiency (NSE)
def nash_sutcliffe_efficiency(observed, predicted):
    return 1 - (np.sum((observed - predicted) ** 2) / np.sum((observed - np.mean(observed)) ** 2))

trainNSE = nash_sutcliffe_efficiency(y_train, y_train_pred)
print('Train Score: %.2f NSE' % (trainNSE))
testNSE = nash_sutcliffe_efficiency(y_test, y_test_pred)
print('Test Score: %.2f NSE' % (testNSE))

In [None]:
##7. 绘制图像
import plotly.graph_objects as go
import numpy as np
import pandas as pd  # 确保导入 pandas

# 创建一个新的 DataFrame 来存储训练和测试的实际值与预测值
train_results = pd.DataFrame({
    'Actual': y_train.flatten(),
    'Predicted': y_train_pred.flatten()
})

test_results = pd.DataFrame({
    'Actual': y_test.flatten(),
    'Predicted': y_test_pred.flatten()
})

# 创建索引
train_results.index = range(len(train_results))
test_results.index = range(len(train_results), len(train_results) + len(test_results))

# 合并训练集和测试集结果
results = pd.concat([train_results, test_results], axis=0)

# 创建 Plotly 图形
fig = go.Figure()

# 添加训练集预测
fig.add_trace(go.Scatter(x=train_results.index,
                         y=train_results['Predicted'],
                         mode='lines',
                         name='Train Fitting',
                         line=dict(color='tomato', width=2)))

# 添加测试集预测
fig.add_trace(go.Scatter(x=test_results.index,
                         y=test_results['Predicted'],
                         mode='lines',
                         name='Test Prediction',
                         line=dict(color='royalblue', width=2)))

# 添加实际值
fig.add_trace(go.Scatter(x=results.index,
                         y=results['Actual'],
                         mode='lines',
                         name='Actual Value',
                         line=dict(color='gray', width=2)))

# 更新布局
fig.update_layout(
    title='Water Level Prediction vs Actual Values',
    xaxis=dict(
        title='Time (Hours)',
        showline=True,
        showgrid=True,
        linecolor='white',
        linewidth=2,
        showticklabels=True
    ),
    yaxis=dict(
        title='Water Level (m)',
        titlefont=dict(
            family='Rockwell',
            size=12,
            color='white'
        ),
        showline=True,
        showgrid=True,
        linecolor='white',
        linewidth=2,
        ticks='outside',
        tickfont=dict(
            family='Rockwell',
            size=12,
            color='white'
        ),
    ),
    template='plotly_dark',
    legend=dict(x=1.05, y=1, traceorder='normal', orientation='v')  # 将图例放置在右侧
)

# 添加注释
annotations = [
    dict(xref='paper', yref='paper', x=0.0, y=1.05,
         xanchor='left', yanchor='bottom',
         text='Results (CNN_LSTM_kans)',
         font=dict(family='Rockwell', size=26, color='white'),
         showarrow=False)
]
fig.update_layout(annotations=annotations)

# 显示图形
fig.show()


