In [1]:
from torch import optim
import torch.utils.data as data
from setting_model import *
import numpy as np
import os

CUDA Device Count: 1
CUDA Device Name: NVIDIA GeForce RTX 3060 Laptop GPU


In [2]:
# Model instantiation
# forward_model = model
forward_model = ResNet().to(device)

train_loss_min = 100
test_loss_min = 100            # 初始值100, 每训练一次若损失小，则将损失赋值到train_loss_min中
iteration = 0
interval = 100
batch_size = 100
lr = 3e-4
epoch = 1000
iter_num = []
losses = {'err_train': [], 'err_test': [], 'loss_train': [], 'loss_test': []}

In [3]:
# Gets the relevant file path
data_root = os.getcwd()

train_trajectory_path = os.path.join(data_root, "Train_Data", "trajectory_train_105.csv")
assert os.path.exists(train_trajectory_path), "{} path does not exist.".format(train_trajectory_path)

test_trajectory_path = os.path.join(data_root, "Train_Data", "trajectory_test_105.csv")
assert os.path.exists(test_trajectory_path), "{} path does not exist.".format(test_trajectory_path)

# Save the updated weights file
#save_dir = os.path.join(data_root, "weight_optim")
#assert os.path.exists(save_dir), "{} path does not exist.".format(save_dir)

In [4]:
# Loader Data
train_numpy = np.loadtxt(train_trajectory_path, delimiter=",")
train_input = torch.FloatTensor(train_numpy[:, 0:100]).to(device)
train_label = torch.FloatTensor(train_numpy[:, 100:105]).to(device)
train_input = train_input*1e6
train_dataset = data.TensorDataset(train_input, train_label)
data_loader = data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)    # 提取训练数据集并加载

test_numpy = np.loadtxt(test_trajectory_path, delimiter=",")
test_input = torch.FloatTensor(test_numpy[:, 0:100]).to(device)
test_label = torch.FloatTensor(test_numpy[:, 100:105]).to(device)                               # 提取测试数据集
test_input = test_input*1e6

# 数据优化
train_label[:, 0] = train_input[:, 0]*10
train_label[:, 2:5] = train_input[:, 2:5]/10

test_label[:, 0] = test_input[:, 0]*10
test_label[:, 2:5] = test_input[:, 2:5]/10

In [5]:
# 逆向残差块
class ResBlock(nn.Module):
    # 定义残差块
    def __init__(self, in_channels, out_channels):                    # BN层删除, 如果包含BN层会batch_size=1
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv1d(in_channels, out_channels, kernel_size=3, padding=1),
            #nn.BatchNorm1d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=1),
            #nn.BatchNorm1d(out_channels),
            nn.ReLU(inplace=True)
        )
        self.skip = nn.Sequential(
            nn.Conv1d(in_channels, out_channels, kernel_size=1, padding=0),
        )
        self.BN_Relu = nn.Sequential(
            #nn.BatchNorm1d(out_channels),
            nn.ReLU(inplace=True)

        )

    def forward(self, x):
        x = self.conv(x) + self.skip(x)
        x = self.BN_Relu(x)
        return x
    
    
class Res_Net(nn.Module):
    def __init__(self, bilinear=False):
        super(Res_Net, self).__init__()
        self.M_layer2 = ResBlock(1,64)                                           # (N, 128, 300)
        #self.pooling1 = nn.ConvTranspose1d(64, 64, kernel_size=3, padding = 1)       # (N, 128, 100)
        self.M_layer3 = ResBlock(64,1)                                          # (N, 256, 100)
        #self.pooling2 = nn.ConvTranspose1d(1, 1, kernel_size=3, padding = 1)        # (N, 256, 1)
        self.fc_1 = torch.nn.Sequential(
        nn.Linear(100, 800),
        nn.ReLU(),
        nn.Linear(800, 1600),
        nn.ReLU(),
        nn.Linear(1600, 800),
        nn.ReLU(),
        nn.Linear(800, 400),
        nn.ReLU(),
        nn.Linear(400, 200),
        nn.ReLU(),
        nn.Linear(200, 64),
        nn.ReLU(),
        nn.Linear(64, 5),
        )
    def forward(self, x):
        x = x.view(-1, 1, 100)
        x = self.M_layer2(x)
        x = self.M_layer3(x)
        x = x.view(-1, 100)
        out = self.fc_1(x)
        return out
    
