# 第三部分：PyTorch与深度学习基础
本项目是**中南大学智能雷达实验室**为即将进组的同学准备的培训项目，涵盖从Python基础到深度学习在探地雷达领域的应用。项目设计与文档由智能雷达实验室成员精心编写，旨在帮助新成员快速融入实验室研究工作。

## Do-1 深度学习理论基础
 * 神经网络基本原理
 * 激活函数与非线性
 * 前向传播与反向传播
 * 梯度下降算法
 * 过拟合与正则化
 * 批量归一化
 * 卷积神经网络原理
 * 池化操作与降维

* 简单理解神经网络：写出一个带有未知参数的函数，这个函数可以对指定的目标任务做出预测，而该参数含有的未知参数需要通过数据去找出来。  
![](resource/线性模型.png)  
* 其中y是准备要预测的东西（输入），x1为输入信息，未知参数w为<span style="color:red">权重</span>，用来控制输入信息对最终输出的影响有多大，未知参数b为<span style="color:red">偏置</span>，该带有未知参数的函数称为<span style="color:red">模型</span>。<span style="color:red">特征</span>用来指代数据中的有用信息，如图像的边缘、纹理、颜色分布、形状等，这些信息有利于模型进行预测。


* 损失函数的输入就为未知参数b、w，作用是来代表在固定好某一对b和w时，该模型的输出好还是不好，例如平均绝对误差（Mean Absolute Error，MAE）:  
![](resource/MAE.png)  
* 还有均方误差（Mean Squared Error，MSE）：  
![](resource/MSE.png)    
* MAE、MSE用来计算预测值与你希望的输出（<span style="color:red">标签</span>）之间的差距。

In [None]:
import torch
import torch.nn as nn

# 假设模型输出和真实标签
y_predict = torch.tensor([2.5, 0.0, 2.1], requires_grad=True)
y_label = torch.tensor([3.0, -0.5, 2.0])

# MSE（均方误差）
mse_loss = nn.MSELoss()
mse = mse_loss(y_predict, y_label)

# MAE（平均绝对误差）
mae_loss = nn.L1Loss()
mae = mae_loss(y_predict, y_label)
print("MAE:", mae.item())

* 根据不同的b,w参数，损失函数的大小会发生变化，若能找出一对数值b、w，使得最终的损失值最小，这就是训练的目的，参数通过对输入数据的迭代学习进行更新，逐渐趋向于最优解。  
* 如何找出这个最优解，也就是学习策略，常用<span style="color:red">梯度下降法</span>，如下图：  
<p align="center">
  <img src="resource/梯度下降.png" width="35%">
</p>

* 为简化问题，先解释损失值L和参数w的关系，首先随机选取一个初始点，计算该点损失值L对参数w的偏导数，就能知道w往哪个方向走（增大或减小）使得损失值L下降，并且斜率大的地方步伐就跨大一点，斜率小的地方步伐就跨小一点。  
* <span style="color:red">学习率</span>η也会影响步伐大小，是自己设定的，如果学习率设大一点，每次参数更新就会量大，学习可能会比较快，但是容易使损失在学习的过程中震荡，如果学习率设小一点，每次参数更新只会改变一点点参数的数值，但容易陷入<span style="color:red">局部最小值</span>。这种需要自己设定，而不是机器自己找出来的，称之为<span style="color:red">超参数</span>。

<p align="center">
  <img src="resource/局部最小值&全局最小值.png" width="35%">
</p>  

* 当参数调整到在该点的梯度（偏导）为零时，有两种情况：<span style="color:red">局部最小值</span>、<span style="color:red">全局最小值</span>，参数更新在这两点都会停止，而我们希望找到的时全局最小值，但学习经常会在局部最小值点停止。
* 此外，损失函数是自己定义的，这个图中的曲线并不是一个真实的损失，这个损失的曲线可以是任何形状，而且并不仅仅只与一个或两个参数有关。例如这个模型有57个参数：  
![](resource/多参数线性模型.png)
* 但是这个模型依旧是个简单的线性模型，输入x与输出y之间可能有复杂的关系，我们可以将输入x与y的映射关系（红色曲线）看作是一组s形状的线（蓝色）与一个常数相加之和。  
<p align="center">
  <img src="resource/非线性模型.png" width="35%">
</p>    

* 并且我们常用<span style="color:red">Sigmoid函数</span>来逼近这些蓝色曲线（hard Sigmoid）：  
![](resource/Sigmoid函数.png)
<p align="center">
  <img src="resource/Sigmoid图像.png" width="35%">
</p>   

* 其中c，b，w是常数，我们可以通过改变这些常数，制造出不同的Sigmoid函数，把不同的Sigmoid函数叠加来逼近各种不同的分段函数，就可以变成任何连续的曲线。
  <p align="center">
  <img src="resource/不同参数下的Sigmoid函数.png" width="35%">
</p>   

