In [None]:
# 基础库
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import sys
import os
import datetime
import pickle
# 机器学习库
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error
from sklearn.metrics.pairwise import rbf_kernel
# 数据归一化、逆归一化
from sklearn.preprocessing import MinMaxScaler
# 优化相关库
from skopt import gp_minimize
from scipy.optimize import minimize

# 深度学习库
import tensorflow as tf
import torch
import torch.nn as nn
import torch.optim as optim

# 忽略警告
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

# 中文字体设置
from matplotlib.font_manager import FontProperties
font = FontProperties(fname=r"c:\windows\fonts\simsun.ttc", size=12)  # 替换为你的中文字体文件路径

# 其他路径设置
sys.path.append(r"C:\Users\haokw\Documents\GitHub\gaolu\MPC\高炉")

# 自定义模块
import base 


In [None]:
# 检查 DataFrame 中是否包含 NaN 值
def check_if_NaN(data):
    print(data.shape)
    contains_nan = data.isna().any().any()
    if contains_nan:
        print("数据包含 NaN 值")
    else:
        print("数据不包含 NaN 值")


In [None]:
# 读取Excel文件
# excel_path = f'C:\\Users\\haokw\\Documents\\GitHub\\gaolu\\up2\\data\\data.xlsx'
# df_sheet_X = pd.read_excel(excel_path, sheet_name='X') 
# check_if_NaN(df_sheet_X)


excel_path = f'C:\\Users\\haokw\\Documents\\GitHub\\gaolu\\up2\\data\\data.xlsx'
df_sheet_Y = pd.read_excel(excel_path, sheet_name='Y') 


check_if_NaN(df_sheet_Y)



# 读取Excel文件
excel_path = f'C:\\Users\\haokw\\Documents\\GitHub\\gaolu\\up2\\data\\GAN_all_data.xlsx'
df_sheet_params1 = pd.read_excel(excel_path, sheet_name='Sheet1') 
df_sheet_params2 = pd.read_excel(excel_path, sheet_name='Sheet2') 
df_sheet_params3 = pd.read_excel(excel_path, sheet_name='Sheet3') 
df_sheet_params4 = pd.read_excel(excel_path, sheet_name='Sheet4') 
df_sheet_params = pd.concat([df_sheet_params1,df_sheet_params2,df_sheet_params3,df_sheet_params4],axis=0)
print('加载数据完成')




In [None]:
df_sheet_X = df_sheet_params.iloc[::20,:].reset_index()#返回第一行
check_if_NaN(df_sheet_X)


In [None]:
###########
os.system(r'C:\Users\haokw\Desktop\11111.mp3')


In [None]:
input_term =  ['富氧流量', '设定喷煤量', '热风压力', '热风温度']
output_term = ['铁水温度[MIT]', '铁水硅含量[SI]']
time_term=  '时间戳h'


In [None]:
# 创建数据框副本以避免修改原始数据
df_sheet_X_process = df_sheet_X.copy()
df_sheet_Y_process = df_sheet_Y.copy()


def IQR_process(df_IQR, columns):
    df_IQR = df_IQR
    columns = columns

    print(columns)      # 获取数据框的所有列名
    outlier_indices = set()  # 用于存储异常值的行索引

    # 1. 分别处理每个变量
    for column in columns:
        # 计算描述性统计
        stats = df_IQR[column].describe()

        # 计算IQR（四分位距）以及上下须的范围
        Q1 = stats['25%']
        Q3 = stats['75%']
        IQR = Q3 - Q1
        lower_whisker = Q1 - 1.5 * IQR
        upper_whisker = Q3 + 1.5 * IQR

        # # 绘制箱线图
        # plt.figure(figsize=(8, 6))
        # sns.boxplot(data=df_IQR[column])
        # plt.title(f'Boxplot of {column}', fontproperties=font)
        # plt.xlabel('Feature', fontproperties=font)
        # plt.ylabel('Value', fontproperties=font)
        # plt.show()

        # 查找异常值的索引
        outliers = df_IQR[(df_IQR[column] < lower_whisker) | 
                            (df_IQR[column] > upper_whisker)].index
        outlier_indices.update(outliers)

        # # 打印统计信息和异常值范围
        # print(f"列: {column}")
        # print(f"第一四分位数 (Q1): {Q1}")
        # print(f"第三四分位数 (Q3): {Q3}")
        # print(f"下须 (lower whisker): {lower_whisker}")
        # print(f"上须 (upper whisker): {upper_whisker}")
        # print(f"找到的异常值索引: {list(outliers)}")

        
        # print(f"异常值数量: {len(outliers)}")
        # print(f"总数: {len(df_IQR[column])}")

        # print(f"异常值比例: {len(outliers)/len(df_IQR[column])}\n")

    # 2. 删除所有异常值
    df_cleaned = df_IQR.drop(index=outlier_indices)
    # 重新设置索引，使索引从 0 开始，并丢弃旧索引
    df_cleaned.reset_index(drop=True, inplace=True)
    # 输出处理后的数据框信息
    print(f"原始数据行数: {df_IQR.shape[0]}")
    print(f"删除异常值后的数据行数: {df_cleaned.shape[0]}")

    # 你可以继续对 df_cleaned 进行后续处理



    return df_cleaned


