In [None]:
import numpy as np
import tensorflow as tf
import cv2
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
from tensorflow.keras import Model
from tensorflow.keras.layers import Layer, Resizing, Rescaling, InputLayer, Conv2D, MaxPool2D, Activation, Add, GlobalAveragePooling2D
from tensorflow.keras.layers import BatchNormalization, Dropout, Flatten, Dense, RandomRotation, RandomFlip, RandomContrast
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.regularizers import L2
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.metrics import CategoricalAccuracy, TopKCategoricalAccuracy
from tensorflow.keras.optimizers import Adam

In [3]:
train_directory = "C:\ML\dataset\Emotions Dataset\Emotions Dataset\Train"
validation_directory = "C:\ML\dataset\Emotions Dataset\Emotions Dataset\Test"
class_names = ["angry", "happy", "sad"]

In [None]:
train_dataset = tf.keras.preprocessing.image_dataset_from_directory(
    train_directory,
    labels='inferred',
    label_mode='categorical',
    class_names=class_names,
    color_mode='rgb',
    batch_size=32,
    image_size=(256, 256),
    shuffle=True,
    seed=42,
)

In [None]:
validation_dataset = tf.keras.preprocessing.image_dataset_from_directory(
    validation_directory,
    labels='inferred',
    label_mode='categorical',
    class_names=class_names,
    color_mode='rgb',
    batch_size=32,
    image_size=(256, 256),
    shuffle=False,
)

In [6]:
augment_layers = tf.keras.Sequential([
    RandomRotation(factor = (-0.025, 0.025)),
    RandomFlip(mode = 'horizontal'),
    RandomContrast(factor = 0.1)
])

def augmentation(image, label):
    return augment_layers(image, training = True), label

In [7]:
train_dataset = train_dataset.map(augmentation, num_parallel_calls = tf.data.experimental.AUTOTUNE).prefetch(tf.data.experimental.AUTOTUNE)
validation_dataset = validation_dataset.map(augmentation, num_parallel_calls = tf.data.experimental.AUTOTUNE).prefetch(tf.data.experimental.AUTOTUNE)

In [27]:
CONFIGURATION = {
    "BATCH_SIZE" : 32,
    "IMAGE_SIZE" : 256,
    "LEARNING_RATE" : 0.001,
    "DROPOUT_RATE" : 0,
    "REGULARIZATION_RATE" : 0,
    "EPOCHS" : 25,
    "N_FILTERS" : 6,
    "KERNEL_SIZE" : 3,
    "N_STRIDES" : 1,
    "POOL_SIZE" : 2,
    "N_DENSE_1" : 1024,
    "N_DENSE_2" : 128,
    "NUM_CLASSES" : 3
}