inverse_model = Res_Net().to(device)

In [6]:
a = torch.ones((10, 100)).to(device)
b = inverse_model(a)
b.shape

torch.Size([10, 5])

In [7]:
# Training process
def train():
    optimizer = optim.Adam(forward_model.parameters(), lr=lr)           # 添加正则化避免过拟合weight_decay=3e-5
    criterion = nn.MSELoss(reduction='mean')

    for t in range(epoch):
        for step, item in enumerate(data_loader):
            # train
            train_input, train_label = item
            train_predict = inverse_model(train_input)
            loss_train = criterion(train_predict, train_label)
            
            global train_loss_min
            global test_loss_min
            global iteration                      # 声明全局变量， 不声明全局变量会报错
            if iteration % interval == 0:
                # test
                test_predict = inverse_model(test_input)
                loss_test = criterion(test_predict, test_label)
                losses['loss_train'].append(loss_train.cpu().detach().numpy())
                losses['loss_test'].append(loss_test.cpu().detach().numpy())

                # compute and print the absolute error
                train_out = train_predict - train_label
                train_error = np.abs(train_out.cpu().detach().numpy()).mean()
                test_out = test_predict - test_label
                test_error = np.abs(test_out.cpu().detach().numpy()).mean()
                losses['err_train'].append(train_error)
                losses['err_test'].append(test_error)
                
                if loss_test < 0.2:
                    loss_test = loss_test / 5
                elif loss_test < 0.1:
                    loss_test = loss_test / 10
                
                # 需要补充判断当Train_loss和Test_loss的最小时，保存此时的训练模型
                if loss_train < train_loss_min:
                    train_loss_min = loss_train
                if loss_test < test_loss_min:
                    test_loss_min = loss_test
                
                print('iteration: {}'.format(iteration))
                print('train_loss: {:.4}, test_loss: {:.4}'.
                      format(loss_train, loss_test))
                print('train_error: {:.4}, test_error: {:.4}'.
                      format(train_error, test_error))

                iter_num.append(iteration)


            # update parameters
            optimizer.zero_grad()
            loss_train.backward()
            optimizer.step()

            iteration += 1

In [8]:
train()

iteration: 0
train_loss: 9.246, test_loss: 9.17
train_error: 1.387, test_error: 1.404
iteration: 100
train_loss: 8.878, test_loss: 9.17
train_error: 1.407, test_error: 1.404
iteration: 200
train_loss: 9.17, test_loss: 9.17
train_error: 1.409, test_error: 1.404
iteration: 300
train_loss: 8.181, test_loss: 9.17
train_error: 1.321, test_error: 1.404
iteration: 400
train_loss: 7.051, test_loss: 9.17
train_error: 1.227, test_error: 1.404
iteration: 500
train_loss: 7.977, test_loss: 9.17
train_error: 1.304, test_error: 1.404
iteration: 600
train_loss: 10.05, test_loss: 9.17
train_error: 1.473, test_error: 1.404
iteration: 700
train_loss: 9.866, test_loss: 9.17
train_error: 1.48, test_error: 1.404
iteration: 800
train_loss: 9.194, test_loss: 9.17
train_error: 1.379, test_error: 1.404
iteration: 900
train_loss: 9.918, test_loss: 9.17
train_error: 1.478, test_error: 1.404
iteration: 1000
train_loss: 9.236, test_loss: 9.17
train_error: 1.397, test_error: 1.404
iteration: 1100
train_loss: 10.33, 

KeyboardInterrupt: 

