In [1]:
# model_name = "yolo"
# model_name = "imagenet"
# model_name = "cnn"
# model_name = "wider"
model_name = "vgg"
# model_name = "vggface"
#model_name = "vggface_resnet"
#model_name = "vggface_senet"
#model_name = "vggface"
use_gray = False
only_one = False
isTrainable = False
batch_size = 32
learning_rate = 5e-5
epochs = 80

import warnings
warnings.filterwarnings("ignore")

image_shape = (224,224, 1 if use_gray else 3)

In [2]:
#%env CUDA_DEVICE_ORDER=PCI_BUS_ID
#%env CUDA_VISIBLE_DEVICES=-1

In [3]:
import tensorflow as tf
import utils

2023-05-26 07:13:47.602353: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.10.1


In [4]:
import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow.image import resize
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import cv2
import albumentations as alb

augmentor = alb.Compose([#alb.RandomCrop(width=150, height=150), 
                        alb.HorizontalFlip(p=0.5), 
                        alb.RandomBrightnessContrast(p=0.2),
                        alb.RandomGamma(p=0.2), 
                        alb.RGBShift(p=0.2), 
                        alb.VerticalFlip(p=0.5)], 
                      bbox_params=alb.BboxParams(format='albumentations', 
                                                 label_fields=['class_labels'],
                                                 min_area=0.1))


def aug_fn(image, bbox, img_size):
  
    
    try:
      aug_data = augmentor(image = image, bboxes = bbox, class_labels=['face'])
    except Exception as e:
      display(f"Bbox makes problems: {bbox} \n{e}")
      aug_data = augmentor(image = image, bboxes = tf.convert_to_tensor([[0.,0.,1e-2,1e-2]]), class_labels=['face'])
    aug_img = aug_data["image"]
    aug_img = tf.cast(aug_img/255.0, tf.float32)
    aug_img = tf.image.resize(aug_img, size=[img_size, img_size])

    aug_bbox = aug_data["bboxes"]
    aug_bbox = tf.cast(aug_bbox, tf.float32)
    
    #display(aug_bbox)

    return aug_img, aug_bbox

def process_data(image, bbox, img_size):
  
    image, bbox = tf.numpy_function(func=aug_fn, inp=[image, bbox, img_size], Tout=(tf.float32, tf.float32))

    return image, bbox #tf.convert_to_tensor([bbox, [is_bbox]]) 

def aug_fn_test(aug_img,  img_size):
    
    aug_img = tf.cast(aug_img/255.0, tf.float32)
    aug_img = tf.image.resize(aug_img, size=[img_size, img_size])

    return aug_img

def process_data_test(image, img_size):

    image = tf.numpy_function(func=aug_fn_test, inp=[image, img_size], Tout=tf.float32)
    
    return image

def set_shapes(img, bbox=None, img_shape=(224,224,3)):
    
    img = tf.reshape(img, img_shape)    
    
    if bbox is not None:
      bbox = tf.reshape(bbox,[-1,4])
      return img, bbox
    else: 
      return img
    

  
def getDataset(name: str, batch_size: int = 32, image_shape: tuple[int, int, int] = (224, 224, 3)) -> tuple[tf.data.Dataset, None]:
  
  ds, ds_info = tfds.load('wider_face', split=name, shuffle_files=True, with_info=True, data_dir="/scratch/fs2/Fimilak/tensorflow_datasets/")

  if name != "test":
    
    #ds = ds.map(lambda x: [x["image"], tf.convert_to_tensor([[0.,0.,1e-2,1e-2]]) if x["faces"]["bbox"].shape[0] is None else x["faces"]["bbox"]],
    
    #display(f"BEFORE LENGTH OF {name} is {len(list(ds))}")
    
    #ds.map(lambda x,y: [x, y] if display(y.shape) is None else [x, y])
        
    ds_1 = ds.filter(lambda x: tf.shape(x["faces"]["bbox"])[0] == 1)
    ds_1 = ds_1.map(lambda x: [x["image"], x["faces"]["bbox"]])
   
    #for y, x in tfds.as_numpy(ds_1.take(2)):
    ##  
    #  display(x.shape)
    #  display(x)
    
    ds_0 = ds.filter(lambda x: tf.shape(x["faces"]["bbox"])[0] == 0)
    ds_0 = ds_0.map(lambda x: [x["image"], tf.convert_to_tensor([[0.,0.,1e-2,1e-2]])])
    
    ds = ds_0.concatenate(ds_1)
    
    ds = ds.map(lambda x, y: process_data(x, y, img_size=image_shape[0]),
                      num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE)

    ds = ds.map(set_shapes,
                      num_parallel_calls=tf.data.AUTOTUNE).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    
  else:
    ds = ds.map(lambda x: process_data_test(x["image"], img_size=image_shape[0]),
                      num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE)
    ds = ds.map(set_shapes,
                      num_parallel_calls=tf.data.AUTOTUNE).batch(batch_size).prefetch(tf.data.AUTOTUNE)
  return ds

In [None]:
ds_train = getDataset('train', batch_size=batch_size, image_shape=image_shape)
ds_eval = getDataset('validation', batch_size=batch_size, image_shape=image_shape)
ds_test = getDataset('test', batch_size=batch_size, image_shape=image_shape)

for _ in range(60):
    ds_train = ds_train.concatenate(getDataset('train', batch_size=batch_size, image_shape=image_shape))
    ds_eval = ds_eval.concatenate(getDataset('validation', batch_size=batch_size, image_shape=image_shape))

In [6]:
#display(len(list(ds_train)))
#display(len(list(ds_eval)))
#display(len(list(ds_test)))

