In [1]:
# Lets Import all the required modules
import cv2
import os
import numpy as np
import keras
from keras.optimizers import Adam
from keras.models import model_from_json
flag=0
# if no training data is found recreate data
if(("trained_model.h5" not in os.listdir("./json_model/")) or ("trained_model.json" not in os.listdir("./json_model/"))):
    print("No training Data Found!")
    from keras.models import Sequential
    from keras.layers import Dense,Flatten,MaxPooling2D,Conv2D,Dropout
    from keras.utils import to_categorical
    from sklearn.model_selection import train_test_split
    from keras.preprocessing.image import ImageDataGenerator
    flag=1

In [2]:
# Global Variables
model=[]
images=[]
labels=[]
if (flag==1) :
    model=Sequential() # defining model type

In [8]:
# Define all functions here
# Lets import training and testing data
def load():
    global images
    global labels
    for i in os.listdir("./trainingNtesting/"):
        print("Loading Directory '"+i+"' .",end="")
        val=0
        if(i=="with_mask"):
            val=1
        for j in os.listdir("./trainingNtesting/"+i):
            images.append(cv2.resize(cv2.imread("./trainingNtesting/"+i+"/"+j),(50,50)))
            # resize imported images to fix lists sizes for converting to np array
            labels.append(val)
        print()
    images=np.array(images) # convert to np array
    labels=np.array(labels) # same as above
    images=np.array(list(map(preprocess,images))) # change all images to grayscale images
    print("Done")
    return 0
# to convert images to grayscale
def preprocess(img):
    converted=cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
    converted=converted/255
    return converted
# to insert layers into model
def insert_layers():
    global model
    try:
        model.add(Conv2D(60,(3,3),input_shape=(50,50,1),activation="relu"))
        model.add(MaxPooling2D((2,2)))
        model.add(Conv2D(60,(3,3),activation="relu"))
        model.add(MaxPooling2D((2,2)))
        model.add(Conv2D(60,(3,3),activation="relu"))
        model.add(MaxPooling2D((2,2)))
        model.add(Flatten())
        model.add(Dropout(0.3))
        model.add(Dense(2,activation="softmax"))
    except:
        return "Unable to create model layers"
    return 0
# function to return mask is worn or not
def getCalssName(classNo):
    if   classNo == 0: return 'No Mask worn.'
    elif classNo == 1: return 'Mask is worn.'
# function to provide video playback
def video_feed():
    global model
    # import my model from json and h5 files
    print("Loading Required Model")
    count=0
    try:
        count=0
        json_file=open("./json_model/trained_model.json",'r')
        count=1
        json_model=json_file.read()
        count=2
        json_file.close()
        count=3
        loaded_model=model_from_json(json_model)
        count=4
        loaded_model.load_weights("./json_model/trained_model.h5")
        count=5
        model=loaded_model
        print(model.summary())
    except:
        return str("Unable to load model"+str(count))
    print("Done...Staring Video")
    # video prerequisites
    print("starting Video...Please Make sure Webcam is enabled")
    cap=cv2.VideoCapture(0)
    cap.set(3,640)
    cap.set(4,480)
    cap.set(10,180)
    # font
    font = cv2.FONT_HERSHEY_SIMPLEX
    # pretrained face recognition model
    face_cascade=cv2.CascadeClassifier("./pretrained/haarcascade_frontalface_default.xml")
    # Give Video feedback with required predictions
    itr=0
    while True:
        try:
            success,imgOriginal=cap.read()
            faces=face_cascade.detectMultiScale(imgOriginal,1.2,6)
            for (x,y,w,h) in faces:
                try:                   
                    crop_img=np.array(imgOriginal[y-10:y+h+80,x-20:x+w+80])
                    crop_img=preprocess(crop_img)
                    crop_img=cv2.resize(crop_img,(50,50))
                    crop_img=crop_img.reshape(1,50,50,1)
                    predictions=model.predict(crop_img)
                    classIndex=model.predict_classes(crop_img)
                    probabilityValue=np.amax(predictions)
                    cv2.rectangle(imgOriginal,(x-5,y-5),(x+w+5,y+h+5),(255,0,0),2)
                    if probabilityValue>0.5 :
                        cv2.putText(imgOriginal,str(getCalssName(classIndex)),(x,y-10),font,0.60,(0,0,255),2,cv2.LINE_AA)
                except:
                    continue
            cv2.imshow("Corona Mask Identifier ( Press 'q' to exit )",imgOriginal)
            # break when q is pressed
            if cv2.waitKey(1)==113:
                cv2.destroyAllWindows()
                break
        except:
            return "Error Occured"
    return "You Closed the Window"

In [9]:
# Main
def main():
    global flag,model
    flag2=1
    if(flag==1):
        # make some variables global
        global images,labels
        # load all images into memory
        load()
        # categorize images data and divide them
        train_feature,test_feature,train_target,test_target=train_test_split(images,labels,test_size=0.2)
        train_feature=train_feature.reshape(1100,50,50,1)
        test_feature=test_feature.reshape(276,50,50,1)
        train_target=to_categorical(train_target)
        # create augmented images
        aug_data_gen=ImageDataGenerator(width_shift_range=0.3,height_shift_range=0.3,zoom_range=0.8,shear_range=0.3,rotation_range=180)
        aug_data_gen.fit(train_feature)
        # prepare model,compile and fit values
        if (insert_layers() == 0):
            model.compile(Adam(lr=0.0001),loss="categorical_crossentropy",metrics=["accuracy"])
            model.fit(aug_data_gen.flow(train_feature,train_target,batch_size=120),epochs=500)
            # Lets save training data to json file to reduce runtime at next run
            json_model=model.to_json()
            with open('./json_model/trained_model.json', 'w') as json_file:
                json_file.write(json_model)
            model.save_weights('./json_model/trained_model.h5')
            print(model.summary())
            print("Saved model to drive")
        else:
            print("Error in model")
            flag2=0
        # Saved model data to required files
    if(flag2==1):
        print(video_feed())
    return 0
if(__name__=="__main__"):
    main()

Loading Required Model
Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_6 (Conv2D)            (None, 48, 48, 60)        600       
_________________________________________________________________
max_pooling2d_6 (MaxPooling2 (None, 24, 24, 60)        0         
_________________________________________________________________
conv2d_7 (Conv2D)            (None, 22, 22, 60)        32460     
_________________________________________________________________
max_pooling2d_7 (MaxPooling2 (None, 11, 11, 60)        0         
_________________________________________________________________
conv2d_8 (Conv2D)            (None, 9, 9, 60)          32460     
_________________________________________________________________
max_pooling2d_8 (MaxPooling2 (None, 4, 4, 60)          0         
_________________________________________________________________
flatten_2 (Flatten)          (N