In [None]:
import os
import numpy as np
import cv2
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from torch.autograd import Variable
import time

In [None]:
#别忘改标号与输出大小
def readfile2(path1, path2):
    image_dir1 = sorted(os.listdir(path1))
    image_dir2 = sorted(os.listdir(path2))
    l = len(image_dir1)
    x = np.zeros((len(image_dir1), 128, 128, 2), dtype=np.uint8)
    y = np.zeros((len(image_dir1)), dtype=np.uint8) 
    for i, file in enumerate(image_dir1):
        img = cv2.imread(os.path.join(path1, file))
        x[i, :, :, 0] = cv2.resize(img[:,:,0],(128,128))
        img = cv2.imread(os.path.join(path2, file))
        x[i, :, :, 1] = cv2.resize(img[:,:,0],(128,128))
        y[i] = int(file.split("_")[0]) - 1
    return x, y

In [None]:
train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),           # 將圖片轉成 Tensor，並把數值 normalize 到 [0,1] (data normalization)
])
# testing 時不需做 data augmentation
test_transform = transforms.Compose([
    transforms.ToPILImage(),                                    
    transforms.ToTensor(),
])
class ImgDataset(Dataset):
    def __init__(self, x, y=None, transform=None):
        self.x = x
        # label is required to be a LongTensor
        self.y = y
        if y is not None:
            self.y = torch.LongTensor(y)
        self.transform = transform
    def __len__(self):
        return len(self.x)
    def __getitem__(self, index):
        X = self.x[index]
        if self.transform is not None:
            X1 = X[:,:,0]
            X2 = X[:,:,1]
            m = self.transform(X1).unsqueeze(3)
            n = self.transform(X2).unsqueeze(3)
            X = torch.cat([m, n], 3)
        if self.y is not None:
            Y = self.y[index]
            return X, Y
        
        else:
            return X