df_cleaned_X = IQR_process(df_sheet_X_process, input_term)
df_cleaned_Y = IQR_process(df_sheet_Y_process, output_term)


In [None]:
# 画出数据
def plot_subplot(data_x_yuan,data_y_yuan,data_x,data_y,column):
    plt.plot(data_x_yuan,data_y_yuan,'r.')
    plt.plot(data_x,data_y,'m.')
    # plt.xlabel(time_term, fontproperties=font)  # 使用中文标签
    plt.ylabel(column, fontproperties=font)  # 使用中文标签
    # 使用中文标签



plt.figure(figsize=(15, 4))
for idx, column in enumerate(input_term):
    plt.subplot(len(input_term), 1, idx+1)
    plot_subplot(   df_sheet_X[time_term].values,   df_sheet_X[column].values, 
                    df_cleaned_X[time_term].values, df_cleaned_X[column].values,column
                )

plt.figure(figsize=(15, 2))
for idx, column in enumerate(output_term):
    plt.subplot(len(output_term), 1, idx+1)
    plot_subplot(   df_sheet_Y[time_term].values,   df_sheet_Y[column].values, 
                    df_cleaned_Y[time_term].values, df_cleaned_Y[column].values,column
                )



In [None]:
# 画出选取的数据
def plot_subplot(data_x,data_y,column,index_predict,index_gaolu):
    plt.plot(data_x,data_y,'-', label='origin_data')
    plt.plot(data_x[index_gaolu],data_y[index_gaolu],'r-', label='gaolu_data')
    plt.plot(data_x[index_predict],data_y[index_predict],'g-', label='predict_data')
    plt.legend()
    # plt.xlabel(time_term, fontproperties=font)  # 使用中文标签
    plt.ylabel(column, fontproperties=font)  # 使用中文标签

# 6509

length1 = 400
start1 = 0

length2 = 1
start2 = 6507


index_gaolu   = range(start1, start1+length1+1, 1)
index_predict     = range(start2, start2+length2+1, 1)
# index = range(1, 7572, 1)


plt.figure(figsize=(15, 6))
for idx, column in enumerate(input_term):
    plt.subplot(len(input_term+output_term), 1, idx+1)
    plot_subplot(df_cleaned_X[time_term].values, df_cleaned_X[column].values, column, index_predict, index_gaolu)



plt.figure(figsize=(15, 6))
for idx, column in enumerate(output_term):
    plt.subplot(len(input_term+output_term), 1, idx+1)
    plot_subplot(df_cleaned_Y[time_term].values,df_cleaned_Y[column].values,column,index_predict,index_gaolu)



In [None]:
# 将数据存储为字典，每个键对应一列数据
X_dict_original = {
    input_term[0]:   df_cleaned_X[input_term[0]].values,
    input_term[1]:   df_cleaned_X[input_term[1]].values,
    input_term[2]:   df_cleaned_X[input_term[2]].values,
    input_term[3]:   df_cleaned_X[input_term[3]].values
}
Y_dict_original = {
    output_term[0]:  df_cleaned_Y[output_term[0]].values,
    output_term[1]:  df_cleaned_Y[output_term[1]].values
}





X_dict_original_index_gaolu = {
    input_term[0]:   df_cleaned_X[input_term[0]][index_gaolu].values,
    input_term[1]:   df_cleaned_X[input_term[1]][index_gaolu].values,
    input_term[2]:   df_cleaned_X[input_term[2]][index_gaolu].values,
    input_term[3]:   df_cleaned_X[input_term[3]][index_gaolu].values
}
Y_dict_original_index_gaolu = {
    output_term[0]:  df_cleaned_Y[output_term[0]][index_gaolu].values,
    output_term[1]:  df_cleaned_Y[output_term[1]][index_gaolu].values
}




In [None]:
# 初始化缩放器
scalers_X = {}
scalers_Y = {}

# 进行拟合
for column, data in X_dict_original.items():
    scaler = MinMaxScaler(feature_range=(-1, 1))
    scaler.fit(data.reshape(-1, 1))  # 保证数据是列向量
    scalers_X[column] = scaler

