In [13]:
import numpy as np
import pandas as pd

In [14]:
import cv2
import os

In [15]:
import matplotlib.pyplot as plt
%matplotlib inline

In [16]:
import torch
from torch import nn, optim
import torchvision
from torchvision import transforms,datasets
from torch.utils.data import DataLoader, Dataset, random_split
import torch.nn.functional as F
import pytorch_lightning as pl
from argparse import ArgumentParser
from pytorch_lightning.callbacks import ModelCheckpoint

In [17]:
root = 'D:/Projects/Face recognition/face_mask_data/data/'

In [19]:
class LightningMaskDetector(pl.LightningModule):
    
    def __init__(self):
        super(LightningMaskDetector,self).__init__()
        
        #self.resnet = torchvision.models.resnet18(pretrained=True) 
        #self.resnet.fc = nn.Linear(512,128)
        self.dropout = nn.Dropout(p=0.3)
        self.network = torchvision.models.mobilenet_v2(pretrained=True)
        child_counter = 0
        for child in self.network.children():
            for param in child.parameters():
                param.requires_grad = False
        self.network.classifier = torch.nn.Sequential(nn.Linear(1280,256),
                                              nn.Tanh(),
                                              nn.Dropout(p=0.25),
                                              nn.Linear(256,2),
                                              nn.LogSoftmax())
        
    def forward(self,x):

        x = self.network(x)

        return x
    
    '''
    @staticmethod
    def add_model_specific_args(parent_parser):
        parser = ArgumentParser(parents=[parent_parser], add_help=False)
        parser.add_argument('--batch_size', type=int, default=64)
        parser.add_argument('--learning_rate', type=float, default=0.002)
        return parser'''
    
    def prepare_data(self):
        
        transform = transforms.Compose([transforms.ToTensor()])
        train_transform = transforms.Compose([transforms.RandomGrayscale(),
                                transforms.RandomHorizontalFlip(),
                                transforms.ColorJitter(brightness=(0.5,1.2)),
                                transforms.ToTensor()
                               ])
        trainset = datasets.ImageFolder(root+'train',transform=train_transform)
        self.testset = datasets.ImageFolder(root+'test',transform=transform)
        self.trainset, self.validset = random_split(trainset,[1700,207])
        
    def train_dataloader(self):
        
        trainloader = DataLoader(self.trainset,batch_size=8)
        return trainloader
    
    def val_dataloader(self):
        
        validloader = DataLoader(self.validset,batch_size=8)
        return validloader
        
    def test_dataloader(self):
        
        testloader = DataLoader(self.testset,batch_size=8)
        return testloader
    
    def configure_optimizers(self):
        
        optimizer = optim.Adam(self.parameters(),lr = 0.0001)
        return optimizer
    
    def nllloss(self,logits,labels):
        
        return F.nll_loss(logits, labels)
    
    def training_step(self,batch,batch_idx):
        
        x, y = batch
        logits = self.forward(x)
        loss = self.nllloss(logits,y)
        
        logs = {'train_loss':loss}
        output = {'loss':loss, 'log':logs}
        return output
        #return loss
        
    def validation_step(self,batch,batch_idx):
        x, y = batch
        logits = self.forward(x)
        loss = self.nllloss(logits,y)
        
        output = {'val_loss':loss}
        return output
    
    def validation_epoch_end(self, outputs):
        
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        tensorboard_logs = {'val_loss': avg_loss}
        output = {'avg_val_loss': avg_loss, 'log': tensorboard_logs}
        return output
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self.forward(x)
        loss = self.nllloss(logits, y)
        return {'test_loss': loss}
        #return loss

    def test_epoch_end(self, outputs):
        avg_loss = torch.stack([x['test_loss'] for x in outputs]).mean()
        tensorboard_logs = {'test_loss': avg_loss}
        return {'test_loss': avg_loss, 'log': tensorboard_logs}
        #return avg_loss


