In [1]:
import numpy as np
import time
import random
import os
import matplotlib.pyplot as plt
import cv2
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.utils import load_img,img_to_array
from tensorflow.keras.applications import MobileNetV3Large
from tensorflow.keras.applications.mobilenet_v3 import preprocess_input, decode_predictions
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense,GlobalAveragePooling2D,Input,Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.layers import RandomCrop, RandomFlip, RandomRotation, RandomContrast
from tensorflow.keras.callbacks import EarlyStopping,ModelCheckpoint
from tensorflow.keras.regularizers import l2
from tensorflow.keras.mixed_precision import experimental as mixed_precision
from tensorflow.keras import backend as K
import json
from PIL import Image


K.clear_session()



print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available:  1


# Train MobileNetV3
To train our neural network we need:
 - Load the data
 - Downscale data and prepare it for input
 - create some neccessary functions to show image, show scores and etc
 - Load MobileNetV3 pretrained model without top
 - Prepare Data augmentation layers
 - Prepare output layers
 - Set neccessary parameters
 - Train the model
 - evaluate the model performance
 - create pipeline 

#### parameters initialization

In [2]:
# We need to set seed for any randomizing that will happen during process
SEED = 44
# choose how much times we want data to be shown
epochs = 5
BATCH_SIZE = 8
# validation frequency so validation wont be run every epoch
VAL_FREQ = 2
OPTIMIZER = 'adam'
shape_mobilenet = 224
VALIDATION_SPLIT = 0.05

# Data preparation

In [3]:
all_types = os.listdir("./jpgs/")
print(all_types)
all_images_dict = {}
all_full_names = {}
for type in all_types:
    all_images_dict[type[:2]] = []

for type in all_types:
    images_of_one_type = []
    for condition in os.listdir(f"./jpgs/{type}"):
        
        condition_images = os.listdir(f"./jpgs/{type}/{condition}")
        images_of_one_type += condition_images
        for img in condition_images:
            all_full_names[img] = f"./jpgs/{type}/{condition}/{img}"
    
    all_images_dict[type[:2]] = images_of_one_type
        
        
        
        # all_images_dict[type].append()
    

['01_alb_id', '02_aut_drvlic_new', '04_aut_id', '07_chl_id', '09_chn_id', '10_cze_id', '12_deu_drvlic_new', '13_deu_drvlic_old', '19_esp_drvlic', '22_est_id', '23_fin_drvlic', '24_fin_id', '26_hrv_drvlic', '29_irn_drvlic', '30_ita_drvlic', '31_jpn_drvlic', '33_mac_id', '35_nor_drvlic', '36_pol_drvlic', '37_prt_id', '38_rou_drvlic', '40_srb_id', '42_svk_id', '43_tur_id']


In [4]:
len(all_full_names)

7064

In [5]:
image_paths = []
labels = []
for key in all_images_dict:
    for image in all_images_dict[key]:
        image_paths.append(image)
        labels.append(image[:4])


## Create pairs for every image

In [6]:

def create_pairs(image_paths, labels, num_positive_pairs=15, num_negative_pairs=35):
    pairs = []
    pair_labels = []

    id_numbers = [label[2:] for label in labels]
    

    unique_ids = np.unique(id_numbers)
    
    # Group images by their ID numbers
    id_indices = {id_num: []  for id_num in unique_ids}
    for key in id_indices:
        for image in image_paths:
            if key == image[2:4]:
                id_indices[key].append(image)
    # print(id_indices)
    for idx1 in range(len(image_paths)):
        image = image_paths[idx1]
        id_1 = image[2:4]
        
        pos_indices = np.array(id_indices[id_1])
        pos_indices = pos_indices[pos_indices != image]
        # print(pos_images)
        selected_pos_indices = np.random.choice(pos_indices, size=min(len(pos_indices), num_positive_pairs), replace=False)
        # print(selected_pos_indices)
        for selected_image in selected_pos_indices:
            image_1 = all_full_names[image_paths[idx1]]
            image_2 = all_full_names[selected_image]
            pairs.append([image_1, image_2])
            pair_labels.append(1)  # Positive pair
        
        
        # print(pairs)
        # print(pair_labels)

        for _ in range(num_negative_pairs):
            id2 = np.random.choice(unique_ids)
            # print(id2)
            while id2 == id_1:  # Ensure the negative pair is from a different ID
                id2 = np.random.choice(unique_ids)
            neg_image = np.random.choice(id_indices[id2])
            # neg_image = random.choice(id_indices[idx2])
            # print(neg_image)
            image_1 = all_full_names[image_paths[idx1]]
            image_2 = all_full_names[neg_image]
            
            pairs.append([image_1, image_2])
            pair_labels.append(0)  # Negative pair
        
        
    return np.array(pairs), np.array(pair_labels)