for column, data in Y_dict_original.items():
    scaler = MinMaxScaler(feature_range=(-1, 1))
    scaler.fit(data.reshape(-1, 1))  # 保证数据是列向量
    scalers_Y[column] = scaler




# 进行归一化
X_dict_normal = {}
Y_dict_normal = {}
for column, scaler in scalers_X.items():
    X_dict_normal[column] = scaler.transform(X_dict_original[column].reshape(-1, 1)).flatten()
for column, scaler in scalers_Y.items():
    Y_dict_normal[column] = scaler.transform(Y_dict_original[column].reshape(-1, 1)).flatten()

# 转换为DataFrame
print('全部数据')
X_df_normal = pd.DataFrame(X_dict_normal)
check_if_NaN(X_df_normal)
Y_df_normal = pd.DataFrame(Y_dict_normal)
check_if_NaN(Y_df_normal)



# 高炉部分数据
# 进行归一化
X_dict_normal_index_gaolu = {}
Y_dict_normal_index_gaolu = {}
for column, scaler in scalers_X.items():
    X_dict_normal_index_gaolu[column] = scaler.transform(X_dict_original_index_gaolu[column].reshape(-1, 1)).flatten()
for column, scaler in scalers_Y.items():
    Y_dict_normal_index_gaolu[column] = scaler.transform(Y_dict_original_index_gaolu[column].reshape(-1, 1)).flatten()

# 转换为DataFrame
print('高炉部分数据')
X_df_normal_index_gaolu = pd.DataFrame(X_dict_normal_index_gaolu)
check_if_NaN(X_df_normal_index_gaolu)
Y_df_normal_index_gaolu = pd.DataFrame(Y_dict_normal_index_gaolu)
check_if_NaN(Y_df_normal_index_gaolu)








In [None]:
# 绘制叠加的散点图矩阵。


def plot_scatter_matrix(X_df_normal, X_df_normal_index_gaolu, figsize=(10, 8),font=font, save_path=None):
    """
    绘制叠加的散点图矩阵。

    参数:
    X_df_normal (DataFrame): 第一组数据。
    X_df_normal_index_gaolu (DataFrame): 第二组数据。
    font (FontProperties, optional): 字体属性，用于设置标签的字体。
    """
    # 设置颜色和标记
    color_left = 'blue'
    color_right = 'red'
    marker_left = '.'
    marker_right = '.'

    # 设置数据
    df_left = X_df_normal  # 第一组数据
    df_right = X_df_normal_index_gaolu  # 第二组数据

    # 绘制叠加的散点图矩阵
    plt.figure(figsize = figsize)
    num_cols = len(df_left.columns)
    
    for i, col1 in enumerate(df_left.columns):
        for j, col2 in enumerate(df_left.columns):
            ax = plt.subplot(num_cols, num_cols, i * num_cols + j + 1)
            
            if i != j:
                ax.scatter(df_left[col1], df_left[col2], color=color_left, alpha=0.5, marker=marker_left, label='Left Data' if i == 0 and j == 1 else "")
                ax.scatter(df_right[col1], df_right[col2], color=color_right, alpha=0.5, marker=marker_right, label='Right Data' if i == 0 and j == 1 else "")
                ax.set_xlim([-1, 1])
                ax.set_ylim([-1, 1])
            else:
                ax.hist(df_left[col1], bins=50, alpha=0.5, color=color_left)
                ax.hist(df_right[col1], bins=50, alpha=0.5, color=color_right)
                ax.set_xlim([-1, 1])

            if i == num_cols - 1:
                ax.set_xlabel(col2, fontproperties=font)
            if j == 0:
                ax.set_ylabel(col1, fontproperties=font)

    # # 添加图例
    # plt.legend(loc='upper right', bbox_to_anchor=(1.5, 1))
    # 手动添加图例
    handles, labels = ax.get_legend_handles_labels()
    ax.legend(handles, labels, loc='upper right', bbox_to_anchor=(1.5, 1))

    # 添加标题并调整布局
    plt.suptitle('Overlaid Scatter Matrix of Features', y=1.02)
    plt.tight_layout()
    
    # 如果提供了保存路径，则保存图像
    if save_path:
        plt.savefig(save_path)
        plt.close()  # 关闭当前的图像，以节省内存
    else:
        plt.show()  # 否则显示图像






In [None]:
# 绘制散点图矩阵
plot_scatter_matrix(X_df_normal, X_df_normal_index_gaolu, font=font, figsize=(10, 8))
# 绘制散点图矩阵
plot_scatter_matrix(Y_df_normal, Y_df_normal_index_gaolu, font=font, figsize=(5, 4))


