In [None]:
import os
import cv2
import json
import torch
import numpy as np
from numpy import uint8
from torch import nn
import torchvision
from scipy.io import loadmat
from torchvision import transforms
from torch.nn import functional as F
from torch import optim
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import label_binarize
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve
from sklearn.metrics import auc
from sklearn.metrics import recall_score, accuracy_score
from sklearn.metrics import precision_score, f1_score
from cluster import RLOMTFAGCluster

from sklearn.model_selection import train_test_split

In [None]:
batch_size = 32
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
epochs = 30
lr = 1e-2

In [None]:
data = loadmat('./data/UMIST.mat')
X = data['fea'].astype(uint8)
y = data['gnd'].reshape(-1).astype(uint8) - 1
X_ = X.T.reshape(-1, 32, 32)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X_, y, test_size=0.25, random_state=42)

In [None]:
class SE_Block(nn.Module):
    def __init__(self, c, r=16):
        super().__init__()
        self.squeeze = nn.AdaptiveAvgPool2d(1)
        self.excitation = nn.Sequential(
            nn.Linear(c, c // r, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(c // r, c, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        bs, c, _, _ = x.shape
        y = self.squeeze(x).view(bs, c)
        y = self.excitation(y).view(bs, c, 1, 1)
        return x * y.expand_as(x)


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 20)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


class SENet(nn.Module):
    def __init__(self):
        super(SENet, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(1, 6, 5), nn.ReLU(),  # 32, 32, 32
            nn.MaxPool2d(kernel_size=2, stride=2),  # 32, 16, 16
            nn.Conv2d(6, 16, kernel_size=5), nn.ReLU(),  # 64, 12, 12
            SE_Block(c=16, r=4),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 64, 6, 6
            nn.Flatten(),
            nn.Linear(16 * 5 * 5, 120), nn.ReLU(),
            nn.Linear(120, 84), nn.ReLU(),
            nn.Linear(84, 20)
        )

    def forward(self, x):
        x = self.net(x)
        return x


class UMISTDataset(Dataset):
    def __init__(self, X, y, transform=None):
        self.X = X
        self.y = y
        self.transform = transform

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = self.X[idx]
        y = self.y[idx]
        if self.transform:
            x = self.transform(x)
        return x, y


train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = UMISTDataset(X_train, y_train, transform=train_transform)
test_dataset = UMISTDataset(X_test, y_test, transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Convert the RGB images to an nd-array,
# the values are the gray scale,
# each column is an image
SEED = 42
# def convert_to_ndarray(loader):
#     images = []
#     torch.manual_seed(SEED)
#     for batch_idx, (inputs, targets) in enumerate(loader):
#         for i in range(len(targets)):
#             img = inputs[i]
#             img = img * 0.5 + 0.5
#             img = img.numpy()
#             img = np.transpose(img, (1, 2, 0))
#             img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
#             images.append(img.flatten())
#     images = np.array(images).T
#     return images

# # target_images = convert_to_ndarray(targetloader)
# train_images = convert_to_ndarray(train_loader)

target_images = X[:, 0:256]
param = [1e-9, 1e-3, 1e-2, 7, 27]
# data = loadmat('/output/data/UMIST.mat')
# X = data['fea'].astype(np.float64)
# y = data['gnd'].astype(int)
payload = {'X': target_images,
           'k': 20,
           'param': param,
           'limiter': 100,
           'epsilon': 1e-10,
           'distance_metric': 'sqeuclidean'}

rlomtfag_cls = RLOMTFAGCluster()
rlomtfag_cls.fit(payload)


class EnhancedNet(nn.Module):
    def __init__(self):
        super(EnhancedNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 20)
        self.se = SE_Block(c=16, r=4)

    def forward(self, x):
        X = []
        for i in range(len(x)):
            img = x[i]
            img = img.detach().cpu().numpy()
            X.append(img.flatten())
        X = np.array(X).T
        cls_fea = rlomtfag_cls.project(X).T
        cls_fea = torch.from_numpy(cls_fea).float()
        cls_fea = cls_fea.to(device)
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(self.se(F.relu(self.conv2(x))))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        cls_fea = torch.sigmoid(cls_fea)
        x = torch.add(x, cls_fea)
        return x

In [None]:
net = EnhancedNet().to(device)

optimizer = optim.SGD(net.parameters(), lr=0.05, momentum=0.9)

In [None]:
def train_model(model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_num, (data, label) in enumerate(train_loader):
        data, label = data.to(device), label.to(device).long()
        # 将梯度初始化为0，pytorch中的梯度会自动累加，因此每次都要初始化
        optimizer.zero_grad()
        result = model(data)
        # 计算损失
        loss = F.cross_entropy(result, label)
        # Backward
        loss.backward()
        # Update 参数
        optimizer.step()
        if (batch_num + 1) % 3 == 0:
            print(
                f'Epoch: {epoch}\tBatch: {batch_num + 1}\tLoss: {loss.item():.6f}')


def test_model(model, device, test_loader):
    # 验证模型
    model.eval()
    # loss
    test_loss = 0.0
    pred_all = []
    label_all = []
    labels = list(range(20))
    n_classes = 4
    # 测试过程不需要计算梯度和反向传播
    with torch.no_grad():
        for data, label in test_loader:
            label_all.extend(label)
            data, label = data.to(device), label.to(device).long()
            result = model(data)
            # 计算test损失
            test_loss += F.cross_entropy(result, label).item()
            # 找到最大值下标
            pred = result.argmax(dim=1)
            pred_all.extend(pred.cpu().data.numpy())

    test_loss /= len(test_loader.dataset)

    confmat = confusion_matrix(torch.tensor(label_all), torch.tensor(pred_all))
    print(f'{confmat}')

    pred_all = label_binarize(pred_all, classes=labels)
    label_all = label_binarize(label_all, classes=labels)
    precision = precision_score(label_all, pred_all, average='macro')
    recall = recall_score(label_all, pred_all, average='macro')
    f1 = f1_score(label_all, pred_all, average='macro')
    acc = accuracy_score(label_all, pred_all)

    print("\nPrecision Score: ", precision)
    print("Recall Score: ", recall)
    print("F1 Score: ", f1)
    print("Accuracy Score: ", acc)
    print(f'\nTest Average Loss: {test_loss:.6f}')
    print(
        f"\nAUC: {roc_auc_score(label_all, pred_all, average='macro', multi_class='ovo', labels=labels):.4f}")

In [None]:
for epoch in range(1, 30 + 1):
    train_model(net, device, train_loader, optimizer, epoch)
    test_model(net, device, test_loader)