In [None]:
!pip install -U tensorflow-addons

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
import matplotlib.pyplot as plt  # plot
import random

**Mounted Drive**

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

**GetData**

In [None]:
# !unzip /content/drive/MyDrive/Exp/archive.zip -d /content/

In [None]:
import os
def countFile(link):
    path = link
    num_files = len([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])

    print(f"Number of files in directory {link} is ", num_files)

In [None]:
countFile("/kaggle/input/affectnet-training-data/happy")
countFile("/kaggle/input/affectnet-training-data/sad")

**Construct dataset**

In [None]:
import cv2
import numpy as np
from keras.utils import to_categorical
import os
import pandas as pd

In [None]:
#df = pd.read_csv("/content/labels.csv")

In [None]:
#happy_link = ['/content/' + ele for ele in df[df['label'] == 'happy']['pth'].to_list() if 'happy' in ele]

In [None]:
#sad_link = ['/content/' + ele for ele in df[df['label'] == 'sad']['pth'].to_list() if 'sad' in ele]

In [None]:
INPUT_PATH = '/kaggle/input/affectnet-training-data/'
EMOTIONS = ["happy","sad"]
IMAGE_SIZE = (96, 96)

def image_generator(input_path, emotions, image_size):
    for index, emotion in enumerate(emotions):
        for filename in os.listdir(os.path.join(input_path, emotion)):
            img = cv2.imread(os.path.join(input_path, emotion, filename))
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert to RGB
            #img = cv2.resize(img, image_size)
            #img = img.astype('float32') / 255.0  # Normilize
            yield img, index

def load_images(input_path, emotions, image_size):
    X, y = [], []
    for img, label in image_generator(input_path, emotions, image_size):
        X.append(img)
        y.append(label)
    X = np.array(X)
    y = np.array(y)
    return X, y

In [None]:
X, y = load_images(INPUT_PATH,EMOTIONS, IMAGE_SIZE)
input_shape = X[0].shape

In [None]:
idx = np.random.randint(len(X))

# display the image and its corresponding label from arrays
plt.imshow(X[idx])
plt.title(EMOTIONS[np.argmax(y[idx])])
plt.axis('off')  # remove the grid
plt.show()

**Util Function**

In [None]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Lambda, Dropout, LayerNormalization, MultiHeadAttention
from tensorflow.keras.activations import gelu
import numpy as np

#Patch image to subimage (adapt with input of visionTransformer)
class Patches(layers.Layer):
    def __init__(self,patch_size):
        super(Patches,self).__init__()
        self.patch_size = patch_size
    
    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images = images,
            sizes = [1,self.patch_size,self.patch_size,1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size,-1,patch_dims])
        # print("hehe:, ",patches.shape)
        return patches
        
def get_angle(pos, i, d_model):
    indices = i // 2
    angle_rates = 1 / np.power(10000,(2*indices) / np.float32(d_model))
    
    return pos * angle_rates
    
def pos_encoding(pos,d_model):
    angle_rads = get_angle(np.arange(pos)[:,np.newaxis],
                            np.arange(d_model)[np.newaxis,:],
                            d_model)
    
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    return tf.cast(angle_rads, dtype=tf.float32)

class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super(PatchEncoder,self).__init__()
        self.num_patches = num_patches
        self.projection_dim = projection_dim
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = pos_encoding(pos = num_patches, d_model = projection_dim)

    def call(self, patch):
        length = tf.shape(patch)[1]
        # print("debug1 ",self.projection(patch).shape)
        # print("debug2 ",self.position_embedding.shape)
        encoded = self.projection(patch) + self.position_embedding
        return encoded

class RandomSampling(layers.Layer):
    def __init__(self, num_patches, mask_ratio=0.75):
        super(RandomSampling,self).__init__()
        self.num_patches = num_patches
        self.mask_ratio = mask_ratio
        
        self.num_mask = int(mask_ratio * num_patches)
        self.un_masked_indices = None
        self.masked_indices = None
    
    def get_indices(self):
        return [self.masked_indices, self.un_masked_indices]
    
    def call(self, patches):
        self.masked_indices = np.random.choice(self.num_patches, size=self.num_mask, replace=False)
        self.un_masked_indices = np.delete(np.array(range(self.num_patches)), self.masked_indices)
        
        return tf.gather(patches, self.un_masked_indices, axis=1), self.masked_indices, self.un_masked_indices

