<a href="https://www.kaggle.com/code/mloooo/notebook5a39d105b9?scriptVersionId=143799784" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import cv2
import skimage
import numpy
import typing
import warnings 
import os
from tqdm import tqdm
import matplotlib.pyplot as plt
import pandas
from PIL import Image
import seaborn
warnings.filterwarnings('ignore')

In [None]:
dir_kaggle ='../input/face-mask-detection'
data_kaggle ='../input/face-mask-detection/dataset'
with_mask ='..../input/face-mask-detection/dataset/with_mask'
without_mask='../input/face-mask-detection/dataset/without_mask'

class_data= ['with_mask','without_mask']
len_class_data = len(class_data)

In [None]:
image_count = {}
train_data = []

for i , class_data in tqdm(enumerate(class_data)):
    class_folder = os.path.join(data_kaggle,class_data)
    label = class_data
    image_count[class_data] = []
    
    for path in os.listdir(os.path.join(class_folder)):
        image_count[class_data].append(class_data)
        train_data.append(['{}/{}'.format(class_data, path), i, class_data])

In [None]:
df = pandas.DataFrame(
    train_data, 
    columns=['path', 'class', 'name']
)
df['full_path'] = df['path'].apply(
    lambda item: os.path.join(data_kaggle, item)
)
df['image'] = df['full_path'].apply(func=lambda item: Image.open(item))


def validate_channels(img): 
    if numpy.array(img).shape[2] == 4:
        img = Image.fromarray(
        cv2.cvtColor(numpy.array(img), cv2.COLOR_RGBA2RGB))
    return img

df['image'] = df['image'].apply(func=validate_channels)

In [None]:
seaborn.countplot(x=df['class'])

# Smoothing and Filtering Noisy images

In [None]:
def bilateral_filtering(image, diameter: int, sigma_space: int, sigma_color: int):
    
    if not isinstance(image, numpy.ndarray):
        image = numpy.array(image)
        
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    new_img = cv2.bilateralFilter(
        src=gray_image, 
        d=diameter,
        sigmaColor=sigma_color, 
        sigmaSpace=sigma_space,
    )
    rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    return rgb_image

def sharpen_image(blur_image, orig_image, sharpen_factor):
    new_img = orig_image - blur_image 
    return orig_image + (sharpen_factor * new_img)

# Resizing Image

In [None]:
HEIGHT = 512
WIDTH = 512

# Adjusting image contrast

In [None]:
import numpy 

def adjust_object_contrast(img, alpha: float, beta: float):
    conv_img = cv2.convertScaleAbs(
    img, alpha=alpha, beta=beta)
    return conv_img

def gamma_correction(img, gamma):
    float_img = img.astype(numpy.float32) / 255.0
    corr_img = numpy.power(float_img, 1 / gamma)
    new_img = (corr_img * 255).astype(numpy.uint8)
    return new_img

def adjust_local_clahe(img, clip_limit: int=3.0, tile_grid_size: tuple=(8, 8)):
    
    lab_img = cv2.cvtColor(img, cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab_img)
    
    clahe = cv2.createCLAHE(clip_limit, tile_grid_size)
    clahed_l = clahe.apply(l)
    
    merged_img = cv2.merge((clahed_l, a, b))
    converted_rgb = cv2.cvtColor(img, cv2.COLOR_LAB2BGR)
    return converted_rgb


# Resizing images

In [None]:
def nearest_neighbor_interpolation(img: numpy.ndarray, height, width):
    new_img = cv2.resize(img, (height, width), cv2.INTER_NEAREST)
    return new_img

def bilinear_interpolation(img: numpy.ndarray, height, width):
    new_img = cv2.resize(img, (height, width), cv2.INTER_LINEAR)
    return new_img

def bicubic_interpolation(img: numpy.ndarray, height, width):
    new_img = cv2.resize(img, (height, width), cv2.INTER_CUBIC)
    return new_img

In [None]:
%%time

blurred_images = []
for image in df['image'].tolist():
    
    contrasted_img = adjust_local_clahe(numpy.array(image))
    corrected_img = gamma_correction(numpy.array(image), gamma=1.8)
    blurred_images.append(corrected_img)

# Data Augmentation

In [None]:
from torchvision import transforms 
from torchvision.transforms import v2 
from PIL import Image

train_transformations = [
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.Resize(size=(HEIGHT, WIDTH), 
    interpolation=Image.NEAREST),
    v2.RandomAdjustSharpness(sharpness_factor=1.5),
]

validation_transformations = [
    transforms.ToTensor(),
    transforms.Resize(size=(HEIGHT, WIDTH), 
    interpolation=Image.NEAREST),
]

# Dataset

In [None]:
from torch.utils import data
import torch

class ImageDataset(data.Dataset):
    
    def __init__(self, labels, images, transform=None):
        self.labels = labels 
        self.images = images 
        self.transform = transforms.Compose(transform) if transform is not None else None

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, key):
        if key >= len(self.labels): return 
        label = self.labels[key]
        if self.transform is not None:
            image = self.transform(self.images[key])
        else: image = self.images[key]
        return label, image

In [None]:
dataset = ImageDataset(
    labels=df['class'].tolist(),
    images=blurred_images,
    transform=None
)

In [None]:
len(dataset)

# Splitting data

In [None]:
from torch.utils import data
training_size = int(0.7 * len(dataset))
validation_size = len(dataset) - training_size
train_set, valid_set = data.random_split(dataset, [training_size, validation_size])

In [None]:
%%time 

training_set = ImageDataset(
    labels=numpy.array(dataset.labels)[train_set.indices],
    images=numpy.array(dataset.images)[train_set.indices],
    transform=train_transformations
)

