## Collecting images using opencv

In [15]:
import os
import time
import uuid
import cv2

In [3]:
uuid.uuid1()

UUID('b0f3fb66-4386-11ef-996e-04e8b9b51bf6')

In [4]:
Images_Path = os.path.join("data","images")
number_images = 30

In [28]:
cap = cv2.VideoCapture(1)

for i in range(number_images):
    print("collecting image: " + str(i))
    ret, frame = cap.read()
    imagename = os.path.join(Images_Path,f'{str(uuid.uuid1())}.jpg')
    cv2.imwrite(imagename,frame)
    cv2.imshow("frame",frame)
    time.sleep(0.5)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
        
cap.release()
cv2.destroyAllWindows()

collecting image: 0
collecting image: 1
collecting image: 2
collecting image: 3
collecting image: 4
collecting image: 5
collecting image: 6
collecting image: 7
collecting image: 8
collecting image: 9
collecting image: 10
collecting image: 11
collecting image: 12
collecting image: 13
collecting image: 14
collecting image: 15
collecting image: 16
collecting image: 17
collecting image: 18
collecting image: 19
collecting image: 20
collecting image: 21
collecting image: 22
collecting image: 23
collecting image: 24
collecting image: 25
collecting image: 26
collecting image: 27
collecting image: 28
collecting image: 29


## Annotation and Augmentation

In [29]:
!labelme

2024-07-16 20:56:48,582 [INFO   ] __init__:get_config:67- Loading config file from: C:\Users\PMYLS\.labelmerc


In [17]:
import tensorflow as tf
import numpy as np
import json
import matplotlib.pyplot as plt

In [31]:
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True)

In [32]:
images = tf.data.Dataset.list_files('data\\images\\*.jpg',shuffle=False)

In [33]:
images.as_numpy_iterator().next()

b'data\\images\\618df3d3-4387-11ef-a177-04e8b9b51bf6.jpg'

In [34]:
def load_image(x):
    bytes = tf.io.read_file(x)
    img = tf.io.decode_jpeg(bytes)
    return img

In [35]:
images = images.map(load_image)

In [36]:
iterator = images.as_numpy_iterator()

In [40]:
# Moving labels to correct subsets

for folder in ['train','test','valid']:
    print(folder)
    for file in os.listdir(os.path.join("data",folder,"images")):
        filename = file.split(".")[0] + ".json"
        existing_path = os.path.join("data","labels",filename)
        if os.path.exists(existing_path):
            new_path = os.path.join("data",folder,"labels",filename)
            os.replace(existing_path,new_path)

train
test
valid


In [41]:
import albumentations as alb

In [42]:
augmentor = alb.Compose([alb.RandomCrop(width=450, height=450), 
                         alb.HorizontalFlip(p=0.5), 
                         alb.RandomBrightnessContrast(p=0.2),
                         alb.RandomGamma(p=0.2), 
                         alb.RGBShift(p=0.2), 
                         alb.VerticalFlip(p=0.5)], 
                       bbox_params=alb.BboxParams(format='albumentations', 
                                                  label_fields=['class_labels']))

