Clone the github repo

In [1]:
!git clone --branch scratch https://github.com/cssaivishnu/IR_Repo.git

Cloning into 'IR_Repo'...
remote: Enumerating objects: 63783, done.[K
remote: Counting objects: 100% (96/96), done.[K
remote: Compressing objects: 100% (59/59), done.[K
remote: Total 63783 (delta 49), reused 71 (delta 37), pack-reused 63687[K
Receiving objects: 100% (63783/63783), 2.34 GiB | 35.43 MiB/s, done.
Resolving deltas: 100% (68/68), done.
Updating files: 100% (63288/63288), done.


Import the essential libraries

In [26]:
!pip install tensorflow-addons


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



In [5]:
import numpy as np
import os
import shutil
import cv2
from google.colab.patches import cv2_imshow
import torchvision.transforms as transforms
from PIL import Image
import json
import random
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
import pandas as pd

tf.random.set_seed(20)
random.seed = 20
np.random.seed = 20

Make the main repo as the current active repository

In [6]:
os.chdir('/content/IR_Repo')
cwd = os.getcwd()
print(cwd)

/content/IR_Repo


From the complete dataset of approximately 63285 images from 35 categories, we consider only the product categories with atleast 150 images.

In [7]:
def func(name):
    for i in range(len(name)):
        if name[i] == '&' or name[i] == '-':
            name = name[:i] + '_' + name[i+1:]
    return name

In [8]:
dir_path = 'atlas_dataset_full'
dir_count = 0
active_dir_count = 0
complete_dir_count = 0
total_images = 0
images_list = []

os.mkdir('dataset')

for name in sorted(os.listdir(dir_path)):
    if os.path.isdir(os.path.join(dir_path, name)):
        path = os.path.join(dir_path, name)
        path = os.path.join(path, 'images')
        num_files = len([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])
        if(num_files > 150):
            active_dir_count = active_dir_count + 1
            name = func(name)
            new_path = os.path.join('dataset', name)
            os.mkdir(new_path)
            i = 0
            for f in os.listdir(path):
                if i == 600:
                    complete_dir_count = complete_dir_count + 1
                    break
                if os.path.isfile(os.path.join(path, f)):
                    i = i + 1
                    src = os.path.join(path, f)
                    fname = '{:03d}'.format(i) + '.' + f.split('.')[-1]
                    dst = os.path.join(new_path, fname)
                    shutil.copy2(src, dst)
            images_list.append(i)
        total_images = total_images + num_files
        dir_count += 1

print("Total Number of categories:", dir_count)
print("Number of categories with atleast 150 images:", active_dir_count)
print("Number of categories with 600 images:", complete_dir_count)
print("Total Number of Images:", total_images)

Total Number of categories: 35
Number of categories with atleast 150 images: 22
Number of categories with 600 images: 18
Total Number of Images: 63285


We find that 22 out of these 35 categories only have atleast 150 images

We want to have 600 images from each of the 22 product categories, out of which 4 of them have less than 600 images.

Now, we will perform image augmentation to increase the number of images in those 4 product categories with less than 600 images to 600 images

In [9]:
# Here, the image is flipped horizontally to create a new image

def horizontalflipping_augmentation(path, new_path):
    # Define the horizontal flipping transformation
    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(p=1),
    ])

    # Load the image
    img = Image.open(path)

    # Apply the horizontal flipping transformation
    img_flipped = transform(img)

    # Display the original and flipped images
    # img.show()
    # img_flipped.show()

    # Save the horizontally flipped image
    img.save(new_path)

In [10]:
# Here, the image is modified by varying color glittering entities like contrast, brightness etc.

def colorgittering_augmentation(path,new_path):
    # Load the image
    img = cv2.imread(path)

    # Define the range of color jittering values
    brightness = 0.1
    contrast = 0.1
    saturation = 0.1
    hue = 0.1

    # Convert the image from BGR to HSV color space
    img_hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)

    # Apply color jittering to the image
    img_hsv[:, :, 2] = np.clip(img_hsv[:, :, 2] * (1 + brightness), 0, 255)
    img_hsv[:, :, 1] = np.clip(img_hsv[:, :, 1] * (1 + contrast), 0, 255)
    img_hsv[:, :, 1] = np.clip(img_hsv[:, :, 1] * (1 + saturation), 0, 255)
    img_hsv[:, :, 0] = np.clip(img_hsv[:, :, 0] * (1 + hue), 0, 255)

    # Convert the image back to BGR color space
    img_jittered = cv2.cvtColor(img_hsv, cv2.COLOR_HSV2BGR)

    # Display the original and jittered images
    # cv2_imshow(img)
    # cv2_imshow(img_jittered)
    # cv2.waitKey(0)
    # cv2.destroyAllWindows()

    # Save the color gritted image
    cv2.imwrite(new_path, img)

In [11]:
# Here, the image is randomly scaled to a feasible size

def randomscaling_augmentation(path,new_path):
    # Load the image
    img = Image.open(path)
    width, height = img.size
    size = int(min(width,height)*0.9)

    # Define the random scaling transformation
    transform = transforms.Compose([
        transforms.RandomResizedCrop(size=size, scale=(0.8, 1.0)),
    ])

    # Apply the random scaling transformation
    img_scaled = transform(img)

    # Display the original and scaled images
    # img.show()
    # img_scaled.show()

    # Save the randomly scaled image
    img.save(new_path)

In [12]:
dir_path = 'dataset'
images_list = []
name_list = sorted(os.listdir(dir_path))

for name in name_list:
    if os.path.isdir(os.path.join(dir_path, name)):
        path = os.path.join(dir_path, name)
        num_files = len([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])
        images_list.append(num_files)
        if num_files != 600:
            # print(num_files)
            # print(name)
            for f in os.listdir(path):
                if os.path.isfile(os.path.join(path, f)):
                    img_name = int(f.split('.')[0])
                    for i in range(1,4):
                        new_img_name = img_name + num_files*i
                        if new_img_name > 600:
                            break
                        new_img_name = '{:03d}'.format(new_img_name) + '.' + f.split('.')[-1]
                        img_path = os.path.join(path,f)
                        new_img_path = os.path.join(path,new_img_name)
                        if i == 1:
                            horizontalflipping_augmentation(img_path,new_img_path)
                        if i == 2:
                            colorgittering_augmentation(img_path,new_img_path)
                        if i == 3:
                            randomscaling_augmentation(img_path,new_img_path)

Create the label encodings mapping the product categories

In [13]:
dir_path = 'dataset'
i = 0
name_to_label = dict()
label_to_name = dict()

for name in name_list:
    if os.path.isdir(os.path.join(dir_path, name)):
        name_to_label[name] = i
        label_to_name[i] = name
        i = i + 1

Load 500 out of 600 images from each product category and modify them to (56, 56) shape for training and evaluation of the model

In [14]:
dir_path = 'dataset'
dataset = []
image_shape = (56, 56)

for name in name_list:
    if os.path.isdir(os.path.join(dir_path, name)):
        path = os.path.join(dir_path, name)
        # print(name_to_label[name])
        for f in os.listdir(path):
            if os.path.isfile(os.path.join(path, f)):
                num = int(f.split('.')[0])
                if num > 500:
                    continue
                img_path = os.path.join(path, f)
                img = Image.open(img_path)
                img = img.resize(image_shape, Image.ANTIALIAS)
                pixels = img.load()
                lst = []
                for i in range(img.size[0]):
                    lst1 = []
                    for j in range(img.size[1]):
                        lst1.append(list(pixels[i, j]))
                    lst.append(lst1)
                lst = np.array(lst)
                tupl = (lst,name_to_label[name])
                dataset.append(tupl)

Shuffle the dataset for randomness and split the dataset into features(X) and labels(y)

In [15]:
random.shuffle(dataset)
X = []
y = []

for tupl in dataset:
    x, yy = tupl
    X.append(x)
    y.append(yy)
X = np.array(X)
y = np.array(y).reshape(11000,1)

# Just clear the space
# dataset = []

Initialisation

In [17]:
input_shape = (56, 56, 3)
num_categories = len(name_list)

Split the dataset into train and test data

In [18]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
print('X_train:', X_train.shape)
print('X_test:', X_test.shape)
print('y_train:', y_train.shape)
print('y_test:', y_test.shape)

X_train: (8800, 56, 56, 3)
X_test: (2200, 56, 56, 3)
y_train: (8800, 1)
y_test: (2200, 1)


Image Data Augmentation

In [19]:
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.RandomFlip('horizontal'),
        layers.RandomRotation(0.02),
        layers.RandomWidth(0.2),
        layers.RandomHeight(0.2)
    ]
)