In [14]:
def lenet_model(train_dataset, validation_dataset, CONFIGURATION):
    resize_rescale = tf.keras.Sequential([
        Resizing(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"]),
        Rescaling(1/255)
    ])
    
    model = tf.keras.Sequential([
        InputLayer(input_shape = (None, None, 3)),
        resize_rescale,

        Conv2D(filters = CONFIGURATION["N_FILTERS"], kernel_size = CONFIGURATION["KERNEL_SIZE"], strides = CONFIGURATION["N_STRIDES"], 
            activation = "relu", kernel_regularizer = L2(CONFIGURATION["REGULARIZATION_RATE"])),
        BatchNormalization(),
        MaxPool2D(pool_size = CONFIGURATION["POOL_SIZE"], strides = CONFIGURATION["N_STRIDES"]*2),
        Dropout(rate = CONFIGURATION["DROPOUT_RATE"]),

        Conv2D(filters = CONFIGURATION["N_FILTERS"]*2 + 4, kernel_size = CONFIGURATION["KERNEL_SIZE"], strides = CONFIGURATION["N_STRIDES"], 
            activation = "relu", kernel_regularizer = L2(CONFIGURATION["REGULARIZATION_RATE"])),
        BatchNormalization(),
        MaxPool2D(pool_size = CONFIGURATION["POOL_SIZE"], strides = CONFIGURATION["N_STRIDES"]*2),

        Flatten(),

        Dense(CONFIGURATION["N_DENSE_1"], activation = "relu", kernel_regularizer = L2(CONFIGURATION["REGULARIZATION_RATE"])),
        BatchNormalization(),
        Dropout(rate = CONFIGURATION["DROPOUT_RATE"]),

        Dense(CONFIGURATION["N_DENSE_2"], activation = "relu", kernel_regularizer = L2(CONFIGURATION["REGULARIZATION_RATE"])),
        BatchNormalization(),

        Dense(CONFIGURATION["NUM_CLASSES"], activation = "softmax")
    ])

    loss_function = CategoricalCrossentropy()
    metrics = [CategoricalAccuracy(name = "Accuracy"), TopKCategoricalAccuracy(k = 2, name = "Top_K_Accuracy")]

    model.compile(
        optimizer = Adam(learning_rate = CONFIGURATION["LEARNING_RATE"]),
        loss = loss_function,
        metrics = metrics
    )

    history = model.fit(
        train_dataset,
        validation_data = validation_dataset,
        epochs = CONFIGURATION["EPOCHS"],
        verbose = 1
    )

    return model, history

In [None]:
model, history = lenet_model(train_dataset, validation_dataset, CONFIGURATION)

In [32]:
def predicting(model, validation_dataset, class_names):
    labels = []
    predictions = []

    for im, lb in validation_dataset:
        for i in range(32):

            test_image = tf.expand_dims(im[i], axis = 0)
            pred = class_names[tf.argmax(model(test_image).numpy()[0])]
            predictions = np.append(predictions, pred)

            labels = np.append(labels, class_names[tf.argmax(lb[i].numpy())])
            if predictions.shape[0] == 2278:
                break

    return predictions, labels

In [17]:
def plot_cm(predictions, labels):
    cm = confusion_matrix(labels, predictions)
    plt.figure(figsize=(8,8))
    sns.heatmap(cm,annot=True)

In [None]:
predictions, labels = predicting(model, validation_dataset, class_names)
plot_cm(predictions, labels)

In [19]:
class CustomConv2D(Layer):
    def __init__(self, n_filters, kernel_size, n_strides, padding = "same"):
        super(CustomConv2D, self).__init__(name = "Custom_Conv_Layer")

        self.conv = Conv2D(filters = n_filters,kernel_size = kernel_size, activation = 'relu', 
                           strides = n_strides, padding = padding)
        self.batchnorm = BatchNormalization()

    def call(self, input, training = True):
        x = self.conv(input)
        x = self.batchnorm(x, training = True)

        return x

In [20]:
class ResidualBlock(Layer):
    def __init__(self, n_channels, n_strides = 1):
        super(ResidualBlock, self).__init__(name = "Residual_block")

        self.dotted = (n_strides!=1)

        self.custom_conv_1 = CustomConv2D(n_channels, 3, n_strides, padding = "same")
        self.custom_conv_2 = CustomConv2D(n_channels, 3, 1, padding = "same")
        self.activation = Activation("relu")

        if self.dotted:
            self.custom_conv_3 = CustomConv2D(n_channels, 1, n_strides)

    def call(self, input, training = True):
        x = self.custom_conv_1(input, training = True)
        x = self.custom_conv_2(x, training = True)

        if self.dotted:
            x_add = self.custom_conv_3(input, training = True)
            x_add = Add()([x,x_add])
        else:
            x_add = Add()([x,input])
        
        return self.activation(x_add)

In [21]:
class Resnet_Model(Model):
    def __init__(self):
        super(Resnet_Model, self).__init__(name = "resnet34")

        self.conv_1 = CustomConv2D(64, 7, 2)
        self.maxpool = MaxPool2D(3, 2)

        self.conv_2_1 = ResidualBlock(64)
        self.conv_2_2 = ResidualBlock(64)
        self.conv_2_3 = ResidualBlock(64)

        self.conv_3_1 = ResidualBlock(128, 2)
        self.conv_3_2 = ResidualBlock(128)
        self.conv_3_3 = ResidualBlock(128)
        self.conv_3_4 = ResidualBlock(128)

        self.conv_4_1 = ResidualBlock(256, 2)
        self.conv_4_2 = ResidualBlock(256)
        self.conv_4_3 = ResidualBlock(256)
        self.conv_4_4 = ResidualBlock(256)
        self.conv_4_5 = ResidualBlock(256)
        self.conv_4_6 = ResidualBlock(256)

        self.conv_5_1 = ResidualBlock(512, 2)
        self.conv_5_2 = ResidualBlock(512)
        self.conv_5_3 = ResidualBlock(512)

        self.global_pool = GlobalAveragePooling2D()
        self.fc_3 = Dense(CONFIGURATION["NUM_CLASSES"], activation = "softmax")

    def call(self, x, training = True):
        x = self.conv_1(x, training = True)
        x = self.maxpool(x)
        
        x = self.conv_2_1(x, training = True)
        x = self.conv_2_2(x, training = True)
        x = self.conv_2_3(x, training = True)

        x = self.conv_3_1(x, training = True)
        x = self.conv_3_2(x, training = True)
        x = self.conv_3_3(x, training = True)
        x = self.conv_3_4(x, training = True)

        x = self.conv_4_1(x, training = True)
        x = self.conv_4_2(x, training = True)
        x = self.conv_4_3(x, training = True)
        x = self.conv_4_4(x, training = True)
        x = self.conv_4_5(x, training = True)
        x = self.conv_4_6(x, training = True)

        x = self.conv_5_1(x, training = True)
        x = self.conv_5_2(x, training = True)
        x = self.conv_5_3(x, training = True)

        x = self.global_pool(x)
        return self.fc_3(x)

In [25]:
def resnet_model_run(train_dataset, validation_dataset, CONFIGURATION):
    resnet = Resnet_Model()

    checkpoint_callback = ModelCheckpoint(
        'best_weights.keras',
        monitor = 'val_accuracy',
        mode = 'max',
        verbose = 1,
        save_best_only = True
    )

    loss_function = CategoricalCrossentropy()
    metrics = [CategoricalAccuracy(name = "Accuracy"), TopKCategoricalAccuracy(k = 2, name = "Top_K_Accuracy")]

    resnet.compile(
            optimizer = Adam(learning_rate = CONFIGURATION["LEARNING_RATE"]*10),
            loss = loss_function,
            metrics = metrics
        )

    history = resnet.fit(
            train_dataset,
            validation_data = validation_dataset,
            epochs = 1,
            # epochs = CONFIGURATION["EPOCHS"],
            verbose = 1,
            callbacks = [checkpoint_callback]
        )
    
    return resnet, history

In [None]:
resnet, history = resnet_model_run(train_dataset.take(1), validation_dataset, CONFIGURATION)

In [36]:
class EfficientNet(Model):
    def __init__(self):
        super(EfficientNet, self).__init__(name = "EfficientNet")

        self.backbone = tf.keras.applications.EfficientNetB4(
            include_top=False,
            weights='imagenet',
            input_shape=(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"], 3),
        )
        self.backbone.trainable = False

        self.efficientnet = tf.keras.Sequential([
            InputLayer(shape=(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"], 3)),
            self.backbone,
            GlobalAveragePooling2D(),
            Dense(CONFIGURATION["N_DENSE_1"], activation="relu"),
            BatchNormalization(),
            Dense(CONFIGURATION["N_DENSE_2"], activation="relu"),
            Dense(CONFIGURATION["NUM_CLASSES"], activation="softmax")
        ])

    def call(self, x):
        x = self.efficientnet(x)
        return x

In [39]:
def efficientnet_model_run(train_dataset, validation_dataset, CONFIGURATION):
    efficientnet = EfficientNet()

    checkpoint_callback = ModelCheckpoint(
        'best_weights.keras',
        monitor = 'val_accuracy',
        mode = 'max',
        verbose = 1,
        save_best_only = True
    )

    loss_function = CategoricalCrossentropy()
    metrics = [CategoricalAccuracy(name = "Accuracy"), TopKCategoricalAccuracy(k = 2, name = "Top_K_Accuracy")]

    efficientnet.compile(
            optimizer = Adam(learning_rate = CONFIGURATION["LEARNING_RATE"]*10),
            loss = loss_function,
            metrics = metrics
        )
    
    history = efficientnet.fit(
            train_dataset,
            validation_data = validation_dataset,
            epochs = 1,
            # epochs = CONFIGURATION["EPOCHS"],
            verbose = 1,
            callbacks = [checkpoint_callback]
        )
    
    return efficientnet, history

In [None]:
efficientnet, history = efficientnet_model_run(train_dataset, validation_dataset, CONFIGURATION)