In [20]:
import tensorflow as tf
from tensorflow.keras import layers, Model
import numpy as np
import os
import cv2
from sklearn.model_selection import KFold
import matplotlib.pyplot as plt

# Define image size at the top
img_size = (227, 227)

# 1. Data Loading and Preprocessing
def load_and_preprocess_data(data_dir, img_size=(227, 227)):
    data = []
    labels = []
    # For CK+, use this emotion map
    emotion_map = {'angry': 0, 'contempt': 1, 'disgust': 2, 'fear': 3, 'happiness': 4, 
                   'sadness': 5, 'surprise': 6}
    print(f"Checking directory: {data_dir}")
    if not os.path.exists(data_dir):
        print(f"Directory {data_dir} does not exist!")
        return np.array(data), np.array(labels)
    
    # Check if there are subfolders
    has_subfolders = any(os.path.isdir(os.path.join(data_dir, item)) for item in os.listdir(data_dir))
    
    if has_subfolders:  # CK+ structure
        for emotion in os.listdir(data_dir):
            emotion_path = os.path.join(data_dir, emotion)
            if os.path.isdir(emotion_path):
                key = emotion.lower()
                if key not in emotion_map:
                    print(f"Warning: {emotion} not found in emotion_map; skipping.")
                    continue
                for img_name in os.listdir(emotion_path):
                    img_path = os.path.join(emotion_path, img_name)
                    #print(f"Processing image: {img_path}")
                    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
                    if img is not None:
                        img = cv2.resize(img, img_size)
                        img = img / 255.0  # Normalize
                        data.append(img)
                        labels.append(emotion_map[key])
                    
    else:  # Flat structure such as KMU-FED
        # Here, assume the emotion code is embedded in the filename, e.g., "01_AN_mr_001.jpg"
        for file in os.listdir(data_dir):
            img_path = os.path.join(data_dir, file)
            if os.path.isfile(img_path):
                #print(f"Processing image: {img_path}")
                img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
                if img is not None:
                    img = cv2.resize(img, img_size)
                    img = img / 255.0
                    data.append(img)
                    # Assuming the emotion code is the second part when splitting by '_'
                    parts = file.split('_')
                    # Example: "01_AN_mr_001.jpg" -> emotion code "AN"
                    if len(parts) > 1:
                        code = parts[1].lower()
                        # Map known abbreviations to emotion indices
                        code_map = {'an': 0, 'di': 1, 'fe': 2, 'ha': 3, 'sa': 4, 'su': 5}
                        if code in code_map:
                            labels.append(code_map[code])
                        else:
                            labels.append(-1)
                            print(f"Warning: Unknown emotion code '{code}' in file {file}")
                    else:
                        labels.append(-1)
                        print(f"Warning: Could not extract emotion from file {file}")
                
    
    print(f"Loaded {len(data)} images with {len(labels)} labels")
    return np.array(data), np.array(labels)

# Load datasets
# Use absolute or relative paths based on your notebook's location
base_dir = os.path.join(os.path.expanduser("~"), "Desktop", "BTUBooks", "ResearchModule", "Research_Implementation")
ck_plus_dir = os.path.join(base_dir, "CK+")  # Path to CK+ folder
kmu_fed_dir = os.path.join(base_dir, "kmu_fed")  # Path to KMU-FED folder
X_ck, y_ck = load_and_preprocess_data(ck_plus_dir, img_size)
X_kmu, y_kmu = load_and_preprocess_data(kmu_fed_dir, img_size)

# Augmentation
data_augmentation = tf.keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomBrightness(0.2),
    layers.RandomCrop(img_size[0], img_size[1])
])

# 2. SqueezeNext with DAC Block
def squeeze_next_block(x, filters, kernel_size=3):
    x_shortcut = x
    x = layers.Conv2D(filters // 2, 1, padding='same', activation='relu')(x)
    x = layers.DepthwiseConv2D(kernel_size, padding='same', activation='relu')(x)
    x = layers.Conv2D(filters, 1, padding='same', activation='relu')(x)
    x = layers.Add()([x, x_shortcut])  # Residual connection
    return x

# Custom Layer for Coordinate Space Attention (CSA)
class CSALayer(layers.Layer):
    def build(self, input_shape):
        channels = input_shape[-1]
        self.conv_relu = layers.Conv2D(channels, 3, padding='same', dilation_rate=2, activation='relu')
        self.conv_sigmoid = layers.Conv2D(channels, 1, padding='same', activation='sigmoid')
        super().build(input_shape)
    
    def call(self, x):
        # Average pooling along height and width
        x_csa_h = tf.reduce_mean(x, axis=2, keepdims=True)  # (batch, height, 1, channels)
        x_csa_w = tf.reduce_mean(x, axis=1, keepdims=True)  # (batch, 1, width, channels)

        # Tile to match dimensions
        x_csa_h = tf.tile(x_csa_h, [1, 1, tf.shape(x)[2], 1])
        x_csa_w = tf.tile(x_csa_w, [1, tf.shape(x)[1], 1, 1])

        # Concatenate along the channel axis
        x_csa = tf.concat([x_csa_h, x_csa_w], axis=-1)
        
        # Process using the layers created during build
        x_csa = self.conv_relu(x_csa)
        x_csa = self.conv_sigmoid(x_csa)
        return x * x_csa

    def compute_output_shape(self, input_shape):
        return input_shape
    
def dac_block(x, filters):
    # Hybrid Channel Attention (HCA)
    x_hca = layers.GroupNormalization(groups=32)(x)
    x_hca = layers.Conv2D(filters, 1, padding='same', activation='sigmoid')(x_hca)
    x = x * x_hca
    
    # Coordinate Space Attention (CSA)
    x = CSALayer()(x)
    return x

def daldl_model(input_shape=(227, 227, 1)):
    inputs = layers.Input(shape=input_shape)
    x = data_augmentation(inputs)
    x = layers.Conv2D(32, 3, padding='same', activation='relu')(x)
    x = layers.MaxPooling2D(2)(x)
    
    for _ in range(4):  # 4 dense blocks
        x = squeeze_next_block(x, 32)
        x = dac_block(x, 32)
        x = layers.MaxPooling2D(2)(x)
    
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.2)(x)
    outputs = layers.Dense(7, activation='softmax')(x)  # 7 classes for CK+
    model = Model(inputs, outputs)
    return model