In [43]:
for partition in ['train','test','valid']: 
    for image in os.listdir(os.path.join('data', partition, 'images')):
        img = cv2.imread(os.path.join('data', partition, 'images', image))

        coords = [0,0,0.00001,0.00001]
        label_path = os.path.join('data', partition, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)

            coords[0] = label['shapes'][0]['points'][0][0]
            coords[1] = label['shapes'][0]['points'][0][1]
            coords[2] = label['shapes'][0]['points'][1][0]
            coords[3] = label['shapes'][0]['points'][1][1]
            coords = list(np.divide(coords, [640,480,640,480]))

        try: 
            for x in range(60):
                augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join('aug_data', partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                annotation = {}
                annotation['image'] = image

                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0: 
                        annotation['bbox'] = [0,0,0,0]
                        annotation['class'] = 0 
                    else: 
                        annotation['bbox'] = augmented['bboxes'][0]
                        annotation['class'] = 1
                else: 
                    annotation['bbox'] = [0,0,0,0]
                    annotation['class'] = 0 


                with open(os.path.join('aug_data', partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)

        except Exception as e:
            print(e)

x_max is less than or equal to x_min for bbox [0.3105244252873563, 0.004909003831417652, 0.0016163793103447955, 0.45270593869731807, 'face'].


In [44]:
train_images = tf.data.Dataset.list_files('aug_data\\train\\images\\*.jpg', shuffle=False)
train_images = train_images.map(load_image)
train_images = train_images.map(lambda x: tf.image.resize(x, (120,120)))
train_images = train_images.map(lambda x: x/255)

In [45]:
test_images = tf.data.Dataset.list_files('aug_data\\test\\images\\*.jpg', shuffle=False)
test_images = test_images.map(load_image)
test_images = test_images.map(lambda x: tf.image.resize(x, (120,120)))
test_images = test_images.map(lambda x: x/255)

In [46]:
val_images = tf.data.Dataset.list_files('aug_data\\valid\\images\\*.jpg', shuffle=False)
val_images = val_images.map(load_image)
val_images = val_images.map(lambda x: tf.image.resize(x, (120,120)))
val_images = val_images.map(lambda x: x/255)

In [51]:
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding = "utf-8") as f:
        label = json.load(f)
        
    return [label['class']], label['bbox']

In [52]:
train_labels = tf.data.Dataset.list_files('aug_data\\train\\labels\\*.json', shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [53]:
test_labels = tf.data.Dataset.list_files('aug_data\\test\\labels\\*.json', shuffle=False)
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [55]:
val_labels = tf.data.Dataset.list_files('aug_data\\valid\\labels\\*.json', shuffle=False)
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [68]:
train_img_iter = train_images.as_numpy_iterator()
train_lab_iter = train_labels.as_numpy_iterator()

In [79]:
sample_img = train_img_iter.next()
sample_lab = train_lab_iter.next()

In [82]:
print(len(train_images))
print(len(test_images))
print(len(val_images))

3720
840
780


In [83]:
train = tf.data.Dataset.zip((train_images, train_labels))
train = train.shuffle(5000)
train = train.batch(8)
train = train.prefetch(4)

In [87]:
test = tf.data.Dataset.zip((test_images, test_labels))
test = test.shuffle(1300)
test = test.batch(8)
test = test.prefetch(4)

In [88]:
val = tf.data.Dataset.zip((val_images, val_labels))
val = val.shuffle(1000)
val = val.batch(8)
val = val.prefetch(4)

## Neural Networks

In [1]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16




In [2]:
vgg = VGG16(include_top = False)





In [3]:
vgg.summary()

Model: "vgg16"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, None, None, 3)]   0         
                                                                 
 block1_conv1 (Conv2D)       (None, None, None, 64)    1792      
                                                                 
 block1_conv2 (Conv2D)       (None, None, None, 64)    36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, None, None, 64)    0         
                                                                 
 block2_conv1 (Conv2D)       (None, None, None, 128)   73856     
                                                                 
 block2_conv2 (Conv2D)       (None, None, None, 128)   147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, None, None, 128)   0     

In [4]:
def make_model():
    input_layer = Input(shape=(120,120,3))
    vgg = VGG16(include_top = False)(input_layer)

    f1 = GlobalMaxPooling2D()(vgg)
    class1 = Dense(2048, activation = "relu")(f1)
    class2 = Dense(1, activation = "sigmoid")(class1)

    f2 = GlobalMaxPooling2D()(vgg)
    regress1 = Dense(2048, activation = "relu")(f2)
    regress2 = Dense(4, activation = "sigmoid")(regress1)

    return Model(inputs = input_layer, outputs = [class2,regress2])
    

In [5]:
facetracker = make_model()

