In [1]:
# 导入必要的库
import os
import glob
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
import random
import csv

In [None]:
# 检查CUDA是否可用
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Data_path = '/home/ubuntu/data/workspace/Li/Data/'
save_path = '/home/ubuntu/data/workspace/Li/Model/Python_code_FFT_Amp/'

In [3]:
M = 9900
batch_size = 1

# 定义数据集类
class SignalDataset(Dataset):
    def __init__(self, generation_signals, labels, experimental_signal):
        self.generation_signals = torch.tensor(generation_signals, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.float32)
        self.experimental_signal = torch.tensor(experimental_signal, dtype=torch.float32)

    def __len__(self):
        return len(self.generation_signals)

    def __getitem__(self, idx):
        return {
            'generation_signal': self.generation_signals[idx],
            'labels': self.labels[idx],
            'experimental_signal': self.experimental_signal[idx]
        }

In [4]:
class ConvAutoencoder(nn.Module):
    def __init__(self, signal_length, label_dim, output_shape, hidden_dim, dropout_rate):
        super(ConvAutoencoder, self).__init__()
        
        # 输入信号处理路径（编码器部分）
        self.encoder_signal = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=hidden_dim//4, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Conv1d(in_channels=hidden_dim//4, out_channels=hidden_dim//8, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Conv1d(in_channels=hidden_dim//8, out_channels=hidden_dim//16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        
        # 标签处理路径（MLP）
        self.label_fc = nn.Sequential(
            nn.Linear(label_dim, hidden_dim//4),
            nn.ReLU(),
            nn.Linear(hidden_dim//4, hidden_dim//8),
            nn.ReLU(),
            nn.Linear(hidden_dim//8, hidden_dim//16)
        )
        
        # 合并后的输出路径（解码器部分）
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(in_channels=hidden_dim//16 + hidden_dim//16, out_channels=hidden_dim//8, kernel_size=2, stride=2),
            nn.ReLU(),
            nn.ConvTranspose1d(in_channels=hidden_dim//8, out_channels=hidden_dim//4, kernel_size=2, stride=2),
            nn.ReLU(),
            nn.ConvTranspose1d(in_channels=hidden_dim//4, out_channels=1, kernel_size=6, stride=2)
        )
        
        # 最后的输出层
        self.output_layer = nn.Linear(signal_length, output_shape)

    def forward(self, signal, label):
        # 处理信号 [batch, 1, signal_length]
        signal_features = self.encoder_signal(signal)  # 编码后的特征 [batch, hidden_dim//16, signal_length // 8]
        
        # 处理标签 [batch, label_dim]
        label_features = self.label_fc(label)  # [batch, hidden_dim//64]
        
        # 将标签特征扩展为与信号特征相同的长度
        label_features = label_features.unsqueeze(2).repeat(1, 1, signal_features.size(2))
        
        # 合并特征 [batch, hidden_dim//16 + hidden_dim//64, signal_length // 8]
        combined = torch.cat([signal_features, label_features], dim=1)

        # 解码 [batch, 1, signal_length]
        decoded = self.decoder(combined)
        
        # 输出 [batch, output_shape]
        output = self.output_layer(decoded.view(decoded.size(0), -1))
        return output.view(-1, 1, M)  # 保持输入输出形状一致

In [5]:
def load_data(generation_signal_path, experimental_signal_path, M):
    generation_signals = []  # 存储生成信号
    labels = []  # 存储标签 (Force_label, HI_label, Distance_label, Time_label)
    experimental_signals = []  # 存储输出参数
    filenames = []  # 存储文件名

    # 加载生成信号数据
    generation_dataPaths = sorted(glob.glob(os.path.join(generation_signal_path, '**', '*.csv'), recursive=True))
    for dataPath in generation_dataPaths:

        filename = os.path.basename(dataPath)
        parts = filename.split('_')
        Force_label = float(parts[0])
        HI_label = float(parts[1])
        Distance_label = float(parts[3])
        Time_label = float(parts[2])

        data_1 = pd.read_csv(dataPath)
        for N in range(len(data_1)):
            if data_1.iloc[N, 1] != 0:
                break
        if N < len(data_1):
            generation_signal = data_1.iloc[N:N+M, 1].values
        else:
            continue

        # 分别存储 generation_signal 和 labels
        generation_signals.append(generation_signal)
        labels.append([Force_label, HI_label, Distance_label, Time_label])
        filenames.append(filename)

    # 加载实验信号数据
    experimental_dataPaths = sorted(glob.glob(os.path.join(experimental_signal_path, '**', '*.csv'), recursive=True))
    for dataPath in experimental_dataPaths:
        data_1 = pd.read_csv(dataPath)
        filename = os.path.basename(dataPath)

        for N in range(len(data_1)):
            if data_1.iloc[N, 1] != 0:
                break
        if N < len(data_1):
            experimental_signal = data_1.iloc[N:N+M, 1].values
            experimental_signals.append(experimental_signal)  # 将 experimental_signal 添加到对应的 outputs 元素中

    # 确保 generation_signals、labels 和 outputs 的长度一致
    if len(generation_signals) != len(experimental_signals) or len(labels) != len(experimental_signals):
        print("Error: The number of inputs and outputs does not match.")
        return None, None, None, None, None

    # 转换为 NumPy 数组
    generation_signals = np.array(generation_signals, dtype=float)
    labels = np.array(labels, dtype=float)
    experimental_signals = np.array(experimental_signals, dtype=float)

    return generation_signals, labels, experimental_signals, filenames

In [None]:
generation_signal_path = f'{Data_path}Hanning_signal_generation'
experimental_signal_path = f'{Data_path}Hanning_Signal_Experiment(10000)'

# 加载数据
filenames = os.listdir(generation_signal_path)  # 假设两个文件夹的文件名是一一对应的

# 控制加载的数据量
n = 100  # 假设只加载 n% 的数据
num_samples = int(len(filenames) * n / 100)
sampled_filenames = random.sample(filenames, num_samples)

# 根据采样的文件名加载数据
# 假设 load_data 函数已经定义好
generation_signals, labels, experimental_signal, filenames = load_data(generation_signal_path, experimental_signal_path, M)

# 将所有数据打乱
shuffled_indices = np.arange(len(generation_signals))
np.random.shuffle(shuffled_indices)

generation_signals = generation_signals[shuffled_indices]
labels = labels[shuffled_indices]
experimental_signal = experimental_signal[shuffled_indices]

# 随机划分训练集、验证集和测试集
# 比例为 4:2:4，即 40% 训练集，20% 验证集，40% 测试集
train_val_indices, test_indices = train_test_split(range(len(sampled_filenames)), test_size=0.4)
train_indices, val_indices = train_test_split(train_val_indices, test_size=0.166)  # 20% / (40% + 20%) = 1/3

# 提取测试集的文件名
test_filenames = [sampled_filenames[i] for i in test_indices]

# 其余代码保持不变
generation_signals_train = generation_signals[train_indices]
labels_train = labels[train_indices]
experimental_signal_train = experimental_signal[train_indices]

generation_signals_val = generation_signals[val_indices]
labels_val = labels[val_indices]
experimental_signal_val = experimental_signal[val_indices]

generation_signals_test = generation_signals[test_indices]
labels_test = labels[test_indices]
experimental_signal_test = experimental_signal[test_indices]

# 增加一个维度，从 [batch_size, N] 变为 [batch_size, 1, N]
generation_signals_train = np.expand_dims(generation_signals_train, axis=1)
experimental_signal_train = np.expand_dims(experimental_signal_train, axis=1)

generation_signals_val = np.expand_dims(generation_signals_val, axis=1)
experimental_signal_val = np.expand_dims(experimental_signal_val, axis=1)

generation_signals_test = np.expand_dims(generation_signals_test, axis=1)
experimental_signal_test = np.expand_dims(experimental_signal_test, axis=1)

print(f'generation_signals_train:{generation_signals_train.shape}')
print(f'generation_signals_val:{generation_signals_val.shape}')
print(f'generation_signals_test:{generation_signals_test.shape}')

# 创建数据集
train_dataset = SignalDataset(generation_signals_train, labels_train, experimental_signal_train)
val_dataset = SignalDataset(generation_signals_val, labels_val, experimental_signal_val)
test_dataset = SignalDataset(generation_signals_test, labels_test, experimental_signal_test)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
generation_signals_test_1 = generation_signals_test.reshape(-1,M,1)
plt.style.use('default')
plt.figure(figsize=(10,6))
plt.rcParams['font.family'] = ['Times New Roman']
plt.plot(generation_signals_test_1[1],linewidth=1.5)
plt.xlabel('Sample point',fontdict={'weight': 'normal', 'size': 18})
plt.ylabel('Amplitude(v)',fontdict={'weight': 'normal', 'size': 18})
#坐标轴刻度大小设置
plt.tick_params(axis='both', which='major', labelsize=15)
plt.xlim([0,M])
plt.savefig(f'{save_path}Forward_problem/Generation_signal_test.jpg', dpi=600, bbox_inches='tight')

In [None]:
experimental_signal_test_1 = experimental_signal_test.reshape(-1,M,1)
plt.style.use('default')
plt.figure(figsize=(10,6))
plt.rcParams['font.family'] = ['Times New Roman']
plt.plot(experimental_signal_test_1[25],linewidth=1.5)
plt.xlabel('Sample point',fontdict={'weight': 'normal', 'size': 18})
plt.ylabel('Amplitude(V)',fontdict={'weight': 'normal', 'size': 18})
#坐标轴刻度大小设置
plt.tick_params(axis='both', which='major', labelsize=15)
plt.xlim([0,M])
plt.savefig(f'{save_path}Forward_problem/Experiment_signal_test.jpg', dpi=600, bbox_inches='tight')

In [None]:
# 训练模型
model=torch.load(f'{save_path}Forward_problem/Forward_Model.pth')  # 加载训练好的模型参数
model.to(device)

In [10]:
def calculate_fft(signal, sampling_rate):
    N = len(signal)  # 信号长度
    fft_values = np.fft.fft(signal)  # 计算FFT
    fft_magnitude = np.abs(fft_values)  # 取模
    fft_magnitude = fft_magnitude[:N // 2]  # 只取一半（正频率部分）
    freq = np.fft.fftfreq(N, d=1 / sampling_rate)[:N // 2]  # 频率轴
    return freq, fft_magnitude

In [11]:
def calculate_pcc(x, y):
    """
    计算两个信号的皮尔逊相关系数(PCC)。

    参数:
        x (torch.Tensor): 第一个信号，假定在 CUDA 上。
        y (torch.Tensor): 第二个信号，假定在 CUDA 上。

    返回:
        float: 皮尔逊相关系数。
    """
    
    # 计算均值
    mean_x = torch.mean(x)
    mean_y = torch.mean(y)
    
    # 计算协方差
    covariance = torch.mean((x - mean_x) * (y - mean_y))
    
    # 计算标准差
    std_x = torch.std(x)
    std_y = torch.std(y)
    
    # 计算皮尔逊相关系数
    if std_x == 0 or std_y == 0:
        raise ValueError("标准差不能为零，这可能导致除以零的错误。")
    
    pcc = covariance / (std_x * std_y)

    pcc_1 = pcc.to(device)

    return pcc_1.item()  # 将结果转换为 Python 标量

In [None]:
pcc_time = 0.0
pcc_fft = 0.0

Error_fft = 0.0

# 采样率
sampling_rate = 2.5e7

signal_predicted = []
signal_origin=[]

running_loss = 0.0
criterion = nn.MSELoss().to(device)

# 将模型设置为评估模式
model.eval()  
# 进行预测
with torch.no_grad():  # 关闭梯度计算
    for batch in test_loader:
        generation_signal = batch['generation_signal'].to(device)
        labels = batch['labels'].to(device)
        experimental_signal = batch['experimental_signal'].to(device)

        experimental_signal_pred = model(generation_signal, labels)
        loss = criterion(experimental_signal_pred, experimental_signal)

        running_loss += loss.item()
        pcc_time += calculate_pcc(experimental_signal_pred,experimental_signal)

        # 将重构信号移动到CPU并转换为NumPy数组
        Origin_x3 = experimental_signal.cpu().numpy().squeeze()
        predicted_x3 = experimental_signal_pred.cpu().numpy().squeeze()

        # 假设 Original_signal 和 predicted_signal 已经定义
        freq_origin, fft_magnitude_origin = calculate_fft(Origin_x3, sampling_rate)
        freq_predicted, fft_magnitude_predicted = calculate_fft(predicted_x3, sampling_rate)

        fft_magnitude_predicted,fft_magnitude_origin=torch.from_numpy(fft_magnitude_predicted),torch.from_numpy(fft_magnitude_origin),

        Error_fft += criterion(fft_magnitude_predicted,fft_magnitude_origin)
        pcc_fft += calculate_pcc(fft_magnitude_predicted,fft_magnitude_origin)
    
        signal_origin.append(experimental_signal)
        signal_predicted.append(experimental_signal_pred)

    test_loss_time = running_loss / len(test_loader)
    test_pcc_time = pcc_time / len(test_loader)

    test_fft_error = Error_fft / len(test_loader)
    test_fft_pcc = pcc_fft / len(test_loader)

    print(f'Time_error:{test_loss_time:.4f}')
    print(f'Time_PCC:{test_pcc_time:.4f}')
    print(f'FFT_error:{test_fft_error:.4f}')
    print(f'FFT_PCC:{test_fft_pcc:.4f}')

In [13]:
results_1 = os.path.join(Data_path, "Results_MSE/Predicted_signal/")

# 检查目录是否存在
if os.path.exists(results_1):
    # 遍历目录中的所有文件
    for filename in os.listdir(results_1):
        file_path = os.path.join(results_1, filename)
        try:
            # 如果是文件，则删除
            if os.path.isfile(file_path):
                os.remove(file_path)
        except Exception as e:
            print(f"Failed to delete {file_path}. Reason: {e}")
else:
    print(f"The directory {results_1} does not exist.")

results_2 = os.path.join(Data_path, "Results_MSE/Predicted_FFT/")

# 检查目录是否存在
if os.path.exists(results_2):
    # 遍历目录中的所有文件
    for filename in os.listdir(results_2):
        file_path = os.path.join(results_2, filename)
        try:
            # 如果是文件，则删除
            if os.path.isfile(file_path):
                os.remove(file_path)
        except Exception as e:
            print(f"Failed to delete {file_path}. Reason: {e}")
else:
    print(f"The directory {results_2} does not exist.")

results_3 = os.path.join(Data_path, "Results_MSE/Time_Difference/")

# 检查目录是否存在
if os.path.exists(results_3):
    # 遍历目录中的所有文件
    for filename in os.listdir(results_3):
        file_path = os.path.join(results_3, filename)
        try:
            # 如果是文件，则删除
            if os.path.isfile(file_path):
                os.remove(file_path)
        except Exception as e:
            print(f"Failed to delete {file_path}. Reason: {e}")
else:
    print(f"The directory {results_3} does not exist.")

# 定义文件名
file_name = f'{save_path}Forward_problem/Test_Error_Amp.csv'

# 检查文件是否存在，如果存在则删除
file_path = os.path.join(save_path, file_name)
if os.path.exists(file_path):
    os.remove(file_path)

In [14]:
# 寻找指定频率范围内的最大幅值
def find_max_in_range(freq, magnitude, freq_range):
    mask = (freq / 1e3 >= freq_range[0]) & (freq / 1e3 <= freq_range[1])
    max_index = np.argmax(magnitude[mask])
    max_freq = freq[mask][max_index] / 1e3
    max_magnitude = magnitude[mask][max_index]
    return max_freq, max_magnitude

In [None]:
# 遍历每个信号
for a in range(len(experimental_signal_test)):
    # 获取对应的 CSV 文件名
    csv_filename = test_filenames[a]
    # 去掉 CSV 文件名的扩展名，用于保存图片
    csv_filename_without_extension = os.path.splitext(csv_filename)[0]

    # 将重构信号移动到CPU并转换为NumPy数
    signal_origin_3 = signal_origin[a].cpu().numpy().squeeze()
    predicted_signal = signal_predicted[a].cpu().numpy().squeeze()

    # 计算信号的差值
    difference = signal_origin_3 - predicted_signal
    Time_RMSE = np.sqrt(np.mean((signal_origin_3 - predicted_signal) ** 2))

    # 采样率
    sampling_rate = 2.5e7

    # 假设 Original_signal 和 predicted_signal 已经定义
    freq_origin, fft_magnitude_origin = calculate_fft(signal_origin_3, sampling_rate)
    freq_predicted, fft_magnitude_predicted = calculate_fft(predicted_signal, sampling_rate)
    fft_magnitude_predicted, fft_magnitude_origin = torch.from_numpy(fft_magnitude_predicted)*1e-4, torch.from_numpy(fft_magnitude_origin)*1e-4

    # 绘制时域信号图
    plt.style.use('default')
    plt.figure(figsize=(10, 6))
    plt.rcParams['font.family'] = ['Times New Roman']
    plt.plot(signal_origin_3, linewidth=1.5, label='Experimental Signal')
    plt.plot(predicted_signal, linewidth=1.5, linestyle='--', label='Predicted Signal')

    plt.xlabel('Sample point', fontdict={'weight': 'normal', 'size': 18})
    plt.ylabel('Amplitude(V)', fontdict={'weight': 'normal', 'size': 18})
    plt.tick_params(axis='both', which='major', labelsize=15)
    plt.legend(loc='upper right', fontsize=20)
    plt.text(max(signal_origin_3), max(signal_origin_3), f'RMSE = {Time_RMSE:.4f}', fontsize=28, verticalalignment='top')
    plt.savefig(f'{Data_path}Results_FFT_Amp/Predicted_signal/{csv_filename_without_extension}.jpg', dpi=600, bbox_inches='tight')
    plt.close()

    # 绘制差值图
    plt.figure(figsize=(10, 6))
    plt.rcParams['font.family'] = ['Times New Roman']
    plt.plot(difference, linewidth=1.5)
    plt.xlim([0, M])
    plt.ylim([-max(difference), max(difference)])

    plt.xlabel('Sample point', fontdict={'weight': 'normal', 'size': 18})
    plt.ylabel('Difference (V)', fontdict={'weight': 'normal', 'size': 18})
    plt.tick_params(axis='both', which='major', labelsize=15)
    plt.legend(loc='upper right', fontsize=20)
    plt.savefig(f'{Data_path}Results_FFT_Amp/Time_Difference/{csv_filename_without_extension}.jpg', dpi=600, bbox_inches='tight')
    plt.close()


    # 原始信号
    A1_freq, Af = find_max_in_range(freq_origin, fft_magnitude_origin, (40, 60))
    As_freq, As = find_max_in_range(freq_origin, fft_magnitude_origin, (80, 120))

    # 预测信号
    Bf_freq, Bf = find_max_in_range(freq_predicted, fft_magnitude_predicted, (40, 60))
    Bs_freq, Bs = find_max_in_range(freq_predicted, fft_magnitude_predicted, (80, 120))

    Af_max = max(Af,Bf)
    As_max = max(As,Bs)

    # 绘制FFT频谱图
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.plot(freq_origin / 1e3, fft_magnitude_origin, label='Experimental Signal')
    ax.plot(freq_predicted / 1e3, fft_magnitude_predicted, linestyle='--', label='Predicted Signal', color='orange')
    ax.set_xlabel('Frequency (kHz)', fontdict={'weight': 'normal', 'size': 18})
    ax.set_ylabel('FFT Magnitude', fontdict={'weight': 'normal', 'size': 18})
    ax.legend()
    ax.set_xlim([10, 120])
    ax.set_ylim([0, Af_max])
    ax.tick_params(axis='both', which='major', labelsize=15)
    ax.legend(loc='upper right', fontsize=20)

    # 创建局部放大图
    axins = inset_axes(ax, width="40%", height="40%", loc='lower left',
                       bbox_to_anchor=(0.55, 0.1, 1, 1), bbox_transform=ax.transAxes, borderpad=1)
    axins.plot(freq_origin / 1e3, fft_magnitude_origin, label='Experimental Signal')
    axins.plot(freq_predicted / 1e3, fft_magnitude_predicted, linestyle='--', label='Predicted Signal', color='orange')
    axins.set_xlim([80, 120])
    axins.set_ylim([0, As_max])
    axins.tick_params(axis='both', which='major', labelsize=10)

    FFT_RMSE = torch.sqrt(torch.mean((fft_magnitude_origin - fft_magnitude_predicted) ** 2))
    
    # 计算NP_true和NP_pre
    NP_true = As / (Af ** 2)
    NP_pre = Bs / (Bf ** 2)

    # 计算误差
    NP_Error = abs(NP_true - NP_pre) / NP_true * 100

    # 在图片中显示这些值
    ax.text(0.05, 0.95, f'RMSE = {FFT_RMSE:.4f}\nNP_true = {NP_true:.8f}\nNP_pre = {NP_pre:.8f}\nError = {NP_Error:.4f}%',
            transform=ax.transAxes, fontsize=22, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.5))

    plt.savefig(f'{Data_path}Results_FFT_Amp/Predicted_FFT//{csv_filename_without_extension}.jpg', dpi=600, bbox_inches='tight')
    plt.close()

    print(f"{csv_filename_without_extension}, {a+1}/{len(experimental_signal_test)}, NP_Error: {NP_Error:.4f}%, Time_RMSE = {Time_RMSE:.4f},FFT_RMSE = {FFT_RMSE:.8f}")

    # 打开 CSV 文件，如果文件不存在会自动创建
    with open('Test_Error_Amp.csv', mode='a', newline='') as file:
        writer = csv.writer(file)
        # 写入每一行的数据
        writer.writerow([csv_filename_without_extension, f"{a+1}/{len(experimental_signal_test)}", f"{NP_Error:.4f}", f"{Time_RMSE:.4f}"])