class MaskToken(layers.Layer):
    def __init__(self):
        super(MaskToken,self).__init__()
        self.mask_indices = None
        self.un_masked_indices = None
        self.indices = None
        self.mst = None
        self.hidden_size = None
    
    def build(self, input_shape):
        self.hidden_size = input_shape[-1]
        self.mst = tf.Variable(
            name="mst",
            initial_value = tf.random.normal(
                shape=(1, 1, self.hidden_size), dtype='float32'), 
            trainable=True
        )

    def call(self, inputs, mask_indices, un_masked_indices):
        self.mask_indices = mask_indices
        self.un_masked_indices = un_masked_indices
                
        batch_size = tf.shape(inputs)[0]
        mask_num = self.mask_indices.shape[0]
        #update for bach_size
        mst_broadcasted = tf.cast(
                            tf.broadcast_to(self.mst, [batch_size, mask_num, self.hidden_size]),
                            dtype=inputs.dtype,
                        )
        self.indices = tf.concat([self.mask_indices, self.un_masked_indices], axis=0)
        updates = tf.concat([mst_broadcasted, inputs], axis=1)
        out = tf.gather(updates, self.indices, axis=1, batch_dims=0)
        return out

class TransformerBlock(layers.Layer):
    def __init__(self, num_heads, mlp_dim, dropout):
        super(TransformerBlock,self).__init__()
        self.num_heads = num_heads
        self.mlp_dim = mlp_dim
        self.dropout = dropout

    def build(self, input_shape):
        self.att = MultiHeadAttention(
            num_heads = self.num_heads,
            key_dim = input_shape[-1] // self.num_heads #d_model is input_shape[-1]
        )
        
        self.mlpBlock = Sequential([
            Dense(self.mlp_dim,activation="linear"),
            Lambda(lambda x : gelu(x, approximate=False)),
            Dropout(self.dropout),
            Dense(input_shape[-1]),
            Dropout(self.dropout),
        ])
        
        self.layerNorm1 = LayerNormalization(epsilon = 1e-6)
        self.layerNorm2 = LayerNormalization(epsilon = 1e-6)
        self.layerDropout = Dropout(self.dropout)
    
    def call(self, inputs, training):
        x = self.att(inputs,inputs)
        x = self.layerDropout(x, training = training)
        x = x + inputs
        y = self.layerNorm2(x)
        y = self.mlpBlock(y)
        x = x + y
        x = self.layerNorm1(x)
        return x

def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units = units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

**train, val, test split**

In [None]:
id_pos = np.where(y == 1)[0]
id_neg = np.where(y == 0)[0]

np.random.shuffle(id_pos)
np.random.shuffle(id_neg)

id_train_neg = id_neg[:int(len(id_neg) * 0.7)]
id_train_pos = id_pos[:int(len(id_pos) * 0.7)]
id_train = np.concatenate((id_train_neg, id_train_pos), axis = 0)

id_val_neg = id_neg[int(len(id_neg) * 0.7):int(len(id_neg) * 0.9)]
id_val_pos = id_pos[int(len(id_pos) * 0.7):int(len(id_pos) * 0.9)]
id_val = np.concatenate((id_val_neg, id_val_pos), axis = 0)

id_test_neg = id_neg[int(len(id_neg) * 0.9):]
id_test_pos = id_pos[int(len(id_pos) * 0.9):]
id_test = np.concatenate((id_test_neg, id_test_pos), axis = 0)

In [None]:
#train
x_train = X[id_train]
y_train = y[id_train]
#val
x_val = X[id_val]
y_val = y[id_val]
#test
x_test = X[id_test]
y_test = y[id_test]

y_train = y_train.reshape((-1, 1))
y_val = y_val.reshape((-1, 1))
y_test = y_test.reshape((-1, 1))

