In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error, mean_absolute_percentage_error
from sklearn.model_selection import train_test_split
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

In [None]:
import random
import torch
import torch.nn as nn
import torch.optim as optim
device='cuda:0' if torch.cuda.is_available() else 'cpu'

def set_seed(seed):
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

Divide the training set and the test set

In [None]:
df_suzhou=pd.read_excel("./data/P1.xlsx")
df_suzhou['日期'] = pd.to_datetime(df_suzhou['日期'])
X1_suzhou=df_suzhou.drop(columns=['平均负荷','碳排放强度', '日期','机组'])
y1_suzhou=df_suzhou['碳排放强度']

df_maanshan=pd.read_excel("./data/P3.xlsx")
df_maanshan['日期'] = pd.to_datetime(df_maanshan['日期'])

X1 = df_maanshan.drop(columns=['平均负荷','碳排放强度', '日期'])
y1 = df_maanshan['碳排放强度']

scaler1 = MinMaxScaler()
scaler2 = MinMaxScaler()
scaler3 = MinMaxScaler()
scaler4 = MinMaxScaler()
X1_suzhou_scaled = scaler1.fit_transform(X1_suzhou)
y1_suzhou_scaled = scaler2.fit_transform(y1_suzhou.to_frame())
X1_scaled = scaler3.fit_transform(X1)
y1_scaled = scaler4.fit_transform(y1.to_frame())

y1_suzhou_scaled=pd.Series(y1_suzhou_scaled.flatten())
y1_scaled=pd.Series(y1_scaled.flatten())

X1_train, X1_test, y1_train, y1_test = train_test_split(X1_scaled, y1_scaled, test_size=0.3, random_state=42)
X1_train=np.vstack((X1_suzhou_scaled,X1_train))
y1_train=np.hstack((y1_suzhou_scaled,y1_train))
y1_test_recover=scaler4.inverse_transform(pd.DataFrame(y1_test))

Sorted feature importance

In [None]:
def plot_feature_importance(feature_importances):
    sorted_idx = np.argsort(feature_importances)
    sorted_feature_names = np.array(X1.columns)[sorted_idx]
    sorted_feature_importances = feature_importances[sorted_idx]
    sorted_correlations = df_maanshan.corr()['碳排放强度'].drop('碳排放强度')[sorted_idx]
    plt.figure(figsize=(10, 8))
    bars = plt.barh(sorted_feature_names, sorted_feature_importances,height = 0.6,color=np.where(sorted_correlations > 0, '#CA7373', '#4874CB'))
    plt.yticks(fontsize=13, fontweight='bold')
    plt.xticks(fontsize=13, fontweight='bold')
    plt.xlabel("特征重要性", fontsize=13, fontweight='bold')
    plt.grid(axis='x', linestyle='--', alpha=0.6)
    plt.show()

Prediction result graph

In [None]:
def plot_line(pred):
    import matplotlib.pyplot as plt
    import matplotlib.lines as mlines  # 导入 mlines
    plt.rcParams['font.sans-serif'] = ['SimHei']  # 使用黑体 SimHei
    plt.rcParams['axes.unicode_minus'] = False    # 解决负号显示问题
    plt.figure(figsize=(9, 5))
    plt.plot(pred, label='预测值', linewidth=1,color = "#4874CB")       # 添加预测值的标签
    plt.plot(y1_test_recover, label='实际值', linewidth=1, color='#CA7373')  # 添加真实值的标签
    plt.scatter(range(len(pred)), pred, label='预测值',  alpha=0.7,marker= "X",color = "#4874CB")  # 绘制预测值的散点图
    plt.scatter(range(len(y1_test_recover)), y1_test_recover, label='实际值', alpha=0.7, color='#CA7373')  # 绘制真实值的散点图
    line1 = mlines.Line2D([], [], color="#4874CB", marker="X", markersize=6, label="预测值")
    line2 = mlines.Line2D([], [], color="#CA7373", marker="o", markersize=6, label="实际值")
    
    plt.legend(handles=[line1, line2], fontsize=12.5)
    plt.xlabel("样本序列",fontsize = 15,fontweight='bold')
    plt.ylabel("碳排放强度/(g/kWh)",fontsize = 15,fontweight='bold')
    
    plt.tight_layout()  # 调整布局，确保所有元素不重叠
    # plt.savefig("rf1.png",dpi=500,bbox_inches='tight')
    plt.show()