In [None]:
# 转换为DataFrame
print('高炉部分数据')
X_df_normal_index_gaolu = pd.DataFrame(X_dict_normal_index_gaolu)
check_if_NaN(X_df_normal_index_gaolu)
Y_df_normal_index_gaolu = pd.DataFrame(Y_dict_normal_index_gaolu)
check_if_NaN(Y_df_normal_index_gaolu)



# 转换为DataFrame
print('全部数据')
X_df_normal = pd.DataFrame(X_dict_normal)
check_if_NaN(X_df_normal)
Y_df_normal = pd.DataFrame(Y_dict_normal)
check_if_NaN(Y_df_normal)



In [None]:
print(min(X_df_normal.shape[0],Y_df_normal.shape[0]))


In [None]:
print(X_df_normal.shape)
print(Y_df_normal.shape)
print(df_cleaned_X.info())
print(df_cleaned_Y.info())



In [None]:
isShuffle = True
isShuffle = False
time_steps = 2
test_size = 0.15
val_size = 0.15
train_size = 1-val_size-test_size


In [None]:
# 组合训练数据--拆分训练、测试集

# 定义时间步数和特征数

# 构成    
# X = [X(t),X(t-1),Y(t-1)]
# Y = [Y(t)]
def make_data(X_df_normal,Y_df_normal,index_fanwei,ifprint):
    X_modified = []
    y_modified = []


    for i in range(0, min(X_df_normal.shape[0],Y_df_normal.shape[0])):
        # print(i)
        if i in index_fanwei:
            # print(i)

            Y_time = df_cleaned_Y[time_term][i]
            # print('输出时间：',df_cleaned_Y[time_term][i])

            closest_10 = df_cleaned_X[df_cleaned_X[time_term] <= Y_time].nlargest(time_steps, time_term)
            # print(closest_10)
            # 检查 closest_10 是否为空
            if closest_10.empty:
                print("No closest values found. closest_10 is empty.")
                continue
                
            index = closest_10.index
            # print(list(index))
            # print(closest_10.iloc[-1][time_term])
            # print(Y_time - time_steps + 1 )

            if closest_10.iloc[-1][time_term] != Y_time - time_steps + 1 :
                # print(i,Y_time)
                print(i,',t',Y_time,',||||  t',closest_10.iloc[0][time_term],',t-time_steps',closest_10.iloc[-1][time_term],'index',index[0],index[-1],'errloss')
            else:
                # print(X_df_normal.loc[index])
                # 拼接行数据 (axis=0 表示纵向拼接)
                new_x_sample = np.concatenate([X_df_normal.loc[i, :].values for i in index], axis=0)
                # print(new_x_sample)
                y_last = Y_df_normal.loc[i-1]
                
                # print(y_last, 'y_last time : ',df_cleaned_Y[time_term][i-1])

                new_x_sample = np.concatenate([new_x_sample,y_last],axis=0)
                # print(new_x_sample)
                y_sample = Y_df_normal.loc[i]
                # print(y_sample)
                X_modified.append(new_x_sample)
                y_modified.append(y_sample)
                print(i,',t',Y_time,',t',closest_10.iloc[0][time_term],',t-time_steps',closest_10.iloc[-1][time_term],'index',index[0],index[-1])


            # break
    
    # 将列表转换为 NumPy 数组
    
    # 查看二维列表的形状
    rows = len(X_modified)
    columns = len(X_modified[0]) if rows > 0 else 0
    print(f"二维列表的形状: ({rows}, {columns})")



    X_modified = np.array(X_modified)
    y_modified = np.array(y_modified)
    X_reshaped = X_modified.reshape((X_modified.shape[0], X_modified.shape[1]))

    # 打印新数据的形状
    print("Modified Input Shape:", X_reshaped.shape)
    print("Modified Output Shape:", y_modified.shape)


    X_train, X_test, y_train, y_test = train_test_split(X_reshaped, y_modified, 
                                                        test_size=test_size, 
                                                        random_state=42, 
                                                        shuffle=isShuffle)

    # 将剩余的70%训练数据再次拆分成训练数据和验证数据（20%验证数据，50%训练数据）
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, 
                                                        test_size=val_size/(train_size+val_size), 
                                                        random_state=42, 
                                                        shuffle=isShuffle)

    print('训练数量：',X_train.shape,y_train.shape)
    print('验证数量：',X_val.shape,y_val.shape)
    print('测试数量：',X_test.shape,y_test.shape)

    return X_train, X_val, X_test, y_train, y_val, y_test