print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_val shape: {x_val.shape} - y_val shape: {y_val.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

In [None]:
y_train = to_categorical(y_train,dtype = 'int32')
y_val = to_categorical(y_val,dtype = 'int32')
y_test = to_categorical(y_test,dtype = 'int32')

In [None]:
y_train.shape

**Hyperparam**

In [None]:
learning_rate = 0.001
weight_decay = 0.0001
batch_size = 256
num_epochs = 100
image_size = 72  # We'll resize input images to this size
patch_size = 6  # Size of the patches to be extract from the input images
num_patches = (image_size // patch_size) ** 2
projection_dim = 64
num_heads = 4
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers
transformer_layers = 8
mlp_head_units = [2048, 1024]  # Size of the dense layers of the final classifier
drop_out = 0.1
num_class = 2

**Augmentation**

In [None]:
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(factor=0.02),
        layers.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)

data_augmentation.layers[0].adapt(x_train)

**Create model**

In [None]:
def vit_model():
    inputs = layers.Input(input_shape)
    augmented = data_augmentation(inputs)
    patches = Patches(patch_size)(augmented)
    x = PatchEncoder(num_patches,projection_dim)(patches)
    for _ in range(transformer_layers):
         x = TransformerBlock(num_heads=num_heads, mlp_dim=projection_dim * 2, dropout=drop_out) (x) 
    # print(x.shape)
    x = layers.Flatten()(x)
    x = layers.Dropout(0.2)(x)
    # print(x.shape)
    # Add MLP.
    features = mlp(x, hidden_units=mlp_head_units, dropout_rate=0.2)
    logits = layers.Dense(num_class, activation = "softmax")(features)
    model = keras.Model(inputs=inputs, outputs=logits)
    
    return model

In [None]:
from tensorflow.keras.applications.resnet50 import ResNet50

def resnet_model():
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)

    x = base_model.output
    x = layers.GlobalAveragePooling2D()(x)
    
    # output has shape of (None, 2)
    output = layers.Dense(num_class, activation="softmax")(x)
    model = keras.Model(inputs=base_model.input, outputs=output)
    return model

In [None]:
from tensorflow.keras.applications.vgg16 import VGG16

# function to build VGG model
def vgg_model():
    # load model without classifier layers
    base_model = VGG16(weights='imagenet', include_top=False, input_shape=input_shape)
    # mark loaded layers as not trainable
    # for layer in base_model.layers:
    #     layer.trainable = False
    # add new classifier layers
    flat1 = layers.Flatten()(base_model.layers[-1].output)
    class1 = layers.Dense(1024, activation='relu')(flat1)
    output = layers.Dense(num_class, activation='softmax')(class1)
    # define new model
    model = keras.Model(inputs=base_model.inputs, outputs=output)
    return model    

**Optimizer**

In [None]:
def run_experiment(model,name):
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

    model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=[
            keras.metrics.CategoricalAccuracy(name="accuracy"),
            tfa.metrics.F1Score(num_classes=2)
        ],
    )

    checkpoint_filepath = f"/content/tmp/{name}/checkpoint"
    checkpoint_callback = keras.callbacks.ModelCheckpoint(
        checkpoint_filepath,
        monitor="val_accuracy",
        save_best_only=True,
        save_weights_only=True,
    )

    history = model.fit(
        x=x_train,
        y=y_train,
        batch_size=batch_size,
        epochs=15,
        validation_data = (x_val,y_val),
        callbacks=[checkpoint_callback],
    )

    model.load_weights(checkpoint_filepath)
    _, accuracy, f1_score_res = model.evaluate(x_test, y_test)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    print("f1_score: ", f1_score_res)

    return history

In [None]:
vit_classifier = vit_model() 
# vit_classifier.summary()
# history = run_experiment(vit_classifier,"vit")

In [None]:
resnet_classifier = resnet_model()
# resnet_classifier.summary()
history = run_experiment(resnet_classifier, "RESNET")

In [None]:
vgg_model = vgg_model()
# vgg_model.summary()
history = run_experiment(vgg_model, "VGG")

In [None]:
while 1:
    continue

In [None]:
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='val')
plt.legend()

In [None]:
plt.plot(history.history['accuracy'], label='train')
plt.plot(history.history['val_accuracy'], label='val')
plt.legend()

# PLOT Prediction on Test Set

In [None]:
# use model to predict and plot the result
y_pred = vgg_model.predict(x_test)
y_pred = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)

print("GROUND TRUTH: ")
print(f"Label 1: ", np.count_nonzero(y_true == 1))
print(f"Label 0: ", np.count_nonzero(y_true == 0))


print("PREDICTION: ")
print(f"Label 1: ", np.count_nonzero(y_pred == 1))
print(f"Label 0: ", np.count_nonzero(y_pred == 0))

# plot random 20 x_test and their predicted labels and ground truth labels
fig, axs = plt.subplots(4, 5, figsize=(15, 10))
for i in range(20):
    ax = axs[i//5, i%5]
    index = random.randint(0, len(x_test))
    ax.imshow(x_test[index])
    ax.set_title(f"Predicted label: {y_pred[index]}\nTrue label: {y_true[index]}")
    ax.axis('off')