* 所以在各种书上对<span style="color:red">激活函数</span>的描述为：激活函数（非线性函数）可以给神经网络增加“非线性能力”，让模型可以学习复杂的模式和关系。常见的激活函数还有<span style="color:red">ReLU</span>。在模型的各个环节增加激活函数，可以使得模型更加灵活：
  <p align="center">
  <img src="resource/激活函数给模型增加灵活性.png" width="35%">
</p>   

* 在实际输入大量数据来寻找最优参数点时，通常会把这大量数据分成一个一个的<span style="color:red">批量（batch）</span>,本来是要把所有的数据拿出来算出一个损失，现在是只拿一个批量里边的数据来算出一个损失，算出梯度，来更新参数，然后继续下一个批量里边的数据算出损失、算出梯度、更新参数，当把所有数据都看过一次，称之为一个<span style="color:red">回合（epoch）</span>，每更新一次参数叫做一次更新。

* <span style="color:red">神经网络</span>由很多个神经元组成，<span style="color:red">神经元</span>通过多个输入与对应的权重相乘，加上一个偏置，然后通过一个激活函数，输出一个结果。神经元很多的模型，那些既不是模型直接接受输入的地方也不是直接输出预测结果的地方，称之为<span style="color:red">隐藏层</span>，人们把神经网络越叠越多越叠越深，例如残差网络（Residual Network,ResNet）可以有152层。
  <p align="center">
  <img src="resource/神经网络.png" width="35%">
</p>   

* 深度学习网络可以看作是一个很深层的神经网络，<span style="color:red">反向传播</span>是训练深度学习网络的核心算法之一，梯度从输出层开始，一层层“反向”传播回输入层，更新每一层的权重（参数）。而<span style="color:red">前向传播</span>：输入->经过神经网络->得到输出（预测）。

In [None]:
import torch
import torch.nn as nn

# 1. 输入和目标（注意：需要是 float 类型）
x = torch.tensor([[0.5, -0.2]], requires_grad=True)   # 输入特征
y_true = torch.tensor([[1.0]])                        # 目标输出

# 2. 简单的线性模型：输入(2) -> 输出(1)
model = nn.Linear(2, 1)  # 自动初始化权重和偏置

# 3. 损失函数：均方误差
criterion = nn.MSELoss()

# 4. 前向传播
y_pred = model(x)

# 5. 计算损失
loss = criterion(y_pred, y_true)

# 6. 反向传播
loss.backward()

* 反向传播更新参数时，梯度值可能变得非常小或非常大，就是所谓的<span style="color:red">梯度消失</span>或<span style="color:red">梯度爆炸</span>，梯度消失会导致深层特征并没有被学习到但参数不再更新，如前边提到的局部极小值，梯度爆炸会导致参数直接变成NaN，训练无法继续，造成梯度消失和梯度爆炸的原因有很多，比如学习率过高或过低，所以学习率这种超参数需要在实验过程中慢慢调试。
* <span style="color:red">鞍点</span>的梯度也为零，但区别于局部极小值和局部极大值，如图中红色的点在y轴方向比较高，但在x轴方向是比较低的。我们把梯度为零的点统称为<span style="color:red">临界点</span>。
  <p align="center">
  <img src="resource/局部极小值&鞍点.png" width="35%">
</p>   

* 应对梯度消失和梯度爆炸的策略有<span style="color:red">批量归一化（Batch Normalization，BN）</span>，如今几乎是神经网络的标配，网络每一层的输出如果数值分布变化太大，会让网络训练困难，BN在每一层后把输出重新“归一化”，让数值更加平稳、集中，模型更容易收敛。

In [None]:
import torch.nn as nn

# 用于全连接层输出
nn.BatchNorm1d(num_features)

# 用于卷积层输出（特征图）
nn.BatchNorm2d(num_channels)

* 除了批归一化以外，还有<span style="color:red">特征归一化（Feature Normalization,FN）</span>等。

* 深层网络很容易出现<span style="color:red">过拟合</span>，最明显的表现是模型在训练集上表现非常好，但在验证集和测试集上表现很差，也就是说模型记住了训练数据，而不是学会了泛化规律。为了解决该问题，很多时候采用<span style="color:red">正则化</span>的策略，通俗地讲，是给模型加上一些约束，防止它“死记硬背”，例如L1正则化和L2正则化，还有<span style="color:red">Dropout</span>，其策略是训练时随机“屏蔽”一部分神经元，防止模型对某些神经元太依赖，而测试时自动关闭Dropout。

In [None]:
import torch.nn as nn
nn.Dropout(p=0.5)  # 每次随机丢掉50%的神经元