In [None]:
print('高炉模型数据')
X_gaolu_train, X_gaolu_val, X_gaolu_test,\
y_gaolu_train, y_gaolu_val, y_gaolu_test = make_data(X_df_normal,Y_df_normal,
                                                    index_gaolu,ifprint = True)


In [None]:
# 在三角形内部生成随机点
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Polygon
from matplotlib.path import Path

# 设置随机种子
np.random.seed(42)

def generate_triangle_data(vertices, num_samples):
    # 创建三角形的路径
    triangle_path = Path(vertices)
    
    # 初始化点
    points = []
    
    # 在三角形内部生成随机点
    while len(points) < num_samples:
        # 生成随机点
        random_point = np.random.rand(2)
        
        # 检查点是否在三角形内部
        if triangle_path.contains_points([random_point]):
            points.append(random_point)
    
    return np.array(points)

# 定义三角形顶点
vertices = np.array([
    [0, 0],  # 顶点A
    [1, 0],  # 顶点B
    [0.5, 1]  # 顶点C
])

# 生成数据
num_samples = 10000
data_test = generate_triangle_data(vertices, num_samples)

# # 可视化数据
# plt.figure(figsize=(8, 8))
# plt.fill(vertices[:, 0], vertices[:, 1], 'lightgray', edgecolor='black', alpha=0.5)  # 填充三角形
# plt.scatter(data_test[:, 0], data_test[:, 1], c='blue', alpha=0.5, s=1)
# plt.title('Random Points Inside a Triangle')
# plt.xlabel('X')
# plt.ylabel('Y')
# plt.axis('equal')
# plt.show()


# 将数据转换为 DataFrame
data_test = pd.DataFrame(data_test, columns=['X', 'Y'])


In [2]:
# 将历史数据转换为 PyTorch 张量
data_item = X_df_normal
# data_item = data_test

data = torch.tensor(data_item.values, dtype=torch.float32)
df_WGAN = data_item
test_sample_num = data.shape[0]



# 超参数
z_dim = 5  # 随机噪声维度
data_dim = data.shape[1]  # 数据维度，4
learning_rate = 0.0002
num_epochs = 200
n_critic = 5  # 每次更新生成器前，更新 Critic 的次数

print_piture_d = 10

# 设置数据加载器
batch_size = 512
data_loader = torch.utils.data.DataLoader(data, batch_size=batch_size, shuffle=True)



NameError: name 'X_df_normal' is not defined

In [None]:
# W-GAN模型定义和训练
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
import os

# WGAN的生成器
class Generator(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(True),
            nn.Linear(128, 256),
            nn.ReLU(True),
            nn.Linear(256, output_dim),
            nn.Tanh()
        )

    def forward(self, x):
        return self.model(x)

