![masks](https://images.pexels.com/photos/3951882/pexels-photo-3951882.jpeg?auto=compress&cs=tinysrgb&dpr=1&w=500)

# Dependencies

In [1]:
import pandas as pd
import os
from sklearn.model_selection import train_test_split
import glob

import torch
from torch import nn
import torchvision
from torch.utils.data import Dataset,DataLoader
import cv2
import pytorch_lightning as pl
import torchvision.transforms as transforms

# Preprocessing

In [2]:
incorrect_mask = glob.glob(f"../input/face-mask-detection/Dataset/mask_weared_incorrect/*.png")#returns a list of paths of images in incorrect masks folder
mask = glob.glob(f"../input/face-mask-detection/Dataset/with_mask/*.png")#returns a list of paths of images in masks correct folder
no_mask =  glob.glob(f"../input/face-mask-detection/Dataset/without_mask/*.png")#returns a list of paths of images in no masks folder

In [3]:
data1 = {
    "mask_id": mask,
    "mask":"masks_correct"
}
data2 = {
    "mask_id": incorrect_mask,
    "mask":"incorrectly_worn"
}
data3 = {
    "mask_id": no_mask,
    "mask":"No_mask"
}
masks = pd.DataFrame(data1)# a dataframe with the path and labels
incorrect_masks = pd.DataFrame(data2)
no_masks = pd.DataFrame(data3)

df  = pd.concat([masks,incorrect_masks],ignore_index= True)#combining the two datasets
df = pd.concat([df,no_masks],ignore_index= True)
df

Unnamed: 0,mask_id,mask
0,../input/face-mask-detection/Dataset/with_mask...,masks_correct
1,../input/face-mask-detection/Dataset/with_mask...,masks_correct
2,../input/face-mask-detection/Dataset/with_mask...,masks_correct
3,../input/face-mask-detection/Dataset/with_mask...,masks_correct
4,../input/face-mask-detection/Dataset/with_mask...,masks_correct
...,...,...
8977,../input/face-mask-detection/Dataset/without_m...,No_mask
8978,../input/face-mask-detection/Dataset/without_m...,No_mask
8979,../input/face-mask-detection/Dataset/without_m...,No_mask
8980,../input/face-mask-detection/Dataset/without_m...,No_mask


In [4]:
def label(x):
    if x == "masks_correct":
        return 1
    if x == "No_mask":
        return 0
    if x == "incorrectly_worn":
        return 2

In [5]:
df["mask"] = df["mask"].apply(label)# encoding the label numerically
df

Unnamed: 0,mask_id,mask
0,../input/face-mask-detection/Dataset/with_mask...,1
1,../input/face-mask-detection/Dataset/with_mask...,1
2,../input/face-mask-detection/Dataset/with_mask...,1
3,../input/face-mask-detection/Dataset/with_mask...,1
4,../input/face-mask-detection/Dataset/with_mask...,1
...,...,...
8977,../input/face-mask-detection/Dataset/without_m...,0
8978,../input/face-mask-detection/Dataset/without_m...,0
8979,../input/face-mask-detection/Dataset/without_m...,0
8980,../input/face-mask-detection/Dataset/without_m...,0


Dividing the dataframe into train test and validation sets

In [6]:
X  = df["mask_id"]
y = df["mask"]
x_train , x_test , y_train, y_test = train_test_split(X,y,test_size = 0.25,random_state = 42 ,)
x_val , x_test , y_val, y_test = train_test_split(x_test,y_test,test_size = 0.25,random_state = 42 ,)


# Dataset

In [7]:
class MaskDataset(torch.utils.data.Dataset):
    def __init__(self,image,targets = None):
        self.image = image
        self.targets = targets
        
    def __len__(self):
        return len(self.image)
    
    def __getitem__(self,index):
        path = self.image[index]
        image = cv2.imread(path)
        image = cv2.resize(image,(256,256))
        #image = transformations(image)
        if self.targets is None:
            return torch.tensor(image).float().reshape(3,256,256)
        return torch.tensor(image).float().reshape(3,256,256) , torch.tensor(self.targets[index])

# Sanity Check

In [8]:
mask = MaskDataset(image = x_train.values , targets = y_train.values)
image , label = next(iter(DataLoader(mask,batch_size = 6,shuffle = True)))
image.shape

torch.Size([6, 3, 256, 256])

In [9]:
train_dataset = MaskDataset(
                x_train.values,
                y_train.values
)

test_dataset = MaskDataset(
                    x_test.values,
                    y_test.values
)
prediction_dataset = MaskDataset(
                    x_test.values,
)

validation_dataset = MaskDataset(
                    x_val.values,
                    y_val.values
)



#  Datamodule

In [10]:
class Mask_DataModule(pl.LightningDataModule):
    def __init__(self):
        super().__init__()
        self.train = train_dataset
        self.val = validation_dataset
        self.test = test_dataset
        self.prediction = prediction_dataset
        
    def train_dataloader(self):
        return DataLoader(self.train,batch_size = 64,shuffle = True,num_workers=2)
    def val_dataloader(self):  
        return DataLoader(self.val,batch_size = 32,shuffle = False,num_workers=1)
    def test_dataloader(self):
        return DataLoader(self.test,batch_size = 32,shuffle = False,num_workers=1)
    def predict_dataloader(self):
        return DataLoader(self.prediction,batch_size = 1,shuffle = False,num_workers=1)

# The Model

In [11]:
neural_network = torchvision.models.resnet50(pretrained= True)
neural_network.fc = torch.nn.Linear(2048,3)# changing the number of output features to 2

Downloading: "https://download.pytorch.org/models/resnet50-19c8e357.pth" to /root/.cache/torch/hub/checkpoints/resnet50-19c8e357.pth


  0%|          | 0.00/97.8M [00:00<?, ?B/s]

# Model

In [12]:
loss_func = torch.nn.CrossEntropyLoss()

In [13]:
class Mask_Model(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.neural_net = neural_network
        
    def forward(self,x):
        return self.neural_net(x)
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters() , lr = 1e-4)
        #choosing a optimizer
        sch = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 20, gamma=0.5, last_epoch=-1, verbose=False)
        return {
        "optimizer": optimizer,
        "lr_scheduler": {
            "scheduler": sch,
            "monitor": "val_loss",
        },
    }
    
    def training_step(self,batch,batch_idx):
        x,y = batch
        y_pred = self(x)
        #y = y.unsqueeze(-1)
        loss = loss_func(y_pred,y)
        return loss
    
    def validation_step(self,batch,batch_idx):
        x,y = batch
        y_pred = self(x)
        #y = y.unsqueeze(-1)
        loss = loss_func(y_pred,y)
        self.log("val_loss" ,loss )
        return loss
    def test_step(self,batch,batch_idx):
        x,y = batch
        y_pred = self(x)
        #y = y.unsqueeze(-1)
        loss = loss_func(y_pred,y)
        self.log("test_loss : " , loss)
        return loss

