In [70]:
!cp -r ../input/custom-data-generator/custom_data_generator* ./
import custom_data_generator

In [71]:
import warnings
warnings.filterwarnings("ignore")

In [72]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import glob
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras import layers, models, Sequential
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.preprocessing.image import ImageDataGenerator

from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input

from tensorflow.keras.applications.resnet50 import ResNet50

from custom_data_generator.custom_image_data_generator import CustomImageDataGenerator

from sklearn.metrics import classification_report , f1_score



In [73]:
learning_rate = 0.01 #try and error
batch_size = 64
input_shape  = (256, 256, 3) #try and error
temperature = 0.1 #try and error
output_dim = 256
# data_path  = "camera 2"
threshold = 0.85

In [74]:
tf.config.list_physical_devices('GPU')

In [75]:
folders_names = os.listdir(r"../input/driver-dataset/train/train")
print(folders_names)


Data Prep

In [76]:
def get_train_schema(dir_name, normal_str, csv_file_name, view=""):
    """
    generate csv file contains two columns: 
        "Path": the path for each image
        "Label": the label of each image
        
    # Arguments
        dir_name: path for the dataset directory which contains the classes folders
        normal_str: the starting string of the normal data folders 
        csv_file_name: Name of the generated csv file 
        view: One of "front_depth", "front_IR", "top_depth", "top_IR" or "" if we use AUC dataset
    """
    folders_names = os.listdir(dir_name)
    
    path_lst = []
    label_lst = []
    
    for folder in folders_names[2:]:
        label = 0  if folder.startswith(normal_str) else 1
        paths = glob.glob(f'{dir_name}/{folder}/{view}/*')
        labels = [label] * len(paths)

        path_lst.extend(paths)
        label_lst.extend(labels)
        
    data_tuples = list(zip(path_lst,label_lst))
    df = pd.DataFrame(data_tuples, columns=['Path','Label'])   

    df.to_csv(f"{csv_file_name}.csv", header=True, index=False)
    
    

In [77]:
def get_test_schema(dir_name, normal_str, csv_file_name, view=""):
    """
    generate csv file contains two columns: 
        "Path": the path for each image
        "Label": the label of each image
        
    # Arguments
        dir_name: path for the dataset directory which contains the classes folders
        normal_str: the starting string of the normal data folders 
        csv_file_name: Name of the generated csv file 
        view: One of "front_depth", "front_IR", "top_depth", "top_IR" or "" if we use AUC dataset
    """
    folders_names = os.listdir(dir_name)
    
    path_lst = []
    label_lst = []
    
    for folder in folders_names:
        label = 0  if folder.startswith(normal_str) else 1
        paths = glob.glob(f'{dir_name}/{folder}/{view}/*')
        labels = [label] * len(paths)

        path_lst.extend(paths)
        label_lst.extend(labels)
        
    data_tuples = list(zip(path_lst,label_lst))
    df = pd.DataFrame(data_tuples, columns=['Path','Label'])   

    df.to_csv(f"{csv_file_name}.csv", header=True, index=False)
    
    

In [78]:
get_train_schema("../input/driver-dataset/train/train", "c0", "Camera 2_train_Schema")


In [79]:
get_test_schema("../input/driver-dataset/test/test", "c0", "Camera 2_test_Schema")



In [80]:
train_data_schema_cam2 = pd.read_csv("Camera 2_train_Schema.csv",dtype=str )
test_data_schema_cam2 = pd.read_csv("Camera 2_test_Schema.csv",dtype=str)

train_data_schema_cam2.sort_values("Label", inplace=True)
test_data_schema_cam2.sort_values("Label", inplace=True)

In [81]:
train_data_schema_cam2

In [82]:
test_data_schema_cam2

In [83]:
(train_data_schema_cam2["Label"] == "1").sum()

In [84]:
(train_data_schema_cam2["Label"] == "0").sum()

In [85]:
train_normal_count_cam2 = (train_data_schema_cam2["Label"] == "0").sum()
train_normal_count_cam2