# WGAN的Critic
class Critic(nn.Module):
    def __init__(self, input_dim):
        super(Critic, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, 128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        return self.model(x)

# 权重裁剪
def weight_clipping(critic, clip_value=0.01):
    for param in critic.parameters():
        param.data.clamp_(-clip_value, clip_value)

# WGAN训练
def train_wgan(generator, critic, data_loader, num_epochs, z_dim, clip_value, n_critic, optimizer_G, optimizer_C, output_dir):
    
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    for epoch in range(num_epochs):
        for real_data in data_loader:
            batch_size = real_data.size(0)

            # 更新 Critic
            for _ in range(n_critic):
                z = torch.randn(batch_size, z_dim)
                fake_data = generator(z)

                # 计算 Critic 的损失（Wasserstein 距离）
                real_output = critic(real_data)
                fake_output = critic(fake_data.detach())
                c_loss = -torch.mean(real_output) + torch.mean(fake_output)

                optimizer_C.zero_grad()
                c_loss.backward()
                optimizer_C.step()

                # 对 Critic 权重进行裁剪
                weight_clipping(critic, clip_value)

            # 更新生成器
            z = torch.randn(batch_size, z_dim)
            fake_data = generator(z)
            fake_output = critic(fake_data)
            g_loss = -torch.mean(fake_output)

            optimizer_G.zero_grad()
            g_loss.backward()
            optimizer_G.step()

        print(f"WGAN_training Epoch [{epoch}/{num_epochs}], c_loss: {c_loss.item():.4f}, g_loss: {g_loss.item():.4f}")

        if epoch % print_piture_d == print_piture_d-1:
            # 生成一些数据
            z = torch.randn(test_sample_num, z_dim)
            generated_data = generator(z).detach().numpy()

            # 将生成的数据转换为 DataFrame
            generated_df = pd.DataFrame(generated_data, columns=df_WGAN.columns)

            # 获取当前时间
            current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            # 构建保存路径和文件名
            filename = f"{current_time}_WGAN_Epoch_{epoch+1}_c_loss_{c_loss.item():.4f}_g_loss_{g_loss.item():.4f}.png"
            save_path = os.path.join(output_dir, filename)

            # 可视化生成的数据分布（你可以实现自己的可视化函数）
            plot_scatter_matrix(df_WGAN, generated_df, figsize=(10, 8), font=font, save_path=save_path)


In [None]:
# LS-GAN模型定义和训练
class LSGANGenerator(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(LSGANGenerator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(True),
            nn.Linear(128, 256),
            nn.ReLU(True),
            nn.Linear(256, output_dim),
            nn.Tanh()
        )

    def forward(self, x):
        return self.model(x)

# LSGAN的判别器
class LSGANDiscriminator(nn.Module):
    def __init__(self, input_dim):
        super(LSGANDiscriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, 128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        return self.model(x)

# LSGAN的损失函数
def generator_loss(fake_output):
    return torch.mean((fake_output - 1) ** 2)

def discriminator_loss(real_output, fake_output):
    real_loss = torch.mean((real_output - 1) ** 2)
    fake_loss = torch.mean(fake_output ** 2)
    return real_loss + fake_loss

# LSGAN训练
def train_lsgan(generator, discriminator, data_loader, num_epochs, z_dim, optimizer_G, optimizer_D, output_dir):
    
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    for epoch in range(num_epochs):
        for real_data in data_loader:
            batch_size = real_data.size(0)

            # 生成数据
            z = torch.randn(batch_size, z_dim)
            fake_data = generator(z)

            # 判别器前向传播
            real_output = discriminator(real_data)
            fake_output = discriminator(fake_data.detach())

            # 计算判别器损失
            d_loss = discriminator_loss(real_output, fake_output)
            optimizer_D.zero_grad()
            d_loss.backward()
            optimizer_D.step()

            # 生成器前向传播
            fake_output = discriminator(fake_data)

            # 计算生成器损失
            g_loss = generator_loss(fake_output)
            optimizer_G.zero_grad()
            g_loss.backward()
            optimizer_G.step()

        print(f"LS-GAN_training Epoch [{epoch}/{num_epochs}], D_loss: {d_loss.item():.4f}, G_loss: {g_loss.item():.4f}")

        if epoch % print_piture_d == print_piture_d-1:
            # 生成数据test_sample_num
            z = torch.randn(test_sample_num, z_dim)
            generated_data = generator(z).detach().numpy()

            # 将生成的数据转换为 DataFrame
            generated_df = pd.DataFrame(generated_data, columns=df_WGAN.columns)

            # 获取当前时间
            current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

            # 构建保存路径和文件名
            filename = f"{current_time}_LSGAN_Epoch_{epoch+1}_D_loss_{d_loss.item():.4f}_G_loss_{g_loss.item():.4f}.png"
            save_path = os.path.join(output_dir, filename)

            # 可视化生成的数据分布（你可以实现自己的可视化函数）
            plot_scatter_matrix(df_WGAN, generated_df, figsize=(10, 8), font=font, save_path=save_path)


In [None]:
# 初始化WGAN生成器和Critic
wgan_generator = Generator(input_dim=z_dim, output_dim=data_dim)
wgan_critic = Critic(input_dim=data_dim)
# 初始化优化器
optimizer_G = optim.Adam(wgan_generator.parameters(), lr=learning_rate, betas=(0.2, 0.999))
optimizer_C = optim.Adam(wgan_critic.parameters(), lr=learning_rate, betas=(0.2, 0.999))
output_dir = r"data\train_output_picture\WGAN_training_output_all_data"
# 训练WGAN
train_wgan(wgan_generator, wgan_critic, data_loader, 
                num_epochs=num_epochs, z_dim=z_dim, clip_value=0.01, 
                n_critic = 5, 
                optimizer_G=optimizer_G, optimizer_C=optimizer_C, 
                output_dir=output_dir)


In [None]:
###########
os.system(r'C:\Users\haokw\Desktop\11111.mp3')


In [None]:
# 初始化LSGAN优化器
# # 使用WGAN生成的数据初始化LSGAN
lsgan_generator = LSGANGenerator(input_dim=z_dim, output_dim=data_dim)
lsgan_discriminator = LSGANDiscriminator(input_dim=data_dim)

optimizer_G = optim.Adam(lsgan_generator.parameters(), lr=learning_rate, betas=(0.2, 0.999))
optimizer_D = optim.Adam(lsgan_discriminator.parameters(), lr=learning_rate, betas=(0.2, 0.999))
output_dir = r"data\train_output_picture\LS-GAN_training_output_all_data"
# 使用WGAN生成的数据作为输入进行LSGAN训练
train_lsgan(lsgan_generator, lsgan_discriminator, data_loader, 
                num_epochs=num_epochs, z_dim=z_dim, 
                optimizer_G=optimizer_G, optimizer_D=optimizer_D, 
                output_dir=output_dir)


In [None]:
# 生成样本测试
z = torch.randn(10000, z_dim)
print(z.shape)

generated_data = lsgan_generator(z).detach().numpy()
print(generated_data.shape)

# 将生成的数据转换为 DataFrame
generated_df = pd.DataFrame(generated_data, columns=df_WGAN.columns)

# 可视化生成的数据分布（你可以实现自己的可视化函数）
plot_scatter_matrix(df_WGAN, generated_df, figsize=(10, 8), font=font)


In [None]:
# 各种参数、信息数据的整理保存

# 缩放器
with open(r'data\scalers\scalers_X.pkl', 'wb') as f:
    pickle.dump(scalers_X, f)
with open(r'data\scalers\scalers_Y.pkl', 'wb') as f:
    pickle.dump(scalers_Y, f)


# 保存模型参数
def save_model(generator, discriminator, output_dir, epoch):
    # 创建保存目录（如果不存在）
    os.makedirs(output_dir, exist_ok=True)

    # 定义文件名
    generator_filename = os.path.join(output_dir, f"generator.pth")
    discriminator_filename = os.path.join(output_dir, f"discriminator.pth")

    # 保存生成器的模型参数
    torch.save(generator.state_dict(), generator_filename)
    print(f"Generator model saved to {generator_filename}")

    # 保存判别器的模型参数
    torch.save(discriminator.state_dict(), discriminator_filename)
    print(f"Discriminator model saved to {discriminator_filename}")

save_model(lsgan_generator, lsgan_discriminator, 
        output_dir=r"data\model_params\lsgan_model", 
        epoch=num_epochs)

save_model(wgan_generator, wgan_critic, 
        output_dir=r"data\model_params\wgan_model", 
        epoch=num_epochs)


In [None]:
# LSGAN的生成器   初始化   加载生成器参数  获取 PyTorch 模型参数  构建numpy版本  初始化numpy生成器
class LSGANGenerator_item(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(LSGANGenerator_item, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(True),
            nn.Linear(128, 256),
            nn.ReLU(True),
            nn.Linear(256, output_dim),
            nn.Tanh()
        )

    def forward(self, x):
        return self.model(x)



# 假设输入维度和输出维度如下
data_dim = 4  # 数据维度

# 初始化生成器和判别器
lsgan_generator_item = LSGANGenerator_item(input_dim=z_dim, output_dim=data_dim)



# 加载生成器参数
generator_path = r"data\model_params\lsgan_model\lsgan_generator_epoch_200.pth"
lsgan_generator_item.load_state_dict(torch.load(generator_path))
lsgan_generator_item.eval()  # 切换到评估模式
print(f"Generator model loaded from {generator_path}")




# 假设 lsgan_generator_item 已经是训练好的模型
def extract_pytorch_params(model):
    params = {}
    for name, param in model.named_parameters():
        params[name] = param.data.numpy()
    return params
# 获取 PyTorch 模型参数
lsgan_generator_item_params = extract_pytorch_params(lsgan_generator_item)





# 构建numpy版本
class LSGANGeneratorNumpy:
    def __init__(self, input_dim, output_dim, params):
        self.input_dim = input_dim
        self.output_dim = output_dim
        
        # 使用从 PyTorch 模型中提取的参数初始化
        self.W1 = params['model.0.weight'].T  # 转置以匹配 numpy 矩阵乘法
        self.b1 = params['model.0.bias']
        
        self.W2 = params['model.2.weight'].T
        self.b2 = params['model.2.bias']
        
        self.W3 = params['model.4.weight'].T
        self.b3 = params['model.4.bias']
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def tanh(self, x):
        return np.tanh(x)
    
    def forward(self, x):
        z1 = np.dot(x, self.W1) + self.b1
        a1 = self.relu(z1)
        
        z2 = np.dot(a1, self.W2) + self.b2
        a2 = self.relu(z2)
        
        z3 = np.dot(a2, self.W3) + self.b3
        output = self.tanh(z3)
        
        return output
    
# 初始化numpy生成器
lsgan_generator_numpy = LSGANGeneratorNumpy(z_dim, data_dim, lsgan_generator_item_params)

print('参数已迁移')



# 从文件中加载 scalers_X 和 scalers_Y
with open(r'data\scalers\scalers_X.pkl', 'rb') as f:
    scalers_X = pickle.load(f)

with open(r'data\scalers\scalers_Y.pkl', 'rb') as f:
    scalers_Y = pickle.load(f)

print('scalers参数已迁移')



In [None]:
# 验证环节
def series2U(z, M, scalers_X, input_term, isprint = True):

    if(isprint):print(z.shape)

    # 将 z 重新整理成 (M, z_dim) 的二维数组
    z_reshaped = z.reshape(M, z_dim)

    # 将 numpy 数组转换为 tensor，指定 dtype 为 float32
    z_tensor = torch.from_numpy(z_reshaped).float()
    if(isprint):print(z_tensor.shape)

    # z_tensor = torch.randn(2, z_dim)
    generated_data = lsgan_generator(z_tensor).detach().numpy()

    if(isprint):print(generated_data.shape)
    if(isprint):print(generated_data)

    # 分别提取 U1, U2, U3, U4
    U1 = generated_data[:, 0]
    U2 = generated_data[:, 1]
    U3 = generated_data[:, 2]
    U4 = generated_data[:, 3]

    # 将 U1, U2, U3, U4 连接成一个序列
    sequence = np.concatenate((U1, U2, U3, U4))

    if(isprint):print("U1:", U1)
    if(isprint):print("U2:", U2)
    if(isprint):print("U3:", U3)
    if(isprint):print("U4:", U4)
    if(isprint):print("Connected sequence:", sequence)


    U1_inverse = scalers_X[input_term[0]].inverse_transform(U1.reshape(-1, 1)).flatten()
    U2_inverse = scalers_X[input_term[1]].inverse_transform(U2.reshape(-1, 1)).flatten()
    U3_inverse = scalers_X[input_term[2]].inverse_transform(U3.reshape(-1, 1)).flatten()
    U4_inverse = scalers_X[input_term[3]].inverse_transform(U4.reshape(-1, 1)).flatten()

    if(isprint):print("U1_inverse:", U1_inverse)
    if(isprint):print("U2_inverse:", U2_inverse)
    if(isprint):print("U3_inverse:", U3_inverse)
    if(isprint):print("U4_inverse:", U4_inverse)

    return sequence,U1_inverse,U2_inverse,U3_inverse,U4_inverse


def series2U_numpy(z, M, scalers_X, input_term, isprint = True):

    if(isprint):print(z.shape)

    # 生成一些随机数据进行测试
    x =  z.reshape(M, z_dim)

    # 前向传播
    generated_data = lsgan_generator_numpy.forward(x)

    if(isprint):print("Generated data shape:", generated_data.shape)
    if(isprint):print("Generated data:\n", generated_data)


    # 分别提取 U1, U2, U3, U4
    U1 = generated_data[:, 0]
    U2 = generated_data[:, 1]
    U3 = generated_data[:, 2]
    U4 = generated_data[:, 3]

    # 将 U1, U2, U3, U4 连接成一个序列
    sequence = np.concatenate((U1, U2, U3, U4))

    if(isprint):print("U1:", U1)
    if(isprint):print("U2:", U2)
    if(isprint):print("U3:", U3)
    if(isprint):print("U4:", U4)
    if(isprint):print("Connected sequence:", sequence)

    U1_inverse = scalers_X[input_term[0]].inverse_transform(U1.reshape(-1, 1)).flatten()
    U2_inverse = scalers_X[input_term[1]].inverse_transform(U2.reshape(-1, 1)).flatten()
    U3_inverse = scalers_X[input_term[2]].inverse_transform(U3.reshape(-1, 1)).flatten()
    U4_inverse = scalers_X[input_term[3]].inverse_transform(U4.reshape(-1, 1)).flatten()

    if(isprint):print("U1_inverse:", U1_inverse)
    if(isprint):print("U2_inverse:", U2_inverse)
    if(isprint):print("U3_inverse:", U3_inverse)
    if(isprint):print("U4_inverse:", U4_inverse)

    return sequence,U1_inverse,U2_inverse,U3_inverse,U4_inverse



In [None]:

# 定义 M 和 z_dim
M = 3      # 样本数量


# 使用 numpy 生成标准正态分布的随机数，形状为 (z_dim * M,)
z = np.random.randn(z_dim * M)
# generated_data = series2U(z,M,isprint=True)
# generated_data_numpy = series2U_numpy(z,M,isprint=True)

# print("验证原模型与numpy模型的输出是否一致：")
# result_d_state = np.fabs(generated_data-generated_data_numpy)<1e-6
# # print(result_d_state)
# print('错误数量：',np.sum(result_d_state==False),'，正确数量：',np.sum(result_d_state==True))


generated_data,U1_inverse,U2_inverse,U3_inverse,U4_inverse = series2U_numpy(z,M, scalers_X, input_term,isprint=True)

