In [1]:
import cv2
import os
import pandas as pd
from mtcnn.mtcnn import MTCNN
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.models import load_model
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
import numpy as np

%pylab inline 
from IPython.display import clear_output
import time
import PIL.Image
from io import BytesIO
import IPython.display
from mtcnn.mtcnn import MTCNN
import keras

Using TensorFlow backend.


Populating the interactive namespace from numpy and matplotlib


In [2]:
NOTEBOOK_DIR = !pwd
NOTEBOOK_DIR = NOTEBOOK_DIR[0]
ROOT_DIR = os.path.abspath(os.path.join(NOTEBOOK_DIR, os.pardir))
DATA_DIR = os.path.join(ROOT_DIR, 'data')
IMAGES_DIR = os.path.join(DATA_DIR, 'images')
ANNOTATIONS_DIR = os.path.join(DATA_DIR, 'annotations')
SUB_IMAGES_DIR = os.path.join(IMAGES_DIR, 'sub')
MODEL_DIR = os.path.join(ROOT_DIR, 'models')
MNM_MODEL_DIR = os.path.join(MODEL_DIR, 'keras_mask_or_no_mask_model')
HC_MODEL_DIR = os.path.join(MODEL_DIR, 'mask_recog_ver2.h5')
HC_DATA_DIR = os.path.join(DATA_DIR, 'haar_cascades')

HC_FRONTAL_FACE_DIR = os.path.join(HC_DATA_DIR, 
                                   'haarcascade_frontalface_default.xml')
HC_FRONTAL_FACE_ALT1_DIR = os.path.join(HC_DATA_DIR,
                                     'haarcascade_frontalface_alt.xml')
HC_FRONTAL_FACE_ALT2_DIR = os.path.join(HC_DATA_DIR,
                                       'haarcascade_frontalface_alt2.xml')
HC_PROFILE_FACE_DIR = os.path.join(HC_DATA_DIR,
                                  'haarcascade_profileface.xml')


In [3]:
#Use 'jpeg' instead of 'png' (~5 times faster)
def array_to_image(a, fmt='jpeg'):
    #Create binary stream object
    f = BytesIO()

    #Convert array to binary stream object
    PIL.Image.fromarray(a).save(f, fmt)

    return IPython.display.Image(data=f.getvalue())

def get_frame(cap):
    # Capture frame-by-frame
    ret, frame = cap.read()

    #flip image for natural viewing
    frame = cv2.flip(frame, 1)

    return frame


frontal_face_cascade = cv2.CascadeClassifier(HC_FRONTAL_FACE_DIR)
frontal_face_alt1 = cv2.CascadeClassifier(HC_FRONTAL_FACE_ALT1_DIR)
frontal_face_alt2 = cv2.CascadeClassifier(HC_FRONTAL_FACE_ALT2_DIR)
profile_face_cascade = cv2.CascadeClassifier(HC_PROFILE_FACE_DIR)
cascades = [frontal_face_cascade, frontal_face_alt1, frontal_face_alt2, profile_face_cascade]

def detect_faces(frame, neighbors=5, scale=1.1):
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    frame_gray = cv2.equalizeHist(frame_gray)
    
    frontal = frontal_face_cascade.detectMultiScale(frame_gray, scale, neighbors)
#     frontalt1 = frontal_face_alt1.detectMultiScale(frame_gray, scale, neighbors)
#     frontalt2 = frontal_face_alt2.detectMultiScale(frame_gray, scale, neighbors)
    profile = profile_face_cascade.detectMultiScale(frame_gray, scale, neighbors)
    
    faces = {
        'frontal': frontal,
#         'frontal_alt1': frontalt1,
#         'frontal_alt2': frontalt2,
        'profile': profile
    }
    
    return faces
    
def print_faces(faces):
    print("Frontal (default):")
    for face in faces['frontal']:
        print(face)
    
#     print("\nFrontal (alt1):")
#     for face in faces['frontal_alt1']:
#         print(face)
        
#     print("\nFrontal (alt2):")
#     for face in faces['frontal_alt2']:
#         print(face)
    
    print("\nProfile:")
    for face in faces['profile']:
        print(faces)

In [4]:
train_df = pd.read_csv(os.path.join(DATA_DIR, 'img_data/kaggle_training.csv'), index_col=False)
train_df.rename(
    columns={'x1': 'x', 'x2': 'y', 'y1': 'w', 'y2': 'h'},
    inplace=True
)
train_df.sort_values('name', axis = 0, inplace = True)
features = [
    'face_with_mask', 
    'face_no_mask', 
]
train_df = train_df[train_df['classname'].isin(features)]
train_df['classname'].unique()

array(['face_no_mask', 'face_with_mask'], dtype=object)

In [5]:
from sklearn.preprocessing import LabelEncoder
from keras.utils import to_categorical

detector = MTCNN()
model = keras.models.load_model(MNM_MODEL_DIR)
labeler = LabelEncoder()

y = list(set(train_df['classname']))
y = labeler.fit_transform(y)
y = to_categorical(y)


In [None]:
CYAN = (0, 255, 255)
GREEN = (0, 255, 0)
MAGENTA = (255, 0, 255)

cap = cv2.VideoCapture('../../video_test_me.mov')
rval, frame = cap.read()

session_data = list()
count = 1
while cap.isOpened(): 
    #Capture frame-by-frame
