In [1]:
import torch as t
from torch.utils.data import Dataset, DataLoader

import torchvision as tv
from torchvision import  utils, datasets
from torchvision.transforms import *

import imutils
from imutils.video import VideoStream

import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import copy
import time
import os

import cv2
from torch.nn import *
from pathlib import Path
from dataset import MaskDataset
from tqdm import tqdm

In [2]:
EPOCHS = 10
INIT_LR = 1e-5
BATCH_SIZE = 128
IMG_SIZE = 100
device = t.device("cuda:0" if t.cuda.is_available() else "cpu")
t.manual_seed(42)

<torch._C.Generator at 0x7fb3a3bf7d80>

In [3]:
def train_model(model,dataloaders, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for data in tqdm(dataloaders[phase], desc=f"{phase} progress: \t"):
                inputs = data['image'].to(device)
                labels = data['mask'].to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with t.set_grad_enabled(phase == 'train'):
                    outputs = model.forward(inputs)
                    conf, preds = t.max(outputs, 1)
                    
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += t.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, best_acc

In [4]:
# https://discuss.pytorch.org/t/how-to-add-noise-to-mnist-dataset-when-using-pytorch/59745
class View(Module):
    def __init__(self):
        super().__init__()
        self.std = 0
        self.mean = 0
        return

    def forward(self, tensor):
        return tensor.view(-1, 2048)

    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)


In [5]:
# https://discuss.pytorch.org/t/how-to-add-noise-to-mnist-dataset-when-using-pytorch/59745
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        return tensor + t.randn(tensor.size()) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

In [6]:
from MaskDetector import MaskDetector
val_trns = Compose([
    ToPILImage(),
    Resize((IMG_SIZE, IMG_SIZE)),
    ToTensor()
])

train_trns = Compose([
    ToPILImage(),
    RandomAffine(45, translate=(0.2, 0.2)),
    RandomHorizontalFlip(0.5),
    RandomResizedCrop(int(0.85*IMG_SIZE)),
    Resize((IMG_SIZE, IMG_SIZE)),

    ToTensor(),
    AddGaussianNoise(0., .2)
])

eval_trns = Compose([
    Resize((600, 600)),

    # RandomAffine(45, translate=(0.2, 0.2)),
    # RandomHorizontalFlip(0.5),
    # RandomResizedCrop(int(0.85*IMG_SIZE)),

    ToTensor(),
    # AddGaussianNoise(0., .2)
])


model = MaskDetector(
    Path("./data/mask_df.csv"),
    batch_size=BATCH_SIZE,
    lr=1e-3,
    train_trns=train_trns,
    val_trns=val_trns,
    img_size=(3,IMG_SIZE,IMG_SIZE)
)

model.prepare_data()

train_dl = model.train_dataloader()
val_dl = model.val_dataloader()


dls = {
    "train": train_dl,
    "val": val_dl
}

# print(model)


  stream(template_mgs % msg_args)


In [7]:
print(device)
# opt = model.configure_optimizers()

# if t.cuda.is_available():
#     model.cuda()

# model, best_acc = train_model(model, dls, model.crossEntropyLoss, opt, t.optim.lr_scheduler.StepLR(opt, 2),EPOCHS)
# mc = open(f"./checkpoints/SZ{IMG_SIZE}_EP{EPOCHS}_LR{INIT_LR}_BS{BATCH_SIZE}_BA{best_acc*100:.2f}%.ckpt", "wb")
# t.save(model.state_dict(), mc)
# mc = open("./face_mask.ckpt", "wb")
# t.save(model.state_dict(), mc)
# else:
model = MaskDetector(unlazy=True, batch_size=1,img_size=100)
checkpoint = t.load("./checkpoints/face_mask.ckpt", map_location=device)
model.load_state_dict(checkpoint['state_dict'],strict=False )
model.to(device)
#     model.load_state_dict(t.load("face_mask.ckpt", map_location=device),
#                         strict=False)

cuda:0


MaskDetector(
  (trainAcc): Accuracy()
  (valAcc): Accuracy()
  (convLayer1): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (convLayer2): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (convLayer3): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(3, 3), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (linearLayers): Sequential(
    (0): Linear(in_features=2048, out_features=1024, bias=True)
    (1): ReLU()
    (2): Linear(in_features=1024, out_features=2, bias=True)
  )
)

In [8]:
# from facenet_pytorch import MTCNN
# mtcnn = MTCNN(image_size=IMG_SIZE, keep_all=True)

from FaceDetector import FaceDetector
mtcnn = FaceDetector(prototype='./checkpoints/deploy.prototxt.txt',
        model='./checkpoints/res10_300x300_ssd_iter_140000.caffemodel')

In [9]:
labels = ['Mask', 'No mask']
labelColor = [(10, 255, 0), (10, 0, 255)]
font = cv2.FONT_HERSHEY_SIMPLEX


def detect_frame(frame, face_detection_model, face_classifier_model, device):
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    faces = face_detection_model.detect(frame)

    for face in faces:
        x_start, y_start, x_end, y_end = face

        x_start, y_start = max(x_start, 0), max(y_start, 0)

        faceImg = frame[y_start:y_end, x_start:x_end]

        output = face_classifier_model(
            val_trns(faceImg).unsqueeze(0).to(device))
        print(output)
        output = Softmax()(output)
        conf, predicted = t.max(output.data, 1)
        print(predicted)
        verdict = f"{labels[predicted]}: {conf.item()*100:.2f}%"

        cv2.rectangle(frame,
                      (x_start, y_start),
                      (x_end, y_end),
                      labelColor[predicted],
                      thickness=2)

        # draw prediction label
        cv2.putText(frame,
                    verdict,
                    (x_start, y_start-20),
                    font, 0.5, labelColor[predicted], 1) 

    frame = imutils.resize(frame, width=frame_shape[0]*2)
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    return frame


In [10]:
vs = VideoStream(src=0).start()
cv2.destroyAllWindows()

try:
  while True:


    frame = vs.read()
    frame_shape = frame.shape
    frame = imutils.resize(frame, width=300)
    frame = detect_frame(frame,mtcnn,model,device)

    cv2.imshow("frame", frame)

    key = cv2.waitKey(1) & 0xFF
    if key == ord("q"):
      break
      
    # time.sleep(1)
finally:
  print("Exiting")
  cv2.destroyAllWindows()
  vs.stop()

tensor([[-0.4656,  0.0906]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.6042,  0.2118]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.6042,  0.2118]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.5679,  0.1678]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.5679,  0.1678]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.5679,  0.1678]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.6214,  0.2217]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.6214,  0.2217]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.6214,  0.2217]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')




tensor([[-0.5675,  0.1683]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.5675,  0.1683]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.5675,  0.1683]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.3741,  0.0075]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.3741,  0.0075]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.3741,  0.0075]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.5688,  0.1635]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.5688,  0.1635]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.5688,  0.1635]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
tensor([[-0.4738,  0.0676]], device='cuda:0', grad_fn=<AddmmBackward>)
tensor([1], device='cuda:0')