In [86]:
#Create train generator
train_datagen =  CustomImageDataGenerator(
    rotation_range=20,#try_and_error
    channel_shift_range=20,#try_and_error
    horizontal_flip=True,
    preprocessing_function = preprocess_input#scale input pixels between -1 and 1.
)

#Read the training data using train generator
train_ds = train_datagen.flow_from_dataframe(
    dataframe=train_data_schema_cam2,
    directory=".",
    x_col="Path",
    y_col="Label",
#     subset="training",
    batch_size=batch_size,
    seed=42,
#     shuffle=True,
    class_mode="binary",
    target_size=input_shape[0:-1],
    positive_n = train_normal_count_cam2,
)

In [87]:
#Create test generator
test_datagen = CustomImageDataGenerator(preprocessing_function = preprocess_input)


#Read the testing data using test generator
test_ds = test_datagen.flow_from_dataframe(
    dataframe=test_data_schema_cam2,
    directory=".",
    x_col="Path",
    y_col="Label",
    seed = 42,
    batch_size=batch_size,
    target_size=input_shape[0:-1],
    class_mode='binary', 
    shuffle=False)

In [88]:
  # base_encoder
def create_encoder(input_shape):
    mobilenet = MobileNetV2(
        input_shape=input_shape, include_top=False, weights='imagenet',
        input_tensor=None
    )
#     mobilenet.trainable = False ## Not trainable weights
    
    inputs = tf.keras.Input(shape=input_shape)
    x = mobilenet(inputs)
    x = layers.Conv2D(512, kernel_size=1)(x)
    outputs = layers.GlobalAvgPool2D()(x)
    model = tf.keras.Model(inputs=inputs, outputs=outputs, name="base-encoder")
    return model

In [89]:
# projection_head
def add_projection_head(encoder, input_shape):
    inputs = tf.keras.Input(shape=input_shape)
    features = encoder(inputs)
    x = layers.Dense(512, activation="relu")(features)#here it takes 1280 vec_length but in paper it takes 512
    x = layers.Dense(output_dim, activation=None)(x)
    outputs = layers.Lambda(lambda  v: tf.math.l2_normalize(v,axis=1))(x)
    
    model = tf.keras.Model(
        inputs=inputs, outputs=outputs, name="base-encoder_with_projection-head"
    )
    return model

In [90]:
import tensorflow.keras.backend as K

#Loss Function
class SupervisedContrastiveLoss(tf.keras.losses.Loss):
    def __init__(self, temperature=1, name=None):
        super(SupervisedContrastiveLoss, self).__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):
        
#         tf.print(K.shape(feature_vectors))
        # Normalize feature vectors
#         feature_vectors = tf.math.l2_normalize(feature_vectors, axis=1)
        
        
        #separate normal and anomaly examples   
        n_idxs = tf.reshape(tf.where(labels == 0)[:,0], [-1, 1])
        a_idxs = tf.reshape(tf.where(labels == 1)[:,0], [-1, 1])
        
        n_vectors = tf.gather_nd(feature_vectors, n_idxs)
        a_vectors = tf.gather_nd(feature_vectors, a_idxs)
        # Compute logits
        n_scores = tf.divide(
            tf.matmul(n_vectors, tf.transpose(n_vectors)),
            self.temperature,
        )
        
        a_n_scores = tf.divide(
            tf.matmul(n_vectors, tf.transpose(a_vectors)),
            self.temperature,
        )
        pos_logits = tf.exp(n_scores)
        neg_logits = tf.exp(a_n_scores)
        
        #compute loss
        denominator = pos_logits + tf.reduce_sum(neg_logits, axis=-1, keepdims=True)
        loss_steps = -1 * tf.math.log((pos_logits / denominator))
        loss_steps = tf.linalg.set_diag(loss_steps , tf.zeros(K.shape(loss_steps)[0])) # remove values for i==j(diagonal)
#         tf.print("1",loss_steps)
        k = tf.cast(K.shape(n_scores), tf.float32)[0]
        k = tf.math.maximum(k, 2) # prevent divide by zero, when the batch contains no or one normal photos
#         tf.print("2",loss_steps)
        loss = (1/(k*(k-1))) * tf.reduce_sum(loss_steps)
        return loss


