# 提取DQN进行训练

将DQN网络单独提取出来以测定性能，研究是网络的问题或者是模型的问题

In [2]:
# 包引用
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
import random
from torchvision.models import resnet34, ResNet34_Weights

## 进行模型定义

此处直接使用DQN的模型

In [141]:
class DQN(nn.Module):
    def __init__(self, n_actions_0):  # 考虑注入n_states
        super(DQN, self).__init__()
        
        # 添加一个卷积层，用于数据转化
        # self.fc1 = nn.Conv2d(in_channels=8, out_channels=3, kernel_size=12, stride=1, padding=1, groups=1, bias=True)
        # 定义第一个卷积核，将8通道234*234数据处理成3通道112*112数据，以输入ResNet-18网络
        # 参数定义：入通道8，出通道3，卷积核数量64，步长1，边缘1，不分组，添加可学习偏置
        # 利用F.interpolate函数动态调整图片大小为224*224，采用bilinear插值，在前向传播中实现
        
        # 定义激活函数
        # self.relu = nn.ReLU(inplace=True)  # 指定的激活函数ReLU
        
        # 定义ResNet-34网络
        self.resnet = resnet34(weights=ResNet34_Weights.DEFAULT)  # 引入ResNet-34
        for param in self.resnet.parameters():  # 遍历预训练的ResNet-18中的参数
            param.requires_grad = False  # 冻结预训练的ResNet-18中的参数
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, n_actions_0)
        )    # 重新定义网络的全连接层
        for param in self.resnet.fc.parameters():
            param.requires_grad = True
        
    def forward(self, x):  # 前向传播过程
        # 输入：以batch_size作为批次大小的8通道512*512图像，shape：(batch_size, 8, 512, 512)
        # 输出：以batch_size作为批次大小的分类（动作）数据，one-hot形式，shape：(batch_size, n_actions)
        # x = self.fc1(x)
        # x = F.interpolate(x, size=224, mode='bilinear', align_corners=False)
        # x = self.relu(x)
        x = self.resnet(x)  # 使用网络处理输入
        return x  # 输出ResNet-18网络的输出

In [148]:
# 超参数定义
num_classes = 10
batch_size = 16
num_epoches = 1000
learning_rate = 1e-3
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
# 数据处理函数定义

