Load the dataset

In [None]:
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D
from keras.layers import Activation, Dropout, Flatten, Dense
from keras import backend as K
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt
import tensorflow_addons as tfa
from tensorflow.keras.layers.experimental import preprocessing
from simclr import SimCLR
import tensorflow_datasets as tfds


In [None]:
import csv
import os

datapath = 'clothing-dataset/'
data = []
with open(os.path.join(datapath, 'images.csv')) as csv_file:
    reader = csv.reader(csv_file, delimiter=',')
    for line in reader:
        data.append(line)
    

In [None]:
print(data[:10])

In [None]:
labels = data[0]
data = data[1:]

In [None]:
print(labels)
print(data[:10])

In [None]:
num_examples = len(data)
print(len(data))

In [None]:
label_counts = dict()
for row in data:
    if row[2] not in label_counts:
        label_counts[row[2]] = 0
    label_counts[row[2]] += 1
    
valid_labels = dict()
for k, v in label_counts.items():
    if v >= 100 and k not in ['Not sure', 'Others', 'Skip']:
        valid_labels[k] = v

filename_to_label = dict()
cleaned_data = []
valid_filenames = []
for i in range(len(data)):
    if data[i][2] in valid_labels.keys():
        cleaned_data.append(data[i])
        valid_filenames.append(data[i][0])
        filename_to_label[data[i][0]] = data[i][2]



num_examples = len(cleaned_data)
print(num_examples)

In [None]:
import os
import shutil

imagespath = 'clothing-dataset/images'
images = [f for f in os.listdir(imagespath) if os.path.isfile(os.path.join(imagespath, f))]

num_images_moved = 0



for i in range(len(images)):
    filename = images[i].split(".")[0]
    if filename not in valid_filenames:
        continue
        
    if num_images_moved < num_train + num_validate:
        folder_name = 'train'
    else:
        folder_name = 'test'
    
    new_folder = os.path.join(imagespath, folder_name)
    
    classname = filename_to_label[filename]
    finalfoldername = os.path.join(new_folder, classname)
    
    if not os.path.exists(finalfoldername):
        os.makedirs(finalfoldername)
    
    old_image_path = os.path.join(imagespath, images[i])
    new_image_path = os.path.join(finalfoldername, images[i])
    shutil.move(old_image_path, new_image_path)
    num_images_moved += 1

In [None]:
num_train = 3600
num_validate = 900
num_test = 461
epochs = 50
batch_size = 16
train_dir = os.path.join(os.getcwd(), os.path.join(imagespath, 'train'))
img_height = 128
img_width = 128
input_shape = (128, 128, 3)
num_classes = len(valid_labels)
labels = ["Blazer", 'Dress', 'Hat', 'Hoodie', 'Longsleeve', 'Outwear', 'Pants', 'Polo', 'Shirt', 'Shoes', 'Shorts', 'Skirt', 'T-Shirt', 'Undershirt']
print(num_classes)

In [None]:
import random
seed = random.randint(1, 10000)
train_ds = tf.keras.utils.image_dataset_from_directory(
  train_dir,
  validation_split=0.2,
  subset="training",
  seed=seed,
  image_size=(img_height, img_width),
  batch_size=batch_size,
)

val_ds = tf.keras.utils.image_dataset_from_directory(
  train_dir,
  validation_split=0.2,
  subset="validation",
  seed=seed,
  image_size=(img_height, img_width),
  batch_size=batch_size)