Comparison between the predicted value and the true value

In [None]:
def plot_compare_line(pred,r2):
    z = np.polyfit(y1_test_recover.T[0], pred, 1)  # 线性拟合
    p = np.poly1d(z)  # 得到拟合的多项式函数
    y_fit = p(y1_test_recover.T[0])
    residuals = pred - y_fit
    std_err = np.std(residuals)
    confidence_interval = 1.96 * std_err  # 95%置信区间对应的倍数是1.96
    data = pd.DataFrame({'y_test': y1_test_recover.T[0], 'y_pred': pred, 'y_fit': y_fit})
    data_sorted = data.sort_values(by='y_test')
    sorted_y_test = data_sorted['y_test'].values
    sorted_y_pred = data_sorted['y_pred'].values
    sorted_y_fit = data_sorted['y_fit'].values
    plt.scatter(sorted_y_test, sorted_y_pred, color='blue', alpha=0.6, edgecolor='k', label='Data point')
    plt.plot(sorted_y_test, sorted_y_fit, color='orange', alpha=0.6, label=f"Fit line\n$R^2$ = {r2:.2f}")
    plt.fill_between(sorted_y_test, sorted_y_fit - confidence_interval, sorted_y_fit + confidence_interval, color='orange', alpha=0.2, label='95% Confidence interval')
    max_val = max(sorted_y_test.max(), sorted_y_pred.max())  # 找到最大值，确保参考线绘制完整
    min_val = min(sorted_y_test.min(), sorted_y_pred.min())  # 找到最小值
    plt.plot([min_val, max_val], [min_val, max_val], 'r--', label='y = x')  # 画出y=x的对角线
    plt.title("Comparison of model predictions with real values", fontsize=14)
    plt.xlabel("Real values of CEI / (g/kWh)", fontsize=14)
    plt.ylabel("Predictions of CEI / (g/kWh)", fontsize=14)
    plt.legend(fontsize=12)
    plt.tight_layout()
    plt.show()

1.SVM

In [None]:
svr_model = SVR(gamma=0.005,C=4600,epsilon=0.09)
svr_model.fit(X1_train, y1_train)
svm_y_pred = svr_model.predict(X1_test)

svm_y_pred=scaler4.inverse_transform(pd.DataFrame({'pred':svm_y_pred}))
y1_test_recover=scaler4.inverse_transform(pd.DataFrame(y1_test))
svm_mse = mean_squared_error(y1_test_recover, svm_y_pred)
svm_rmse = np.sqrt(svm_mse)
svm_mae = mean_absolute_error(y1_test_recover, svm_y_pred)
svm_mape = mean_absolute_percentage_error(y1_test_recover, svm_y_pred)
svm_r2 = r2_score(y1_test_recover, svm_y_pred)

print(f"svm_MSE: {svm_mse:.6f}")
print(f"svm_RMSE: {svm_rmse:.6f}")
print(f"svm_MAE: {svm_mae:.6f}")
print(f"svm_MAPE: {svm_mape*100:.6f}")
print(f"svm_R Square: {svm_r2:.6f}")

In [None]:
# plot_feature_importance(np.abs(svr_model.coef_).flatten())
plot_compare_line(svm_y_pred.T[0],svm_r2)
plot_line(svm_y_pred)

In [None]:
'''import shap
plt.rcParams['font.sans-serif'] = ['SimHei']  # 使用黑体 SimHei
plt.rcParams['axes.unicode_minus'] = False    # 解决负号显示问题
# Use the trained SVR model to create an explainer
explainer = shap.KernelExplainer(svr_model.predict, X1_train)
# Calculate SHAP values for the test set
shap_values = explainer.shap_values(X1_test)
# Visualize feature importance using SHAP
shap.summary_plot(shap_values, X1_test, feature_names=X1.columns)'''

In [None]:
df_X1_train=pd.DataFrame(X1_train,columns=X1.columns)