data_augmentation.layers[0].adapt(X_train)

Define the Supervised Contrastive Loss Function that will be used in training the model 

In [20]:
class SupervisedContrastiveLoss(keras.losses.Loss):
    def __init__(self, temperature=1, name=None):
        super(SupervisedContrastiveLoss, self).__init__(name=name)
        self.temperature = temperature
    def __call__(self, labels, feature_vectors, sample_weight=None):
        # normalize the feature vectors
        feature_vectors_normailzed = tf.math.l2_normalize(feature_vectors, axis=1)
        # Compute Logits
        logits = tf.divide(
            tf.matmul(
                feature_vectors_normailzed, tf.transpose(feature_vectors_normailzed)
            ),
            self.temperature,
        )
        return tfa.losses.npairs_loss(tf.squeeze(labels), logits)

Create an encoder to encode the images pixel data using the RNN - ResNet50V2

In [21]:
def create_encoder():
    resnet = keras.applications.ResNet50V2(
        include_top=False, weights=None, input_shape=input_shape, pooling='avg'
    )
    inputs = keras.Input(shape=input_shape)
    augmented = data_augmentation(inputs)
    outputs = resnet(augmented)
    model = keras.Model(inputs=inputs, outputs=outputs, name='encoder')
    return model

Adding a Projection head