## Create dataset

In [7]:
X,Y = create_pairs(image_paths, labels)
print(len(X))
# print(X[219850],Y[0])

353200


In [8]:
# Set the seed for reproducibility
np.random.seed(SEED)

shuffled_indices = np.random.permutation(len(X))
if len(X) == len(Y):
    # Shuffle both X and Y using the same indices
    image_pairs = X[shuffled_indices]
    labels = Y[shuffled_indices]
    print("shuffled successfully")
else:
    print("X and Y are not the same length")

shuffled successfully


In [9]:
# Define your image dimensions and batch size
IMG_HEIGHT = 300
IMG_WIDTH = 300

# Preprocessing function for a single image
def preprocess_image(image_path):
    
    # Load and decode image from file
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3) 

    # Resize image
    image = tf.image.resize(image, [IMG_HEIGHT, IMG_WIDTH])

    # Normalize pixel values to the range [0.0, 1.0]
    image = image / 255.0

    return image


# Preprocessing function for image pairs
def preprocess_image_pair(image_pair, label):
    image_path1, image_path2 = image_pair[0], image_pair[1]

    # Preprocess both images in the pair
    image1 = preprocess_image(image_path1)
    image2 = preprocess_image(image_path2)

    return (image1, image2), label
    
# Convert the list of image pairs and labels into a tf.data.Dataset
dataset = tf.data.Dataset.from_tensor_slices((image_pairs, labels))

# Map the preprocessing function to the dataset
dataset = dataset.map(lambda x, y: preprocess_image_pair(x, y), num_parallel_calls=tf.data.AUTOTUNE)

# Calculate the number of samples for the train/val split
total_samples = len(image_pairs)
train_size = int((1 - VALIDATION_SPLIT) * total_samples)

# Split the dataset into training and validation
train_dataset = dataset.take(train_size).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
validation_dataset = dataset.skip(train_size).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)


# Add Data augmentation to our model

In [10]:
def data_augment(X):
    X = RandomCrop(height=shape_mobilenet,width=shape_mobilenet,seed=SEED)(X)
    X = RandomFlip(mode="horizontal_and_vertical",seed=SEED)(X)
    X = RandomRotation(factor=0.9)(X)
    X = RandomContrast(0.8,seed=SEED)(X)
    return X

# Load the model

In [11]:
base_model = MobileNetV3Large(input_shape=(shape_mobilenet,shape_mobilenet,3),weights='imagenet',include_top=False,include_preprocessing=False)


In [12]:
num_neurons = 1024
# define model neurons for the last Dense layer and model name for saving it during training and later 
model_name = f'MobileNetV3_{num_neurons}_v2_siamese.h5'

In [13]:
# Define input shape for each image in the pair (300x300 RGB image)
input_shape = (IMG_HEIGHT, IMG_WIDTH, 3)

# Input layers for the two images so inputs can be passed through the same network branch later
input_1 = layers.Input(shape=input_shape, name='input_image_1')
input_2 = layers.Input(shape=input_shape, name='input_image_2')

# Data augmentation for both inputs
x1 = data_augment(input_1)
x2 = data_augment(input_2)

# Pass both inputs through the same base model
x1 = base_model(x1)
x2 = base_model(x2)

# Global spatial avg pooling layer for both outputs
x1 = GlobalAveragePooling2D()(x1)
x2 = GlobalAveragePooling2D()(x2)

# Calculate the L1 distance between the two outputs
x = layers.Lambda(lambda tensors: tf.abs(tensors[0] - tensors[1]))([x1, x2])

# Fully connected layer
x = Dense(num_neurons,activation='relu',kernel_regularizer=l2(0.02))(x) #
x = Dropout(0.7)(x)


output_len = 2  # two classes 0 = not the same , 1 = the same 
predictions = Dense(output_len, activation='softmax')(x)

# This is the model we will train
model = Model(inputs=[input_1, input_2], outputs=predictions)

In [14]:
# first: train only the top layers (which were randomly initialized) for them to properly initialize
# i.e. freeze all convolutional MobileNetV3 layers
for layer in base_model.layers:
    layer.trainable = False

