In [None]:
from google.colab import drive
import pandas as pd
import os
import tensorflow as tf
import numpy as np
import random
from sklearn.preprocessing import StandardScaler
import pickle
import matplotlib.pyplot as plt
import cv2

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Layer, Conv2D, MaxPooling2D, Input, Flatten, Dense, Lambda, BatchNormalization, Dropout, GlobalAveragePooling2D, Concatenate, Activation, Add
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import load_model, save_model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing import image


# Mount Google Drive
drive.mount('/content/drive')

# Load the labels
labels_path = '/content/drive/MyDrive/Contrastive Learning/contrastive_learning_labels.csv'
labels_df = pd.read_csv(labels_path)

# Directory path where images are stored
image_dir = '/content/drive/MyDrive/Contrastive Learning/contrastive_learning_images'

# Check if the image directory exists
if not os.path.exists(image_dir):
    print("Image directory not found!")
else:
    print("Images are ready for training.")

Mounted at /content/drive
Images are ready for training.


In [None]:
def residual_block(x, filters, kernel_size=3, stride=1, activation='relu', dropout_rate=0.3):
    res = Conv2D(filters, kernel_size, strides=stride, padding='same', activation=activation)(x)
    res = BatchNormalization()(res)
    res = Dropout(dropout_rate)(res)

    res = Conv2D(filters, kernel_size, strides=1, padding='same', activation=None)(res)
    res = BatchNormalization()(res)

    if x.shape[-1] != filters:
        x = Conv2D(filters, kernel_size=(1, 1), strides=stride, padding='same', activation=None)(x)

    res = Add()([x, res])
    res = Activation(activation)(res)
    return res

def make_embedding_model(input_shape=(224, 224, 3), scalar_shape=(2,), embedding_dim=128, dropout_rate=0.3):
    inp = Input(shape=input_shape, name='input_image')

    c1 = Conv2D(32, (7, 7), strides=2, activation='relu', padding='same', name='conv_layer_1')(inp)
    c1 = BatchNormalization()(c1)
    m1 = MaxPooling2D((3, 3), strides=2, padding='same', name='pool_layer_1')(c1)

    r1 = residual_block(m1, 64, dropout_rate=dropout_rate)
    r2 = residual_block(r1, 128, dropout_rate=dropout_rate)
    r3 = residual_block(r2, 256, dropout_rate=dropout_rate)

    f1 = GlobalAveragePooling2D(name='global_avg_pool')(r3)

    scalar_input = Input(shape=scalar_shape, name='input_scalar')
    scalar_dense = Dense(64, activation='relu')(scalar_input)
    scalar_dense = BatchNormalization()(scalar_dense)
    scalar_dense = Dropout(dropout_rate)(scalar_dense)

    combined = Concatenate(name='concat_image_scalar')([f1, scalar_dense])

    dense_1 = Dense(256, activation='relu', name='dense_layer_1')(combined)
    dense_1 = BatchNormalization()(dense_1)
    dense_1 = Dropout(dropout_rate)(dense_1)

    embedding = Dense(embedding_dim, name='embedding_layer')(dense_1)
    embedding = BatchNormalization(name='embedding_batch_norm')(embedding)

    return Model(inputs=[inp, scalar_input], outputs=embedding, name='embedding_model')

# Adjust the data generator to include both scalar features (pothole_area_mm2 and mm_to_pixel_ratio)
def data_generator(save_dir, batch_size=32, augment=False):
    pairs_batches = sorted([f for f in os.listdir(save_dir) if f.startswith('pairs_batch_') and f.endswith('.npy')])
    scalar_features_1_batches = sorted([f for f in os.listdir(save_dir) if f.startswith('scalar_features_1_batch_') and f.endswith('.npy')])
    scalar_features_2_batches = sorted([f for f in os.listdir(save_dir) if f.startswith('scalar_features_2_batch_') and f.endswith('.npy')])
    labels_batches = sorted([f for f in os.listdir(save_dir) if f.startswith('labels_batch_') and f.endswith('.npy')])

    while True:
        # Shuffle the batches together
        combined_batches = list(zip(pairs_batches, scalar_features_1_batches, scalar_features_2_batches, labels_batches))
        np.random.shuffle(combined_batches)

        for pair_file, sf1_file, sf2_file, label_file in combined_batches:
            pairs = np.load(os.path.join(save_dir, pair_file))
            scalar_features_1 = np.load(os.path.join(save_dir, sf1_file))  # Include both area and ratio
            scalar_features_2 = np.load(os.path.join(save_dir, sf2_file))  # Include both area and ratio
            labels = np.load(os.path.join(save_dir, label_file))

            num_batches = len(pairs) // batch_size
            for i in range(num_batches):
                batch_pairs = pairs[i * batch_size:(i + 1) * batch_size]
                batch_sf1 = scalar_features_1[i * batch_size:(i + 1) * batch_size]
                batch_sf2 = scalar_features_2[i * batch_size:(i + 1) * batch_size]
                batch_labels = labels[i * batch_size:(i + 1) * batch_size]

                if augment:
                    batch_pairs[:, 0] = data_augmentation(batch_pairs[:, 0])
                    batch_pairs[:, 1] = data_augmentation(batch_pairs[:, 1])

                # Yield the correctly shaped data
                yield (
                    (
                        tf.convert_to_tensor(batch_pairs[:, 0], dtype=tf.float32),
                        tf.convert_to_tensor(batch_sf1, dtype=tf.float32),
                        tf.convert_to_tensor(batch_pairs[:, 1], dtype=tf.float32),
                        tf.convert_to_tensor(batch_sf2, dtype=tf.float32)
                    ),
                    tf.convert_to_tensor(batch_labels, dtype=tf.float32)
                )