In [7]:
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.applications import VGG16
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError
#from akida_models import yolo_base, yolo_widerface_pretrained
from tensorflow.keras.utils import get_file
from tensorflow.keras.layers import Dense, Flatten, Conv2D, ReLU, BatchNormalization, Input, GlobalMaxPooling2D, Dropout
from tensorflow.keras import Sequential
#import keras_vggface
#from keras_vggface.vggface import VGGFace

In [8]:
def getVGGCustom(model, isTrainable: bool = True):
    
    model.trainable = isTrainable
    #last_layer = model.get_layer('avg_pool').output
    
    inputs = tf.keras.Input(shape=(image_shape))
    
    x = tf.keras.Sequential([
    tf.keras.layers.RandomFlip('horizontal'),
    tf.keras.layers.RandomRotation(0.2),
    ])(inputs)
    
    x = model(x)
    
    x = Flatten(name='flatten')(x)
    
    x = Dropout(.2)(x)
    
    out = Dense(4, name='regressor')(x)
    
    return Model(inputs, out)

In [9]:
if not "vgg" in model_name:

    model : Sequential = Sequential()

    if(model_name == "yolo"):
        yolo_model = yolo_base()
        
        print(yolo_model.summary())
        
        model.add(yolo_model)
        
    elif model_name == "wider":
        
        model, anchors = yolo_widerface_pretrained()
        
        print(model.predict(ds_eval))
        
        #utils.predict(model=model, model_name=model_name, ds)
        
    elif(model_name == "imagenet"):
        
        model_file = get_file(
            "akidanet_imagenet_224_alpha_50.h5",
            "http://data.brainchip.com/models/akidanet/akidanet_imagenet_224_alpha_50.h5",
            cache_subdir='models/akidanet_imagenet')
        model.add(load_model(model_file))

    elif(model_name == "cnn"):
        
        input_shapes = [
        image_shape,
        (224,224,16),
        (112,112,32),
        (56,56,64),
        (28,28,128),
        (14,14,1024),
        (14,14,512),
        (7,7,256),
        (7,7,128),
        (7,7,64),
        (7,7,32),
        (4,)]

        for input_shape in input_shapes:
        
            model.add(Conv2D(2,7, input_shape=input_shape)) 
            model.add(BatchNormalization(input_shape=input_shape)) 
            model.add(ReLU(input_shape=input_shape))

    model.add(Flatten())
    model.add(Dense(4, activation='relu'))

    model.summary()
    
elif model_name == "vgg":
    input_layer = Input(shape=image_shape)
    
    vgg = VGG16(include_top=False)
    
    if not isTrainable:
        for layer in vgg.layers:
            layer.trainable = False
            
    vgg = vgg(input_layer)

    # Classification Model  
    f1 = GlobalMaxPooling2D()(vgg)
    class1 = Dense(2048, activation='relu')(f1)
    class2 = Dense(1, activation='sigmoid')(class1)
    
    # Bounding box model
    f2 = GlobalMaxPooling2D()(vgg)
    regress1 = Dense(2048, activation='relu')(f2)
    regress2 = Dense(4, activation='sigmoid')(regress1)
    
    model = Model(inputs=input_layer, outputs=regress2)
    #model = Model(inputs=input_layer, outputs=[regress2, class2])
    
elif model_name == "vggface":  
    model = getVGGCustom(VGGFace(model='vgg16', include_top=False, input_shape = image_shape), isTrainable=isTrainable)
elif model_name == "vggface_resnet":  
    model = getVGGCustom(VGGFace(model='resnet50', include_top=False, input_shape = image_shape), isTrainable=isTrainable)
elif model_name == "vggface_senet" :  
    model = getVGGCustom(VGGFace(model='senet50', include_top=False, input_shape = image_shape), isTrainable=isTrainable)

In [10]:
def localization_loss(y_true, yhat):  
    display(y_true)
    display(yhat)
    delta_coord = tf.reduce_sum(tf.square(y_true[...,:2] - yhat[...,:2]))
                  
    h_true = y_true[...,3] - y_true[...,1] 
    w_true = y_true[...,2] - y_true[...,0] 

    h_pred = yhat[...,3] - yhat[...,1] 
    w_pred = yhat[...,2] - yhat[...,0] 
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

In [11]:
model.compile(optimizer=Adam(learning_rate=learning_rate),
              loss=localization_loss, #MeanSquaredError(),#
              metrics=['accuracy'])#[tf.keras.metrics.MeanIoU(num_classes=1)])


callbacks = [tf.keras.callbacks.EarlyStopping(monitor='loss', patience=5)]

In [12]:
model.fit(
    x=ds_train,
    epochs=epochs,
    validation_data=ds_eval,
    callbacks=callbacks
)

   1037/Unknown - 560s 532ms/step - loss: 147.0355 - accuracy: 0.5178

In [None]:
tf.config.list_physical_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'),
 PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')]

In [None]:
from pathlib import Path

results = model.evaluate(
  ds_eval
)

print("test loss, test acc:", results)

Path(f"models/{model_name}_model").mkdir(parents=True, exist_ok=True) 
model.save(f"models/{model_name}_model")

NameError: name 'model' is not defined

In [None]:
#try:
model = tf.keras.models.load_model(f"models/{model_name}_model", custom_objects={'localization_loss' : utils.localization_loss})
#except Exception as e:
#display(f"There is no model for {model_name} with the error: \n\n{e}")
  

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [None]:
utils.predict(model=model, ds=ds_test)

NameError: name 'model' is not defined

In [None]:
utils.predict(model=model, ds=ds_eval)

: 