In [None]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import cv2 as cv
import os
import json

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)
tf.config.list_physical_devices('GPU')

In [None]:
def byte_to_img(x):
    byteimg = tf.io.read_file(x)
    return tf.io.decode_jpeg(byteimg)

In [None]:
train_images = tf.data.Dataset.list_files("augm_data\\Train\\images\\*.jpg",shuffle = False)
train_images = train_images.map(byte_to_img)
train_images = train_images.map(lambda x: tf.image.resize(x,(120,120)))
train_images = train_images.map(lambda x: x/255)


In [None]:
test_images = tf.data.Dataset.list_files("augm_data\\Test\\images\\*.jpg",shuffle = False)
test_images = test_images.map(byte_to_img)
test_images = test_images.map(lambda x: tf.image.resize(x,(120,120)))
test_images = test_images.map(lambda x: x/255)

In [None]:
valid_images = tf.data.Dataset.list_files("augm_data\\Val\\images\\*.jpg",shuffle = False)
valid_images = valid_images.map(byte_to_img)
valid_images = valid_images.map(lambda x: tf.image.resize(x,(120,120)))
valid_images = valid_images.map(lambda x: x/255)

In [None]:
valid_images

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(),'r',encoding = 'utf-8') as f:
        label = json.load(f)
    return [label['class']],label['bbox']

In [None]:
train_labels = tf.data.Dataset.list_files('augm_data\\Train\\labels\\*.json', shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
test_labels = tf.data.Dataset.list_files('augm_data\\Test\\labels\\*.json', shuffle=False)
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
val_labels = tf.data.Dataset.list_files('augm_data\\Val\\labels\\*.json', shuffle=False)
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
train_labels.as_numpy_iterator().next()

In [None]:
len(train_images),len(train_labels)

In [None]:
train = tf.data.Dataset.zip((train_images,train_labels))
train = train.shuffle(7000)
train = train.batch(8)
train = train.prefetch(4)

In [None]:
test = tf.data.Dataset.zip((test_images,test_labels))
test = test.shuffle(7000)
test = test.batch(8)
test = test.prefetch(4)

In [None]:
val = tf.data.Dataset.zip((valid_images,val_labels))
val = val.shuffle(7000)
val = val.batch(8)
val = val.prefetch(4)

In [None]:
train.as_numpy_iterator().next()[1]

In [None]:
data_samples = train.as_numpy_iterator().next()


In [None]:

fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = data_samples[0][idx]
    sample_coords = data_samples[1][1][idx]
    
    cv.rectangle(sample_image, 
                  tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                  tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                        (255,0,0), 2)

    ax[idx].imshow(sample_image)

In [None]:
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model

In [None]:
vgg = VGG16(include_top = False)
vgg.summary()

In [None]:
def my_model():
    input_layer = tf.keras.layers.Input(shape=(120,120,3))
    vgg = VGG16(include_top = False)(input_layer)

    f1 = tf.keras.layers.GlobalMaxPooling2D()(vgg)
    class1 = tf.keras.layers.Dense(2048,activation='relu')(f1)
    class2 = tf.keras.layers.Dense(1,activation='sigmoid')(class1)

    func2 = tf.keras.layers.GlobalMaxPooling2D()(vgg)
    regress1 = tf.keras.layers.Dense(2048,activation = 'relu')(func2)
    regress2 = tf.keras.layers.Dense(4,activation='sigmoid')(regress1)

    mymod = tf.keras.models.Model(inputs=input_layer,outputs = [class2,regress2])
    return mymod

In [None]:
model = my_model()

In [None]:
model.summary()

In [None]:
x,y = train.as_numpy_iterator().next()

In [None]:
len(train)

In [None]:
batches_per_epoch = len(train)
lr_decay = (1.0/0.75-1)/batches_per_epoch

In [None]:
def localization_loss(y_true,y_pred):
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2]-y_pred[:,:2]))

    h_true = y_true[:,3] - y_true[:,1]
    w_true = y_true[:,2] - y_true[:,0]

    h_pred = y_pred[:,3] - y_pred[:,1]
    w_pred = y_pred[:,2] - y_pred[:,0]

    delta_size = tf.reduce_sum(tf.square(w_true-w_pred)+tf.square(h_true-h_pred)) 
    return delta_coord + delta_size

In [None]:
class MyModel(tf.keras.models.Model):
    def __init__(self,modelx,**kwargs):
        super().__init__(**kwargs)
        self.model = modelx
    
    def compile(self,optimizer,classification_loss,locatization_loss,**kwargs):
        super().compile(**kwargs)
        self.closs = classification_loss
        self.lloss = localization_loss
        self.opt = optimizer
    
    def train_step(self,batch,**kwargs):
        x,y = batch
        with tf.GradientTape() as tape:
            classes,coordinates = self.model(x,training = True)

            batch_classloss = self.closs(y[0],classes)
            batch_localizationloss = self.lloss(tf.cast(y[1],tf.float32),coordinates)
            total_loss = batch_localizationloss+0.5*batch_classloss

            grad = tape.gradient(total_loss,self.model.trainable_variables)
        self.opt.apply_gradients(zip(grad,self.model.trainable_variables))

        return {"total_loss":total_loss,"class_loss":batch_classloss,"regression_loss":batch_localizationloss}
    def test_step(self,batch,**kwargs):
        x,y = batch

        classes,coords = self.model(x,training = True)

        batch_classloss = self.closs(y[0],classes)
        batch_locatizationloss = self.lloss(tf.cast(y[1],tf.float32),coords)
        total_loss = batch_locatizationloss + 0.5*batch_classloss
        return {"total_loss":total_loss, "class_loss":batch_classloss,"regress_loss":batch_locatizationloss}
    def call(self, x,**kwargs):
        return self.model(x,**kwargs)

    


In [None]:
model = MyModel(model)

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001,weight_decay=lr_decay),classification_loss=tf.keras.losses.BinaryCrossentropy(),locatization_loss=localization_loss)

In [None]:
logdir = 'logs'
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir = logdir)

In [None]:
mod = model.fit(train,epochs =20 , validation_data = val, callbacks = [tensorboard_callback])

In [None]:
test_data = test.as_numpy_iterator()
test_samples = test_data.next()
predictions = model.predict(test_samples[0])

In [None]:
fig,ax = plt.subplots(ncols = 4,figsize = (20,20))
for i in range(4):
    sample_img = test_samples[0][idx]
    sample_coords = predictions[1][idx]

    if predictions[0][idx]>0.5:
        cv.rectangle(sample_img,tuple(np.multiply(sample_coords[:2],[120,120]).astype(int)),
                     tuple(np.multiply(sample_coords[2:],[120,120]).astype(int)),
                     (0,255,0),2)
    ax[i].imshow(sample_img)