In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
# Import the necessary packages
import os, sys
import torch
import torchvision

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import GlobalAveragePooling2D
from tensorflow.keras.layers import MaxPooling2D

import tensorflow.keras.backend as K
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
import random
import IPython

from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Lambda
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import EarlyStopping

from sklearn.model_selection import train_test_split

from datetime import datetime
from PIL import Image, ImageOps

In [3]:
from utils.dataset import load

In [22]:
# Specify the shape of the inputs for our network
IMG_SIZE = 224
CROP_SIZE = 110
IMG_SHAPE = (IMG_SIZE, IMG_SIZE, 3)

In [5]:
# Specify the batch size and number of epochs
BATCH_SIZE = 256
EPOCHS = 8
EPOCHS_MLP = 50

In [6]:
# Define the path to the base output directory
BASE_OUTPUT = "../../Results"
# Use the base output path to derive the path to the serialized
# Model along with training history plot
MODEL_PATH = os.path.sep.join([BASE_OUTPUT, "siamese_model"])
PLOT_PATH = os.path.sep.join([BASE_OUTPUT, "plot.png"])

In [23]:
# Predefined functions

def contrastive_loss(y, preds, margin=1):
	# Explicitly cast the true class label data type to the predicted class label data type (otherwise we run the risk of having two separate data types, causing TensorFlow to error out)
	y = tf.cast(y, preds.dtype)
	
    # Calculate the contrastive loss between the true labels and the predicted labels
	squaredPreds = K.square(preds)
	squaredMargin = K.square(K.maximum(margin - preds, 0))
	loss = K.mean((1 - y) * squaredPreds + y * squaredMargin)
	
	# Return the computed contrastive loss to the calling function
	return loss

def build_siamese_model(inputShape, embeddingDim=48):
	# Specify the inputs for the feature extractor network
	inputs = Input(inputShape)
 
	# Define the first set of CONV => RELU => POOL => DROPOUT layers 64 filters 2x2
	x = Conv2D(64, (2, 2), padding="same", activation="relu")(inputs) # Si entrada 28x28x1 -> 28x28x64
	x = MaxPooling2D(pool_size=2)(x) # Si entrada 28x28x64 -> 14x14x64
	x = Dropout(0.3)(x)
 
	# Second set of CONV => RELU => POOL => DROPOUT layers 64 filters 2x2
	x = Conv2D(64, (2, 2), padding="same", activation="relu")(x) # Si entrada 14x14x64 -> 14x14x64
	x = MaxPooling2D(pool_size=2)(x) # Si entrada 14x14x64 -> 7x7x64
	x = Dropout(0.3)(x)
 
 	# Prepare the final outputs
	pooledOutput = GlobalAveragePooling2D()(x)
	outputs = Dense(embeddingDim)(pooledOutput)
	
	# Build the model
	model = Model(inputs, outputs)
	
	# Return the model to the calling function
	return model

def get_pairs_route(images_dir, data_df, image_shape):
    images = []
    labels = []
    names = []
    # We use the product type as different classes
    numClasses = data_df["Category"].unique()

    for category in os.listdir(images_dir):
        category_path = os.path.join(images_dir, category)
        if os.path.isdir(category_path):
            for style in os.listdir(category_path):
                style_path = os.path.join(category_path, style)
                if os.path.isdir(style_path):
                    for img_name in os.listdir(style_path):
                        img_path = os.path.join(style_path, img_name)
                        img_name = os.path.join(category, style, img_name)
                        try:
                            matches = data_df[data_df["ImgPath"] == img_name]
                            if len(matches) != 1:
                                raise ValueError(f"Expected one match for {img_name}, but found {len(matches)}")

                            label = matches.Category.item()  # Convert the single value to a scalar
                            # Load the image
                            image = Image.open(img_path)
                            
                            # All images should be the same size
                            image_resize = image.resize((image_shape[0], image_shape[1]))

                            # Initialize channel_img to None
                            channel_img = None
                            
                            # Convert image to numpy array
                            if image_shape[2] == 1:
                                channel_img = image_resize.convert("L")
                            elif image_shape[2] == 3:
                                channel_img = image_resize.convert("RGB")
                            
                            if channel_img is None:
                                raise ValueError(f"Unexpected number of channels in image: {img_path}")
            
                            data = np.asarray(channel_img)
                            
                            images.append(data)
                            labels.append(label)
                            names.append(img_name)
                        except Exception as e:
                            print(f"Error processing {img_path}: {e}")

    return np.stack(images), np.stack(labels), np.stack(names)

