In [1]:
import torch
import torch.nn as nn

class OneDimensional_SE_ResNet(nn.Module):

    def __init__(self, block, blocks_num, num_classes=40):
        super(OneDimensional_SE_ResNet, self).__init__()
        self.in_channel = 64
        self.conv1 = nn.Conv1d(1, self.in_channel, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm1d(self.in_channel)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv1d(self.in_channel, self.in_channel, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn2 = nn.BatchNorm1d(self.in_channel)
        self.relu = nn.ReLU(inplace=True)

        self.layer1 = self._make_layer(block, 64, blocks_num[0])
        self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2)
        self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2)
        self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool1d(1) 
        self.fc1 = nn.Linear(512 * block.expansion, num_classes)
        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

    def _make_layer(self, block, channel, block_num, stride=1):
        downsample = None
        if stride != 1 or self.in_channel != channel * block.expansion:
            downsample = nn.Sequential(
                nn.Conv1d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm1d(channel * block.expansion))

        layers = []
        layers.append(block(self.in_channel,
                            channel,
                            downsample=downsample,
                            stride=stride))
        self.in_channel = channel * block.expansion

        for _ in range(1, block_num):
            layers.append(block(self.in_channel,
                                channel))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)

        return x


class ResidualBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channel, out_channel, stride=1, downsample=None):

        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=in_channel, out_channels=out_channel,
                               kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm1d(out_channel)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv1d(in_channels=out_channel, out_channels=out_channel,
                               kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm1d(out_channel)

        self.downsample = downsample

        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Sequential(
            nn.Linear(out_channel, out_channel // 16, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(out_channel // 16, out_channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        original_out = out
        b, c, _ = out.size()
        out = self.avg_pool(out).view(b, c)
        out = self.fc(out).view(b, c, 1)
        out = out * original_out

        out += identity
        out = self.relu(out)

        return out

def se_resnet34(num_classes=40):
    return OneDimensional_SE_ResNet(ResidualBlock, [3, 3, 3, 3], num_classes=num_classes)

In [2]:
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import numpy as np

def plot_matrix(conf_matrix, dev_list, save_path):
    plt.figure(figsize=(20, 16))
    plt.imshow(conf_matrix, cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    plt.colorbar()

    thresh = conf_matrix.max() / 2.
    for i in range(conf_matrix.shape[0]):
        for j in range(conf_matrix.shape[1]):
            plt.text(j, i, conf_matrix[i, j],
                     ha="center", va="center",
                     color="white" if conf_matrix[i, j] > thresh else "black", fontsize=6)

    tick_marks = np.arange(len(dev_list))
    plt.xticks(tick_marks, dev_list)
    plt.yticks(tick_marks, dev_list)
    plt.xticks(rotation=90)
    plt.yticks(rotation=0)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.savefig(save_path)
    plt.show()

In [None]:
import os
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import DataLoader
import numpy as np
from sklearn.metrics import f1_score
from sklearn.metrics import recall_score
from sklearn.metrics import confusion_matrix

class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.categories = sorted(os.listdir(data_dir))
        self.data = []
        self.transform = transform
        for category in self.categories:
            category_dir = os.path.join(data_dir, category)
            category_data = sorted(os.listdir(category_dir))
            self.data.extend([(os.path.join(category_dir, file), self.categories.index(category)) for file in category_data])

    def __getitem__(self, index):
        file_path, label = self.data[index]
        data = np.load(file_path)
        image = Image.fromarray(data.astype(np.uint8))
        image = transform(image)
        return image, label

    def __len__(self):
        return len(self.data)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("using {} device.".format(device))

transform = transforms.Compose([ transforms.Grayscale(num_output_channels = 1),
                                     transforms.ToTensor(),
                                     transforms.Normalize((0.5), (0.5))])

train_dataset = CustomDataset("features/train_npy",transform=transform)
train_num = len(train_dataset)
dev_list = train_dataset.categories
batch_size = 32
train_loader = torch.utils.data.DataLoader(train_dataset,
                                               batch_size=batch_size, shuffle=True,
                                               num_workers=0)

validate_dataset = CustomDataset("features/val_npy",transform=transform)
val_num = len(validate_dataset)
validate_loader = torch.utils.data.DataLoader(validate_dataset,
                                                  batch_size=batch_size, shuffle=False,
                                                  num_workers=0)

print("using {} images for training, {} images for validation.".format(train_num, val_num))

net = se_resnet34(num_classes=40)
net.to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.0001)

epochs = 20
save_path = './TrimSENet_parameters.pth'
best_f1 = 0.0
train_accurate_list = []
val_accurate_list = []
f1_list = []
recall_list = []

for epoch in range(epochs):
    net.train()
    running_loss = 0.0
    train_acc = 0.0
    for step, data in enumerate(train_loader, start=0):
        images, labels = data
        images = images.reshape(images.shape[0], 1, 1500)
        optimizer.zero_grad()
        outputs = net(images.to(device))
        predict_y = torch.max(outputs, dim=1)[1]
        train_acc += torch.eq(predict_y, labels.to(device)).sum().item()
        loss = loss_function(outputs,labels.to(device))
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        rate = (step + 1) / len(train_loader)
        a = "*" * int(rate * 50)
        b = "." * int((1 - rate) * 50)
        print("\rtrain loss:{:^3.0f}%[{}->{}]{:.3f}".format(int(rate * 100), a, b, loss), end="")
    print()
    train_accurate = train_acc / train_num
    train_accurate_list.append(train_accurate)
    net.eval()
    acc = 0.0 
    val = torch.tensor([])
    pre = torch.tensor([])
    with torch.no_grad():
        for val_data in validate_loader:
            val_images, val_labels = val_data
            val_images = val_images.reshape(val_images.shape[0], 1, 1500)
            outputs = net(val_images.to(device))
            predict_y = torch.max(outputs, dim=1)[1]
            pre = torch.cat([pre.to(device), predict_y.to(device)])
            val = torch.cat([val.to(device), val_labels.to(device)])
            acc += torch.eq(predict_y, val_labels.to(device)).sum().item()
    val_accurate = acc / val_num
    val_accurate_list.append(val_accurate)
    f1 = f1_score(val.cpu(), pre.cpu(), average='macro')
    recall = recall_score(val.cpu(), pre.cpu(), average='macro')

    f1_list.append(f1)
    recall_list.append(recall)
    if f1 > best_f1:
        best_f1 = f1
        best_pre = pre
        best_val = val
        torch.save(net.state_dict(), save_path)
        torch.save(best_pre, 'pre_val_label/best_pre_TrimSENet.pt')
        torch.save(best_val, 'pre_val_label/best_val_TrimSENet.pt')
    print('[epoch %d] train_loss: %.3f train_accuracy: %.3f val_accuracy: %.3f  recall: %.3f  f1: %.3f' %
              (epoch + 1, running_loss / step, train_accurate, val_accurate, recall, f1))
    with open("TrimSENet_result_npy.txt", 'a') as file:
        file.write("[epoch " + str(epoch + 1) + "]" + "  " + "train_accuracy:" + str(train_accurate) + "  " + "val_accuracy:" + str(val_accurate) + "  " + "recall:" + str(recall) + "  " + "f1:" + str(f1) + '\n')
print('Finished Training')
iterations = range(1, len(train_accurate_list) + 1)
with open("TrimSENet_npy_plt_data.txt", 'a') as file:
    file.write("iterations:" + str(iterations) +
               "train_accurate_list:" + str(train_accurate_list) +
               "val_accurate_list:" + str(val_accurate_list) +
               "f1_list:" + str(f1_list) +
               "recall_list:" + str(recall_list) +
               "dev_list:" + str(dev_list) + '\n')
conf_matrix = confusion_matrix(best_val.cpu(),best_pre.cpu())
plot_matrix(conf_matrix,dev_list,"TrimSENet_confusion_matrix_npy.png")

using cuda:0 device.
using 312067 images for training, 78016 images for validation.
train loss:100%[**************************************************->]0.467
[epoch 1] train_loss: 0.320 train_accuracy: 0.909 val_accuracy: 0.951  recall: 0.938  f1: 0.942
train loss:100%[**************************************************->]0.386
[epoch 2] train_loss: 0.109 train_accuracy: 0.964 val_accuracy: 0.959  recall: 0.960  f1: 0.955
train loss:100%[**************************************************->]0.758
[epoch 3] train_loss: 0.070 train_accuracy: 0.976 val_accuracy: 0.978  recall: 0.979  f1: 0.979
train loss:100%[**************************************************->]1.491
[epoch 4] train_loss: 0.051 train_accuracy: 0.982 val_accuracy: 0.981  recall: 0.982  f1: 0.981
train loss:100%[**************************************************->]0.079
[epoch 5] train_loss: 0.040 train_accuracy: 0.986 val_accuracy: 0.986  recall: 0.986  f1: 0.986
train loss:100%[*********************************************