* 另外一个解决过拟合的策略是<span style="color:red">数据增强</span>，例如通过对训练数据做旋转、缩放、加噪声等方式，人为扩种数据多样性，让模型学会适应更多的情况，所以可以讲训练数据集越小，网络越深，越容易产生过拟合。
* 另外一个解决过拟合的策略是<span style="color:red">早停</span>，当验证集的损失不再下降时，就提前停止训练。
* 除了过拟合以外，还有一个严重问题，就是<span style="color:red">数据泄露</span>，指的是训练过程中使用了测试集的信息，或者用了模型不该提前知道的特征，你以为模型预测很准，其实它是提前“偷看了答案”才那么准，真正面对新数据时完全不行，例如测试集的数据混入训练集，用测试集做特征选择、调参、归一化等。
* 相反，过于<span style="color:red">简单的模型</span>，会产生<span style="color:red">模型偏差</span>，即模型在学习过程中的系统误差，也就是它对真实关系的理解能力差，导致预测结果总是离正确值有偏差，就比如你用一个线性模型学习一个非线性函数，无论喂多少数据，调多少次参数，模型的训练结果都不够好，因为模型本身就太简单。表现在训练误差很大，并且测试误差也很大，随着训练继续也不会好转。
* 如果给模型的<span style="color:red">限制过多</span>也会导致模型偏差。
* 除了模型本身的复杂度以外，<span style="color:red">优化做得不好</span>也容易产生模型偏差，就比如前边讲到的梯度下降，既可能找到全局最小值，也可能找到局部最小值，优化梯度下降不给力，就会容易卡在局部最小值处。常用的优化方法是自适应学习率，如<span style="color:red">Adam</span>。

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# 1. 构造训练数据
x = torch.tensor([[1.0], [2.0], [3.0], [4.0]])
y = torch.tensor([[2.0], [4.0], [6.0], [8.0]])

# 2. 定义模型：简单线性回归 y = wx + b
model = nn.Linear(1, 1)

# 3. 损失函数：均方误差 MSE
criterion = nn.MSELoss()

# 4. 优化器：Adam（学习率可调）
optimizer = optim.Adam(model.parameters(), lr=0.01)

# 5. 训练过程
for epoch in range(200):
    # 前向传播
    y_pred = model(x)
    
    # 计算损失
    loss = criterion(y_pred, y)

    # 清空旧梯度
    optimizer.zero_grad()

    # 反向传播
    loss.backward()

    # 更新参数（Adam 优化）
    optimizer.step()

    # 打印训练过程
    if epoch % 20 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

* 除此以外，还有一个问题需要补充：<span style="color:red">不匹配</span>。例如训练数据与测试数据分布不同，实际使用的数据来源不同，模型训练的优化目标与实际任务目标不一致，训练阶段和测试阶段数据预处理方式不一致（如归一化），训练使用了dropout，但测试没有给关掉，导致测试时预测根本不准，这可能不是因为过拟合的问题，可能就是以上这些不匹配问题。

* 此外卷积神经网络还有一些补充概念。
* 对于机器，图像可以描述为三维<span style="color:red">张量</span>（张量可以想象成维度大于2的矩阵），一维代表图像的宽，一维代表图像的高，还有一维代表图像的<span style="color:red">通道（channel）</span>的数目。
* 卷积神经网络会设定一个区域，即<span style="color:red">感受野</span>，每个神经元都只关心自己的感受野里面发生的事情，感受野是由我们自己决定。
  <p align="center">
  <img src="resource/感受野.png" width="35%">
</p>   

* 我们把右上角的感受野往右移一个步幅，就制造出一个新的感受野，移动的量称之为<span style="color:red">步幅（stride）</span>，步幅是一个超参数，需要人为调整，一般不会设置太大。当感受野超出了图像的范围进行补零值，称之为<span style="color:red">填充（padding）</span>。
* 不同感受野的神经元可以进行<span style="color:red">参数共享</span>，所谓参数共享就是两个神经元的参数（权重）完全是一样的。
  <p align="center">
  <img src="resource/共享参数.png" width="35%">
</p>   

* 感受野、参数共享本质都是简化，<span style="color:red">全连接层</span>可以自己决定看整张图片还是一个小范围，加上感受野的概念后，只能看一个小范围，而加入参数共享后某一些神经元无论如何参数都要一模一样，这又增加了对神经元的限制，而感受野加上参数共享就是<span style="color:red">卷积层</span>，用到卷积层的网络就叫卷积神经网络。卷积层会改变图像的通道数而不会改变高和宽。

In [None]:
import torch.nn as nn

# 定义一个卷积层：输入通道 3（RGB图像），输出通道 16，卷积核大小 3x3
conv = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1)

* 卷积神经网络中，<span style="color:red">池化（pooling）</span>主要用于简化特征、降低过拟合的风险，常见的池化右最大池化（Max Pooling）、平均池化（Avg Pooling）和全局平局池化（Global Avg Pooling）。池化会改变图像的高和宽而不会改变通道数。
  <p align="center">
  <img src="resource/最大池化.png" width="35%">
</p>   


In [None]:
import torch.nn as nn