def data_processing(data_processed, label_processed, batch_size_0):
    # 输入数据：数据，这组数据的类别标签，及训练批次数据（用于适配训练方式）
    
    channels, freq, time = data_processed.shape  # 获取时频维度大小
    parts_num = time // (freq * 2)  # 确定分割的块数
    parts_num = (parts_num // batch_size_0) * batch_size_0 # 根据训练批次重新确定分割方式
    
    data_striped = []  # 定义数据存储器
    data_label = [label_processed for _ in range(parts_num)]
    
    for i in range(parts_num):  # 按块数分割
        data_striped.append(data_processed[:, :, i * freq:(i + 1) * freq])  # 向存储器里添加分割的数据

    # 上面的数据处理过程抛弃了多余的无法形成格式的数据块
    # 返回分割好的数据列表
    return data_striped, data_label

# 这个函数在使用时要分别获取数据和索引已知对应的标签。
# 本任务中数据的维度较高，二者的维度不同导致无法合并。

In [80]:
# 20Hz-0数据加载部分
# 本部分从./dataset/gearset中加载数据并添加分类标签

# 文件路径列表

files_20_0 = ['./dataset/gearset/Chipped_20_0.npy', './dataset/gearset/Health_20_0.npy', './dataset/gearset/Miss_20_0.npy', './dataset/gearset/Root_20_0.npy', './dataset/gearset/Surface_20_0.npy']

files_30_2 = ['./dataset/gearset/Chipped_30_2.npy', './dataset/gearset/Miss_30_2.npy', './dataset/gearset/Root_30_2.npy', './dataset/gearset/Surface_30_2.npy']

# files_30_2 = ['./dataset/gearset/Chipped_30_2.npy', './dataset/gearset/Health_30_2.npy']
# 标签列表

labels = np.array([1, 2, 3, 4, 5])
# 对应的类别：['Chipped', 'Health', 'Miss', 'Root', 'Surface']

# 指定所用路径

files = files_30_2

# 数据寄存器

data = []  # 定义初始化列表型数据集
label = []  # 定义初始化标签集
data_sample = []
label_sample = []

# 装载数据

for file in range(len(files)):
    data_sample, label_sample = data_processing(np.load(files[file]), labels[file], batch_size)  # 加载数据并整理数据形式
    sample_range = (len(label_sample) // batch_size) - 1  # 取样范围
    sample_num = (len(label_sample) // (batch_size * 2)) // batch_size * batch_size  # 取样数量
    random_index = random.sample(range(0, sample_range), sample_num)  # 制备索引，进行随机取样
    for index in random_index:
        data_sam = data_sample[index]
        label_sam = label_sample[index]
        data.append(data_sam) # 合并到总的数据集里
        label.append(label_sam)  # 合并到总的分类集里

# 列表转换成数组

data = np.array(data)  # 数据数组
label = np.array(label)  # 标签数组

# 将数据转换为PyTorch的张量

dataset = torch.tensor(data)
labelset = torch.tensor(label, dtype=torch.long).unsqueeze(1)

# 创建数据加载器

dataset_tensored = TensorDataset(dataset, labelset)  # 数据集合并，信息获取：data, label = dataset[0]
dataloader = DataLoader(dataset_tensored, batch_size=batch_size, shuffle=True)  # 将数据加载为批次


# 标准数据集测试


In [149]:
import torch
import torchvision
import torchvision.transforms as transforms

# 加载CIFAR-10数据集
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = torchvision.datasets.CIFAR10(root='./dataset', train=True, download=False, transform=transform)
# 创建DataLoader
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,shuffle=True)

In [150]:
data_test = next(iter(trainloader))
data_t = data_test[0]
label_t = data_test[1]
print(data_t.shape)
print(label_t)

torch.Size([16, 3, 32, 32])
tensor([7, 0, 7, 7, 1, 1, 9, 2, 5, 4, 0, 7, 8, 2, 7, 9])


# 模型初始化

In [142]:
model = DQN(num_classes)
model = model.to(device)
criterion = F.mse_loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [159]:
acc = []
loss_s = []
# model = DQN(num_classes)
# model.load_state_dict(torch.load('model.pth')) 
# model = model.to(device)

for epoch in range(num_epoches):
    model.train()
    
    dataiter = iter(trainloader)
    # for batch_idx, batch in enumerate(trainloader):
    for step in range(50):
        batch = next(dataiter)
        images = batch[0]
        labels = batch[1]
        images = images.to(device)
        labels = labels.to(device)
        
        # 前向传播
        output = model(images)
        output_get = output.max(1)[1].view(batch_size, 1)# .add_(1)
        output_get = output_get.to(dtype=torch.float32)
        labels_get = labels.to(dtype=torch.float32)
        loss = criterion(output_get.squeeze(1), labels_get)
        loss_temp = loss.detach()
    
        # 后向传播
        optimizer.zero_grad()
        loss.requires_grad_(True)
        loss.backward()
        optimizer.step()
        
        # 检查正确率
        index_corrects = (output_get.squeeze(1) == labels_get)
        num_corrects = torch.sum(index_corrects)
        acc_temp = 100.0 * num_corrects / batch_size
        
        acc.append(acc_temp)
        loss_s.append(loss_temp)
        
    model.eval()
    if (epoch + 1) % 100 == 0:
        print(f'代数：{epoch+1}/{num_epoches}，正确率：{sum(acc) / len(acc): .2f}%，Loss：{sum(loss_s) / len(loss_s): .5f}')
        acc = []
        loss_s = []

torch.save(model.state_dict(), './model.pth')

代数：100/1000，正确率： 9.48%，Loss： 23.71995
代数：200/1000，正确率： 9.48%，Loss： 23.67736
代数：300/1000，正确率： 9.41%，Loss： 23.76876
代数：400/1000，正确率： 9.67%，Loss： 23.71010
代数：500/1000，正确率： 9.70%，Loss： 23.62358
代数：600/1000，正确率： 9.50%，Loss： 23.74121
代数：700/1000，正确率： 9.51%，Loss： 23.76466
代数：800/1000，正确率： 9.54%，Loss： 23.76806
代数：900/1000，正确率： 9.28%，Loss： 23.78275
代数：1000/1000，正确率： 9.60%，Loss： 23.67884


In [135]:
test_model = DQN(num_classes)
test_model.load_state_dict(torch.load('model.pth')) 
test_model = test_model.to(device)
test_dl = next(iter(dataloader))
test_data, test_label = test_dl[0].to(device), test_dl[1].to(device)
test_output = test_model(test_data).max(1)[1].view(1, batch_size).add_(1)
print(test_output)
print(test_label.view(1, batch_size))

tensor([[4, 1, 4, 4, 1, 4, 4, 4, 4, 4, 4, 2, 3, 2, 2, 2, 2, 2, 4, 2, 2, 2, 2, 3,
         4, 2, 4, 4, 4, 2, 2, 4]], device='cuda:0')
tensor([[2, 2, 4, 4, 3, 3, 3, 4, 4, 1, 1, 2, 2, 4, 4, 4, 3, 2, 3, 3, 3, 4, 2, 1,
         3, 2, 4, 4, 4, 4, 3, 1]], device='cuda:0')


In [157]:
print(output_get.shape)
print(labels_get.shape)

torch.Size([16, 1])
torch.Size([16])