In [None]:
def pianYiLai(f1,f2,s1,s2):
    from sklearn.inspection import PartialDependenceDisplay
    import warnings
    warnings.filterwarnings("ignore")
    t2_features3 = [f1, f2,(f1,f2)]
    plt.rcParams['font.sans-serif'] = ['SimHei']  # 使用黑体 SimHei
    plt.rcParams['axes.unicode_minus'] = False    # 解决负号显示问题
    # 生成双向部分依赖图
    disp = PartialDependenceDisplay.from_estimator(svr_model, X1, t2_features3)

    disp.axes_.ravel()[0].set_ylabel('偏依赖量/(g/kWh)', fontsize=13, fontweight='bold')  # Set y-axis label and make it bold
    disp.axes_.ravel()[2].set_ylabel(s2, fontsize=13, fontweight='bold')  # Set y-axis label and make it bold
    disp.axes_.ravel()[0].set_xlabel(s1, fontsize=13, fontweight='bold')  # Set y-axis label and make it bold
    disp.axes_.ravel()[1].set_xlabel(s2, fontsize=13, fontweight='bold')  # Set y-axis label and make it bold
    disp.axes_.ravel()[2].set_xlabel(s1, fontsize=13, fontweight='bold')  # Set y-axis label and make it bold
    plt.subplots_adjust(wspace=0.7)

    # Save the plot to an image file
    #plt.savefig('partial_dependence_plot3.png', dpi=1000, bbox_inches='tight')  # Save as PNG with 500 dpi
    plt.show()

In [None]:
# pianYiLai('负荷率','排烟温度','负荷率平均值(%)','排烟温度/°C')

2.RF

In [None]:
correlation_matrix = df_maanshan.drop(columns=['排汽温度', '日期']).corr()
from sklearn.ensemble import RandomForestRegressor
rf_model = RandomForestRegressor(n_jobs=28,max_depth=26,n_estimators=178,random_state=42)
rf_model.fit(X1_train, y1_train)
rf_y1_pred = rf_model.predict(X1_test)

rf_y1_pred=scaler4.inverse_transform(pd.DataFrame({'pred':rf_y1_pred}))
# y1_test_recover=scaler4.inverse_transform(pd.DataFrame(y1_test))

rf_mse = mean_squared_error(y1_test_recover, rf_y1_pred)
rf_rmse = np.sqrt(rf_mse)  # Calculating RMSE
rf_mae = mean_absolute_error(y1_test_recover, rf_y1_pred)
rf_mape = mean_absolute_percentage_error(y1_test_recover, rf_y1_pred)
rf_r2 = r2_score(y1_test_recover, rf_y1_pred)

print(f"rf_MSE: {rf_mse:.6f}")
print(f"rf_RMSE: {rf_rmse:.6f}")
print(f"rf_MAE: {rf_mae:.6f}")
print(f"rf_MAPE: {rf_mape*100:.6f}")
print(f"rf_R Square: {rf_r2:.6f}")

In [None]:
plot_feature_importance(rf_model.feature_importances_)
plot_compare_line(rf_y1_pred.T[0],rf_r2)
plot_line(rf_y1_pred)

3.XGB

In [None]:
from xgboost import XGBRegressor
# 初始化XGBoost回归器
xgb_model = XGBRegressor(
    n_estimators=48,        # 树的数量
    max_depth=7,             # 树的最大深度
    learning_rate=0.1,       # 学习率
    subsample=0.8,           # 每棵树使用的样本比例
    colsample_bytree=0.8,    # 每棵树使用的特征比例
    random_state=42
)
# 训练模型
xgb_model.fit(X1_train, y1_train)
# Predict on the test set
xgb_y1_pred =  xgb_model.predict(X1_test)
xgb_y1_pred=scaler4.inverse_transform(pd.DataFrame({'pred':xgb_y1_pred}))
# y1_test_recover=scaler4.inverse_transform(pd.DataFrame(y1_test))
# Evaluate the model
xgb_mse = mean_squared_error(y1_test_recover, xgb_y1_pred)
xgb_rmse = np.sqrt(xgb_mse)  # Calculating RMSE
xgb_mae = mean_absolute_error(y1_test_recover, xgb_y1_pred)
xgb_mape = mean_absolute_percentage_error(y1_test_recover, xgb_y1_pred)
xgb_r2 = r2_score(y1_test_recover, xgb_y1_pred)
print(f"xgb_MSE: {xgb_mse:.6f}")
print(f"xgb_RMSE: {xgb_rmse:.6f}")
print(f"xgb_MAE: {xgb_mae:.6f}")
print(f"xgb_MAPE: {xgb_mape*100:.6f}")
print(f"xgb_R Square: {xgb_r2:.6f}")

In [None]:
plot_feature_importance(xgb_model.feature_importances_)
plot_compare_line(xgb_y1_pred.T[0],xgb_r2)
plot_line(xgb_y1_pred)

