In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
zip_path = "/content/drive/MyDrive/Datasets/car_detection.zip"

In [None]:
import os
import zipfile
import shutil

In [None]:
shutil.rmtree('/tmp')

local_zip = zip_path
zip_ref = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall('/tmp/car_detection')
zip_ref.close()

In [None]:
train_dir = "/tmp/car_detection/data/training_images"
test_dir = "/tmp/car_detection/data/testing_images"

print(len(os.listdir(train_dir)))
print(len(os.listdir(test_dir)))

In [None]:
labels_path = "/tmp/car_detection/data/train_solution_bounding_boxes (1).csv"

In [None]:
label_df = pd.read_csv(
    labels_path
)
label_df = label_df.round(3)

In [None]:
# label_df.head()

In [None]:
#  PREPROCESSING ======================

In [None]:
from PIL import Image

# image = Image.open(os.path.join(train_dir, "vid_4_600.jpg"))
# # summarize some details about the image
# print(image.format)
# print(image.size)
# print(image.mode)

In [None]:
# MAKE ARRAYS OUT OF DIRECTORY FILE NAMES
x_train = {fname: np.array(Image.open(os.path.join(train_dir, fname))) for fname in os.listdir(train_dir) }
# x_test = {fname: np.array(Image.open(os.path.join(test_dir, fname))) for fname in os.listdir(test_dir) }

print(len(x_train))
# print(len(x_test))

In [None]:
# Filter images with and without cars
train_image_arrays_with_cars = []
image_names_with_cars = []
labels_with_cars = []
for image_name, image in x_train.items():
    for idx,label in label_df.iterrows():
      if image_name == label.image:
        new_label = np.insert(label.values[1:],0,1)
        image_names_with_cars.append(image_name)
        labels_with_cars.append(new_label)
        train_image_arrays_with_cars.append(image)


In [None]:
# Filter images with and without cars
train_image_arrays_empty = []
empty_image_names = []
empty_labels = []
for img_name, image in x_train.items():
  if img_name in image_names_with_cars:
    pass
  else:
    zero_label = np.zeros(5)
    train_image_arrays_empty.append(image)
    empty_image_names.append(img_name)
    empty_labels.append(zero_label)

del x_train
del label_df

In [None]:
#Concatenate images with and without cars
train_slices = train_image_arrays_with_cars + train_image_arrays_empty
train_image_names = image_names_with_cars + empty_image_names
train_labels = labels_with_cars + empty_labels

del train_image_arrays_with_cars
del train_image_arrays_empty
del image_names_with_cars
del empty_image_names
del labels_with_cars
del empty_labels

In [None]:


# Convert to numpy arrays
train_slices = np.asarray(train_slices).astype('float32')/255
train_labels = np.asarray(train_labels).astype('float32')

In [None]:
# Shuffle the data
from sklearn.utils import shuffle
x_shuffled, y_shuffled = shuffle(train_slices, train_labels, random_state=0)

del train_slices
del train_labels

In [None]:
# Split the labels
temp_labels = np.split(y_shuffled, [1,5], axis=-1)

prob_labels = np.asarray(temp_labels[0]).astype('float32')
box_labels = np.asarray(temp_labels[1]).astype('float32')

print(prob_labels.shape)
print(box_labels.shape)

del temp_labels
del y_shuffled

In [None]:

# import cv2

# pt1 = (box_labels[20][0], box_labels[20][1])
# pt2 = (box_labels[20][2], box_labels[20][3])
  
# x = np.asarray(x_shuffled)*255
# cv2.imwrite('test_image.jpg', x[20])

# img = cv2.imread('test_image.jpg', 1)
# img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

# img	=	cv2.rectangle(img, pt1, pt2, (255,0,0), 2)
# plt.imshow(img)

In [None]:
#  BUILDING THE MODEL

from tensorflow.keras.applications.inception_v3 import InceptionV3

In [None]:
pretrained_model = InceptionV3(input_shape= (150, 150, 3), include_top=False)
output = pretrained_model.get_layer('mixed8').output

print(output)

In [None]:
from tensorflow.keras.layers import Dense, Flatten, Dropout, BatchNormalization, Conv2D, MaxPooling2D
from tensorflow.keras.models import Model
from tensorflow.keras import regularizers 
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