In [6]:
facetracker.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 120, 120, 3)]        0         []                            
                                                                                                  
 vgg16 (Functional)          (None, None, None, 512)      1471468   ['input_2[0][0]']             
                                                          8                                       
                                                                                                  
 global_max_pooling2d (Glob  (None, 512)                  0         ['vgg16[0][0]']               
 alMaxPooling2D)                                                                                  
                                                                                              

In [8]:
X,y = train.as_numpy_iterator().next()

NameError: name 'train' is not defined

In [7]:
facetracker.predict(X)

NameError: name 'X' is not defined

In [104]:
batches_per_epoch = len(train)
lr_decay = (1./0.75 -1)/batches_per_epoch
opt = tf.keras.optimizers.legacy.Adam(learning_rate=0.0001, decay=lr_decay)

In [105]:
def localization_loss(y_true, yhat):            
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))
                  
    h_true = y_true[:,3] - y_true[:,1] 
    w_true = y_true[:,2] - y_true[:,0] 

    h_pred = yhat[:,3] - yhat[:,1] 
    w_pred = yhat[:,2] - yhat[:,0] 
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

In [128]:
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = localization_loss

In [9]:
class FaceTracker(Model):
    def __init__(self,facetracker,**kwargs):
        super().__init__(**kwargs)
        self.model = facetracker

    def compile(self,opt,classloss,regressloss,**kwargs):
        super().compile(**kwargs)
        self.closs = classloss
        self.rloss = regressloss
        self.opt = opt

    def train_step(self,batch,**kwargs):

        X,y = batch
        with tf.GradientTape() as tape:

            classes, coords = self.model(X,training = True)
            classloss = self.closs(y[0],classes)
            regressloss = self.rloss(tf.cast(y[1],tf.float32),coords)

            total_loss = regressloss + 0.5 * classloss

            grad = tape.gradient(total_loss,self.model.trainable_variables)
            opt.apply_gradients(zip(grad,self.model.trainable_variables))

            return {"total_loss":total_loss, "class_loss":classloss, "regress_loss":regressloss}

    def test_step(self,batch,**kwargs):

        X,y = batch
        
        classes, coords = self.model(X,training = False)
        classloss = self.closs(y[0],classes)
        regressloss = self.rloss(tf.cast(y[1],tf.float32),coords)

        total_loss = regressloss + 0.5 * classloss
    
        return {"total_loss":total_loss, "class_loss":classloss, "regress_loss":regressloss}      
            
    def call(self,X,**kwargs):
        return self.model(X,**kwargs)
        
        

In [10]:
model = FaceTracker(facetracker)

In [11]:
model.compile(opt,classloss,regressloss)

NameError: name 'opt' is not defined

## Train

In [135]:
logdir = "logs"

In [136]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [137]:
hist = model.fit(train, epochs=10, validation_data=val, callbacks=[tensorboard_callback])

Epoch 1/10
 38/465 [=>............................] - ETA: 5:40 - total_loss: 1.1934 - class_loss: 0.3021 - regress_loss: 1.0423

KeyboardInterrupt: 

In [22]:
facetracker.load_weights("facetracker.h5")

In [24]:
cap = cv2.VideoCapture(1)
while cap.isOpened():
    _ , frame = cap.read()
    frame = frame[50:500, 50:500,:]
    
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = tf.image.resize(rgb, (120,120))
    
    yhat = facetracker.predict(np.expand_dims(resized/255,0))
    sample_coords = yhat[1][0]
    
    if yhat[0] > 0.2: 
        # Controls the main rectangle
        cv2.rectangle(frame, 
                      tuple(np.multiply(sample_coords[:2], [450,450]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [450,450]).astype(int)), 
                            (255,0,0), 2)
        # Controls the label rectangle
        cv2.rectangle(frame, 
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int), 
                                    [0,-30])),
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                    [80,0])), 
                            (255,0,0), -1)
        
        # Controls the text rendered
        cv2.putText(frame, 'face', tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                               [0,-5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    
    cv2.imshow('FaceTrack', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