def make_pairs(images, labels):
	# Initialize two empty lists to hold the (image, image) pairs and labels to indicate if a pair is positive or negative
	pairImages = []
	pairLabels = []
	
	# Calculate the total number of classes present in the dataset and then build a list of indexes for each class label that provides the indexes for all examples with a given label

	classes = np.unique(labels)
 
	idx = [np.where(labels == i)[0] for i in classes]
	
	# Loop over all images
	for idxA in range(len(images)):
		
		# grab the current image and label belonging to the current iteration
		currentImage = images[idxA]
		label = labels[idxA]
		label_idx = np.where(classes == label)[0][0]
		
		# Randomly pick an image that belongs to the *same* class label
		idxB = np.random.choice(idx[label_idx])
		posImage = images[idxB]
		
		# Prepare a positive pair and update the images and labels lists, respectively
		pairImages.append([currentImage, posImage])
		pairLabels.append([1])
		
		# Grab the indices for each of the class labels *not* equal to the current label and randomly pick an image corresponding to a label *not* equal to the current label
		negIdx = np.where(labels != label)[0]
		negImage = images[np.random.choice(negIdx)]
		
		# Prepare a negative pair of images and update our lists
		pairImages.append([currentImage, negImage])
		pairLabels.append([0])
		
	# Return a 2-tuple of our image pairs and labels
	return (np.array(pairImages), np.array(pairLabels))



def euclidean_distance(vectors):
	# Unpack the vectors into separate lists
	(featsA, featsB) = vectors

	# Compute the sum of squared distances between the vectors
	sumSquared = K.sum(K.square(featsA - featsB), axis=1, keepdims=True)
    
	# Return the euclidean distance between the vectors
	return K.sqrt(K.maximum(sumSquared, K.epsilon()))
     

def plot_training(H, plotPath):
	# Construct a plot that plots and saves the training history
	plt.style.use("ggplot")
	plt.figure()
	plt.plot(H.history["loss"], label="train_loss")
	plt.plot(H.history["val_loss"], label="val_loss")
	plt.plot(H.history["f1_m"], label="train_f1")
	plt.plot(H.history["val_f1_m"], label="val_f1")
	plt.title("Training Loss and Accuracy")
	plt.xlabel("Epoch #")
	plt.ylabel("Loss/F1")
	plt.legend(loc="lower left")
	plt.savefig(plotPath)
     

def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))
     

def visualize_test(pairTrain_visualize, model_visualize, size=10, seed=42):
  np.random.seed(seed)
  sample_index = np.random.randint(pairTrain_visualize.shape[0], size=size)

  # Loop over all image pairs
  for (i, (imageA, imageB)) in enumerate(pairTrain_visualize[sample_index]):
    
    # Load both the images and convert them to grayscale create a copy of both the images for visualization purpose
    origA = imageA.copy()
    origB = imageB.copy()

    if len(imageA.shape) == 3 and imageA.shape[-1] == 1:
      imageA = imageA[:, :, 0]
      imageB = imageB[:, :, 0]
    
    elif len(imageA.shape) == 4 and imageA.shape[-1] == 1:
      imageA = imageA[:, :, :, 0]
      imageB = imageB[:, :, :, 0]
    
    imageA = np.expand_dims(imageA, axis=0)
    imageB = np.expand_dims(imageB, axis=0)
    
    # Use our siamese model to make predictions on the image pair, indicating whether or not the images belong to the same class
    preds = model_visualize.predict([imageA, imageB])
    proba = preds[0][0]

    # Initialize the figure
    fig = plt.figure("Pair #{}".format(i + 1), figsize=(4, 2))
    if model_visualize.loss == "binary_crossentropy":
      plt.suptitle("Similarity: {:.2f}".format(proba))
    else:
      plt.suptitle("Distance: {:.2f}".format(proba))
    
    # Show first image
    ax = fig.add_subplot(1, 2, 1)
    plt.imshow(origA)
    plt.axis("off")
    
    # Show the second image
    ax = fig.add_subplot(1, 2, 2)
    plt.imshow(origB)
    plt.axis("off")
    
    # Show the plot
    plt.show()
     

def get_recommendation(model, target, data, rec_number, printable=True):
  recommended_list = []

  origA = target.copy()
  target = np.expand_dims(target, axis=0)

  if len(origA.shape) == 3 and origA.shape[-1] == 1:
      origA = origA[:, :, 0]

  if len(origA.shape) == 4 and origA.shape[-1] == 1:
      origA = origA[:, :, :, 0]

  index = 0
  for img in data:
    
    # Load both the images and convert them to grayscale create a copy of both the images for visualization purpose
    origB = img.copy()

    if len(origB.shape) == 3 and origB.shape[-1] == 1:
      origB = origB[:, :, 0]

    if len(origB.shape) == 4 and origB.shape[-1] == 1:
      origB = origB[:, :, :, 0]
    
    
    img = np.expand_dims(img, axis=0)
    
    # Use our siamese model to make predictions on the image pair, indicating whether or not the images belong to the same class
    preds = model.predict([target, img], verbose = 0)
    proba = preds[0][0]

    recommended_list.append((origB, proba, index))
    index = index + 1
  
 
  if model.loss == "binary_crossentropy":
    
    # Better greater similarity
    recommended_list.sort(key=lambda a: a[1], reverse=True)
  else: 
    
    # Better less distance
    recommended_list.sort(key=lambda a: a[1])

  count = 0
  recommended_list_index = []
  for (rec_img, rec_pred, aux_index) in recommended_list:
    if count == rec_number:
      break
    
    # Get the index of the recommended item in original list
    recommended_list_index.append(aux_index)

    if printable:
      
      # Initialize the figure
      fig = plt.figure("Recommendations", figsize=(4, 2))
      if model.loss == "binary_crossentropy":
        plt.suptitle("Similarity: {:.2f}".format(rec_pred))
      else:
        plt.suptitle("Distance: {:.2f}".format(rec_pred))
      
      # Show first image
      ax = fig.add_subplot(1, 2, 1)
      plt.imshow(origA)
      plt.axis("off")
      
      # Show the second image
      ax = fig.add_subplot(1, 2, 2)
      plt.imshow(rec_img)
      plt.axis("off")
      
      # Show the plot
      plt.show()
    count += 1

  return recommended_list_index
     