In [20]:
class print_loss(pl.callbacks.Callback):
    
    def on_init_start(self,trainer):
        print('Starting to init trainer')
        
    def on_init_end(self,trainer):
        print('trainer is init now')
        
    def on_epoch_start(self, trainer, pl_module):
        print('Started training')
        
    def on_validation_end(self, trainer, pl_module):
        out = trainer.progress_bar_dict['loss']
        #print(out)
        #loss = trainer.output['avg_val_loss']
        #print(trainer.output['avg_val_loss'])
        print(f"Validation is completed. Validation loss is {out}")

        
    def on_train_end(self, trainer, pl_module):
        #print('Train loss:',self.outputs['loss'])
        loss = trainer.progress_bar_dict['loss']
        print(trainer.callback_metrics)
        print(f"Training is done. Train loss: {loss}")
        

In [22]:
checkpoint_callback = ModelCheckpoint(filepath='D:/Projects/Face recognition/model_{epoch:02d}-{val_loss:.2f}',
                                      save_weights_only=False,
                                      save_top_k=2)

In [23]:
callbacks = [print_loss()]

### Training and validating

In [24]:
model = LightningMaskDetector()

trainer = pl.Trainer(gpus=1,max_epochs=10,callbacks=callbacks,checkpoint_callback=checkpoint_callback)
trainer.fit(model)

GPU available: True, used: True
No environment variable for node rank defined. Set as 0.
CUDA_VISIBLE_DEVICES: [0]


Starting to init trainer
trainer is init now



    | Name                         | Type             | Params
--------------------------------------------------------------
0   | dropout                      | Dropout          | 0     
1   | network                      | MobileNetV2      | 2 M   
2   | network.features             | Sequential       | 2 M   
3   | network.features.0           | ConvBNReLU       | 928   
4   | network.features.0.0         | Conv2d           | 864   
5   | network.features.0.1         | BatchNorm2d      | 64    
6   | network.features.0.2         | ReLU6            | 0     
7   | network.features.1           | InvertedResidual | 896   
8   | network.features.1.conv      | Sequential       | 896   
9   | network.features.1.conv.0    | ConvBNReLU       | 352   
10  | network.features.1.conv.0.0  | Conv2d           | 288   
11  | network.features.1.conv.0.1  | BatchNorm2d      | 64    
12  | network.features.1.conv.0.2  | ReLU6            | 0     
13  | network.features.1.conv.1    | Conv2d           

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validation sanity check', layout=Layout…

  input = module(input)


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Training', layout=Layout(flex='2'), max…

Started training


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

Validation is completed. Validation loss is 0.091
Started training


Detected KeyboardInterrupt, attempting graceful shutdown...


{'loss': tensor(0.6253, device='cuda:0'), 'train_loss': tensor(0.6253, device='cuda:0'), 'avg_val_loss': tensor(0.0396, device='cuda:0'), 'val_loss': tensor(0.0396, device='cuda:0'), 'epoch': 0}
Training is done. Train loss: 0.145



1

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/

### Testing the lightning model

In [None]:
trainer.test()

### Inferencing on a new image

In [None]:
img = cv2.imread('D:/Projects/Face recognition/050720_RW_Mass_General_159-1100x0-c-default.jpg')

In [None]:
plt.imshow(img)

In [None]:
trans = transforms.Compose([transforms.ToPILImage(),
                            transforms.Resize((200,200)),
                           transforms.ToTensor()])

imgx = trans(img)
imgx = imgx.unsqueeze(0)

In [None]:
imgx.shape

In [None]:
model_path = 'D:/Projects/Face recognition/Lightning_models/model_epoch=02-val_loss=0.09.ckpt'

In [None]:
model = LightningMaskDetector.load_from_checkpoint(model_path)

out = model(imgx)

In [None]:
prob = torch.exp(out)

In [None]:
_, class_labels = torch.topk(prob, k=1, dim=1)

In [None]:
class_labels