In [None]:
import os
from time import time
import tqdm
import numpy as np
import cv2
import torch
from torch import nn, optim
from torch.utils.data import SubsetRandomSampler, DataLoader, Dataset
from torchvision import  datasets, transforms, models, get_image_backend

from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from matplotlib import pyplot as plt


# Dataset

**Link to download the dataset** [Here](https://www.kaggle.com/omkargurav/face-mask-dataset)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!unzip /content/drive/MyDrive/archive.zip

In [None]:
!ls /content/data/

# Data

## Transformation


In [None]:
train_transforms = transforms.Compose([
                                        transforms.ToPILImage(),
                                        transforms.Scale((64,64)),
                                        transforms.ToTensor()
                                      ])
test_transforms = transforms.Compose([
                                        transforms.ToPILImage(),
                                        transforms.Scale((64,64)),
                                     ])

In [None]:
def one_hot_encoder(data, words=True):  

    values = np.array(data)  
    d = {}
    if words:
        label_encoder = LabelEncoder()
        integer_encoded = label_encoder.fit_transform(values) 
    else:
        integer_encoded = values
    
    onehot_encoder = OneHotEncoder(sparse=False)
    integer_encoded = integer_encoded.reshape(len(integer_encoded), 1)    
    return integer_encoded

In [None]:
def imshow(img):
    
    plt.imshow(np.transpose(img, (1, 2, 0)))  # convert from Tensor image
def displaying_data(dataiter, dic):
    # obtain one batch of training images
    images, labels = dataiter.next()
    images = images.numpy() # convert images to numpy for display

    # plot the images in the batch, along with the corresponding labels
    fig = plt.figure(figsize=(25, 4))
    # display 20 images
    for idx in np.arange(20):
        ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
        try:
            imshow(images[idx][0])
        except:
            imshow(images[idx])

        ax.set_title(dic[int(labels[idx])])

## DataParser

In [None]:
class Data:
    def __init__(self, path):
        with_mask_dir = "with_mask"
        without_mask_dir = "without_mask"
        self.images_path, self.labels = self.load_data(path, with_mask_dir, without_mask_dir)

    def load_data(self, path, with_mask_dir, without_mask_dir):
        with_mask_path = os.path.join(path, with_mask_dir)
        without_mask_path = os.path.join(path, without_mask_dir)

        with_mask_images = os.listdir(with_mask_path)
        without_mask_images = os.listdir(without_mask_path)

        images_path = [os.path.join(with_mask_path, image) for image in with_mask_images]
        images_path += [os.path.join(without_mask_path, image) for image in without_mask_images]
        
        labels = ["withMask" for _ in range(len(with_mask_images))]
        labels += ["withOutMask" for _ in range(len(without_mask_images))]
        
        return images_path, one_hot_encoder(labels)

## DataLoader

In [None]:
def default_loader(image_path):
    return cv2.imread(image_path)

In [None]:
class ImageDataSets(Dataset):
    def __init__(self,data_path, transform=None, image_loader=default_loader):
        self.data = Data(data_path)
        self.loader = image_loader
        self.transform = transform

    def __len__(self):
        return len(self.data.images_path)

    def __getitem__(self,indx):
        image = self.loader(self.data.images_path[indx])
        label = self.data.labels[indx]
        if self.transform:
            image = self.transform(image)
        return image,label

In [None]:

valid_size = 0.2
test_size = 0

train_dataset = ImageDataSets("/content/data", train_transforms)

test_dataset = ImageDataSets("/content/data", test_transforms)

train_size = len(train_dataset)

indices = list(range(train_size))
np.random.shuffle(indices)

valid_split_size = int(valid_size * train_size)
test_split_size = int(test_size * train_size)

train_indices, test_indices, valid_indices = indices[test_split_size + valid_split_size:], indices[:test_split_size], indices[test_split_size:test_split_size + valid_split_size]

train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(valid_indices)
test_sampler = SubsetRandomSampler(test_indices)

batch_size = 32

In [None]:
train_loader = DataLoader(train_dataset,batch_size=batch_size,sampler=train_sampler)
valid_loader = DataLoader(train_dataset,batch_size=batch_size,sampler=valid_sampler)
test_loader = DataLoader(test_dataset,batch_size=batch_size,sampler=valid_sampler)

In [None]:
dataiter = iter(train_loader)
displaying_data(dataiter,{0:'Mask', 1:"No Mask"})


# Model

In [None]:
class SeparableConv2d(nn.Module):
    def __init__(self,in_channels,out_channels,kernel_size=1,stride=1,padding=0,dilation=1,bias=False):
        super(SeparableConv2d,self).__init__()

        self.conv1 = nn.Conv2d(in_channels,in_channels,kernel_size,stride,padding,dilation,groups=in_channels,bias=bias)
        self.pointwise = nn.Conv2d(in_channels,out_channels,1,1,0,1,1,bias=bias)
    
    def forward(self,x):
        x = self.conv1(x)
        x = self.pointwise(x)
        return x

In [None]:
class Block(nn.Module):
    def __init__(self,in_filters,out_filters,reps,strides=2,start_with_relu=True,grow_first=True):
        super(Block, self).__init__()

        if out_filters != in_filters or strides!=1:
            self.skip = nn.Conv2d(in_filters,out_filters,1,stride=strides, bias=False)
            self.skipbn = nn.BatchNorm2d(out_filters)
        else:
            self.skip=None
        
        self.relu = nn.ReLU(inplace=True)
        rep=[]

        filters=in_filters
        if grow_first:
            rep.append(self.relu)
            rep.append(SeparableConv2d(in_filters,out_filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(out_filters))
            filters = out_filters

        for i in range(reps-1):
            rep.append(self.relu)
            rep.append(SeparableConv2d(filters,filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(filters))
        
        if not grow_first:
            rep.append(self.relu)
            rep.append(SeparableConv2d(in_filters,out_filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(out_filters))

        if not start_with_relu:
            rep = rep[1:]
        else:
            rep[0] = nn.ReLU(inplace=False)
            
        if strides != 1:
            rep.append(nn.MaxPool2d(3,strides,1))
        self.rep = nn.Sequential(*rep)
        

    def forward(self,inp):
        x = self.rep(inp)

        if self.skip is not None:
            skip = self.skip(inp)
            skip = self.skipbn(skip)
        else:
            skip = inp
                            
        x+=skip
        
        return x

In [None]:
class FeatureExtractor(nn.Module):
    
    def __init__(self):
        super(FeatureExtractor, self).__init__()
        
        self.con2d_1 = nn.Conv2d(in_channels=3,out_channels=8,
                                  kernel_size=(3, 3), stride=(1, 1), padding=0, bias=False)
        self.bn_1 = nn.BatchNorm2d(8)
        
        self.con2d_2 = nn.Conv2d(in_channels=8,out_channels=8,
                                  kernel_size=(3, 3), stride=(1, 1), padding=0, bias=False)
        self.bn_2 = nn.BatchNorm2d(8)
        self.block_1 = Block(in_filters=8, out_filters=16,reps=2)
        self.block_2 = Block(in_filters=16, out_filters=32,reps=2)
        self.block_3 = Block(in_filters=32, out_filters=64,reps=2)
        self.block_4 = Block(in_filters=64, out_filters=128,reps=2)

        self.relu = nn.ReLU()
        
    def forward(self, x):
        
        x = self.con2d_1(x)
        x = self.bn_1(x)
        x = self.relu(x)
        
        x = self.con2d_2(x)
        x = self.bn_2(x)
        x = self.relu(x)
        
        x = self.block_1(x)
        x = self.block_2(x)
        x = self.block_3(x)
        x = self.block_4(x)
        
        return x

In [None]:
class Classifier(nn.Module):
    
    def __init__(self, num_classes):
        super(Classifier, self).__init__()
        
        self.conv2d_f = nn.Conv2d(in_channels=128, out_channels=num_classes,
                                  kernel_size=(3, 3), stride=(1, 1), padding=1)
        self.glob_avg_bool = nn.AvgPool2d(kernel_size=(3, 3))
        
        self.softmax = nn.Softmax()

        
    def forward(self, x):
        x = self.conv2d_f(x)
        x = self.glob_avg_bool(x)
#         x = self.softmax(x)
        
        return x

In [None]:
class Model(nn.Module):
    
    def __init__(self,num_classes):
        super(Model, self).__init__()
        
        self.feature_extractor = FeatureExtractor()
        self.classifier = Classifier(num_classes)
    
    def forward(self,x):
        x = self.feature_extractor(x)
        x = self.classifier(x)
        
        return x

In [None]:
class Decoder(nn.Module):
    
    def __init__(self):
        super(Decoder, self).__init__()

        self.up1 = nn.ConvTranspose2d(128, 64, 2, stride=2)
        self.up2 = nn.ConvTranspose2d(64, 32, 2, stride=2)
        self.up3 = nn.ConvTranspose2d(32,16, 2, stride=2)
        self.up4 = nn.ConvTranspose2d(16, 3, 2, stride=2)

    def forward(self,x):
        x = self.up1(x)  
        x = self.up2(x)      
        x = self.up3(x)      
        x = self.up4(x)      
        # x = self.up5(x)      
        return x

In [None]:
class AutoEncoder(nn.Module):
    
    def __init__(self):
        super(AutoEncoder, self).__init__()
        self.encoder =  FeatureExtractor()
        self.decoder = Decoder()

    def forward(self,x):
        x = self.encoder(x)  
        x = self.decoder(x)           
        return x

# Train AutoEncoder

In [None]:
def train(train_loader, model, optimizer, criterion):
    model.train()
    t = time()
    train_loss = 0
    for data in tqdm.notebook.tqdm(train_loader):
        images = data[0]
        labels = data[1]
        images = images.cuda()
        optimizer.zero_grad()
        output = model(images)
        loss = criterion(output, images)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    return train_loss / len(train_loader)

In [None]:
def validation(valid_loader, model, criterion):
    model.eval()
    validation_loss = 0
    with torch.no_grad():
        for data in tqdm.notebook.tqdm(valid_loader):
            images = data[0]
            labels = data[1]
            images = images.cuda()
            output = model(images)
            loss = criterion(output, images)
            validation_loss += loss.item()
    return validation_loss / len(valid_loader)

In [None]:
criterion = nn.MSELoss()
auto_encoder = AutoEncoder()

auto_encoder = auto_encoder.cuda()
optimizer = optim.Adam(auto_encoder.parameters(), lr=0.001)
min_loss = np.inf
stop_counter = 0
train_losses = []
validatoin_losses = []
for i in tqdm.notebook.tqdm(range(1000)):
    t_loss = train(train_loader, auto_encoder, optimizer, criterion)
    v_loss = validation(valid_loader, auto_encoder, criterion)
    
    train_losses.append(t_loss)
    validatoin_losses.append(v_loss)

    min_loss = min(min_loss, v_loss)
    print("loss: ",v_loss, end=" ")
    if min_loss != v_loss:
        print("BAD ", stop_counter)
        stop_counter += 1

    else:
        print("Better")
        stop_counter = 0
        torch.save(auto_encoder.state_dict(), "/content/model.pth")

    if stop_counter == 10:
        break
    break


In [None]:
def display_graph(train_losses, valid_losses):
    plt.plot(train_losses, label='Training loss')
    plt.plot(valid_losses, label='Validation loss')
    plt.legend(frameon=False)


In [None]:
display_graph(train_losses, validatoin_losses)

# Classifier

In [None]:
auto_encoder = AutoEncoder()
auto_encoder.cuda()
auto_encoder.load_state_dict(torch.load("/content/model.pth"))
torch.save(auto_encoder.encoder.state_dict(), "/content/encoder.pth")

In [None]:
model = Model(2)
model.cuda()
model.feature_extractor.load_state_dict(torch.load( "/content/encoder.pth"))
for param in model.feature_extractor.parameters():
    param.requires_grad = False

In [None]:
def train_classifier(train_loader, model, optimizer, criterion):
    model.train()
    t = time()
    train_loss = 0
    correct = 0

    for data in tqdm.notebook.tqdm(train_loader):
        images = data[0]
        labels = data[1]
        images = images.cuda()
        labels = labels.cuda()
        optimizer.zero_grad()
        output = model(images)
        output = output.reshape(-1, 2)
        labels = labels.reshape(-1)
        loss = criterion(output, labels)

        correct += (torch.argmax(output, axis=1) == labels).float().sum()
        # print(correct/32)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    print(int((correct/(len(train_loader)*32)*100)))
    return train_loss / len(train_loader)

In [None]:
def validation_classifer(valid_loader, model, criterion):
    model.eval()
    validation_loss = 0
    correct = 0
    with torch.no_grad():
        for data in tqdm.notebook.tqdm(valid_loader):
            images = data[0]
            labels = data[1]
            labels = labels.cuda()
            images = images.cuda()
            output = model(images)
            output = output.reshape(-1, 2)
            labels = labels.reshape(-1)
            correct += (torch.argmax(output, axis=1) == labels).float().sum()
            # print(correct/32)
            loss = criterion(output, labels)
            validation_loss += loss.item()
    print(int((correct/(len(valid_loader)*32)*100)))

    return validation_loss / len(valid_loader)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
min_loss = np.inf
stop_counter = 0
train_losses = []
validatoin_losses = []
for i in tqdm.notebook.tqdm(range(1000)):
    t_loss = train_classifier(train_loader, model, optimizer, criterion)
    v_loss = validation_classifer(valid_loader, model, criterion)
    
    train_losses.append(t_loss)
    validatoin_losses.append(v_loss)

    min_loss = min(min_loss, v_loss)
    print("loss: ",v_loss, end=" ")
    if min_loss != v_loss:
        print("BAD ", stop_counter, i)
        stop_counter += 1

    else:
        print("Better")
        stop_counter = 0
        torch.save(model.state_dict(), "/content/classifer_model.pth")

    if stop_counter == 10:
        break
    break

In [None]:
display_graph(train_losses, validatoin_losses)