#     ret, frame = cap.read()
#     frame = get_frame(cap)
    
    if frame is not None:
        im = np.asarray(bytearray(frame), dtype="uint8")
        frame = cv2.imdecode(img_arr, cv2.IMREAD_COLOR)
#         frame = cv2.cvtColor(frame, cv2.IMREAD_COLOR)
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
#         image = array_to_image(frame)

        #Use MTCNN to detect faces
        faces = detector.detect_faces(rgb)
        
        # Use Haar Cascade to detect faces
#         faces = detect_faces(frame)
#         faces = frontal_face_cascade.detectMultiScale(gray,
#                                              scaleFactor=1.1,
#                                              minNeighbors=5,
#                                              minSize=(60, 60),
#                                              flags=cv2.CASCADE_SCALE_IMAGE)
#         print_faces(faces)
        
        if faces != []:
            
            data = list()
            results = list()
            frame_df = pd.DataFrame()
            
#             print(faces)
            for face in faces:
#                 data.append([count, face['box']])
                bounding_box = face['box']       # for use with MTCNN
                keypoints = face['keypoints']    # for use with MTCNN
#                 bounding_box = face
                x, y, w, h = bounding_box        # for use with MTCNN
                img = gray[y:y+h,x:x+w]

                try:
                    new_img = cv2.resize(img, (50, 50))
                    new_img = new_img.reshape(-1, 50, 50, 1)
                except Exception as e:
                    print(str(e))
                prediction = model.predict(new_img)
#                 print(prediction)
#                 print("PREDICTION: {}".format(labeler.inverse_transform(prediction)))
                data.append([count, bounding_box, prediction])
    
                cv2.circle(rgb,(keypoints['left_eye']), 2, CYAN, 2)
                cv2.circle(rgb,(keypoints['right_eye']), 2, CYAN, 2)
                cv2.circle(rgb,(keypoints['nose']), 2, CYAN, 2)
                cv2.circle(rgb,(keypoints['mouth_left']), 2, CYAN, 2)
                cv2.circle(rgb,(keypoints['mouth_right']), 2, CYAN, 2)
            
#             for face in faces['profile']:
# #                 data.append([count, face['box']])
# #                 bounding_box = face['box']       # for use with MTCNN
# #                 keypoints = face['keypoints']    # for use with MTCNN
#                 bounding_box = face
#                 x, y, w, h = bounding_box        # for use with MTCNN
#                 img = gray[y:y+h,x:x+w]

#                 try:
#                     new_img = cv2.resize(img, (50, 50))
#                     new_img = new_img.reshape(-1, 50, 50, 1)
#                 except Exception as e:
#                     print(str(e))
#                 prediction = model.predict(new_img)
# #                 print(prediction)
# #                 print("PREDICTION: {}".format(labeler.inverse_transform(prediction)))
#                 data.append([count, bounding_box, prediction])
    
#             for face in faces['frontal']:
# #                 data.append([count, face['box']])
# #                 bounding_box = face['box']       # for use with MTCNN
# #                 keypoints = face['keypoints']    # for use with MTCNN
#                 bounding_box = face
#                 x, y, w, h = bounding_box        # for use with MTCNN
#                 img = gray[y:y+h,x:x+w]

#                 try:
#                     new_img = cv2.resize(img, (50, 50))
#                     new_img = new_img.reshape(-1, 50, 50, 1)
#                 except Exception as e:
#                     print(str(e))
#                 prediction = model.predict(new_img)
# #                 print(prediction)
# #                 print("PREDICTION: {}".format(labeler.inverse_transform(prediction)))
#                 data.append([count, bounding_box, prediction])
                
            print(data)
            results += [i for i in data if len(i) == 1]
            results += [[j for j in i] for i in data if len(i) > 1]

            print(results)
            image = []
            classname = []
            for k, i, j in results:
                cn = np.argmax(j)
                classname.append(cn)# if cn < 2 else 0)
                image.append(i)
            
            
            print("FRAME NO.: {}\nCLASSNAME ARGMAX: {}\nINVERSE TRANSFORM: {}\n".format(
                count, classname, labeler.inverse_transform(classname)
            ))
            frame_df['image'] = image
            frame_df['classname'] = classname
            frame_df['classname'] = labeler.inverse_transform(frame_df['classname'])
            print(frame_df)


            for i in range(len(frame_df)):
                x, y, w, h = frame_df.iloc[i]['image']
                cname = str(frame_df.iloc[i]['classname'])
               
                if cname == 'face_with_mask':
                    label = "Mask"
                else:
                    label = "No Mask"
                color = (0, 255, 0) if label == 'Mask' else (255,0,0)
                cv2.putText(rgb, label, (x, y- 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 1)
                        
                cv2.rectangle(rgb, (x, y), (x + w, y + h),color, 3)

            session_data.append(results)


        #display resulting frame
#         frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        cv2.imshow('frame', rgb)
#         plt.imshow(frame, interpolation = 'bicubic')
#         print(frame.shape)
        

    ret, frame = cap.read()
    clear_output(wait=True)
    count += 1
    if cv2.waitKey(1) &0xFF == ord('q'):
        break
#When everything's done, release capture
cap.release()
cv2.destroyAllWindows()

# exception at frame 16-17

[255 241 248 ... 113 119 135]


In [None]:
cap.release()
cv2.destroyAllWindows()

In [None]:
session_data