# 3. Training and Evaluation
model = daldl_model()
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# K-fold Cross Validation
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
acc_per_fold = []
loss_per_fold = []

for train_idx, val_idx in kfold.split(X_ck):
    X_train, X_val = X_ck[train_idx], X_ck[val_idx]
    y_train, y_val = y_ck[train_idx], y_ck[val_idx]
    X_train = X_train.reshape(-1, 227, 227, 1)
    X_val = X_val.reshape(-1, 227, 227, 1)
    history = model.fit(X_train, y_train, validation_data=(X_val, y_val),
                        epochs=10, batch_size=64, verbose=0)
    scores = model.evaluate(X_val, y_val, verbose=0)
    acc_per_fold.append(scores[1] * 100)
    loss_per_fold.append(scores[0])

print(f'Average Accuracy: {np.mean(acc_per_fold):.2f}% (+/- {np.std(acc_per_fold):.2f})')
print(f'Average Loss: {np.mean(loss_per_fold):.2f}')

# 4. Visualization
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Val Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

Checking directory: /Users/adicadi/Desktop/BTUBooks/ResearchModule/Research_Implementation/CK+
Loaded 321 images with 321 labels
Checking directory: /Users/adicadi/Desktop/BTUBooks/ResearchModule/Research_Implementation/kmu_fed
Loaded 1106 images with 1106 labels


KeyboardInterrupt: 

In [15]:
import os 

kmu_fed_dir = os.path.join(base_dir, "kmu_fed")
print("\nKMU-FED folder structure:")
if os.path.exists(kmu_fed_dir):
    for img in sorted(os.listdir(kmu_fed_dir)):
        print(f"  {img}")
else:
        print(f"Directory {kmu_fed_dir} does not exist!")


KMU-FED folder structure:
  01_AN_mr_001.jpg
  01_AN_mr_002.jpg
  01_AN_mr_003.jpg
  01_AN_mr_004.jpg
  01_AN_mr_005.jpg
  01_AN_mr_006.jpg
  01_AN_mr_007.jpg
  01_AN_mr_008.jpg
  01_AN_mr_009.jpg
  01_AN_mr_010.jpg
  01_AN_mr_011.jpg
  01_AN_mr_012.jpg
  01_AN_mr_013.jpg
  01_AN_mr_014.jpg
  01_AN_mr_015.jpg
  01_AN_mr_016.jpg
  01_AN_mr_017.jpg
  01_AN_mr_018.jpg
  01_AN_mr_019.jpg
  01_AN_mr_020.jpg
  01_DI_mr_001.jpg
  01_DI_mr_002.jpg
  01_DI_mr_003.jpg
  01_DI_mr_004.jpg
  01_DI_mr_005.jpg
  01_DI_mr_006.jpg
  01_DI_mr_007.jpg
  01_DI_mr_008.jpg
  01_DI_mr_009.jpg
  01_DI_mr_010.jpg
  01_DI_mr_011.jpg
  01_DI_mr_012.jpg
  01_DI_mr_013.jpg
  01_DI_mr_014.jpg
  01_DI_mr_015.jpg
  01_DI_mr_016.jpg
  01_DI_mr_017.jpg
  01_DI_mr_018.jpg
  01_DI_mr_019.jpg
  01_DI_mr_020.jpg
  01_FE_mr_001.jpg
  01_FE_mr_002.jpg
  01_FE_mr_003.jpg
  01_FE_mr_004.jpg
  01_FE_mr_005.jpg
  01_FE_mr_006.jpg
  01_FE_mr_007.jpg
  01_FE_mr_008.jpg
  01_FE_mr_009.jpg
  01_FE_mr_010.jpg
  01_FE_mr_011.jpg
  01