def recall_and_precission_at_k(model, images, labels, k, p_groups=0.05):
  n_items = len(images)

  np.array(images)
  images = images / 255.0
  images = np.expand_dims(images, axis=-1)

  items_index = random.sample(range(1, n_items), int(n_items*p_groups))
  
  sum_recall_list = []
  sum_precission_list = []

  for index in items_index:
    
    # The data retrieved is not given to de the net
    target = images[index]
    target_label = labels[index]
    data = np.concatenate((images[:index], images[index+1:]), axis=0)
    data_labels = np.concatenate((labels[:index], labels[index+1:]), axis=0)

    # Get top recommendations
    top_index = get_recommendation(model, target, data, k, printable=False)
    top_labels = data_labels[top_index].tolist()

    # Compute Rel_k/Rel
    data_labels = data_labels.tolist()
    rel = data_labels.count(target_label)

    rel_at_k = top_labels.count(target_label)
    sum_recall_list.append(rel_at_k / rel)

    # Compute Rel_k/k
    sum_precission_list.append(rel_at_k / k)

  return sum(sum_recall_list) / len(sum_recall_list), sum(sum_precission_list) / len(sum_precission_list)


# Loading the Dataset

In [8]:
# Path to the dataset
rawDataPath = '../../Data/Furniture_Data/'

In [9]:
# Load DataFrame
furniture_dataset = load(rawDataPath)

Error processing ../../Data/Furniture_Data/lamps\Modern\11286modern-lighting.jpg: [Errno 13] Permission denied: '../../Data/Furniture_Data/lamps\\Modern\\11286modern-lighting.jpg'


In [10]:
furniture_dataset

Unnamed: 0,ImgPath,FileType,Width,Height,Ratio,Mode,Bands,Transparency,Animated,Category,Interior_Style
0,beds\Asian\19726asian-daybeds.jpg,jpg,350,350,1.0,RGB,R G B,False,False,beds,Asian
1,beds\Asian\20027asian-canopy-beds.jpg,jpg,350,350,1.0,RGB,R G B,False,False,beds,Asian
2,beds\Asian\20109asian-panel-beds.jpg,jpg,350,350,1.0,RGB,R G B,False,False,beds,Asian
3,beds\Asian\20508asian-platform-beds.jpg,jpg,350,350,1.0,RGB,R G B,False,False,beds,Asian
4,beds\Asian\20750asian-comforters-and-comforter...,jpg,350,350,1.0,RGB,R G B,False,False,beds,Asian
...,...,...,...,...,...,...,...,...,...,...,...
90078,tables\Victorian\5victorian-side-tables-and-en...,jpg,350,350,1.0,RGB,R G B,False,False,tables,Victorian
90079,tables\Victorian\6victorian-side-tables-and-en...,jpg,350,350,1.0,RGB,R G B,False,False,tables,Victorian
90080,tables\Victorian\7victorian-side-tables-and-en...,jpg,350,350,1.0,RGB,R G B,False,False,tables,Victorian
90081,tables\Victorian\8victorian-dining-tables.jpg,jpg,350,350,1.0,RGB,R G B,False,False,tables,Victorian


In [11]:
print(furniture_dataset["ImgPath"].head())

0                    beds\Asian\19726asian-daybeds.jpg
1                beds\Asian\20027asian-canopy-beds.jpg
2                 beds\Asian\20109asian-panel-beds.jpg
3              beds\Asian\20508asian-platform-beds.jpg
4    beds\Asian\20750asian-comforters-and-comforter...
Name: ImgPath, dtype: object


In [24]:
images, labels, names = get_pairs_route(rawDataPath, furniture_dataset, IMG_SHAPE)
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

Error processing ../../Data/Furniture_Data/lamps\Modern\11286modern-lighting.jpg: Expected one match for lamps\Modern\11286modern-lighting.jpg, but found 0


MemoryError: Unable to allocate 12.6 GiB for an array with shape (90083, 224, 224, 3) and data type uint8

In [19]:
images

array([None, None, None, ..., None, None, None], dtype=object)