In [1]:
import os
import time
import uuid
import cv2

In [2]:
IMAGES_PATH = os.path.join("data", "images")
number_image = 30

In [None]:
cap = cv2.VideoCapture(0)
for imgnum in range(number_image):
    print(f'\rCollecting image {imgnum}', end="")
    ret, frame = cap.read()
    # imgname = os.path.join(IMAGES_PATH, f'{str(uuid.uuid1())}.jpg')
    # cv2.imwrite(imgname, frame)
    cv2.imshow('frame', frame)
    time.sleep(0.5)

    # if cv2.waitKey(1) & 0xFF == ord('q'):
    #     break

cap.release()
# cv2.destroyAllWindows()

In [2]:
import tensorflow as tf
import json
import numpy as np
import matplotlib.pyplot as plt

In [13]:
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [41]:
# images = tf.data.Dataset.list_files("data\\images\\*.jpg", shuffle=False)
images = tf.data.Dataset.list_files("data\\images\\*.jpg")

In [3]:
def load_image(x):
    byte_img = tf.io.read_file(x)
    img = tf.io.decode_jpeg(byte_img)
    return img

In [42]:
images = images.map(load_image)

In [43]:
images_generator = images.batch(4).as_numpy_iterator()

In [49]:
plot_images = images_generator.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20, 20))
for idx, image in enumerate(plot_images):
    ax[idx].imshow(image)
plt.show()

In [None]:
for folder in ["train", "test", "val"]:
    for file in os.listdir(os.path.join("data", folder, "images")):
        filename = file.split(".")[0] + ".json"
        existing_filepath = os.path.join("data", "labels", filename)
        if os.path.exists(existing_filepath):
            new_filepath = os.path.join("data", folder, "labels", filename)
            os.replace(existing_filepath, new_filepath)

### مضاعفة الصور

In [3]:
import albumentations as alb

In [4]:
augmentor = alb.Compose([alb.RandomCrop(width=450, height=450), 
                         alb.HorizontalFlip(p=0.5), 
                         alb.RandomBrightnessContrast(p=0.2),
                         alb.RandomGamma(p=0.2), 
                         alb.RGBShift(p=0.2), 
                         alb.VerticalFlip(p=0.5)], 
                       bbox_params=alb.BboxParams(format='albumentations', 
                                                  label_fields=['class_labels']))