In [None]:
class MyModel(Model):
    def __init__(self, **kwargs):
        super(MyModel, self).__init__(**kwargs)
        self.pretrained_extractor = None
        self.flatten = Flatten()
        self.batch_norm = BatchNormalization()
        self.conv1 = Conv2D(1000,1, activation="relu")
        self.conv2 = Conv2D(800,1, activation="relu")
        self.dp1 = Dropout(0.4)
        self.dp2 = Dropout(0.3)
        self.fc1 = Dense(2048, activation="relu", kernel_regularizer=regularizers.l2(0.01))
        self.fc2 = Dense(1024, activation="relu", kernel_regularizer=regularizers.l2(0.01))
        self.fc2 = Dense(512, activation="relu", kernel_regularizer=regularizers.l2(0.01))
        self.fc3 = Dense(256, activation="relu", kernel_regularizer=regularizers.l2(0.01))
        self.fc4 = Dense(64, activation="relu", kernel_regularizer=regularizers.l2(0.01))
        self.out_probs = Dense(1, activation="sigmoid", name="out_probs")
        self.out_boxes = Dense(4, name="out_boxes")
        
            
    def build(self, input_shape):
        pretrained_model = InceptionV3(input_shape= input_shape[-3:], include_top=False)
        for layer in pretrained_model.layers:
            layer.trainable = False
        self.pretrained_extractor = Model(inputs=pretrained_model.input, 
                                          outputs= pretrained_model.get_layer('mixed8').output)

    def call(self, inputs, training = False):
        last_output = self.pretrained_extractor(inputs)
        x = self.conv1(last_output)
        x = self.conv2(x)
        x = self.flatten(x)
        x = self.batch_norm(x)
        x = self.fc1(x)
        x = self.dp1(x)
        x = self.fc2(x)
        x = self.dp1(x)
        x = self.fc3(x)
        x = self.dp2(x)
        x = self.fc4(x)
        out_probs = self.out_probs(x)
        out_boxes = self.out_boxes(x)
        return (out_probs, out_boxes)


In [None]:
from tensorflow.keras.losses import MeanSquaredError, BinaryCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model

In [None]:
num_epochs = 50
batch_size = 32

# input_shape = x_shuffled[0].shape

In [None]:
assert len(x_shuffled) == len(prob_labels) and len(prob_labels) == len(box_labels)

N = np.round_(len(x_shuffled)/batch_size)

x_shuffled_batched = np.array_split(x_shuffled, N)
prob_labels_batched = np.array_split(prob_labels, N)
box_labels_batched = np.array_split(box_labels, N)

del x_shuffled
del prob_labels
del box_labels

In [None]:

prob_loss = BinaryCrossentropy()
box_loss = MeanSquaredError()
prob_optim = Adam(learning_rate=0.0005)
box_optim = Adam(learning_rate=0.01)

model = MyModel()

if os.path.exists("/content/drive/MyDrive/Models/yoloDetection_model"):
  print("loaded weigths")
  model.load_weights("/content/drive/MyDrive/Models/yoloDetection_model")


In [None]:
# tensor1 = tf.constant([[1, 2], [3, 4], [5, 6], [7,8]]) # Out boxes
# tensor2 = tf.constant([[0, 0], [3, 4], [0, 0], [1,5]]) # Box labels
# mask = tensor2[:,-1] > 0

# x = tf.boolean_mask(tensor1, mask)
# print(x)

# del tensor1, tensor2, mask, x

In [None]:
# COMPUTE GRADIENTS FUNCTION

def filter_boxes(out_boxes, box_labels):
  mask = box_labels[:, -1] > 0
  new_out_boxes = tf.boolean_mask(out_boxes, mask)
  new_box_labels = tf.boolean_mask(box_labels, mask)
  return new_out_boxes, new_box_labels