# 最大池化层：窗口大小 2x2，步长 2
maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

# 平均池化层
avgpool = nn.AvgPool2d(kernel_size=2, stride=2)
通常池化层会接在卷积层之后，例如：
nn.Sequential(
    nn.Conv2d(3, 16, 3, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(2, 2)
)

* 池化和<span style="color:red">下采样</span>都可以简化特征，而下采样指代更广义的尺寸缩小方法，不拘泥于池化。

## 实战：探地雷达数据反演实验
### 数据集构建

In [2]:
import h5py
import numpy as np
import os
from sklearn.model_selection import train_test_split
import torch
from scipy.ndimage import zoom
from torch.utils.data import Dataset, DataLoader

class OneStageDataset(Dataset):
    def __init__(self, bscan_folder, diel_label_folder, mode='train', num_class = 4, target_size=(200, 375)):
        self.bscan_folder = bscan_folder
        self.diel_label_folder = diel_label_folder
        self.bscan_files = self.load_h5_files(bscan_folder)
        self.diel_label_files = self.load_h5_files(diel_label_folder)
        self.target_size = target_size
        self.num_classes = num_class

        # 检查文件数量是否一致
        if not (len(self.bscan_files) == len(self.diel_label_files)):
            raise ValueError("每个文件夹中的文件数量必须相同！")

        # 划分文件名索引
        self.train_indices, self.val_indices, self.test_indices = self.split_dataset()

        # 根据模式选择索引
        if mode == 'train':
            self.indices = self.train_indices
        elif mode == 'val':
            self.indices = self.val_indices
        elif mode == 'test':
            self.indices = self.test_indices
        elif mode == "all":
            self.indices = np.arange(len(self.bscan_files))
        else:
            raise ValueError("模式必须是 'train', 'val' 或 'test'")

    def load_h5_files(self, folder_path):
        files = []
        filenames = os.listdir(folder_path)
        filenames.sort()
        for file_name in filenames:
            if file_name.endswith('.h5') or file_name.endswith('.out'):
                files.append(file_name)
        return files

    def split_dataset(self):
        # 记录所有文件的索引
        all_indices = np.arange(len(self.bscan_files))
        # 划分索引
        # train_indices, temp_indices = train_test_split(all_indices, test_size=0.2, random_state=42)
        # val_indices, test_indices = train_test_split(temp_indices, test_size=0.5, random_state=42)
        _, temp_indices = train_test_split(all_indices, test_size=0.02, random_state=42)
        train_indices, temp_indices = train_test_split(temp_indices, test_size=0.5, random_state=42)
        val_indices, test_indices = train_test_split(temp_indices, test_size=0.5, random_state=42)

        return train_indices, val_indices, test_indices

    def __len__(self):
        return len(self.indices)  # 返回当前模式下样本数量

    def __getitem__(self, idx):
        # 根据索引获取文件名
        bscan_file = self.bscan_files[self.indices[idx]]
        diel_file = self.diel_label_files[self.indices[idx]]

        # 加载数据
        bscan_data = self.load_h5_data(os.path.join(self.bscan_folder, bscan_file), 'rxs/rx1/Ez')
        diel_data = self.load_h5_data(os.path.join(self.diel_label_folder, diel_file), 'data')

        bscan_data = np.squeeze(bscan_data)
        diel_data = np.squeeze(diel_data)

        # Resize 数据
        bscan_data = self.resize(bscan_data) # / 100000
        diel_data = self.resize(diel_data)


        return (
            torch.tensor(bscan_data, dtype=torch.float32).unsqueeze(0),
            torch.tensor(diel_data, dtype=torch.float32).unsqueeze(0),
            bscan_file,
        )

    def load_h5_data(self, file_path, dataset_name):
        with h5py.File(file_path, 'r') as f:
            data = f[dataset_name][:]
        return data

    def resize(self, data):
        # 使用zoom函数进行二维缩放
        data_dims = data.ndim
        
        # 计算缩放因子
        if data_dims == 2:
            zoom_factor = (self.target_size[0] / data.shape[0], self.target_size[1] / data.shape[1])
        elif data_dims == 3:
            zoom_factor = (1, self.target_size[0] / data.shape[1], self.target_size[1] / data.shape[2])  # 保持第一个维度不变
        else:
            raise ValueError(f"Unexpected B-scan dimensions: {data_dims}")
        

        resized_data = zoom(data, zoom_factor, order=0)  # order=1 表示双线性插值

        return resized_data

# 示例用法
# dataset = TwoStageDataset('path/to/bscan_folder', 'path/to/cls_label_folder', 'path/to/diel_label_folder')

def test():
    # 设置文件夹路径
    bscan_folder = r'D:\Test\Z-GprMax\DeepLearning\dataset\twostageinversion\OneCircleAndRectOBJ\bscan'
    cls_label_folder = r'D:\Test\Z-GprMax\DeepLearning\dataset\twostageinversion\OneCircleAndRectOBJ\cls_label'
    diel_label_folder = r'D:\Test\Z-GprMax\DeepLearning\dataset\twostageinversion\OneCircleAndRectOBJ\diel_label'
    
    # 检查文件夹是否存在
    if not (os.path.exists(bscan_folder) and os.path.exists(cls_label_folder) and os.path.exists(diel_label_folder)):
        raise FileNotFoundError("请确保所有文件夹路径都存在！")

    # 创建数据集实例
    train_dataset = OneStageDataset(bscan_folder, cls_label_folder, diel_label_folder, mode='train')
    val_dataset = OneStageDataset(bscan_folder, cls_label_folder, diel_label_folder, mode='val')
    test_dataset = OneStageDataset(bscan_folder, cls_label_folder, diel_label_folder, mode='test')

    # 打印数据集长度
    print("Train dataset length:", len(train_dataset))
    print("Validation dataset length:", len(val_dataset))
    print("Test dataset length:", len(test_dataset))

    # 创建数据加载器
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

    # 测试数据集
    for bscan, cls_label, diel_label, filename in train_loader:
        print("Train B-scan shape:", bscan.shape)
        print("Train Classification label shape:", cls_label.shape)
        print("Train Dielectric constant label shape:", diel_label.shape)
        print("Train File name:", filename)
        break  # 只打印一个批次的数据

    for bscan, cls_label, diel_label, filename in val_loader:
        print("Validation B-scan shape:", bscan.shape)
        print("Validation Classification label shape:", cls_label.shape)
        print("Validation Dielectric constant label shape:", diel_label.shape)
        print("Validation File name:", filename)
        break  # 只打印一个批次的数据

    for bscan, cls_label, diel_label, filename in test_loader:
        print("Test B-scan shape:", bscan.shape)
        print("Test Classification label shape:", cls_label.shape)
        print("Test Dielectric constant label shape:", diel_label.shape)
        print("Test File name:", filename)
        break  # 只打印一个批次的数据

## 网络模型搭建MRFUNet
该网络来源于[DMRF-UNet: A Two-Stage Deep Learning Scheme for GPR Data Inversion Under Heterogeneous Soil Conditions](https://ieeexplore.ieee.org/document/9782091):

复现其网络代码结构如下，分为两个文件：
- net_parts.py
- net_model.py

In [4]:
""" Parts of the MRFU-Net model """

import torch
import torch.nn as nn
import torch.nn.functional as F


class MRFBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(MRFBlock, self).__init__()
        
        # Assuming 1/4 channels means splitting into 4 groups
        quarter_channels = out_channels // 4

        # 1x1 convolution for channel reduction
        self.conv1x1 = nn.Conv2d(in_channels, quarter_channels, kernel_size=1, stride=1, padding=0, bias=False)

        # Parallel 3x3 convolutions (each group processes quarter_channels)
        self.branch1 = nn.Sequential(
            nn.Conv2d(in_channels, quarter_channels, kernel_size=3, stride=1, padding=1, bias=False),
            nn.ReLU(),
        )
        self.branch2 = nn.Sequential(
            nn.Conv2d(in_channels, quarter_channels, kernel_size=3, stride=1, padding=1, bias=False),
            nn.ReLU(),
            nn.Conv2d(quarter_channels, quarter_channels, kernel_size=3, stride=1, padding=1, bias=False),
            nn.ReLU(),
        )
        self.branch3 = nn.Sequential(
            nn.Conv2d(in_channels, quarter_channels, kernel_size=3, stride=1, padding=1, bias=False),
            nn.ReLU(),
            nn.Conv2d(quarter_channels, quarter_channels, kernel_size=3, stride=1, padding=1, bias=False),
            nn.ReLU(),
            nn.Conv2d(quarter_channels, quarter_channels, kernel_size=3, stride=1, padding=1, bias=False),
            nn.ReLU(),
        )

        # Final 3x3 convolution after concatenation
        self.final_conv = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)

    def forward(self, x):

        out1 = self.branch1(x)
        out2 = self.branch2(x)
        out3 = self.branch3(x)
        out4 = self.conv1x1(x)

        # Concatenate results from all branches
        out = torch.cat([out1, out2, out3, out4], dim=1)

        # Final 3x3 convolution
        out = self.final_conv(out)

        return out

class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            MRFBlock(in_channels, out_channels),
            MRFBlock(out_channels, out_channels)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        # print(x1.shape)
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        # if you have padding issues, see
        # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
        # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
        x = torch.cat([x2, x1], dim=1)
        # double = nn.Conv2d(x1.shape[1], x1.shape[1] * 2, kernel_size=3, padding=1).cuda()
        # x = double(x1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)


In [5]:
#|output
""" Full assembly of the parts to form the complete network """
import sys
sys.path.append('../..')
from net.mrf_unet.unet_parts import *


class MRFUNet(nn.Module):
    _name = "MRFUNet"
    def __init__(self, n_channels, n_classes, bilinear=False):
        super(MRFUNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        factor = 2 if bilinear else 1
        self.down4 = Down(512, 1024 // factor)
        self.up1 = Up(1024, 512 // factor, bilinear)
        self.up2 = Up(512, 256 // factor, bilinear)
        self.up3 = Up(256, 128 // factor, bilinear)
        self.up4 = Up(128, 64, bilinear)
        self.outc = OutConv(64, n_classes)

    def forward(self, x):

        x1 = self.inc(x)

        x2 = self.down1(x1)

        x3 = self.down2(x2)

        x4 = self.down3(x3)
  
        x5 = self.down4(x4)
  
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        # logits = torch.sigmoid(logits)
        return logits

if __name__ == '__main__':
    model = MRFUNet(2,1)
    data = torch.rand((2, 2, 200, 375))

    y = model(data)
    print(y.shape)

ModuleNotFoundError: No module named 'net'

### 完整训练流程

In [6]:
import os
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from pytorch_msssim import ssim
from torch.utils.data import DataLoader
from tqdm import tqdm  # 导入 tqdm
import sys
import matplotlib.colors as mcolors
from datetime import datetime


sys.path.append('..')
from dataset import dataset_onestage1
from net.segmentnet import SegmentNet
from net.unet.unet_model import UNet as NET
# from net.ManyUNet import AttU_Net as NET
from net.mrf_unet.unet_model import MRFUNet as NET
# from net.unet_with_mlp.mlp_net import Mlp_Net as NET
# from net.firstnet import FirstNet
# from net.ManyUNet import AttU_Net as NET
matplotlib.rcParams['font.sans-serif'] = ['SimHei']  # 用黑体
matplotlib.rcParams['axes.unicode_minus'] = False  # 解决保存图像是负号'-'显示为方块的问题

# 配置信息函数
def get_config():
    config = {
        'bscan_folder': r'\\lei_nas\home\dataset\process data\twostageinversion\TwoObj\bscan',
        'diel_label_folder': r'\\lei_nas\home\dataset\process data\twostageinversion\TwoObj\bk6_diel',
        'model_name': NET._name,
        'batch_size': 16,
        'learning_rate': 0.0001,
        'num_epochs': 2,
        'device': 'cuda' if torch.cuda.is_available() else 'cpu',
        'model_save_path': 'best_model.pth'  # 保存模型的路径
    }
    return config

# 训练函数
def train(model, train_loader, criterion_diel, optimizer, device):
    model.train()
    running_loss = 0.0
    for bscan, diel_label, _ in tqdm(train_loader, desc="Training", unit="batch"):
        bscan, diel_label = bscan.to(device), diel_label.to(device)

        optimizer.zero_grad()
        # classification_output, dielectric_constant_output = model(bscan)
        # classification_output = model(bscan)
        dielectric_constant_output = model(bscan)
        # loss_cls = criterion_cls(classification_output, cls_label)
        loss_diel = criterion_diel(dielectric_constant_output, diel_label)
        # loss = loss_cls + loss_diel  # 可以根据需要调整权重
        loss = loss_diel
        # loss = loss_cls
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    return running_loss / len(train_loader)

# 验证函数
def validate(model, val_loader, criterion_diel, device):
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for bscan, diel_label, _ in val_loader:
            bscan, diel_label = bscan.to(device), diel_label.to(device)

            # classification_output, dielectric_constant_output = model(bscan)
            # classification_output = model(bscan)
            dielectric_constant_output = model(bscan)
            # loss_cls = criterion_cls(classification_output, cls_label)
            loss_diel = criterion_diel(dielectric_constant_output, diel_label)

            # loss = loss_cls + loss_diel
            loss = loss_diel
            # loss = loss_cls
            running_loss += loss.item()
    return running_loss / len(val_loader)

# 测试函数
def test(model, test_loader, device):
    from utils import psnr, normalize_tensor_to_0_255, calculate_smape_torch
    model.eval()
    results = []
    mse_total = 0
    mae_total = 0
    ssim_total = 0
    mape_total = 0
    psnr_total = 0
    num_samples = 0
    with torch.no_grad():
        for bscan,  diel_label, filename in tqdm(test_loader, desc="Testing"):
            bscan, diel_label = bscan.to(device), diel_label.to(device)
            # classification_output, dielectric_constant_output = model(bscan)
            # classification_output = model(bscan)
            dielectric_constant_output = model(bscan)

            results.append((filename, dielectric_constant_output.cpu().numpy(), diel_label.cpu().numpy()))
            dc_mse = F.mse_loss(dielectric_constant_output, diel_label, reduction='mean')
            mse_total += dc_mse.item() 
            dc_mae = F.l1_loss(dielectric_constant_output, diel_label, reduction='mean')
            mae_total += dc_mae.item()
            dc_mape = calculate_smape_torch(dielectric_constant_output, diel_label)
            mape_total += dc_mape.item()


            dielectric_constant_output = normalize_tensor_to_0_255(dielectric_constant_output, 1, 100)
            diel_label = normalize_tensor_to_0_255(diel_label, 1, 100)

            dc_ssim = ssim(dielectric_constant_output, diel_label, 255, size_average=True)
            ssim_total += dc_ssim.item() 

            psnr_total += psnr(diel_label, dielectric_constant_output).item() 
            num_samples += 1
    avg_mse_total = mse_total / num_samples
    avg_mae_total = mae_total / num_samples
    avg_ssim_total = ssim_total / num_samples
    avg_mape_total = mape_total / num_samples
    avg_psnr_total = psnr_total / num_samples

    metrics_variables = ['avg_mse_total', 'avg_mae_total', 'avg_ssim_total', 'avg_mape_total', 'avg_psnr_total', ]
    metrics = {var : eval(var) for var in metrics_variables}

    return results, metrics

def test_use_pth():
    def test_other_data(model, pth_path, test_loader, results_folder="results", device = 'cuda' if torch.cuda.is_available() else 'cpu'):
        model.load_state_dict(torch.load(pth_path, weights_only=False))
        test_results, metrics_dict = test(model, test_loader, device)
        filenames, diel_outputs, diel_labels = zip(*test_results)

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        folder = f'test_onestage_{timestamp}_{model._name}_3obj'
        os.makedirs(os.path.join(results_folder, folder), exist_ok=False)

        with open(os.path.join(results_folder, folder, 'matrix.txt'), 'w') as file:
            # 写入内容
            contents = []
            for k, v in metrics_dict.items():
                contents.append(f"{k}:{v}\n")
            content = ''.join(contents)
            file.write(content)

       
        # 保存结果
        save_results(filenames, diel_outputs, diel_labels, os.path.join(results_folder, folder)) 
        # save_results(filenames, cls_outputs, small_outputs, great_outputs, cls_labels, small_labels, great_labels, os.path.join(results_folder, folder)) 

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = NET(1, 1).to(device)
    # dataset = dateset_twostage2.TwoStageDataset(r'D:\Test\Z-GprMax\test\inversion2step\two_obj\pec_air\out',
    #                                             r'D:\Test\Z-GprMax\test\inversion2step\two_obj\pec_air\cls_label',
    #                                             r'D:\Test\Z-GprMax\test\inversion2step\two_obj\pec_air\smaller_label',
    #                                             r'D:\Test\Z-GprMax\test\inversion2step\two_obj\pec_air\greater_label',
    #                                             mode='all')
    dataset = dataset_onestage1.OneStageDataset(r'D:\Test\Z-GprMax\datas\ThreeobjTest\process\bscan',
                                                r'D:\Test\Z-GprMax\datas\ThreeobjTest\process\bk6_diel',
                                                mode='all')
    config = get_config()
    train_loader = DataLoader(dataset, batch_size=config['batch_size'], shuffle=False)

    pth_path = r'D:\两阶段电磁反演\results\normal\20250114_041030_MRFUNet\best_model.pth'


    test_other_data(model, pth_path, train_loader)

# 绘制损失曲线
def plot_loss(train_losses, val_losses, save_dir):
    plt.figure()
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.savefig(os.path.join(save_dir, 'loss_curve.png'))
    plt.close()

# 保存结果函数
def save_results(filenames, diel_outputs, diel_labels, save_folder):
    diel_res_folder = os.path.join(save_folder, "test_output", "diel_res")
    os.makedirs(diel_res_folder, exist_ok=True)
    from matplotlib.colors import LinearSegmentedColormap
    # 定义颜色和区间
    # colors = [
    #             (0, "blue"),  # 区间起点和颜色
    #             (0.005, "white"),
    #             (0.06, "pink"),
    #             (0.18, "cyan"),  # 1到20的细致渐变
    #             (0.36, "green"),  # 20到40的细致渐变
    #             (0.64, "yellow"),  # 40到70的细致渐变
    #             (0.82, "orange"),  # 70到90的细致渐变
    #             (0.91, "red"),  # 90到100的细致渐变
    #             (1, "darkred")  # 100到110的颜色
    #         ]

    # # 创建自定义颜色映射
    # custom_cmap = mcolors.LinearSegmentedColormap.from_list("custom_cmap", colors)
    bounds = [0, 0.5, 2.5, 4.5, 7.5, 10, 15, 25, 50, 85, 110]

    colors = [
        'darkblue',      # 深蓝
        'mediumblue',    # 中蓝
        'royalblue',     # 皇家蓝
        'dodgerblue',    # 道奇蓝
        'deepskyblue',   # 深天蓝
        'mediumseagreen', # 中海绿
        'hotpink',       # 亮粉
        'red',           # 红色
        'orange',        # 橙色
        'yellow'         # 黄色
    ]
    n_bins = 100  # 颜色分布的细分数量
    cmap_name = 'custom_cmap'
    # 创建自定义颜色映射
    cmap = LinearSegmentedColormap.from_list(cmap_name, colors, N=n_bins)

    # 创建 BoundaryNorm 对象，确保每个区间的长度在 colorbar 中相等
    norm = mcolors.BoundaryNorm(boundaries=bounds, ncolors=n_bins)
    

    with torch.no_grad():
        for filename, diel_output, diel_label in tqdm(zip(filenames, diel_outputs, diel_labels), desc="GenerateImg"):
            # diel_output = diel_output.squeeze()
            diel_label = diel_label.squeeze()
            for f, do, dl in zip(filename, diel_output, diel_label):
                f = f.replace(".out", ".jpg")
                do = do.squeeze()
                dl = dl.squeeze()
                vmin, vmax = 0, dl.max()
                # 绘制介电常数结果与标签对比
                fig, axs = plt.subplots(1, 2, figsize=(12, 6))
                im = axs[0].imshow(do, cmap=cmap, norm=norm, interpolation="none", extent=[0, 1.5, 0.8, 0])
                axs[0].set_title('Predicted Dielectric Constant')
                axs[0].set_ylabel("Depth/m")
                axs[0].set_xlabel("Detection distance/m")
                # plt.colorbar(axs[0].imshow(do, cmap=custom_cmap, aspect='equal'), ax=axs[0])
                
                im = axs[1].imshow(dl, cmap=cmap, norm=norm, interpolation="none", extent=[0, 1.5, 0.8, 0])
                axs[1].set_title('True Dielectric Constant')
                axs[1].set_ylabel("Depth/m")
                axs[1].set_xlabel("Detection distance/m")
                cbar = fig.colorbar(im, ticks=bounds, ax=axs)
                cbar.set_ticks(bounds)  # 设置 colorbar 的刻度为指定的范围
                cbar.set_ticklabels([0, 1, 3, 5, 8, 10, 15, 25, 50, 85, 110])  # 设置标签
                
                plt.savefig(os.path.join(diel_res_folder, f'diel_{f}'))
                plt.close(fig)

# 主函数
def main():
    config = get_config()
    OneStageDataset = dataset_onestage1.OneStageDataset
    
    # 创建结果保存文件夹
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    results_folder = f'results/{timestamp}_{config["model_name"]}'
    os.makedirs(results_folder, exist_ok=True)

    # 创建数据集和数据加载器
    train_dataset = OneStageDataset(config['bscan_folder'], config['diel_label_folder'], mode='train')
    val_dataset = OneStageDataset(config['bscan_folder'], config['diel_label_folder'], mode='val')
    test_dataset = OneStageDataset(config['bscan_folder'], config['diel_label_folder'], mode='test')

    train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True, num_workers=8, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False, num_workers=8, pin_memory=True)
    test_loader = DataLoader(test_dataset, batch_size=config['batch_size'], shuffle=False, num_workers=8, pin_memory=True)

    # 初始化模型、损失函数和优化器
    device = config['device']
    # model = FirstNet().to(device)
    model = NET(1, 1).to(device)
    criterion_diel = nn.MSELoss()  # 第二阶段使用均方误差损失
    optimizer = optim.Adam(model.parameters(), lr=config['learning_rate'])

    train_losses = []
    val_losses = []
    best_val_loss = float('inf')  # 初始化最佳验证损失

    # 训练过程
    for epoch in range(config['num_epochs']):
        train_loss = train(model, train_loader, criterion_diel, optimizer, device)
        val_loss = validate(model, val_loader, criterion_diel, device)

        train_losses.append(train_loss)
        val_losses.append(val_loss)

        print(f'Epoch [{epoch+1}/{config["num_epochs"]}], Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}')

        # 保存验证损失最低的模型权重
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), os.path.join(results_folder, config['model_save_path']))
            print(f'Saved best model with validation loss: {best_val_loss:.4f}')

    # 绘制损失曲线
    plot_loss(train_losses, val_losses, results_folder)

    # 测试并保存结果
    # 加载最佳模型权重
    model.load_state_dict(torch.load(os.path.join(results_folder, config['model_save_path']), weights_only=False))
    test_results, metrics_dict = test(model, test_loader, device)
    filenames, diel_outputs, diel_labels = zip(*test_results)
    with open(os.path.join(results_folder, 'matrix.txt'), 'w') as file:
        # 写入内容
        contents = []
        for k, v in metrics_dict.items():
            contents.append(f"{k}:{v}\n")
        content = ''.join(contents)
        file.write(content)

    # 保存结果
    save_results(filenames, diel_outputs, diel_labels, results_folder) 


# if __name__ == '__main__':
#     main()

ImportError: cannot import name 'dataset_onestage1' from 'dataset' (unknown location)