4.LR

In [None]:
from sklearn.linear_model import LinearRegression
# 创建线性回归模型
linear_model = LinearRegression()
# 拟合模型
linear_model.fit(X1_train, y1_train)
# 在测试集上进行预测
linear_y1_pred = linear_model.predict(X1_test)
linear_y1_pred=scaler4.inverse_transform(pd.DataFrame({'pred':linear_y1_pred}))
# y1_test_recover=scaler4.inverse_transform(pd.DataFrame(y1_test))
lr_mse = mean_squared_error(y1_test_recover, linear_y1_pred)
lr_rmse = np.sqrt(xgb_mse)  # Calculating RMSE
lr_mae = mean_absolute_error(y1_test_recover, linear_y1_pred)
lr_mape = mean_absolute_percentage_error(y1_test_recover, linear_y1_pred)
lr_r2 = r2_score(y1_test_recover, linear_y1_pred)
print(f"lr_MSE: {lr_mse:.6f}")
print(f"lr_RMSE: {lr_rmse:.6f}")
print(f"lr_MAE: {lr_mae:.6f}")
print(f"lr_MAPE: {lr_mape*100:.6f}")
print(f"lr_R Square: {lr_r2:.6f}")
print(f"回归系数: {linear_model.coef_}")
print(f"截距: {linear_model.intercept_}")

In [None]:
plot_feature_importance(np.abs(linear_model.coef_).flatten())
plot_compare_line(linear_y1_pred.T[0],lr_r2)
plot_line(linear_y1_pred)

5.DNN

In [None]:
import random
import torch
import torch.nn as nn
import torch.optim as optim
device='cuda:0' if torch.cuda.is_available() else 'cpu'

def set_seed(seed):
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

X1_train_tensor=torch.FloatTensor(X1_train).to(device)
y1_train_tensor=torch.FloatTensor(y1_train).view(-1,1).to(device)
X1_test_tensor=torch.FloatTensor(X1_test).to(device)
y1_test_tensor=torch.FloatTensor(y1_test).view(-1,1).to(device)

class SimpleNN(nn.Module):
    def __init__(self, input_size):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)  # 第一个隐藏层
        self.fc2 = nn.Linear(64, 32)          # 第二个隐藏层
        self.fc3 = nn.Linear(32, 1)           # 输出层

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# 初始化模型
input_size = X1_train.shape[1]
model = SimpleNN(input_size).to(device)
# 3. 定义损失函数和优化器
criterion = nn.MSELoss().to(device)  # 均方误差损失
optimizer = optim.SGD(model.parameters(), lr=0.12)

# 4. 训练模型
num_epochs = 10000
for epoch in range(num_epochs):
    model.train()  # 设定训练模式
    optimizer.zero_grad()  # 梯度清零
    outputs = model(X1_train_tensor)  # 前向传播
    loss = criterion(outputs, y1_train_tensor)  # 计算损失
    loss.backward()  # 反向传播
    optimizer.step()  # 更新权重

    if (epoch + 1) % 1000 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

# 5. 评估模型
model.eval()  # 切换到评估模式
with torch.no_grad():
    predictions = model(X1_test_tensor)  # 预测
    test_loss = criterion(predictions, y1_test_tensor)  # 计算测试损失
    print(f'Test Loss: {test_loss.item():.4f}')

predictions=predictions.cpu().numpy().flatten()
predictions=scaler4.inverse_transform(pd.DataFrame({'pred':predictions}))
# Evaluate the model
dnn_mse = mean_squared_error(y1_test_recover, predictions)
dnn_rmse = np.sqrt(dnn_mse)  # Calculating RMSE
dnn_mae = mean_absolute_error(y1_test_recover, predictions)
dnn_mape = mean_absolute_percentage_error(y1_test_recover, predictions)
dnn_r2 = r2_score(y1_test_recover, predictions)
print(f"dnn_MSE: {dnn_mse:.6f}")
print(f"dnn_RMSE: {dnn_rmse:.6f}")
print(f"dnn_MAE: {dnn_mae:.6f}")
print(f"dnn_MAPE: {dnn_mape*100:.6f}")
print(f"dnn_R Square: {dnn_r2:.6f}")

In [None]:
# plot_feature_importance(model.)
plot_compare_line(predictions.T[0],dnn_r2)
plot_line(predictions)