In [22]:
def add_projection_head(encoder, projection_units):
    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    outputs = layers.Dense(projection_units, activation='relu')(features)
    model = keras.Model(
        inputs=inputs, outputs=outputs, name='encoder_with_projection_head'
    )
    return model

Train the encoder with Supervised Contrastive Loss defined above for better encoding

In [27]:
learning_rate = 0.001
batch_size = 100
projection_units = 256
epochs = 50
temperature = 0.05

encoder = create_encoder()
encoder.summary()
encoder_with_projection_head = add_projection_head(encoder, projection_units)
encoder_with_projection_head.compile(
    optimizer=keras.optimizers.Adam(learning_rate),
    loss=SupervisedContrastiveLoss(temperature)
)
encoder_with_projection_head.summary()

with tf.device('/gpu:0'):
    history = encoder_with_projection_head.fit(
        x=X_train, y=y_train, batch_size=batch_size, epochs=epochs
    )

Model: "encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_5 (InputLayer)        [(None, 56, 56, 3)]       0         
                                                                 
 sequential (Sequential)     (None, None, None, 3)     7         
                                                                 
 resnet50v2 (Functional)     (None, 2048)              23564800  
                                                                 
Total params: 23,564,807
Trainable params: 23,519,360
Non-trainable params: 45,447
_________________________________________________________________
Model: "encoder_with_projection_head"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_6 (InputLayer)        [(None, 56, 56, 3)]       0         
                                                                 
 enc

Creating and training the classifier using the pretrained encoder, and saving the model into a .h5 file

In [29]:
def create_classifier(encoder, dropout_rate, hidden_units, learning_rate, trainable=True):
    for layer in encoder.layers:
        layer.trainable = trainable
    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    features = layers.Dropout(dropout_rate)(features)
    features = layers.Dense(hidden_units, activation='relu')(features)
    features = layers.Dropout(dropout_rate)(features)
    outputs = layers.Dense(num_categories, activation='softmax')(features)

    model = keras.Model(inputs=inputs, outputs=outputs, name = 'classifier')
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )
    return model

In [30]:
dropout_rate = 0.5
hidden_units = 512
learning_rate = 0.001
batch_size = 100
epochs = 100

classifier = create_classifier(encoder, dropout_rate, hidden_units, learning_rate, trainable=False)
classifier.summary()
with tf.device('/gpu:0'):
    history = classifier.fit(x=X_train, y=y_train, batch_size=batch_size, epochs=epochs, validation_split=0.2)

classifier.save('classifier.h5')

Model: "classifier"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_7 (InputLayer)        [(None, 56, 56, 3)]       0         
                                                                 
 encoder (Functional)        (None, 2048)              23564807  
                                                                 
 dropout (Dropout)           (None, 2048)              0         
                                                                 
 dense_2 (Dense)             (None, 512)               1049088   
                                                                 
 dropout_1 (Dropout)         (None, 512)               0         
                                                                 
 dense_3 (Dense)             (None, 22)                11286     
                                                                 
Total params: 24,625,181
Trainable params: 1,060,374
Non