In [None]:
lmao = train_ds.take(1)
fig, ax = plt.subplots(3, 3)
for x, y in tfds.as_numpy(lmao):
    for i in range(batch_size):
        if i < 9:
            ax[i // 3, i % 3].imshow(x[i] / 255)
            ax[i // 3, i % 3].set_title(labels[y[i]])
            ax[i // 3, i % 3].axis('off')

In [None]:
data_augmentation = tf.keras.Sequential([tf.keras.layers.RandomFlip("horizontal_and_vertical"), 
                                         tf.keras.layers.RandomRotation(0.2),])


In [None]:
model = tf.keras.Sequential([
    data_augmentation, 
  tf.keras.layers.Rescaling(1./255),
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.MaxPooling2D(),
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.MaxPooling2D(),
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dense(num_classes)
])

In [None]:
model.compile(
  optimizer='adam',
  loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
  metrics=['accuracy'])

In [None]:
history = model.fit(
  train_ds,
  validation_data=val_ds,
  epochs=epochs
)

In [None]:
predictor = tf.keras.Sequential([
    model,
    tf.keras.layers.Softmax()
])
predictor.compile(
  optimizer='adam',
  loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
  metrics=['accuracy'])
accuracy = predictor.evaluate(test_ds)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")
# summarize history for accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
test_dir = os.path.join(os.getcwd(), os.path.join(imagespath, 'test'))
test_ds = tf.keras.utils.image_dataset_from_directory(
  test_dir,
  seed=seed,
  image_size=(img_height, img_width),
  batch_size=batch_size)

In [None]:
def create_encoder():
    transfer_model = tf.keras.Sequential()
    resnet50 = tf.keras.applications.ResNet50V2(include_top=False,
                       input_shape=(img_height,img_width,3),
                       pooling='avg',classes=num_classes,
                       weights='imagenet')

    


    transfer_model.add(data_augmentation)

    transfer_model.add(resnet50)
    return transfer_model


def create_classifier(encoder, trainable=True):
    for layer in encoder.layers:
        layer.trainable=trainable
    encoder.add(Dropout(0.1))
    encoder.add(Flatten())
    encoder.add(Dense(128, activation='relu'))
    encoder.add(Dropout(0.1))
    encoder.add(Dense(num_classes))
    return encoder

    
    


In [None]:
transfer_model = tf.keras.Sequential()
resnet50 = tf.keras.applications.ResNet50V2(include_top=False,
                    input_shape=(img_height,img_width,3),
                    pooling='avg',classes=num_classes,
                    weights='imagenet')


transfer_model.add(data_augmentation)

for layer in resnet50.layers:
    layer.trainable = False

transfer_model.add(resnet50)
transfer_model.add(Flatten())
transfer_model.add(tf.keras.layers.BatchNormalization())
transfer_model.add(Dense(128, activation='relu'))
transfer_model.add(Dropout(0.1))
transfer_model.add(tf.keras.layers.BatchNormalization())
transfer_model.add(Dense(num_classes))


In [None]:

transfer_model.compile(optimizer='adam',loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),metrics=['accuracy'])
history = transfer_model.fit(train_ds, validation_data=val_ds, epochs=epochs)


In [None]:
accuracy = classifier.evaluate(test_ds)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")
# summarize history for accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
evaluation = transfer_model.evaluate(test_ds)

In [None]:
labels = ["Blazer", 'Dress', 'Hat', 'Hoodie', 'Longsleeve', 'Outwear', 'Pants', 'Polo', 'Shirt', 'Shoes', 'Shorts', 'Skirt', 'T-Shirt', 'Undershirt']
len(labels)

In [None]:
pred_labels = np.argmax(predictions, axis=1)
pred_classes = np.array(labels)[pred_labels]


In [None]:
class SupervisedContrastiveLoss(keras.losses.Loss):
    def __init__(self, temperature=1, name=None):
        super(SupervisedContrastiveLoss, self).__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):
        # Normalize feature vectors
        feature_vectors_normalized = tf.math.l2_normalize(feature_vectors, axis=1)
        # Compute logits
        logits = tf.divide(
            tf.matmul(
                feature_vectors_normalized, tf.transpose(feature_vectors_normalized)
            ),
            self.temperature,
        )
        return tfa.losses.npairs_loss(tf.squeeze(labels), logits)


def add_projection_head(encoder):
    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    outputs = layers.Dense(projection_units, activation="relu")(features)
    model = keras.Model(
        inputs=inputs, outputs=outputs, name="cifar-encoder_with_projection-head"
    )
    return model

In [None]:
learning_rate = 0.01
temperature = 0.05
projection_units = 128

In [None]:
encoder = create_encoder()

encoder_with_projection_head = add_projection_head(encoder)
encoder_with_projection_head.compile(
    optimizer=keras.optimizers.Adam(learning_rate),
    loss=SupervisedContrastiveLoss(temperature),
)

encoder_with_projection_head.summary()

epochs = 15

history = encoder_with_projection_head.fit(
    train_ds, validation_data=val_ds, batch_size=batch_size, epochs=epochs
)

classifier = create_classifier(encoder, trainable=False)

classifier.compile(
        optimizer=keras.optimizers.Adam(learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )

epochs = 15

history = classifier.fit(train_ds, validation_data=val_ds, batch_size=batch_size, epochs=epochs)



In [None]:
predictor = tf.keras.Sequential([classifier, tf.keras.layers.Softmax()])

predictor.compile(
  optimizer='adam',
  loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
  metrics=['accuracy'])

accuracy = classifier.evaluate(test_ds)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")

In [None]:
Algorithm = SimCLR

width = 128
num_epochs = 10

hyperparams = {SimCLR: {"temperature": 0.1}}

# architecture
model = Algorithm(
    contrastive_augmenter=keras.Sequential(
        [
            layers.Input(shape=(img_width, img_height, 3)),
            preprocessing.Rescaling(1 / 255),
            preprocessing.RandomFlip("horizontal"),
            
        ],
        name="contrastive_augmenter",
    ),
    classification_augmenter=keras.Sequential(
        [
            layers.Input(shape=(img_width, img_height, 3)),
            preprocessing.Rescaling(1 / 255),
            preprocessing.RandomFlip("horizontal"),
            
        ],
        name="classification_augmenter",
    ),
    encoder=keras.Sequential(
        [
            layers.Input(shape=(img_width, img_height, 3)),
            layers.Conv2D(width, kernel_size=3, strides=2, activation="relu"),
            layers.Conv2D(width, kernel_size=3, strides=2, activation="relu"),
            layers.Conv2D(width, kernel_size=3, strides=2, activation="relu"),
            layers.Conv2D(width, kernel_size=3, strides=2, activation="relu"),
            layers.Flatten(),
            layers.Dense(width, activation="relu"),
        ],
        name="encoder",
    ),
    projection_head=keras.Sequential(
        [
            layers.Input(shape=(width,)),
            layers.Dense(width, activation="relu"),
            layers.Dense(width),
        ],
        name="projection_head",
    ),
    linear_probe=keras.Sequential(
        [
            layers.Input(shape=(width,)),
            layers.Dense(num_classes),
        ],
        name="linear_probe",
    ),
    **hyperparams[Algorithm],
)

In [None]:
model.compile(
    contrastive_optimizer=keras.optimizers.Adam(),
    probe_optimizer=keras.optimizers.Adam(),
)

# run training

np_train_ds = tfds.as_numpy(train_ds)
np_val_ds = tfds.as_numpy(val_ds)


    
history = model.fit(np_train_ds, epochs=num_epochs, validation_data=np_val_ds)