In [1]:
# Import Library
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Add

from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import MultiHeadAttention
from tensorflow.keras.layers import Layer
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Concatenate
from tensorflow.keras.layers import LayerNormalization
from tensorflow.keras.layers import GlobalAveragePooling2D
from tensorflow.keras.models import Model

from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import  CSVLogger
from tensorflow.keras.callbacks import  ReduceLROnPlateau
from tensorflow.keras.callbacks import  EarlyStopping

In [2]:
data = {}
data["num_layers"] = 7
data["hidden_dim"] = 512
data["mlp_dim"] = 1024
data["num_heads"] = 7
data["dropout_rate"] = 0.25
data["num_patches"] = 49
data["patch_size"] = 32
data["num_channels"] = 3
data["num_classes"] = 8

In [3]:
class ClassToken(Layer):
    def __init__(self):
        super().__init__()

    def build(self, input_shape):
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(
            initial_value = w_init(shape=(1, 1, input_shape[-1]), dtype=tf.float32),
            trainable = True
        )

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        hidden_dim = self.w.shape[-1]

        # change the shape
        cls = tf.broadcast_to(self.w, [batch_size, 1, hidden_dim]) 
        # change the datatype
        cls = tf.cast(cls, dtype=inputs.dtype) 
        return cls

In [4]:
def MLP(x, data):
    x = Dense(data["mlp_dim"], activation="gelu")(x)
    x = Dropout(data["dropout_rate"])(x)
    x = Dense(data["hidden_dim"])(x)
    x = Dropout(data["dropout_rate"])(x)
    return x

In [5]:
def Transformer(x, data):
    skip_1 = x
    x = LayerNormalization()(x)
    x = MultiHeadAttention(num_heads=data["num_heads"], key_dim=data["hidden_dim"])(x, x)
    x = Add()([x, skip_1])

    skip_2 = x
    x = LayerNormalization()(x)
    x = MLP(x, data)
    x = Add()([x, skip_2])

    return x

In [6]:
# Inputs
input_shape = (data["num_patches"], data["patch_size"]*data["patch_size"]*data["num_channels"])
print("input_shape",input_shape)
inputs = Input(input_shape)     

# Patch
patch_embed = Dense(data["hidden_dim"])(inputs)   
print("patches : ",patch_embed.shape)

# position Embedding
positions = tf.range(start=0, limit=data["num_patches"], delta=1)
pos_embed = Embedding(input_dim=data["num_patches"], output_dim=data["hidden_dim"])(positions) 
print("position Embedding : ",pos_embed.shape)

# patch + position Embedding
embed = patch_embed + pos_embed 
print("Embedding : ",embed.shape)


# Adding Class Token 
token = ClassToken()(embed)
print("token",token.shape)
x = Concatenate(axis=1)([token, embed]) 
print("Affter Add token",x.shape)

# Transfer Encoder
for _ in range(data["num_layers"]):
    x = Transformer(x, data)

# Classification
x = LayerNormalization()(x)    
x = x[:, 0, :]
x = Dense(data["num_classes"], activation="softmax")(x)

model = Model(inputs, x)

input_shape (49, 3072)
patches :  (None, 49, 512)
position Embedding :  (49, 512)
Embedding :  (None, 49, 512)

token (None, 1, 512)
Affter Add token (None, 50, 512)


In [7]:
model.summary()

In [8]:
# Import Library
import numpy as np
import cv2
from glob import glob
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from patchify import patchify

In [9]:
# Hyperparameters
data["image_size"] = 224
data["num_patches"] = (data["image_size"]**2) // (data["patch_size"]**2)
data["flatten_patches"] = (data["num_patches"], data["patch_size"]*data["patch_size"]*data["num_channels"])
data["lr"] = 1e-4
data["num_epochs"] = 50
data["num_classes"] = 8

data["class_names"] = ["anger", "contempt", "disgust", "fear", "happy","neutral","sad","surprise"]

In [10]:
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

In [11]:
def load_data(path, split=0.1):
    images = shuffle(glob(os.path.join(path, "*", "*.jpg")))

    split_size = int(len(images) * split)
    train_x, valid_x = train_test_split(images, test_size=split_size, random_state=42)
    train_x, test_x = train_test_split(train_x, test_size=split_size, random_state=42)

    return train_x, valid_x, test_x

In [12]:
def Convert_dataset(images, batch=32):
    ds = tf.data.Dataset.from_tensor_slices((images))
    ds = ds.map(parse).batch(batch).prefetch(8)
    return ds

In [13]:
def process_image_label(path):
    # Reading images
    path = path.decode() # Convert tensor to string
    image = cv2.imread(path, cv2.IMREAD_COLOR)
    image = cv2.resize(image, (data["image_size"], data["image_size"]))
    image = image/255.0

    # Preprocessing to patches
    patch_shape = (data["patch_size"], data["patch_size"], data["num_channels"])
    patches = patchify(image, patch_shape, data["patch_size"])
    # print("patches",patches.shape)

    patches = np.reshape(patches, (49, 32, 32, 3))
    #for i in range(64):
    #     cv2.imwrite(f"files/{i}.png", patches[i])

    patches = np.reshape(patches, data["flatten_patches"])
    patches = patches.astype(np.float32)

    # Label 
    class_name = path.split("\\")[-2]
    
    class_idx = data["class_names"].index(class_name)
    class_idx = np.array(class_idx, dtype=np.int32)

    return patches, class_idx

In [14]:
def parse(path):
    patches, labels = tf.numpy_function(process_image_label, [path], [tf.float32, tf.int32])
    labels = tf.one_hot(labels, data["num_classes"])

    patches.set_shape(data["flatten_patches"])
    labels.set_shape(data["num_classes"])

    return patches, labels

In [15]:
# Seeding "
import os
np.random.seed(42)
tf.random.set_seed(42)

# Paths 
dataset_path = r"D:\AffectNet_Young"

model_path = os.path.join("files", "model.keras")
csv_path = os.path.join("files", "log.csv")

# Dataset 
train_x, valid_x, test_x = load_data(dataset_path)
print(f"Train: {len(train_x)} - Valid: {len(valid_x)} - Test: {len(test_x)}")

train_ds = Convert_dataset(train_x, batch=32)
valid_ds = Convert_dataset(valid_x, batch=32)

Train: 8515 - Valid: 1064 - Test: 1064


In [16]:
# Compile the Model
model.compile(
    loss="categorical_crossentropy",
    optimizer=tf.keras.optimizers.Adam(data["lr"]),
    metrics=["acc"]
)

In [17]:
callbacks = [
    ModelCheckpoint(model_path, monitor='val_loss', verbose=1, save_best_only=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, min_lr=1e-10, verbose=1),
    CSVLogger(csv_path),
    EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=False),
]

In [18]:
# Fit the Model
model.fit(
    train_ds,
    epochs=data["num_epochs"],
    validation_data=valid_ds,
    callbacks=callbacks
)

Epoch 1/50
[1m138/267[0m [32m━━━━━━━━━━[0m[37m━━━━━━━━━━[0m [1m19:45[0m 9s/step - acc: 0.1620 - loss: 2.4119

KeyboardInterrupt: 

In [None]:
#Plot The Training And Testing
# summarize history for accuracy
import matplotlib.pyplot as plt
plt.figure(1)
plt.subplot(211)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.ylim([0.75,1])
plt.legend(['train', 'test'], loc='lower right')
plt.savefig('plot1')

In [None]:
# summarize history for loss
plt.subplot(212)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.ylim([0,15])
plt.legend(['train', 'test'], loc='upper right')
plt.savefig('plot2')