# compile the model (should be done *after* setting layers to non-trainable)
model.compile(optimizer='adam',
              loss=tf.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])



In [15]:
# Define callbacks
checkpoint = ModelCheckpoint(model_name, monitor='val_loss', save_best_only=True, mode='min')
early_stopping = EarlyStopping(monitor='val_loss', patience=2, mode='min')

In [16]:
BATCH_SIZE = 16

In [None]:
# train the model on the new data for one epoch
epochs = 1

hist = model.fit(train_dataset,
                 epochs = epochs,
                 callbacks = [checkpoint,early_stopping]
                )

In [19]:
# at this point, the top layers are well trained and we can start fine-tuning
# convolutional layers from MobileNet V3. We will freeze the bottom N layers
# and train the remaining top layers.

# let's visualize layer names and layer indices to see how many layers
# we should freeze:
# for i, layer in enumerate(model.layers):
#    print(i, layer.name)

In [20]:
def load_model(model_name):
    model = tf.keras.models.load_model(model_name)
    return model

model = load_model("./MobileNetV3_512_512_siamese.h5")


OSError: No file or directory found at ./MobileNetV3_512_512_siamese.h5

In [21]:
# we chose to train the top 2 inception blocks, i.e. we will freeze
# the first 249 layers and unfreeze the rest:
for layer in model.layers:
   layer.trainable = True

In [22]:
checkpoint = ModelCheckpoint(model_name, monitor='val_loss', save_best_only=True, mode='min')

In [24]:
# we need to recompile the model for these modifications to take effect
# we use adam with a low learning rate
BATCH_SIZE = 8
model.compile(optimizer="adam",
              loss=tf.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy']
             )

In [25]:
# we train our model again, this time fine-tuning the convolutional layers
# alongside the top Dense layers
epochs = 5
hist_unfreezed = model.fit(train_dataset,
                 epochs = epochs,
                 validation_data=validation_dataset,
                 callbacks = [checkpoint,early_stopping]
                 # ,validation_freq=VAL_FREQ
                )


#  https://keras.io/api/applications/#usage-examples-for-image-classification-models

Epoch 1/5
Epoch 2/5
 2887/41943 [=>............................] - ETA: 1:26:23 - loss: 0.6132 - accuracy: 0.6974


KeyboardInterrupt



In [None]:
def load_model(model_name):
    model = tf.keras.models.load_model(model_name)
    return model

# model = load_model('mobilenet.h5')

In [None]:

def predict(img_1_path,img_2_path,model):
    # img_path = 'dawg.jpg'
    img_1 = np.array(preprocess_image(img_1_path))
    # img_2 = np.array(preprocess_image(img_2_path))
    # print(img_1.shape)
    # print(img_2.shape)
    img_1 = np.expand_dims(img_1, axis=0)  # Add batch dimension, shape becomes (1, 300, 300, 3)
    img_2 = np.expand_dims(img_2_path, axis=0)  # Add batch dimension, shape becomes (1, 300, 300, 3)

    
    
    preds = model.predict((img_1,img_2))
    print(preds)
    
    # decode the results into a list of tuples (class, description, probability)
    # (one such list for each sample in the batch)
    if preds[0][0] > 0.7:
        predicted_idx = 0
    elif preds[0][1] > 0.7:
        predicted_idx = 1
    else: 
        predicted_idx = 2
    # predicted_idx = np.argmax(preds)
    print(predicted_idx)
    return predicted_idx, preds

In [None]:
model_name = f'MobileNetV3_1024_siamese.h5'
model = load_model(model_name)

In [None]:
myimg_path = "./saba_1.jpg"
toko_img_path = "./toko_1.jpg"
prava = "./saba_prava.jpg"
predict(myimg_path,prava,model)
img_1 = preprocess_image(myimg_path)
# img_2 = preprocess_image(toko_img_path)

In [24]:
import os
import imghdr

# Define the folder path and the valid image types
folder_path = './jpgs/'
valid_image_types = {'jpeg', 'png', 'gif', 'bmp'}

def check_image_files(folder):
    for root, dirs, files in os.walk(folder):
        for file in files:
            file_path = os.path.join(root, file)
            file_type = imghdr.what(file_path)
            if file_type in valid_image_types:
                # print(f"{file} is a valid image type ({file_type.upper()})")
                pass
            else:
                os.remove(f"{root}/{file}")
                print(f"{root}/{file} is NOT a valid image type")

# Run the function
check_image_files(folder_path)