validation_set = ImageDataset(
    labels=numpy.array(dataset.labels)[valid_set.indices],
    images=numpy.array(dataset.images)[valid_set.indices],
    transform=validation_transformations
)

In [None]:
len(training_set)

In [None]:
len(validation_set)

In [None]:
fg, ax = plt.subplots(1, 2)
seaborn.countplot(ax=ax[0], x=training_set.labels)
seaborn.countplot(ax=ax[1], x=validation_set.labels)

In [None]:
fg, ax = plt.subplots(3, 2)
ax[0, 0].imshow(training_set.images[0])
ax[0, 1].imshow(training_set.images[1])
ax[1, 0].imshow(training_set.images[2])
ax[1, 1].imshow(training_set.images[3])
ax[2, 0].imshow(training_set.images[4])
ax[2, 1].imshow(training_set.images[5])

# Evaluating Image Processing

In [None]:
# for Transformed images
from skimage.metrics import structural_similarity as ssim_scr
from skimage import measure

def ssim_score(old_img, new_img, channel_axis):
    return ssim_scr(old_img, new_img, channel_axis=channel_axis)

def normalized_cross_correlation(old_img, new_img):
    """
    Function computes standard Normalized Cross Correlation
    for given old and modified versions of the same image 
    
    Args:
        old_img (numpy.ndarray) - numpy.array object of old image
        new_img (numpy.ndarray) - numpy.array object of the modified image
    """
    img1 = (old_img - old_img.mean()) / old_img.std()
    img2 = (new_img - new_img.mean()) / new_img.std()
    return numpy.sum(img1 * img2) / (img1.shape[0] - 1)

# For Noise-Recovered images

def niqe_score(distorted_image):
    return measure.niqe(distorted_image)

def brisque_score(distorted_image):
    return measure.brisque(distorted_image)

# Initializing data loaders

In [None]:
%%time

training_loader = data.DataLoader(
    training_set, 
    batch_size=32, 
    shuffle=True,
)

validation_loader = data.DataLoader(
    validation_set,
    batch_size=32,
    shuffle=True
)

In [None]:
torch.cuda.empty_cache()
max_allocated = torch.cuda.max_memory_allocated()
(max_allocated ** 2) / 1024

# Initializing Classification Neural Network

In [None]:
from torchvision import models 
from torch import optim 
from torch import nn
from tqdm import tqdm
from torch import backends

class MaskRecNet(object):
    
    """
    Implementation of the ResNet50 Neural Network 
    for classifying human as 'with' or 'without' face mask
    """
    def __init__(self, 
        weights, 
        num_classes: int,
        learning_rate: float, 
        weight_decay: float, 
        loss_function,
        max_epochs: int,
    ):
        self.network = models.resnet50(weights=weights)
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        self.network.fc = nn.Linear(
            in_features=self.network.fc.in_features,
            out_features=num_classes
        )
        
        self.optimizer = optim.Adam(
            self.network.parameters(), 
            lr=learning_rate,
            weight_decay=weight_decay
        )
        
        self.loss_function = loss_function 
        self.max_epochs = max_epochs
    
    def train(self, dataset: data.DataLoader):
        """
        Function trains neural network on a given
        training set of images
        
        Args:
            - dataset (ImageDataset) - training set of images
        """
        self.network.train()
        
        model = nn.DataParallel(self.network)
        total_loss = []
        
        for epoch in range(self.max_epochs):
            epoch_losses = []
            
            for labels, images in tqdm(dataset):
                cuda_imgs = images.to(self.device)
                predictions = model.forward(cuda_imgs).cpu()
                loss = self.loss_function(predictions, labels)
                
                epoch_losses.append(loss.item())
                
                loss.backward()
                self.optimizer.step()
            
            total_loss.append(sum(epoch_losses) / len(epoch_losses))
            print('epoch - %s;' % (str(epoch + 1)))
        return sum(total_loss) / len(total_loss)
    
    def evaluate(self, dataset: data.DataLoader):
        """
        Function evaluates model on a given validation set
        Args:
            dataset - ImageDataset - non-augmented dataset with images
        """
        self.network.eval()
        model = nn.DataParallel(self.network)
        
        if len(dataset) == 0: return []
        
        predictions = []
        with torch.no_grad():
            
            losses = []
            for labels, images in tqdm(dataset):
                cuda_imgs = images.to(self.device)
                predictions = model.forward(cuda_imgs).cpu()
                loss = self.loss_function(predictions, labels)
                losses.append(loss.item())
                
        return sum(losses) / len(losses)
    
    
    def predict(self, images: typing.List[Image.Image]):
        """
        Function used for predicting
        binary class of having 'face mask' put on or off
        
        Args:
            images - list of PIL image objects
        Returns:
            list of predicted classes
        """
        if not len(images): return 
        predictions = []
        for image in dataset:
            prediction = self.network.forward(image)
            predictions.append(prediction)
        return predictions
    
def backward_trace_hook(module, grad_input, grad_output):
    print('module - %s' % module)
    print('grad input - ', grad_input)
    print('grad output - ', grad_output)


In [None]:
model = MaskRecNet(
    weights=models.ResNet50_Weights.DEFAULT,
    num_classes=2,
    learning_rate=3e-6,
    loss_function=nn.CrossEntropyLoss(),
    max_epochs=60,
    weight_decay=0.01,
)

# Training Neural Network

In [None]:
avg_loss = model.train(training_loader)

In [None]:
avg_loss

# Evaluating model on validation set

In [None]:
eval_loss = model.evaluate(validation_loader)

In [None]:
eval_loss