In [None]:
class ChannelAttention(nn.Module):
    def __init__(self, in_planes, ratio=4):
        super(ChannelAttention, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)

        self.fc = nn.Sequential(nn.Conv2d(in_planes, in_planes // ratio, 1, bias=False),
                                nn.ReLU(),
                                nn.Conv2d(in_planes // ratio, in_planes, 1, bias=False))
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = self.fc(self.avg_pool(x))
        max_out = self.fc(self.max_pool(x))
        out = avg_out + max_out
        return self.sigmoid(out)


class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super(SpatialAttention, self).__init__()

        self.conv1 = nn.Conv2d(2, 1, kernel_size, padding=kernel_size // 2, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        x = torch.cat([avg_out, max_out], dim=1)
        x = self.conv1(x)
        return self.sigmoid(x)

In [None]:
class Classifier(nn.Module):
    def __init__(self, hid_size = 8):
        super(Classifier, self).__init__()
        # torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        # torch.nn.MaxPool2d(kernel_size, stride, padding)
        # input 維度 [3, 128, 128]
        self.cnn_a = nn.Sequential(
            nn.Conv2d(1, 16, 3, 1, 1),  # [16, 128, 128]
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
            
            nn.Conv2d(16, 32, 3, 1, 1), # [32, 64, 64]
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
            
            nn.Conv2d(32, 64, 3, 1, 1), # [64, 32, 32]
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0)
        )
        self.ca_a = ChannelAttention(in_planes = 64, ratio = 4)
        self.sa_a = SpatialAttention(kernel_size = 7)
        self.cnn_b = nn.Sequential(
            nn.Conv2d(1, 16, 3, 1, 1),  # [16, 128, 128]
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
            
            nn.Conv2d(16, 32, 3, 1, 1), # [32, 64, 64]
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
            
            nn.Conv2d(32, 64, 3, 1, 1), # [64, 32, 32]
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0)       # [64, 16, 16]
        )
        self.ca_b = ChannelAttention(in_planes = 64, ratio = 4)
        self.sa_b = SpatialAttention(kernel_size = 7)
        self.fc_a = nn.Sequential(
            nn.Linear(64*16*16, 32),
            nn.ReLU(),
            nn.Linear(32, hid_size)
        )
        self.fc_b = nn.Sequential(
            nn.Linear(64*16*16, 32),
            nn.ReLU(),
            nn.Linear(32, hid_size)
        )
        self.fc2 = nn.Sequential(
            nn.Linear((hid_size + 1) * (hid_size + 1), 32),
            nn.ReLU(),
            nn.Linear(32, 9)
        )

    def forward(self, x):
        batch_size = x.size(0)
        out1 = self.cnn_a(x[:,:,:,:,0])
        ca1 = self.ca_a(out1) * out1
        sa1 = self.sa_a(ca1) * ca1
        out1 = out1 + sa1
        out1 = out1.view(out1.size()[0], -1)
        out1 = self.fc_a(out1)
        hid1 = torch.cat((Variable(torch.ones(batch_size, 1).type(torch.cuda.FloatTensor), requires_grad=False), out1), dim=1)
        out2 = self.cnn_b(x[:,:,:,:,1])
        ca2 = self.ca_a(out2) * out2
        sa2 = self.sa_a(ca2) * ca2
        out2 = out2 + sa2
        out2 = out2.view(out2.size()[0], -1)
        out2 = self.fc_b(out2)
        hid2 = torch.cat((Variable(torch.ones(batch_size, 1).type(torch.cuda.FloatTensor), requires_grad=False), out2), dim=1)
        fusion_tensor = torch.bmm(hid1.unsqueeze(2), hid2.unsqueeze(1))
        return self.fc2(fusion_tensor.view(batch_size, -1))

In [None]:
def TrainModel(path1, path2, path):
    # print(path1)
    # print(path2)
    print(path)
    train_x, train_y = readfile2(os.path.join(path1, "training"),os.path.join(path2, "training"))
    val_x, val_y = readfile2(os.path.join(path1, "validation"),os.path.join(path2, "validation"))
    print("Reading data")
    print("Size of training data = {}".format(len(train_x)))
    print("Size of validation data = {}".format(len(val_x)))
    batch_size = 32
    train_set = ImgDataset(train_x, train_y, train_transform)
    val_set = ImgDataset(val_x, val_y, test_transform)
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)
    model = Classifier().cuda()
    loss = nn.CrossEntropyLoss() # 因為是 classification task，所以 loss 使用 CrossEntropyLoss
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # optimizer 使用 Adam
    num_epoch = 150
    monitor_acc = []
    threshold = 5
    patience = 10
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max = num_epoch)
    for epoch in range(num_epoch):
        epoch_start_time = time.time()
        train_acc = 0.0
        train_loss = 0.0
        val_acc = 0.0
        val_loss = 0.0
        
        model.train() # 确保 model 是在 train model (开放 Dropout 等...)
        for i, data in enumerate(train_loader):
            optimizer.zero_grad() # 用 optimizer 将 model 参数的 gradient 归零
            train_pred = model(data[0].cuda()) # 利用 model 得到预测的概率分布 这边实际上就是去呼叫 model 的 forward 函数
            batch_loss = loss(train_pred, data[1].cuda()) # 计算 loss （注意 prediction 跟 label 必须同时在 CPU 或是 GPU 上）
            batch_loss.backward() # 利用 back propagation 算出每个参数的 gradient
            optimizer.step() # 以 optimizer 用 gradient 更新参数值
            train_acc += np.sum(np.argmax(train_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
            train_loss += batch_loss.item()
        scheduler.step()
        monitor_acc.append(train_acc/train_set.__len__())
        if epoch > patience and sum(monitor_acc[-5:]) >= threshold:
            break
        model.eval()
        with torch.no_grad():
            for i, data in enumerate(val_loader):
                val_pred = model(data[0].cuda())
                batch_loss = loss(val_pred, data[1].cuda())

                val_acc += np.sum(np.argmax(val_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
                val_loss += batch_loss.item()
            #將結果 print 出來
            if (epoch + 1) % 2 == 0:
                print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f | Val Acc: %3.6f loss: %3.6f' % \
                    (epoch + 1, num_epoch, time.time()-epoch_start_time, \
                     train_acc/train_set.__len__(), train_loss/train_set.__len__(), val_acc/val_set.__len__(), val_loss/val_set.__len__()))
    test_set = ImgDataset(val_x, val_y, test_transform)
    test_loader = DataLoader(val_set, batch_size=64, shuffle=False)
    model.eval()
    prediction = []
    alabels = []
    error = 0
    with torch.no_grad():
        test_start_time = time.time()
        for i, data in enumerate(test_loader):
            test_pred = model(data[0].cuda())
            test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1)
            for m in data[1]:
                alabels.append(m)
            for y in test_label:
                prediction.append(y)
        print("Time: %3.6f fps:%3.6f" % (time.time()-test_start_time, len(prediction)/(time.time()-test_start_time)))
    with open(os.path.join(path, "Multi-dim_3_layer.csv"), 'w') as f:
        f.write('Id,Category\n')
        for i, y in  zip(alabels, prediction):
            f.write('{},{}\n'.format(i, y))

In [None]:
path = r'your path'
entries = os.listdir(path)
print(entries)
entries = [x for x in entries if x.startswith("AT")]
entries1 = [y for y in entries if y.split('_')[1].startswith('I')]
entries2 = [x for x in entries if x.split('_')[1].startswith('LP')]
Path1 = os.path.join(path, entries1[0])
Path2 = os.path.join(path, entries2[0])
TrainModel(Path1, Path2, path)