In [1]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import os
import random
import pandas as pd
import glob
from torch.utils.data import TensorDataset, DataLoader
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
from mpl_toolkits.axes_grid1 import host_subplot
from mpl_toolkits import axisartist

In [None]:
#选择在GPU或CPU上面运行
device=torch.device('cuda:0' if torch.cuda.is_available else 'cpu')
print(device)

In [3]:
N=10000
data = []
labels = []

# 拿到数据路径，方便后续读取
dir_path = 'E:\\Desktop\\Python\\Signal_reconstruction\\Numerical_simulation_Data\\test'
# 获取目录及其子目录下所有CSV文件的路径
dataPaths = sorted(glob.glob(os.path.join(dir_path, '**', '*.csv'), recursive=True))
random.seed(42)
random.shuffle(dataPaths)

# 遍历读取数据
for dataPath in dataPaths:
    # 读取数据
    data_1 = pd.read_csv(dataPath)
    data.append(data_1.iloc[1:N+1, 1])

# 将数据和标签转换为numpy数组
test_data = np.array(data, dtype="float")
test_labels = np.array(data, dtype="float")

In [None]:
test_datas=test_data.reshape(-1,test_data.shape[1],1)
print(test_datas.shape)

In [None]:
plt.style.use('default')
plt.figure(figsize=(10,6))
plt.rcParams['font.family'] = ['Times New Roman']
plt.plot(test_datas[0,:],linewidth=1.5)
plt.xlabel('Sample point',fontdict={'weight': 'normal', 'size': 18})
plt.ylabel('Amplitude(mm)',fontdict={'weight': 'normal', 'size': 18})
#坐标轴刻度大小设置
plt.tick_params(axis='both', which='major', labelsize=15)
plt.savefig('E:\\Desktop\\Python\\Signal_reconstruction\\Model\\ISCM\\MSE\\test_data.jpg', dpi=600, bbox_inches='tight')

In [6]:
def add_noise_with_snr(signal, snr_db):
    """
    根据信噪比（dB）为信号添加高斯噪声
    :param signal: 原始信号
    :param snr_db: 信噪比（dB）
    :return: 带噪声的信号
    """
    # 计算信号功率
    signal_power = np.mean(signal ** 2)
    
    # 将信噪比从分贝转换为线性比例
    snr_linear = 10 ** (snr_db / 10)
    
    # 计算噪声功率
    noise_power = signal_power / snr_linear
    
    # 生成高斯噪声
    noise = np.random.normal(0, np.sqrt(noise_power), signal.shape)
    
    # 添加噪声
    noisy_signal = signal + noise
    return noisy_signal

In [None]:
# 添加噪声，信噪比为20dB
snr_db = 20  # 信噪比（dB）
test_noisy_signal = add_noise_with_snr(test_datas, snr_db)
print(f'train_noisy_signal.shape:{test_noisy_signal.shape}')

In [8]:
def calculate_snr(x_prime, x):
    """
    计算信号的信噪比（SNR）。

    参数:
        x (numpy.ndarray): 干净信号。
        x_prime (numpy.ndarray): 噪声信号。

    返回:
        float: 信噪比，单位为 dB。
    """
    # 计算信号功率
    signal_power = torch.mean(x ** 2)
    
    noise = x_prime-x
    # 计算噪声功率
    noise_power = torch.mean(noise ** 2)
    
    # 计算信噪比
    if noise_power == 0:
        raise ValueError("噪声功率不能为零，这可能导致除以零的错误。")
    
    snr = 10 * torch.log10(signal_power / noise_power)
    
    snr_1 = snr.to(device)
    return snr_1.item()  # 将结果转换为 Python 标量