In [91]:
#build the model

base_encoder = create_encoder(input_shape)

encoder_with_projection_head = add_projection_head(base_encoder, input_shape)


lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=1e-2,
    decay_steps=1000,
    decay_rate=0.9)


encoder_with_projection_head.compile(
    optimizer=tf.keras.optimizers.SGD(learning_rate=lr_schedule),
#                                       momentum=0.9,
                                     
    loss=SupervisedContrastiveLoss(temperature),
    
    
    #loss = tf.keras.metrics.binary_crossentropy
)

encoder_with_projection_head.summary()



In [None]:
checkpoint = tf.keras.callbacks.ModelCheckpoint('BestModelCam2{epoch:02d}.h5', period=50) 
history = encoder_with_projection_head.fit(train_ds, epochs=150, batch_size=batch_size, callbacks=[checkpoint] , )

In [None]:
# is this ok since layers.Lambda has no weights, so no need for fitting?
#build test model
def get_test_model(encoder, input_shape):
    
    inputs = tf.keras.Input(shape=input_shape)
    features = encoder(inputs)
    outputs = layers.Lambda(lambda  v: tf.math.l2_normalize(v,axis=1))(features)
    model = tf.keras.Model(
        inputs=inputs, outputs=outputs, name="test_model"
    )
    return model

In [None]:
def predict_class(test_data, normal_template_vector, therashold):
    """
    test_data: each row represent feature vector of a photo
    
    """
    sim = np.dot(test_data, normal_template_vector)
    return (sim < therashold).astype(int) # true for an anomaly 

In [None]:
normal_data_schema = train_data_schema_cam2.loc[train_data_schema_cam2["Label"] == "0"]

In [None]:
# # read normal images

normal_ds = test_datagen.flow_from_dataframe(
    dataframe=normal_data_schema,
    directory=".",
    seed = 42,
    x_col="Path",
    y_col="Label",
    class_mode="categorical",
    target_size=input_shape[0:-1])

In [None]:
# get normal_template_vector
test_model = get_test_model(base_encoder, input_shape)

normal_v = test_model.predict(normal_ds)

normal_template_vector = np.mean(normal_v, axis=0, keepdims=True).reshape(-1,1)

In [None]:
test_v = test_model.predict(test_ds)

In [None]:
result = predict_class(test_v, normal_template_vector,therashold=0.54)

In [None]:
m = tf.keras.metrics.AUC()
m.update_state(y_true, result)
m.result().numpy()

In [None]:
np.save("Normal Vector11.npy", normal_template_vector)



In [None]:
loaded_array = np.load("Normal Vector.npy")


In [None]:
y_true = test_data_schema_cam2["Label"].astype('int')

In [None]:
m = tf.keras.metrics.AUC()
m.update_state(y_true, result)
m.result().numpy()

In [None]:
m = tf.keras.metrics.Accuracy()
m.update_state(y_true, result)
m.result().numpy()

In [None]:
m = tf.keras.metrics.Recall()
m.update_state(y_true, result)
m.result().numpy()

In [None]:
print(f1_score(y_true, result))

In [None]:
m = classification_report(y_true, result)
print(m)

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

In [None]:
cm = (confusion_matrix(y_true, result))
print(cm)

In [None]:
cm_df = pd.DataFrame(cm,
                    index=['Normal' , 'Anomaly'],
                    columns=['Normal' , 'Anomaly']
)

In [None]:
plt.figure(figsize=(8, 6))
sns.heatmap(cm_df, annot=True)
plt.title('Confusion Matrix')
plt.ylabel('Actal Values')
plt.xlabel('Predicted Values')
plt.show()

In [None]:
plt.figure(figsize=(8, 6))

plt.plot(history.history['loss'], 'r')
plt.legend(['Loss'])
plt.xlabel('Epochs', color = 'b')
plt.ylabel('Loss', color = 'b')

plt.title('Training Loss vs Epochs' , color = 'b')
plt.show();

In [None]:
test_model.save('Contrastive Model11')

In [None]:
test_model.save('Contrastive Final Model11.h5')