In [14]:
from pytorch_lightning.callbacks import ModelCheckpoint
checkpoint_callback = ModelCheckpoint(
                            monitor = "val_loss",#monitors val loss
                            mode = "min",#Picks the fold with the lowest val_loss
)

# Trainer

In [15]:
%%time 
from pytorch_lightning import Trainer
model = Mask_Model()
module = Mask_DataModule()
trainer = Trainer(max_epochs=2,gpus= 1,callbacks = [checkpoint_callback])
trainer.fit(model,module)

Validation sanity check: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

CPU times: user 1min 24s, sys: 43.3 s, total: 2min 8s
Wall time: 2min 36s


# Testing

In [16]:
trainer.test()# testing the loss on the test set

Testing: 0it [00:00, ?it/s]

--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_loss : ': 0.08899669349193573}
--------------------------------------------------------------------------------


[{'test_loss : ': 0.08899669349193573}]

Checking the Acuuracy

In [17]:
predictons = trainer.predict()

Predicting: 106it [00:00, ?it/s]

In [18]:
probs = nn.Softmax() # since the outputs are logits we use the softmax function to obtain probabilities

def get_score(y_pred,y):
    probabilities = []
    accuracy = 0
    for x in y_pred:
        prob = probs(x)
        top_p, top_class = prob.topk(1, dim = -1)
        probabilities.append(float(top_class))
    i = 0
    for x in y:
        if int(x) == int(probabilities[i]):
            accuracy = accuracy + 1
        i = i +1
    score = accuracy/len(probabilities)
    return score # returns accuracy

In [19]:
get_score(predictons,y_test.values)

  import sys


0.9661921708185054

In [20]:
trainer.save_checkpoint("Mask_Detection.pth")

In [21]:
model = model.load_from_checkpoint("./Mask_Detection.pth")
model

Mask_Model(
  (neural_net): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequential(
    

In [22]:
def label_decode(x):
    if x == 1 :
        return "masks_correct"
    if x ==  0 :
        return "No_mask"
    if x == 2 :
        return "incorrectly_worn"

In [23]:
def get_prediction(img_path):
    img = cv2.imread(img_path)
    img = cv2.resize(img,(256,256))
    img = torch.tensor(img).float().reshape(1,3,256,256)
    logits = model(img)
    prob = probs(logits)
    top_p, top_class = prob.topk(1, dim = -1)
    
    return label_decode(int(top_class))# returns accuracy

In [24]:
get_prediction("../input/face-mask-detection/Dataset/with_mask/1002.png")

  


'masks_correct'

In [25]:
import shutil
shutil.rmtree("./lightning_logs")

In [26]:
torch.save(model, "model.pt")