In [9]:
def calculate_pcc(x, y):
    """
    计算两个信号的皮尔逊相关系数（PCC）。

    参数:
        x (torch.Tensor): 第一个信号，假定在 CUDA 上。
        y (torch.Tensor): 第二个信号，假定在 CUDA 上。

    返回:
        float: 皮尔逊相关系数。
    """
    
    # 计算均值
    mean_x = torch.mean(x)
    mean_y = torch.mean(y)
    
    # 计算协方差
    covariance = torch.mean((x - mean_x) * (y - mean_y))
    
    # 计算标准差
    std_x = torch.std(x)
    std_y = torch.std(y)
    
    # 计算皮尔逊相关系数
    if std_x == 0 or std_y == 0:
        raise ValueError("标准差不能为零，这可能导致除以零的错误。")
    
    pcc = covariance / (std_x * std_y)

    pcc_1 = pcc.to(device)

    return pcc_1.item()  # 将结果转换为 Python 标量


In [10]:
BATCH_SIZE = 1

In [11]:
# 准备数据
#torch.from_numpy将 NumPy 数组转换为 PyTorch 张量
#TensorDataset用于将张量数据和标签组合成一个数据集
#DataLoader用于从数据集中加载批次数据，并进行训练或测试

test_dataset = TensorDataset(torch.from_numpy(test_noisy_signal),torch.from_numpy(test_labels))
test_loader = DataLoader(test_dataset, batch_size = BATCH_SIZE, shuffle=False)

In [12]:
class Autoencoder(nn.Module):
    def __init__(self, N_target):
        super(Autoencoder, self).__init__()
        # 编码器部分
        self.encoder = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1),  # 保持长度不变
            nn.Tanh(),
            nn.MaxPool1d(kernel_size=2),  # 长度减半
            nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),  # 保持长度不变
            nn.Tanh(),
            nn.MaxPool1d(kernel_size=2),  # 长度再减半
            nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),  # 保持长度不变
            nn.Tanh(),
            nn.MaxPool1d(kernel_size=2),  # 长度再减半
            nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),  # 保持长度不变
            nn.Tanh(),
            nn.AdaptiveMaxPool1d(64)  # 自适应池化，确保输出长度为64
        )
        # 解码器部分
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(in_channels=256, out_channels=128, kernel_size=2, stride=2),  # 长度加倍
            nn.Tanh(),
            nn.ConvTranspose1d(in_channels=128, out_channels=64, kernel_size=2, stride=2),  # 长度加倍
            nn.Tanh(),
            nn.ConvTranspose1d(in_channels=64, out_channels=32, kernel_size=2, stride=2),  # 长度加倍
            nn.Tanh(),
            nn.ConvTranspose1d(in_channels=32, out_channels=1, kernel_size=2, stride=2),  # 长度加倍
            nn.Tanh()
        )
        # 最后一层线性层，确保输出长度为N_target
        self.final_linear = nn.Linear(1024, N_target)  # 输入维度调整为1024

    def forward(self, x):
        # 编码过程
        encoded = self.encoder(x)
        # 解码过程
        decoded = self.decoder(encoded)
        # 调整长度到N_target
        decoded = decoded.view(decoded.size(0), -1)  # 展平
        decoded = self.final_linear(decoded)
        y = decoded.view(decoded.size(0), 1, -1)  # 调整形状为 [batch_size, 1, N_target]
        return y

In [13]:
def downsample_signal(x, compression_ratio):
    """
    对信号进行间隔取样压缩
    :param x: 输入信号，维度为 [batch_size, 1, N]
    :param compression_ratio: 压缩比
    :return: 压缩后的信号，维度为 [batch_size, 1, N/compression_ratio]
    """
    batch_size, channels, length = x.size()
    indices = torch.arange(0, length, compression_ratio, device=x.device)
    return torch.index_select(x, 2, indices)

