In [42]:
import certifi
import ssl
import os

# Update the SSL certificates
os.environ['SSL_CERT_FILE'] = certifi.where()

# Now try loading the models again
from tensorflow.keras.applications import VGG19, Xception, MobileNetV2, InceptionV3, EfficientNetB7

# Create a simple model to test the download
try:
    vgg19_model = VGG19(weights='imagenet', include_top=False)
    print("VGG19 model loaded successfully")
except Exception as e:
    print("Error loading VGG19 model:", e)

try:
    xception_model = Xception(weights='imagenet', include_top=False)
    print("Xception model loaded successfully")
except Exception as e:
    print("Error loading Xception model:", e)

try:
    mobilenetv2_model = MobileNetV2(weights='imagenet', include_top=False)
    print("MobileNetV2 model loaded successfully")
except Exception as e:
    print("Error loading MobileNetV2 model:", e)

try:
    inception_model = InceptionV3(weights='imagenet', include_top=False)
    print("InceptionV3 model loaded successfully")
except Exception as e:
    print("Error loading InceptionV3 model:", e)

try:
    efficient_model = EfficientNetB7(weights='imagenet', include_top=False)
    print("EfficientNetB7 model loaded successfully")
except Exception as e:
    print("Error loading EfficientNetB7 model:", e)


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg19/vgg19_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m80134624/80134624[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 0us/step
VGG19 model loaded successfully
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/xception/xception_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m83683744/83683744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 0us/step
Xception model loaded successfully
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5
[1m9406464/9406464[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 0us/step
MobileNetV2 model loaded successfully
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m87910968/87910968[0m [32m━━━━━━━━━━━━

In [1]:
import os
import numpy as np
from PIL import Image
from concurrent.futures import ThreadPoolExecutor
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator

def is_image_file(filename):
    valid_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff')
    return filename.lower().endswith(valid_extensions)

def load_image(img_path, img_size=(96, 96)):
    try:
        with Image.open(img_path) as img:
            img = img.resize(img_size)
            return np.array(img)
    except Exception as e:
        print(f"Warning: Unable to read image {img_path}: {e}")
        return None

def load_images_and_labels(data_dir, img_size=(96, 96)):
    images = []
    labels = []
    class_names = sorted(os.listdir(data_dir))  # Ensure class names are sorted
    img_paths = []
    img_labels = []

    for label, class_name in enumerate(class_names):
        class_dir = os.path.join(data_dir, class_name)
        if os.path.isdir(class_dir):
            for img_file in os.listdir(class_dir):
                img_path = os.path.join(class_dir, img_file)
                if is_image_file(img_file):
                    img_paths.append(img_path)
                    img_labels.append(label)

    # Use ThreadPoolExecutor for parallel image loading
    with ThreadPoolExecutor() as executor:
        loaded_images = list(executor.map(load_image, img_paths, [img_size] * len(img_paths)))

    for img, label in zip(loaded_images, img_labels):
        if img is not None:
            images.append(img)
            labels.append(label)

    return np.array(images), np.array(labels)

data_dir = 'Datasets/AffectNet'  

# Load and preprocess the data
images, labels = load_images_and_labels(data_dir, img_size=(96, 96))

# Normalize the images
images = images / 255.0

# Split the dataset into training, validation, and test sets
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=42)  

# Data augmentation
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    horizontal_flip=True,
)
datagen.fit(X_train)


In [2]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, GlobalMaxPooling2D, Conv2D, Multiply, Add, Reshape, Lambda, Input
from tensorflow.keras.models import Model

def channel_attention(input_feature, ratio=8):
    channel = input_feature.shape[-1]
    print(f'Input feature shape (channel attention): {input_feature.shape}')
    
    shared_layer_one = Dense(channel // ratio, activation='relu', kernel_initializer='he_normal', use_bias=True, bias_initializer='zeros')
    shared_layer_two = Dense(channel, kernel_initializer='he_normal', use_bias=True, bias_initializer='zeros')
    
    avg_pool = GlobalAveragePooling2D()(input_feature)
    print(f'Avg pool shape: {avg_pool.shape}')
    avg_pool = Reshape((1, 1, channel))(avg_pool)
    avg_pool = shared_layer_one(avg_pool)
    avg_pool = shared_layer_two(avg_pool)
    print(f'Avg pool after dense layers shape: {avg_pool.shape}')
    
    max_pool = GlobalMaxPooling2D()(input_feature)
    print(f'Max pool shape: {max_pool.shape}')
    max_pool = Reshape((1, 1, channel))(max_pool)
    max_pool = shared_layer_one(max_pool)
    max_pool = shared_layer_two(max_pool)
    print(f'Max pool after dense layers shape: {max_pool.shape}')
    
    # Ensure both tensors have the same shape
    assert avg_pool.shape == max_pool.shape, f"Shape mismatch: avg_pool {avg_pool.shape}, max_pool {max_pool.shape}"
    
    cbam_feature = Add()([avg_pool, max_pool])
    cbam_feature = tf.nn.sigmoid(cbam_feature)
    print(f'CBAM feature shape (channel attention): {cbam_feature.shape}')
    
    return Multiply()([input_feature, cbam_feature])

def spatial_attention(input_feature):
    print(f'Input feature shape (spatial attention): {input_feature.shape}')
    avg_pool = Lambda(lambda x: tf.reduce_mean(x, axis=3, keepdims=True))(input_feature)
    max_pool = Lambda(lambda x: tf.reduce_max(x, axis=3, keepdims=True))(input_feature)
    concat = tf.concat([avg_pool, max_pool], axis=3)
    print(f'Concat shape: {concat.shape}')
    cbam_feature = Conv2D(filters=1, kernel_size=7, strides=1, padding='same', activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(concat)
    print(f'CBAM feature shape (spatial attention): {cbam_feature.shape}')
    
    return Multiply()([input_feature, cbam_feature])

def attention_block(input_feature):
    cbam_feature = channel_attention(input_feature)
    cbam_feature = spatial_attention(cbam_feature)
    return cbam_feature


In [3]:
from tensorflow.keras.applications import VGG16
from tensorflow.keras.layers import Flatten, Dense, Dropout

def create_dan_model(input_shape=(96, 96, 3), num_classes=8):
    input_layer = Input(shape=input_shape)
    
    base_model = VGG16(weights='imagenet', include_top=False, input_tensor=input_layer)
    for layer in base_model.layers:
        layer.trainable = False
    
    x = base_model.output
    print(f'Base model output shape: {x.shape}')
    
    x = attention_block(x)
    x = Flatten()(x)
    x = Dense(512, activation='relu')(x)
    x = Dropout(0.5)(x)
    output_layer = Dense(num_classes, activation='softmax')(x)
    
    model = Model(inputs=input_layer, outputs=output_layer)
    return model

# Instantiate the model
model = create_dan_model()

# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(lr=0.0001), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Print the model summary to verify the architecture
model.summary()


Base model output shape: (None, 3, 3, 512)
Input feature shape (channel attention): (None, 3, 3, 512)
Avg pool shape: (None, 512)
Avg pool after dense layers shape: (None, 1, 1, 512)
Max pool shape: (None, 512)
Max pool after dense layers shape: (None, 1, 1, 512)


ValueError: A KerasTensor cannot be used as input to a TensorFlow function. A KerasTensor is a symbolic placeholder for a shape and dtype, used when constructing Keras Functional models or Keras Functions. You can only use it as input to a Keras layer or a Keras operation (from the namespaces `keras.layers` and `keras.operations`). You are likely doing something like:

```
x = Input(...)
...
tf_fn(x)  # Invalid.
```

What you should do instead is wrap `tf_fn` in a layer:

```
class MyLayer(Layer):
    def call(self, x):
        return tf_fn(x)

x = MyLayer()(x)
```


In [4]:
import os
import numpy as np
from PIL import Image
from concurrent.futures import ThreadPoolExecutor
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator

def is_image_file(filename):
    valid_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff')
    return filename.lower().endswith(valid_extensions)

def load_image(img_path, img_size=(96, 96)):
    try:
        with Image.open(img_path) as img:
            img = img.resize(img_size)
            return np.array(img)
    except Exception as e:
        print(f"Warning: Unable to read image {img_path}: {e}")
        return None

def load_images_and_labels(data_dir, img_size=(96, 96), max_workers=8):
    images = []
    labels = []
    class_names = sorted(os.listdir(data_dir))  # Ensure class names are sorted
    img_paths = []
    img_labels = []

    for label, class_name in enumerate(class_names):
        class_dir = os.path.join(data_dir, class_name)
        if os.path.isdir(class_dir):
            for img_file in os.listdir(class_dir):
                img_path = os.path.join(class_dir, img_file)
                if is_image_file(img_file):
                    img_paths.append(img_path)
                    img_labels.append(label)

    # Use ThreadPoolExecutor for parallel image loading
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        loaded_images = list(executor.map(load_image, img_paths, [img_size] * len(img_paths)))

    for img, label in zip(loaded_images, img_labels):
        if img is not None:
            images.append(img)
            labels.append(label)

    return np.array(images), np.array(labels)

data_dir = 'Datasets/AffectNet'  

# Load and preprocess the data
images, labels = load_images_and_labels(data_dir, img_size=(96, 96), max_workers=8)

# Normalize the images
images = images / 255.0

# Split the dataset into training, validation, and test sets
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=42)

# Data augmentation
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    horizontal_flip=True,
)
datagen.fit(X_train)


In [5]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, GlobalMaxPooling2D, Conv2D, Multiply, Add, Reshape, Lambda, Input, Flatten, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.applications import VGG16

def channel_attention(input_feature, ratio=8):
    channel = input_feature.shape[-1]
    print(f'Input feature shape (channel attention): {input_feature.shape}')
    
    shared_layer_one = Dense(channel // ratio, activation='relu', kernel_initializer='he_normal', use_bias=True, bias_initializer='zeros')
    shared_layer_two = Dense(channel, kernel_initializer='he_normal', use_bias=True, bias_initializer='zeros')
    
    avg_pool = GlobalAveragePooling2D()(input_feature)
    print(f'Avg pool shape: {avg_pool.shape}')
    avg_pool = Reshape((1, 1, channel))(avg_pool)
    avg_pool = shared_layer_one(avg_pool)
    avg_pool = shared_layer_two(avg_pool)
    print(f'Avg pool after dense layers shape: {avg_pool.shape}')
    
    max_pool = GlobalMaxPooling2D()(input_feature)
    print(f'Max pool shape: {max_pool.shape}')
    max_pool = Reshape((1, 1, channel))(max_pool)
    max_pool = shared_layer_one(max_pool)
    max_pool = shared_layer_two(max_pool)
    print(f'Max pool after dense layers shape: {max_pool.shape}')
    
    # Ensure both tensors have the same shape
    assert avg_pool.shape == max_pool.shape, f"Shape mismatch: avg_pool {avg_pool.shape}, max_pool {max_pool.shape}"
    
    cbam_feature = Add()([avg_pool, max_pool])
    cbam_feature = tf.nn.sigmoid(cbam_feature)
    print(f'CBAM feature shape (channel attention): {cbam_feature.shape}')
    
    return Multiply()([input_feature, cbam_feature])

def spatial_attention(input_feature):
    print(f'Input feature shape (spatial attention): {input_feature.shape}')
    avg_pool = Lambda(lambda x: tf.reduce_mean(x, axis=3, keepdims=True))(input_feature)
    max_pool = Lambda(lambda x: tf.reduce_max(x, axis=3, keepdims=True))(input_feature)
    concat = tf.concat([avg_pool, max_pool], axis=3)
    print(f'Concat shape: {concat.shape}')
    cbam_feature = Conv2D(filters=1, kernel_size=7, strides=1, padding='same', activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(concat)
    print(f'CBAM feature shape (spatial attention): {cbam_feature.shape}')
    
    return Multiply()([input_feature, cbam_feature])

def attention_block(input_feature):
    cbam_feature = channel_attention(input_feature)
    cbam_feature = spatial_attention(cbam_feature)
    return cbam_feature

def create_dan_model(input_shape=(96, 96, 3), num_classes=8):
    input_layer = Input(shape=input_shape)
    
    base_model = VGG16(weights='imagenet', include_top=False, input_tensor=input_layer)
    for layer in base_model.layers:
        layer.trainable = False
    
    x = base_model.output
    print(f'Base model output shape: {x.shape}')
    
    x = attention_block(x)
    x = Flatten()(x)
    x = Dense(512, activation='relu')(x)
    x = Dropout(0.5)(x)
    output_layer = Dense(num_classes, activation='softmax')(x)
    
    model = Model(inputs=input_layer, outputs=output_layer)
    return model

# Instantiate the model
model = create_dan_model()

# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(lr=0.0001), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Print the model summary to verify the architecture
model.summary()


Base model output shape: (None, 3, 3, 512)
Input feature shape (channel attention): (None, 3, 3, 512)
Avg pool shape: (None, 512)
Avg pool after dense layers shape: (None, 1, 1, 512)
Max pool shape: (None, 512)
Max pool after dense layers shape: (None, 1, 1, 512)


ValueError: A KerasTensor cannot be used as input to a TensorFlow function. A KerasTensor is a symbolic placeholder for a shape and dtype, used when constructing Keras Functional models or Keras Functions. You can only use it as input to a Keras layer or a Keras operation (from the namespaces `keras.layers` and `keras.operations`). You are likely doing something like:

```
x = Input(...)
...
tf_fn(x)  # Invalid.
```

What you should do instead is wrap `tf_fn` in a layer:

```
class MyLayer(Layer):
    def call(self, x):
        return tf_fn(x)

x = MyLayer()(x)
```


In [6]:
import cv2
import os
import numpy as np 
import matplotlib.pyplot as plt
import pandas as pd
import math
from PIL import Image
from tqdm import tqdm
import seaborn as sns
from deepface import DeepFace

from keras.models import Model
from keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator

import plotly.graph_objects as go
import plotly.figure_factory as ff
import plotly.offline as offline
from plotly.subplots import make_subplots
import random

from keras.applications.inception_v3 import preprocess_input
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.applications.xception import Xception
from tensorflow.keras.models import Model,load_model
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical


from tensorflow import keras
from keras.layers import *
from keras.models import *
from keras.applications import *
from keras.optimizers import *
from keras import layers
from keras.regularizers import *
from tensorflow.keras.applications import EfficientNetB7
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.applications.vgg19 import VGG19
from keras import regularizers
from sklearn.metrics import confusion_matrix,classification_report
import plotly.graph_objs as go
from sklearn.metrics import  roc_auc_score
from sklearn.metrics import confusion_matrix,classification_report
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import LabelBinarizer



from tensorflow.keras.layers import Input, Dense, BatchNormalization, Dropout, GlobalAveragePooling2D, Conv2D, MaxPooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from deepface import DeepFace
import matplotlib.cm as cm
plt.style.use('fivethirtyeight')

In [7]:
class Align_Face():
    def __init__(self):
        #definning opencv path for cascades
        opencv_home = cv2.__file__
        folders = opencv_home.split(os.path.sep)[0:-1]
        path = folders[0]
        for folder in folders[1:]:
            path = path + "/" + folder

        path_for_face = path+"/data/haarcascade_frontalface_default.xml"
        path_for_eyes = path+"/data/haarcascade_eye.xml"
        #loading Cascades
        self.face_detection_cascade = cv2.CascadeClassifier(path_for_face)
        self.eye_detection_cascade = cv2.CascadeClassifier(path_for_eyes)

    def face_detection(self,img):
        #getting bounding box of face
        faces = self.face_detection_cascade.detectMultiScale(img, 1.5, 5)
        if (len(faces) <= 0):
            img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            return img, img_gray
        else:
            X, Y, W, H = faces[0]
            #getting Face ROI
            img = img[int(Y):int(Y+H), int(X):int(X+W)]
            return img, cv2.cvtColor(img, cv2.COLOR_BGR2BGRA)


    def trignometry_for_distance(self,a, b):
        return math.sqrt(((b[0] - a[0]) * (b[0] - a[0])) +\
                ((b[1] - a[1]) * (b[1] - a[1])))

    # Find eyes
    def Face_Alignment(self,img_path):
        img_raw = cv2.imread(img_path).copy()
        img, gray_img = self.face_detection(cv2.imread(img_path))
        #detecting Eyes
        eyes = self.eye_detection_cascade.detectMultiScale(gray_img)

        #checking if both eyes are present in the image
        if len(eyes) >= 2:
            eye = eyes[:, 2]
            container1 = []
            for i in range(0, len(eye)):
                container = (eye[i], i)
                container1.append(container)
            df = pd.DataFrame(container1, columns=[
                  "length", "idx"]).sort_values(by=['length'])
            eyes = eyes[df.idx.values[0:2]]
            

            eye_1 = eyes[0]
            eye_2 = eyes[1]
            #selecting left and right eye on the basis of their position
            if eye_1[0] > eye_2[0]:
                left_eye = eye_2
                right_eye = eye_1
            else:
                left_eye = eye_1
                right_eye = eye_2


            right_eye_center = (
            int(right_eye[0] + (right_eye[2]/2)),
            int(right_eye[1] + (right_eye[3]/2)))
            right_eye_x = right_eye_center[0]
            right_eye_y = right_eye_center[1]
            cv2.circle(img, right_eye_center, 2, (255, 0, 0), 3)

            left_eye_center = (int(left_eye[0] + (left_eye[2] / 2)),int(left_eye[1] + (left_eye[3] / 2)))
            left_eye_x = left_eye_center[0]
            left_eye_y = left_eye_center[1]
            cv2.circle(img, left_eye_center, 2, (255, 0, 0), 3)


            if left_eye_y > right_eye_y:
#                 print("Rotate image to clock direction")
                point_3rd = (right_eye_x, left_eye_y)
                direction = -1 
            else:
#                 print("Rotate to inverse clock direction")
                point_3rd = (left_eye_x, right_eye_y)
                direction = 1 # rotate inverse direction of clock

            cv2.circle(img, point_3rd, 2, (255, 0, 0), 2)
            #calculating trignometric distance
            a = self.trignometry_for_distance(left_eye_center,
                        point_3rd)
            b = self.trignometry_for_distance(right_eye_center,
                        point_3rd)
            c = self.trignometry_for_distance(right_eye_center,
                        left_eye_center)
            cos_a = (b*b + c*c - a*a)/(2*b*c)
            angle = (np.arccos(cos_a) * 180) / math.pi
            
            #checking if we have to rotate image clock wise or anti clock wise
            if direction == -1:
                angle = 90 - angle
            else:
                angle = -(90-angle)

            new_img = Image.fromarray(img_raw)
            new_img = np.array(new_img.rotate(direction * angle))

        return new_img

In [8]:
import os
import cv2
import numpy as np
from tqdm import tqdm

class Load_Datasets:
    def __init__(self, path="/young-affectnet-hq", size_images=(96, 96), gray=False, align_face=False, minimum_images=None, thresh_minimum=None):
        self.path = path
        self.size_images = size_images
        self.gray = gray
        self.align_face = align_face
        self.al_face = Align_Face() if align_face else None
        self.minimum_images = minimum_images
        self.thresh_minimum = thresh_minimum
        self.flags = []

    def load_images(self, directory):
        images, labels = [], []
        for emotion in tqdm(os.listdir(directory)):
            if emotion.startswith('.'):
                continue
            emotion_path = os.path.join(directory, emotion)
            image_files = [f for f in os.listdir(emotion_path) if not f.startswith('.')]
            if self.thresh_minimum and len(image_files) < self.thresh_minimum:
                self.flags.append(emotion)
                continue
            print(f"{emotion}: {len(image_files)} images")
            for i, image_file in enumerate(image_files):
                if self.minimum_images and i >= self.minimum_images:
                    break
                try:
                    img_path = os.path.join(emotion_path, image_file)
                    if self.align_face:
                        aligned_face = self.al_face.Face_Alignment(img_path)
                        face_img, gray_face_img = self.al_face.face_detection(aligned_face)
                        image = gray_face_img if self.gray else face_img[:, :, ::-1]
                    else:
                        image = cv2.imread(img_path)[:, :, ::-1]
                    image = cv2.resize(image, self.size_images)
                    images.append(image)
                    labels.append(emotion)
                except Exception as e:
                    print(f"Error processing {img_path}: {e}")
        images = np.array(images)
        labels = np.array(labels)
        if self.gray:
            images = images.reshape(images.shape[0], -1).astype("float") / 255
        return images, labels

    def load_data(self):
        train_x, train_y, test_x, test_y = None, None, None, None
        for subdir in ["train", "test"]:
            subdir_path = os.path.join(self.path, subdir)
            if os.path.exists(subdir_path):
                print(f"Loading {subdir.capitalize()} Dataset")
                if subdir == "train":
                    train_x, train_y = self.load_images(subdir_path)
                else:
                    test_x, test_y = self.load_images(subdir_path)
        if train_x is None or test_x is None:
            print("Loading Dataset Directly | No Train-Test Folders")
            x, y = self.load_images(self.path)
            return x, y, None, None
        return train_x, train_y, test_x, test_y



In [9]:
data = Load_Datasets(path="Datasets/AffectNet/",minimum_images=1000,size_images = (96, 96),align_face=False)
openfacex,openfacey,test_x,test_y = data.load_data()



Loading Dataset Directly | No Train-Test Folders


  0%|          | 0/8 [00:00<?, ?it/s]

happy: 5044 images


 12%|█▎        | 1/8 [00:00<00:06,  1.09it/s]

contempt: 2871 images


 25%|██▌       | 2/8 [00:01<00:03,  1.51it/s]

sad: 3091 images


 38%|███▊      | 3/8 [00:01<00:02,  1.79it/s]

fear: 3176 images


 50%|█████     | 4/8 [00:02<00:02,  1.99it/s]

surprise: 4039 images


 62%|██████▎   | 5/8 [00:02<00:01,  2.02it/s]

neutral: 5126 images


 75%|███████▌  | 6/8 [00:03<00:01,  1.57it/s]

anger: 3218 images


 88%|████████▊ | 7/8 [00:04<00:00,  1.74it/s]

disgust: 2477 images


100%|██████████| 8/8 [00:04<00:00,  1.77it/s]


In [10]:
df= pd.DataFrame()

df["labels"] = openfacey


In [11]:
import pandas as pd
import plotly.express as px

def plot_data_distribution(data, label, emotion_labels):
    # Create a DataFrame with the labels and counts
    data_df = pd.DataFrame({
        'Emotion': emotion_labels,
        'Count': data['labels']
    })
    
    # Create a pie chart using Plotly Express
    fig = px.pie(
        data_df, 
        names='Emotion', 
        values='Count', 
        title=f"{label} Data Distribution",
        color_discrete_sequence=px.colors.qualitative.Pastel
    )

    # Update layout for better aesthetics
    fig.update_layout(
        title_font=dict(size=24, color='#333', family="Lato, sans-serif"),
        font=dict(color='#333', family="Lato, sans-serif"),
        hoverlabel=dict(bgcolor="#FFF", font_size=13, font_family="Lato, sans-serif"),
        legend=dict(
            title="Emotions",
            font=dict(
                family="Lato, sans-serif",
                size=12,
                color="black"
            ),
            bgcolor="#E2E2E2",
            bordercolor="#FFFFFF",
            borderwidth=2
        )
    )

    fig.show()

# Assuming 'df' is your DataFrame containing the data
emotion_labels = ['happy', 'contempt', 'sad', 'fear', 'surprise', 'neutral', 'anger', 'disgust']
data_dist = pd.DataFrame(df["labels"].value_counts()).reset_index()
data_dist.columns = ['index', 'labels']
plot_data_distribution(data_dist, "AffectNet", emotion_labels)


In [12]:
import numpy as np
import random
import plotly.express as px
import plotly.subplots as sp
import plotly.graph_objects as go

def get_random_unique_sample(images, labels):
    unique_labels = np.unique(labels)
    selected_images = []
    selected_labels = []
    seen_labels = set()
    
    while len(seen_labels) < len(unique_labels):
        index = random.randint(0, len(images) - 1)
        label = labels[index]
        if label not in seen_labels:
            seen_labels.add(label)
            selected_images.append(images[index])
            selected_labels.append(label)
    
    return selected_images, selected_labels

def show_images(images, rows=1, titles=None, main_title=None):
    # Convert titles to a list if it's a NumPy array
    if isinstance(titles, np.ndarray):
        titles = titles.tolist()
    assert(titles is None or len(images) == len(titles))
    
    n_images = len(images)
    titles = titles if titles else [f'Image {i+1}' for i in range(n_images)]
    
    # Create a subplot with the number of rows and columns
    cols = int(np.ceil(n_images / rows))
    fig = sp.make_subplots(rows=rows, cols=cols, subplot_titles=titles)
    
    for i, image in enumerate(images):
        row = i // cols + 1
        col = i % cols + 1
        fig.add_trace(go.Image(z=image), row=row, col=col)

    fig.update_layout(height=300*rows, width=300*cols, title_text=main_title, title_x=0.5)
    fig.show()

imgss, labelss = get_random_unique_sample(openfacex,openfacey)
show_images(np.array(imgss), rows=2, titles=np.array(labelss), main_title="AffectNet Dataset")


In [13]:
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

def preprocess(x, y, name):
    print("*" * 20, name, "*" * 20)
    
    x, y = np.array(x), np.array(y)
    unique_labels = np.unique(y)
    
    # Creating a dictionary to map labels to numeric encoding
    label_to_index = {label: index for index, label in enumerate(unique_labels)}
    etol = {index: label for index, label in enumerate(unique_labels)}  # Inverse mapping for later use
    
    # Encoding each label
    encoded_labels = [label_to_index[label] for label in y]
    
    # One hot encoding of labels
    y_encoded = to_categorical(encoded_labels)
    
    # Splitting training and testing dataset
    train_x, test_x, train_y, test_y = train_test_split(x, y_encoded, test_size=0.2, shuffle=True)
    
    print(f"Training set size: {len(train_x)} samples")
    print(f"Testing set size: {len(test_x)} samples")
    print("Shapes - Train X:", train_x.shape, "Train Y:", train_y.shape)
    print("Shapes - Test X:", test_x.shape, "Test Y:", test_y.shape)

    return train_x, test_x, train_y, test_y, etol

# Example usage with sample data
# Assuming `openfacex` and `openfacey` are your dataset images and labels
open_train_x, open_test_x, open_train_y, open_test_y, etol = preprocess(openfacex, openfacey, "OpenFace")


******************** OpenFace ********************
Training set size: 6400 samples
Testing set size: 1600 samples
Shapes - Train X: (6400, 96, 96, 3) Train Y: (6400, 8)
Shapes - Test X: (1600, 96, 96, 3) Test Y: (1600, 8)


In [14]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.applications import VGG19, Xception, MobileNetV2, InceptionV3, EfficientNetB7
from tensorflow.keras.layers import Input, Lambda, GlobalAveragePooling2D, Dense, BatchNormalization, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.model_selection import train_test_split

class Model_Building:
    def __init__(self, model_name=None, train_x=None, train_y=None, test_x=None, test_y=None, model_check_point="Model_Checkpoints"):
        print("-" * 20, "Model Name:", model_name.upper(), "-" * 20)
        self.model_name = model_name
        self.check_point = model_check_point
        self.weights_path = 'Model_Checkpoints/pretrained_models/'
        
        if self.model_name == "vgg19":
            self.preprocess_input = tf.keras.applications.vgg19.preprocess_input
            self.model = VGG19
            
        elif self.model_name == "xception":
            self.preprocess_input = tf.keras.applications.xception.preprocess_input
            self.model = Xception
            
        elif self.model_name == "mobilenetv2":
            self.preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input
            self.model = MobileNetV2
            
        elif self.model_name == "efficient":
            self.preprocess_input = tf.keras.applications.efficientnet.preprocess_input
            self.model = EfficientNetB7
            
        else:
            self.preprocess_input = tf.keras.applications.inception_v3.preprocess_input
            self.model = InceptionV3
            
        
        self.train_x = train_x
        self.train_y = train_y
        self.test_x = test_x
        self.test_y = test_y
        self.main_model = None

    def base_model(self, data, pr_model):
        w, h = data.shape[1:3]
        cnn_model = pr_model(include_top=False, input_shape=(w, h, 3), weights=None)
        cnn_model.load_weights(os.path.join(self.weights_path, self.weights_file))
        
        inputs = Input((w, h, 3))
        x = Lambda(self.preprocess_input, name='preprocessing')(inputs)
        x = cnn_model(x)
        x = GlobalAveragePooling2D()(x)
        
        model = Model(inputs, x)
        features = model.predict(data, batch_size=5, verbose=1)
        return features

    def bottom_model(self, features):
        model = Sequential()
        model.add(Dense(300, activation="relu", input_shape=(features.shape[1],)))
        model.add(Dense(200, activation="relu"))
        model.add(BatchNormalization())
        model.add(Dense(100, activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.01)))
        model.add(Dropout(0.2))
        model.add(Dense(len(np.unique(self.train_y, axis=0)), activation="softmax"))
        model.compile(optimizer=Adam(0.0001), loss="categorical_crossentropy", metrics=["accuracy"])
        return model

    def forward(self, batch_size=4, epochs=10, train_percent=0.90, verbose=1):
        f = self.base_model(self.train_x, self.model)
        percent = int(len(f) * train_percent)

        path1 = os.path.join(self.check_point, self.model_name)
        if not os.path.exists(path1):
            os.makedirs(path1)
        
        cp_callback = ModelCheckpoint(
            filepath=os.path.join(path1, "cp-{epoch:04d}.h5"), 
            verbose=verbose, 
            save_weights_only=True,  
            save_freq=5 * batch_size
        )

        self.main_model = self.bottom_model(f)
        self.main_model.save_weights(self.check_point.format(epoch=0))
        
        hist = self.main_model.fit(
            f[:percent], self.train_y[:percent], 
            validation_data=(f[percent:], self.train_y[percent:]),
            batch_size=batch_size, epochs=epochs
        )
        
        return hist, self.main_model

    def predict(self, sample_test=None):
        if sample_test is None:
            test_f = self.base_model(self.test_x, self.model)
            p = self.main_model.predict(test_f)
        else:
            test_f = self.base_model(sample_test, self.model)
            p = self.main_model.predict(test_f)

        p = np.argmax(p, axis=1)
        return p

    def evaluate(self):
        test_f = self.base_model(self.test_x, self.model)
        loss, acc = self.main_model.evaluate(test_f, self.test_y)

        print("=" * 50, self.model_name, "=" * 50)
        print("On Test Data")
        print("Loss:", round(loss, 4))
        print("Accuracy:", round(acc, 4))

        return loss, acc


In [15]:
import plotly.graph_objects as go

def plot_Accuracy_Loss(history, name):
    fig = go.Figure()
    
    # Plot training and validation loss
    fig.add_trace(go.Scatter(
        y=history.history['loss'], mode='lines', name='Training Loss',
        line=dict(color='blue')))
    
    if 'val_loss' in history.history:
        fig.add_trace(go.Scatter(
            y=history.history['val_loss'], mode='lines', name='Validation Loss',
            line=dict(color='red')))

    fig.update_layout(
        title=f"{name} Model Loss",
        xaxis_title='Epochs',
        yaxis_title='Loss',
        template='plotly_white'
    )

    # Plot training and validation accuracy
    fig2 = go.Figure()

    fig2.add_trace(go.Scatter(
        y=history.history['accuracy'], mode='lines', name='Training Accuracy',
        line=dict(color='blue')))

    if 'val_accuracy' in history.history:
        fig2.add_trace(go.Scatter(
            y=history.history['val_accuracy'], mode='lines', name='Validation Accuracy',
            line=dict(color='red')))

    fig2.update_layout(
        title=f"{name} Model Accuracy",
        xaxis_title='Epochs',
        yaxis_title='Accuracy',
        template='plotly_white'
    )

    fig.show()
    fig2.show()


In [16]:
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.preprocessing import LabelBinarizer
import plotly.graph_objects as go

def plot_ROC(y_pred, y_test, etol=None):
    if etol is None:
        etol = {0: 'anger', 1: 'contempt', 2: 'disgust', 3: 'fear', 4: 'happy', 5: 'neutral', 6: 'sad', 7: 'surprise'}
    
    lb = LabelBinarizer()
    y_test = lb.fit_transform(y_test)
    y_pred = lb.transform(y_pred)

    fig = go.Figure()
    fig.add_shape(
        type='line', line=dict(dash='dash'),
        x0=0, x1=1, y0=0, y1=1
    )

    for i in range(y_test.shape[1]):
        fpr, tpr, _ = roc_curve(y_test[:, i], y_pred[:, i])
        auc_score = roc_auc_score(y_test[:, i], y_pred[:, i])

        name = f"{etol[i]} (AUC={auc_score:.2f})"
        fig.add_trace(go.Scatter(x=fpr, y=tpr, name=name, mode='lines'))

    fig.update_layout(
        title='Multiclass ROC Curve',
        xaxis_title='False Positive Rate',
        yaxis_title='True Positive Rate',
        width=700, height=500,
        template='plotly_white'
    )

    fig.show()


In [17]:
incept = Model_Building("inception", train_x=open_train_x, train_y=open_train_y, test_x=open_test_x, test_y=open_test_y)
incept_hist, incept_model = incept.forward(verbose=1, batch_size=16, epochs=50)
incept_loss, incept_acc = incept.evaluate()

xcept = Model_Building("xception", open_train_x, open_train_y, open_test_x, open_test_y)
xcept_hist, xcept_model = xcept.forward(verbose=1, batch_size=16, epochs=50)
xcept_loss, xcept_acc = xcept.evaluate()

mobilenet = Model_Building("mobilenetv2", open_train_x, open_train_y, open_test_x, open_test_y)
mobilenet_hist, mobilenet_model = mobilenet.forward(verbose=1, batch_size=16, epochs=50)
mobilenet_loss, mobilenet_acc = mobilenet.evaluate()

vgg = Model_Building("vgg19", open_train_x, open_train_y, open_test_x, open_test_y)
vgg_hist, vgg_model = vgg.forward(verbose=1, batch_size=16, epochs=50)
vgg_loss, vgg_acc = vgg.evaluate()

efficient = Model_Building("efficient", open_train_x, open_train_y, open_test_x, open_test_y)
efficient_hist, efficient_model = efficient.forward(verbose=1, batch_size=16, epochs=50)
efficient_loss, efficient_acc = efficient.evaluate()


-------------------- Model Name: INCEPTION --------------------


AttributeError: 'Model_Building' object has no attribute 'weights_file'