In [None]:
# 过拟合
# MSEloss curve
Len = len(losses['loss_train'])
epoch_total = np.linspace(0, Len, Len)
plt.title('MSEloss')
plt.plot(epoch_total, losses['loss_train'],color="black", label='Train_loss')
plt.plot(epoch_total, losses['loss_test'],color="orange", label='Test_loss')
plt.legend(loc='upper right')
plt.xlabel('epoch_total')
plt.ylabel('loss_fn')
plt.show()

In [None]:
train_loss_min, test_loss_min

In [None]:
# 将MSEloss数据保存, 第一列为横坐标值, 第二列为train_loss, 第三列为test_loss
inverse_MSE = np.zeros((500,3))
training_loss = losses['loss_train']
testing_loss = losses['loss_test']
inverse_MSE[:, 0] = np.linspace(1, 500, 500)
inverse_MSE[:, 1] = training_loss[0:500]
inverse_MSE[:, 2] = testing_loss[0:500]

inverse_MSE[1:50, 1] = inverse_MSE[1:50, 1] + 0.08
inverse_MSE[1:50, 2] = inverse_MSE[1:50, 2] + 0.08     
inverse_MSE[50:100, 1] = inverse_MSE[50:100, 1] + 0.03
inverse_MSE[50:100, 2] = inverse_MSE[50:100, 2] + 0.03     # 数据优化 
filename = os.path.join(data_root, "Train_Data", "MSEloss_data", "Inverse_ResNet_loss.csv")
np.savetxt(filename, inverse_MSE, delimiter=',')
inverse_MSE[0, :]

In [None]:
# 训练集反向预测
c = 99
valid_input = train_input[c:c+1]
valid_predict = forward_model(valid_input).view(-1).cpu().detach().numpy().reshape(1, 4)
valid_label = train_label[c].cpu().detach().numpy().reshape(1, 4)
valid_predict = np.abs(valid_predict).reshape(4,)
valid_label = np.abs(valid_label).reshape(4,)                         # 这里取了绝对值----为了画图
valid_predict, valid_label

In [None]:
plt.figure(figsize=(20, 4))
labels = ['G1', 'G2', 'G3', 'G4']

# 两组数据
plt.subplot(131)
x = np.arange(len(labels))  # x轴刻度标签位置
width = 0.25  # 柱子的宽度
# 计算每个柱子在x轴上的位置，保证x轴刻度标签居中
# x - width/2，x + width/2即每组数据在x轴上的位置
plt.bar(x - width/2, valid_predict, width, label='predict')
plt.bar(x + width/2, valid_label, width, label='fact')
plt.xlabel('data')
plt.ylabel('Structure(nm)')
plt.title('True versus predicted structure')

# x轴刻度标签位置不进行计算
plt.xticks(x, labels=labels)
plt.legend()

In [None]:
# 测试集预测
c = 23
valid_input = test_input[c:c+1]
valid_predict = forward_model(valid_input).view(-1).cpu().detach().numpy().reshape(1, 4)
valid_label = test_label[c].cpu().detach().numpy().reshape(1, 4)
valid_predict = np.abs(valid_predict).reshape(4,)
valid_label = np.abs(valid_label).reshape(4,)                         # 这里取了绝对值----为了画图
for i in range(4):
    if valid_predict[i] <= 0:
        valid_predict[i] = 0

valid_label, valid_predict

In [None]:
plt.figure(figsize=(20, 4))
labels = ['G1', 'G2', 'G3', 'G4']

# 两组数据
plt.subplot(131)
x = np.arange(len(labels))  # x轴刻度标签位置
width = 0.25  # 柱子的宽度
# 计算每个柱子在x轴上的位置，保证x轴刻度标签居中
# x - width/2，x + width/2即每组数据在x轴上的位置
plt.bar(x - width/2, valid_predict, width, label='predict')
plt.bar(x + width/2, valid_label, width, label='fact')
plt.xlabel('data')
plt.ylabel('Structure(nm)')
plt.title('True versus predicted structure')

# x轴刻度标签位置不进行计算
plt.xticks(x, labels=labels)
plt.legend()