### Libs

In [1]:
import torch 
import os
from torch.utils.data import Dataset,DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets,transforms
from torch.autograd import Variable
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
import cv2 as cv
import random  

### Crteat Dataset

In [2]:
'''---Load Data From Local---'''
list_Data_Mask = []
list_Data_Nomask = []

#使用列表记录图像的路径、特征和target
for i in os.listdir('CMFD'):
    for j in os.listdir(f'CMFD/{i}'):    #遍历CMFD
        list_Data_Mask.append((f'CMFD/{i}/{j}',j[6:10],0))     #加入列表，元素的是图片存储地址
#print(CMFD)

for i in os.listdir('IMFD'):
    for j in os.listdir(f'IMFD/{i}'):    #遍历IMFD
        if (j[6:-4]=='Mask_Mouth_Chin'):
            list_Data_Nomask.append((f'IMFD/{i}/{j}',j[6:-4],1))
        elif  (j[6:-4]=='Mask_Nose_Mouth'):
            list_Data_Nomask.append((f'IMFD/{i}/{j}',j[6:-4],2))
        elif  (j[6:-4]=='Mask_Chin'):
            list_Data_Nomask.append((f'IMFD/{i}/{j}',j[6:-4],3))   #加入列表，元素的是图片存储地址
#print(CMFD)

for i in os.listdir('NMFD'):
    for j in os.listdir(f'NMFD/{i}'):    #遍历NMFD
        list_Data_Nomask.append((f'NMFD/{i}/{j}','NoMask',4))     #将没戴口罩的图片也存入列表中
#print(CMFD)



'''---creat a custom Dataset---'''
class MaskDataSet(Dataset):
        def __init__(self,path):
        #self.path = path
            self.labels = pd.read_csv(path)
    
        def __len__(self):
            return len(self.labels)
    
        def __getitem__(self, index):
            img_nparr = plt.imread(self.labels.iloc[index,1])  #这里返回tensor型1024x1024x3(H,W,C)的图像
            img_chw = np.transpose(img_nparr)         #此处进行转换
            img = torch.FloatTensor(img_chw)
            tar = torch.tensor(self.labels.iloc[index,3])
            return img,tar                       #这里返回tensor型3x1024x1024(C,H,W)的图像数据和对应的target

'''---load the train&test data from dataset---'''
train_transform = transforms.Compose([
    transforms.Resize((224,224))
])

def train_test_dataload():

    list_Train_Mask,list_Test_Mask = train_test_split(list_Data_Mask,train_size=0.01,test_size=0.01)

    list_Train_Nomask,list_Test_Nomask = train_test_split(list_Data_Nomask,train_size=0.01,test_size=0.01)

    list_Train = list_Train_Mask + list_Train_Nomask
    list_Test = list_Test_Mask + list_Test_Nomask

#创建DataFrame
    df_Train = pd.DataFrame(list_Train,columns=['Addr','Feature','Target'])
    df_Test = pd.DataFrame(list_Test,columns=['Addr','Feature','Target'])
#print(df_NOMASK)

#将DataFrame转换为.csv文件，方便在DataSet中读取和操作

    df_Train.to_csv('train.csv')
    df_Test.to_csv('test.csv')

#重写DataSet类，特别是其中的__len__和__getitem__方法
    
    dataset_Train = MaskDataSet('train.csv')
    dataset_Test = MaskDataSet('test.csv')
    return DataLoader(dataset_Train,3,shuffle=True),DataLoader(dataset_Test,3,shuffle=True)   #此处确定一个超参数batch_size ,由于本机GPU内存太小，只能接受3个图像



### Build Model

In [3]:

class MASKmodel(nn.Module):
    def __init__(self):
        super(MASKmodel,self).__init__()

        self.flatten = nn.Flatten()
       
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=3,out_channels = 30,kernel_size = 4,stride = 1,groups=1),  #这里输入的图像数据要是（C,H,W）
            nn.ReLU(),
            nn.MaxPool2d(4,4)
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=30,out_channels = 100,kernel_size = 4,stride = 1,groups=1),  #这里输入的图像数据要是（C,H,W）
            nn.ReLU(),
            nn.MaxPool2d(4,4)
        )

        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=100,out_channels = 300,kernel_size = 4,stride = 1,groups=1),  #这里输入的图像数据要是（C,H,W）
            nn.ReLU(),
            nn.MaxPool2d(4,4)
        )

        self.conv4 = nn.Sequential(
            nn.Conv2d(in_channels=300,out_channels = 500,kernel_size = 4,stride = 1,groups=1),  #这里输入的图像数据要是（C,H,W）
            nn.ReLU(),
            nn.MaxPool2d(4,4)
        )

        self.fc1 = nn.Linear(500*3*3,500)

        self.fc2 = nn.Linear(500,120)

        self.fc3 = nn.Linear(120,5)
    
    def forward(self,x):
        #x = self.flatten(x)
        ###卷积
        x=self.conv1(x)       #size:3x30x254x254
        x=self.conv2(x)       #size:3x100x62x62
        x=self.conv3(x)       #size:3x300x15x15
        x=self.conv4(x)       #size:3x500x3x3
        ###全连接
        x = x.view(x.size()[0], -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        x = F.softmax(x,1)
        return  x             #size:
    



### Train Model

In [4]:

def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        X = X.to(device)
        y = y.to(device)
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X = X.to(device)
            y = y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Result: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")




device = torch.device("cuda:0")
model = MASKmodel()
model.to(device)
cost = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(),lr=0.008,momentum=0.9)#,weight_decay=0.8,nesterov=True)  
#print(model)

dataload_Data_train,dataload_Data_test = train_test_dataload()

epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(dataload_Data_train, model, cost, optimizer)
    test_loop(dataload_Data_test, model, cost)
print("Done!")





MASKmodel(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (conv1): Sequential(
    (0): Conv2d(3, 30, kernel_size=(9, 9), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(30, 100, kernel_size=(5, 5), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  )
  (conv3): Sequential(
    (0): Conv2d(100, 300, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  )
  (dense): Sequential(
    (0): Linear(in_features=110, out_features=10, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.5, inplace=False)
    (3): Linear(in_features=10, out_features=5, bias=True)
  )
  (fc1): Linear(in_features=67500, out_features=300, bias=True)
  (fc2): Linear(in_features=300, out_features=120, bias=True)
  (fc3): Linear(in_features=120, out_features=5, bias=T