@tf.function
def loss_and_grads(inputs, prob_labels, box_labels):
    with tf.GradientTape() as prob_tape, tf.GradientTape() as box_tape:
        # get the model outputs
        out_probs, out_boxes = model(inputs, training = True)
        # Filter out images without boxes so they don't effect the loss
        out_boxes, box_labels = filter_boxes(out_boxes, box_labels)
        # Compute the losses
        p_loss = prob_loss(prob_labels, out_probs)
        b_loss = box_loss(box_labels, out_boxes)
        # Compute the gradients
        p_grads = prob_tape.gradient(p_loss, model.trainable_variables)
        b_grads = box_tape.gradient(b_loss, model.trainable_variables)
    return (p_loss, b_loss, p_grads, b_grads)

In [None]:
import logging

def remove_warnings(record):
    return int(record.levelname != 'WARNING')

logging.getLogger('tensorflow').addFilter(remove_warnings)

In [None]:
#  CALLBACKS

def change_lr(optimizer, epoch_num): 
    new_lr = 0.01 / (1 + 0.001 * epoch_num)
    tf.keras.backend.set_value(optimizer.lr,new_lr)
    
def earlyStopping(LossList, min_delta=0.1, patience=8):
    #No early stopping for 2*patience epochs 
    if len(LossList)//patience < 2 :
        return False
    treshold = LossList[-patience]
    for l in LossList[-patience:]:
      if l <= treshold - min_delta:
        return False
    return True

    
def save_best_model():
    cp_path = "/content/drive/MyDrive/Models/yoloDetection_model"
    model.save_weights(cp_path)
    

In [None]:
# TRAINING

epoch_losses_p = []
epoch_losses_b = []

for epoch in range(num_epochs):
    batch_losses_p = []
    batch_losses_b = []

    for x, probs, boxes in zip(x_shuffled_batched, prob_labels_batched, box_labels_batched):
        p_loss, b_loss, p_grads, b_grads = loss_and_grads(x, probs, boxes)
        prob_optim.apply_gradients(zip(p_grads, model.trainable_variables))
        box_optim.apply_gradients(zip(b_grads, model.trainable_variables))

        batch_losses_p.append(p_loss)
        batch_losses_b.append(b_loss)

    epoch_losses_p.append(np.mean(batch_losses_p))
    epoch_losses_b.append(np.mean(batch_losses_b))
    
    stop_p = earlyStopping(epoch_losses_p, min_delta=0.2, patience=8)
    stop_b = earlyStopping(epoch_losses_b, min_delta=100, patience=8)

    change_lr(box_optim, int(epoch))
        
    if (int(epoch) > 1 and  epoch_losses_b[-1] + epoch_losses_p[-1] > epoch_losses_b[-2] + epoch_losses_p[-2]):
        save_best_model()
    
    # if stop_p or stop_b:
    #     break

    print(f"epoch {epoch} loss p: " + str(epoch_losses_p[-1]))
    print(f"epoch {epoch} loss b: " + str(epoch_losses_b[-1]))


In [None]:
%matplotlib inline

prob_loss = epoch_losses_p
box_loss = epoch_losses_b

epochs = range(len(prob_loss))

# plt.plot(epochs, prob_loss, 'r', label='Probabilities loss')
plt.plot(epochs, prob_loss, 'b', label='boxes loss')
plt.title('Probability and box losses')
plt.legend(loc=0)
plt.figure()


plt.show()

In [None]:
model.save_weights("/content/drive/MyDrive/Models/yoloDetection_model")

In [None]:
# Testing the model
x_test = [np.array(Image.open(os.path.join(test_dir, fname))) for fname in os.listdir(test_dir) ]

In [None]:
print(len(x_test))
print(x_test[0].shape)

In [None]:
test_batch = np.array(x_test[:3])

test_probs, test_boxes = model(test_batch)

count = 0
for v in test_probs:
  if v.numpy()[0] > 0:
    count += 1
  break
print(test_probs.shape)
print(test_boxes.shape)

In [None]:

import cv2

# pt1 = (box_labels[20][0], box_labels[20][1])
# pt2 = (box_labels[20][2], box_labels[20][3])
  
x = np.asarray(x_test[:32])
cv2.imwrite('test_image.jpg', x[0])

img = cv2.imread('test_image.jpg', 1)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

# img	=	cv2.rectangle(img, pt1, pt2, (255,0,0), 2)
plt.imshow(img)