In [14]:
# 计算FFT
def calculate_fft(signal, sampling_rate):
    N = len(signal)  # 信号长度
    fft_values = np.fft.fft(signal)  # 计算FFT
    fft_magnitude = np.abs(fft_values)  # 取模
    fft_magnitude = fft_magnitude[:N // 2]  # 只取一半（正频率部分）
    freq = np.fft.fftfreq(N, d=1 / sampling_rate)[:N // 2]  # 频率轴
    return freq, fft_magnitude

In [15]:
A = 1
loss = nn.MSELoss()

In [None]:
# 进行预测
TIME_LOSS=[]
TIME_SNR=[]
TIME_PCC=[]
FFT_SNR=[]
FFT_PCC=[]
FFT_LOSS=[]

n=[2,4,8,16,32]
for k in n:
    print(f'Compression ratio:{k}')
    model=torch.load(f'E:\\Desktop\\Python\\Signal_reconstruction\\Model\\ISCM\\MSE\\model_signal_reconstruction_MSE_{k}.pth')  # 加载训练好的模型参数
    model.to(device)

    Error_time  = 0.0
    snr_time = 0.0
    pcc_time = 0.0

    Error_fft  = 0.0
    snr_fft = 0.0
    pcc_fft = 0.0

    # 将模型设置为评估模式
    model.eval()  
    # 进行预测

    signal_reconstruction = []
    signal_origin=[]

    for i, (x, y) in enumerate(test_loader):
        
        x1=x.type(torch.FloatTensor)
        y1=y.type(torch.FloatTensor)

        x2=x1.reshape(BATCH_SIZE,1,-1)
        y2=y1.reshape(BATCH_SIZE,1,-1) 

        x3, y3=x2.to(device),y2.to(device)
        x4 = downsample_signal(x3,k)

        with torch.no_grad():  # 关闭梯度计算
            y_hat = model(x4)
            Error_time += A * loss(y_hat,x3)
            snr_time += calculate_snr(y_hat,x3)
            pcc_time += calculate_pcc(y_hat,x3)

            signal_origin.append(y3)
            signal_reconstruction.append(y_hat)

            # 将重构信号移动到CPU并转换为NumPy数组
            Origin_x3 = x3.cpu().numpy().squeeze()
            Reconstructed_x3= y_hat.cpu().numpy().squeeze()

            # 采样率
            sampling_rate = 1e7

            # 假设 Original_signal 和 reconstructed_signal 已经定义
            freq_origin, fft_magnitude_origin = calculate_fft(Origin_x3, sampling_rate)
            freq_reconstructed, fft_magnitude_reconstructed = calculate_fft(Reconstructed_x3, sampling_rate)

            fft_magnitude_reconstructed,fft_magnitude_origin=torch.from_numpy(fft_magnitude_reconstructed),torch.from_numpy(fft_magnitude_origin),

            Error_fft += loss(fft_magnitude_reconstructed,fft_magnitude_origin)
            snr_fft += calculate_snr(fft_magnitude_reconstructed,fft_magnitude_origin)
            pcc_fft += calculate_pcc(fft_magnitude_reconstructed,fft_magnitude_origin)

    test_time_error = Error_time / len(test_loader.dataset)
    TIME_LOSS.append([test_time_error])

    test_time_snr = snr_time / len(test_loader.dataset)
    TIME_SNR.append([f"{test_time_snr:.2f}"])

    test_time_pcc = pcc_time / len(test_loader.dataset)
    TIME_PCC.append([f"{test_time_pcc:.2f}"])

    test_fft_error = Error_fft / len(test_loader.dataset)
    FFT_LOSS.append([test_fft_error])

    test_fft_snr = snr_fft / len(test_loader.dataset)
    FFT_SNR.append([f"{test_fft_snr:.2f}"])

    test_fft_pcc = pcc_fft / len(test_loader.dataset)
    FFT_PCC.append([f"{test_fft_pcc:.2f}"])

    a = 0
    signal_origin_1 = signal_origin[a]
    reconstructed_signal_1 = signal_reconstruction[a]

    # 将重构信号移动到CPU并转换为NumPy数组
    signal_origin_3 = signal_origin_1.cpu().numpy().squeeze()
    reconstructed_signal = reconstructed_signal_1.cpu().numpy().squeeze()

    # 采样率
    sampling_rate = 1e7
    # 假设 Original_signal 和 reconstructed_signal 已经定义
    freq_origin, fft_magnitude_origin = calculate_fft(signal_origin_3, sampling_rate)
    freq_reconstructed, fft_magnitude_reconstructed = calculate_fft(reconstructed_signal, sampling_rate)
    fft_magnitude_reconstructed,fft_magnitude_origin=torch.from_numpy(fft_magnitude_reconstructed),torch.from_numpy(fft_magnitude_origin),

    plt.style.use('default')
    plt.figure(figsize=(10,6))
    plt.rcParams['font.family'] = ['Times New Roman']
    plt.plot(signal_origin_3,linewidth=1.5, label='Original Signal')
    plt.plot(reconstructed_signal,linewidth=1.5, linestyle='--',label='Reconstructed Signal')

    plt.xlabel('Sample point',fontdict={'weight': 'normal', 'size': 18})
    plt.ylabel('Amplitude(mm)',fontdict={'weight': 'normal', 'size': 18})

    plt.xlim([10,1e4])
    plt.ylim([-0.05,0.05])
    #坐标轴刻度大小设置
    plt.tick_params(axis='both', which='major', labelsize=15)
    plt.legend(loc='upper right', fontsize=20)
    plt.savefig(f'E:\\Desktop\\Python\\Signal_reconstruction\\Model\\ISCM\\MSE\\Reconstruction_signal_{k}.jpg', dpi=600, bbox_inches='tight')

        # 绘制FFT频谱图
    fig, ax = plt.subplots(figsize=(10, 6))
    # 信号的FFT
    ax.plot(freq_origin/1e3, fft_magnitude_origin, label='Original Signal')
    ax.plot(freq_reconstructed/1e3, fft_magnitude_reconstructed, linestyle='--', label='Reconstructed Signal', color='orange')
    ax.set_xlabel('Frequency (kHz)', fontdict={'weight': 'normal', 'size': 18})
    ax.set_ylabel('Magnitude', fontdict={'weight': 'normal', 'size': 18})
    ax.legend()
    ax.set_xlim([0, 120])
    ax.tick_params(axis='both', which='major', labelsize=15)
    ax.legend(loc='upper right', fontsize=20)

    # 创建局部放大图，手动指定位置
    # bbox_to_anchor 是一个四元组 (x, y, width, height)，表示局部放大图的位置和大小
    # bbox_transform=ax.transAxes 表示使用主图的坐标系
    axins = inset_axes(ax, width="40%", height="40%", loc='lower left',
                    bbox_to_anchor=(0.55, 0.1, 1, 1), bbox_transform=ax.transAxes, borderpad=1)
    axins.plot(freq_origin/1e3, fft_magnitude_origin, label='Original Signal')
    axins.plot(freq_reconstructed/1e3, fft_magnitude_reconstructed, linestyle='--', label='Reconstructed Signal', color='orange')
    axins.set_xlim([80, 120])
    axins.set_ylim([0, 0.7])
    axins.tick_params(axis='both', which='major', labelsize=10)

    plt.savefig(f'E:\\Desktop\\Python\\Signal_reconstruction\\Model\\ISCM\\MSE\\test_reconstruction_FFT_{k}.jpg', dpi=600, bbox_inches='tight')
    plt.show()


In [None]:
#设置图形大小、像素
fig=plt.figure(figsize=(10,6))
plt.rcParams['figure.figsize']=(10,6)
plt.rcParams['savefig.dpi'] = 600 #图片像素
plt.rcParams['figure.dpi'] = 600
#设置图形中坐标轴标签大小、刻度值字体及大小
params = {'axes.labelsize': 18, 'axes.titlesize':18, 'legend.fontsize': 14, 'xtick.labelsize': 16, 'ytick.labelsize': 16,'font.family':'Times New roman'}
plt.rcParams.update(params)

host = host_subplot(111, axes_class=axisartist.Axes)
plt.subplots_adjust(right=0.75)

par1 = host.twinx()
par2 = host.twinx()
#设置第三根Y轴位置，这里选择是右边
par2.axis["right"] = par2.new_fixed_axis(loc="right", offset=(80, 0))
#设置Y轴刻度值、坐标轴标签显示
par1.axis["right"].toggle(all=True)
par2.axis["right"].toggle(all=True)

TIME_LOSS_np = [tensor[0].cpu().numpy() for tensor in TIME_LOSS]
TIME_SNR_np = [item[0] for item in TIME_SNR]
TIME_PCC_np = [item[0] for item in TIME_PCC]

TIME_SNR_np = [float(value) for value in TIME_SNR_np]
TIME_PCC_np = [float(value) for value in TIME_PCC_np]

#进行绘图
p1, = host.plot(n,TIME_LOSS_np, 'ro-', linewidth=2.5, label='Reconstruction Error')

p2, = par1.plot(n,TIME_SNR_np, 'go-', linewidth=2.5, label='Reconstruction SNR')

p3, = par2.plot(n,TIME_PCC_np, 'ko-', linewidth=2.5, label='Reconstruction PCC')


#设置x轴、y轴标签
host.set_xlabel("Compression ratio")
host.set_ylabel("Reconstruction Error")
par1.set_ylabel("SNR(dB)")
par2.set_ylabel("PCC")
#显示图例
host.legend(frameon=False,fontsize='x-large')
#设置y轴标签颜色
host.axis["left"].label.set_color(p1.get_color())
par1.axis["right"].label.set_color(p2.get_color())
par2.axis["right"].label.set_color(p3.get_color())
#将刻度线设置为向外
host.axis[:].major_ticks.set_tick_out(True)
par1.axis[:].major_ticks.set_tick_out(True)
par2.axis[:].major_ticks.set_tick_out(True)

# 保存图像
plt.savefig(f'E:\\Desktop\\Python\\Signal_reconstruction\\Model\\ISCM\\MSE\\Reconstruction_error_TIME.jpg', dpi=600, bbox_inches='tight')
#显示图形
plt.show()

In [None]:
#设置图形大小、像素
fig=plt.figure(figsize=(10,6))
plt.rcParams['figure.figsize']=(10,6)
plt.rcParams['savefig.dpi'] = 600 #图片像素
plt.rcParams['figure.dpi'] = 600
#设置图形中坐标轴标签大小、刻度值字体及大小
params = {'axes.labelsize': 18, 'axes.titlesize':18, 'legend.fontsize': 14, 'xtick.labelsize': 16, 'ytick.labelsize': 16,'font.family':'Times New roman'}
plt.rcParams.update(params)

host = host_subplot(111, axes_class=axisartist.Axes)
plt.subplots_adjust(right=0.75)

par1 = host.twinx()
par2 = host.twinx()
#设置第三根Y轴位置，这里选择是右边
par2.axis["right"] = par2.new_fixed_axis(loc="right", offset=(80, 0))
#设置Y轴刻度值、坐标轴标签显示
par1.axis["right"].toggle(all=True)
par2.axis["right"].toggle(all=True)

FFT_LOSS_np = [tensor[0].cpu().numpy() for tensor in FFT_LOSS]
FFT_SNR_np = [item[0] for item in FFT_SNR]
FFT_PCC_np = [item[0] for item in FFT_PCC]

FFT_SNR_np = [float(value) for value in FFT_SNR_np]
FFT_PCC_np = [float(value) for value in FFT_PCC_np]

#进行绘图
p1, = host.plot(n,FFT_LOSS_np, 'ro-', linewidth=2.5, label='Reconstruction Error_FFT')

p2, = par1.plot(n,FFT_SNR_np, 'go-', linewidth=2.5, label='Reconstruction SNR_FFT') 

p3, = par2.plot(n,FFT_PCC_np, 'ko-', linewidth=2.5, label='Reconstruction PCC_FFT')  

#设置x轴、y轴标签
host.set_xlabel("Compression ratio")
host.set_ylabel("Reconstruction Error")
par1.set_ylabel("SNR(dB)")
par2.set_ylabel("PCC")
#显示图例
host.legend(frameon=False,fontsize='x-large')
#设置y轴标签颜色
host.axis["left"].label.set_color(p1.get_color())
par1.axis["right"].label.set_color(p2.get_color())
par2.axis["right"].label.set_color(p3.get_color())
#将刻度线设置为向外
host.axis[:].major_ticks.set_tick_out(True)
par1.axis[:].major_ticks.set_tick_out(True)
par2.axis[:].major_ticks.set_tick_out(True)

# 保存图像
plt.savefig(f'E:\\Desktop\\Python\\Signal_reconstruction\\Model\\ISCM\\MSE\\Reconstruction_error_FFT.jpg', dpi=600, bbox_inches='tight')
#显示图形
plt.show()