# Define the input shapes
image_shape = (224, 224, 3)
scalar_shape = (2,)  # Two scalar inputs: pothole_area_mm2 and mm_to_pixel_ratio

# Load the embedding model with scalar features included
base_network = make_embedding_model(input_shape=image_shape, scalar_shape=scalar_shape, embedding_dim=128)

# Input tensors for the two images and scalar features
input_a = Input(shape=image_shape, name='input_img_a')
input_b = Input(shape=image_shape, name='input_img_b')

scalar_input_a = Input(shape=scalar_shape, name='input_scalar_a')
scalar_input_b = Input(shape=scalar_shape, name='input_scalar_b')

# Generate embeddings for both inputs (image + scalar features)
embedding_a = base_network([input_a, scalar_input_a])
embedding_b = base_network([input_b, scalar_input_b])

# Calculate the distance between the embeddings
distance_layer = Lambda(lambda x: tf.math.square(x[0] - x[1]), name='distance_layer')
distance_output = distance_layer([embedding_a, embedding_b])

# Add classification layer
classifier = Dense(1, activation='sigmoid', name='classifier')(distance_output)

# Define the contrastive model with scalar inputs
model = Model(inputs=[input_a, scalar_input_a, input_b, scalar_input_b], outputs=[classifier, distance_output])

# Define the contrastive loss function
class ContrastiveLoss(tf.keras.losses.Loss):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin

    def call(self, y_true, y_pred):
        label = tf.cast(y_true, tf.float32)
        neg_dist = tf.maximum(self.margin - y_pred, 0)
        return tf.reduce_mean(label * y_pred + (1.0 - label) * neg_dist, axis=-1)

# Instantiate loss functions
loss_contrastive = ContrastiveLoss(margin=1.0)
loss_classifier = tf.keras.losses.BinaryCrossentropy()

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)  # Set a constant learning rate

# Compile the model
model.compile(
    loss=[loss_classifier, loss_contrastive],
    optimizer=optimizer,
    loss_weights=[1.0, 1.0],
    metrics=[['accuracy'], []]  # accuracy for classifier, no metrics for the contrastive loss
)

# Training dataset (no validation dataset)
train_dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(save_dir='/content/drive/MyDrive/pairs_batches', batch_size=32, augment=True),
    output_signature=(
        (
            tf.TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32),  # Input image A
            tf.TensorSpec(shape=(None, 2), dtype=tf.float32),            # Scalar features A (area and ratio)
            tf.TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32),  # Input image B
            tf.TensorSpec(shape=(None, 2), dtype=tf.float32)             # Scalar features B (area and ratio)
        ),
        tf.TensorSpec(shape=(None,), dtype=tf.float32)                  # Labels
    )
).prefetch(tf.data.AUTOTUNE)

# Train the model
history = model.fit(
    train_dataset,
    epochs=30,  # Run for 30 epochs
    steps_per_epoch=100,  # Adjust this based on the size of your dataset
    verbose=1  # Print training progress
)

Epoch 1/30
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m154s[0m 710ms/step - classifier_accuracy: 0.5395 - loss: 1.3719
Epoch 2/30
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m116s[0m 1s/step - classifier_accuracy: 0.5310 - loss: 0.8797
Epoch 3/30
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m67s[0m 671ms/step - classifier_accuracy: 0.5641 - loss: 0.7641
Epoch 4/30
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m62s[0m 620ms/step - classifier_accuracy: 0.6063 - loss: 0.7106
Epoch 5/30
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m62s[0m 625ms/step - classifier_accuracy: 0.5899 - loss: 0.6733
Epoch 6/30
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m67s[0m 675ms/step - classifier_accuracy: 0.5632 - loss: 0.6977
Epoch 7/30
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m62s[0m 620ms/step - classifier_accuracy: 0.5954 - loss: 0.6671
Epoch 8/30
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[

In [None]:
base_network.save('/content/drive/MyDrive/Contrastive Learning/embedding_model_with_scalars.keras')