In [None]:
for partition in ['train','test','val']: 
    i = 0
    for image in os.listdir(os.path.join('data', partition, 'images')):
        i += 1
        print("image ", i)
        img = cv2.imread(os.path.join('data', partition, 'images', image))

        coords = [0,0,0.00001,0.00001]
        label_path = os.path.join('data', partition, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)

            coords[0] = label['shapes'][0]['points'][0][0]
            coords[1] = label['shapes'][0]['points'][0][1]
            coords[2] = label['shapes'][0]['points'][1][0]
            coords[3] = label['shapes'][0]['points'][1][1]
            coords = list(np.divide(coords, [640,480,640,480]))

        try: 
            for x in range(60):
                print("\r\tx: ", x, end="")
                augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join('aug_data', partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                annotation = {}
                annotation['image'] = image

                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0: 
                        annotation['bbox'] = [0,0,0,0]
                        annotation['class'] = 0 
                    else: 
                        annotation['bbox'] = augmented['bboxes'][0]
                        annotation['class'] = 1
                else: 
                    annotation['bbox'] = [0,0,0,0]
                    annotation['class'] = 0 


                with open(os.path.join('aug_data', partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)
            print("")

        except Exception as e:
            print(e)

In [4]:
def load_res_image(x):
    img = load_image(x)
    img = tf.image.resize(img, (120, 120))
    img = img / 255
    return img

In [5]:
train_images = tf.data.Dataset.list_files("aug_data/train/images/*.jpg", shuffle=False)
train_images = train_images.map(load_res_image)

In [6]:
test_images = tf.data.Dataset.list_files("aug_data/test/images/*.jpg", shuffle=False)
test_images = test_images.map(load_res_image)

In [7]:
val_images = tf.data.Dataset.list_files("aug_data/val/images/*.jpg", shuffle=False)
val_images = val_images.map(load_res_image)

In [8]:
def load_labels(label_path):
    with open(label_path.numpy(), "r", encoding="utf-8") as f:
        label = json.load(f)
    return [label["class"]], label["bbox"]

In [9]:
train_labels = tf.data.Dataset.list_files("aug_data/train/labels/*.json", shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function( load_labels, [x], [ tf.uint8, tf.float16 ] ))

test_labels = tf.data.Dataset.list_files("aug_data/test/labels/*.json", shuffle=False)
test_labels = test_labels.map(lambda x: tf.py_function( load_labels, [x], [ tf.uint8, tf.float16 ] ))

val_labels = tf.data.Dataset.list_files("aug_data/val/labels/*.json", shuffle=False)
val_labels = val_labels.map(lambda x: tf.py_function( load_labels, [x], [ tf.uint8, tf.float16 ] ))

In [10]:
train = tf.data.Dataset.zip((train_images, train_labels))
train = train.shuffle(5000)
train = train.batch(8)
train = train.prefetch(4)

In [11]:
test = tf.data.Dataset.zip((test_images, test_labels))
test = test.shuffle(1300)
test = test.batch(8)
test = test.prefetch(4)

In [12]:
val = tf.data.Dataset.zip((val_images, val_labels))
val = val.shuffle(1000)
val = val.batch(8)
val = val.prefetch(4)

In [22]:
data_samples = train.as_numpy_iterator()
res = data_samples.next()

In [None]:
sample_image = res[0][0]
sample_coords = res[1][1][0]

cv2.rectangle(sample_image,
    tuple(np.multiply(sample_coords[:2], [120, 120]).astype(int)),
    tuple(np.multiply(sample_coords[2:], [120, 120]).astype(int)),
    (255, 0, 0), 2
)
plt.imshow(sample_image)

In [17]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalAvgPool2D, GlobalMaxPool2D
from tensorflow.keras.applications import VGG16

In [21]:
vgg = VGG16(include_top=False)

In [22]:
def build_model():
    input_layer = Input(shape=(120, 120, 3))
    vgg = VGG16(include_top=False)(input_layer)

    f1 = GlobalMaxPool2D()(vgg)
    class1 = Dense(2048, activation="relu")(f1)
    class2 = Dense(1, activation="sigmoid")(class1)

    f2 = GlobalMaxPool2D()(vgg)
    regress1 = Dense(2048, activation="relu")(f2)
    regress2 = Dense(4, activation="sigmoid")(regress1)

    facetracker = Model(inputs=input_layer, outputs=[ class2, regress2 ])
    return facetracker

In [23]:
facetracker = build_model()

In [None]:
facetracker.summary()

In [25]:
batches_per_epoch = len(train)
lr_decay = (1./.75 - 1) / batches_per_epoch

In [27]:
opt = tf.keras.optimizers.Adam(learning_rate=0.0001, decay=lr_decay)

In [28]:
def localization_loss(y_true, yhat):            
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))
                  
    h_true = y_true[:,3] - y_true[:,1] 
    w_true = y_true[:,2] - y_true[:,0] 

    h_pred = yhat[:,3] - yhat[:,1] 
    w_pred = yhat[:,2] - yhat[:,0] 
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

In [29]:
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = localization_loss

In [30]:
class FaceTracker(Model): 
    def __init__(self, eyetracker,  **kwargs): 
        super().__init__(**kwargs)
        self.model = eyetracker

    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.closs = classloss
        self.lloss = localizationloss
        self.opt = opt

    def train_step(self, batch, **kwargs): 

        X, y = batch

        with tf.GradientTape() as tape: 
            classes, coords = self.model(X, training=True)

            batch_classloss = self.closs(y[0], classes)
            batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)

            total_loss = batch_localizationloss+0.5*batch_classloss

            grad = tape.gradient(total_loss, self.model.trainable_variables)

        opt.apply_gradients(zip(grad, self.model.trainable_variables))

        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}

    def test_step(self, batch, **kwargs): 
        X, y = batch

        classes, coords = self.model(X, training=False)

        batch_classloss = self.closs(y[0], classes)
        batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
        total_loss = batch_localizationloss+0.5*batch_classloss

        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}

    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)

In [31]:
model = FaceTracker(facetracker)

In [32]:
model.compile(opt, classloss, regressloss)

In [33]:
log_dir = "logs"
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir)

In [None]:
hist = model.fit(train, epochs=40, validation_data=val, callbacks=[tensorboard_callback])