<a href="https://colab.research.google.com/github/aniket21070994/wavenetProject/blob/main/Untitled.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# prompt: code to unzip this file /content/archive.zip

!unzip /content/c+.zip -d /content/ck




In [None]:
pip  install mediapipe

# Mearg dataset

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import mediapipe as mp
import albumentations as A
from tqdm import tqdm

# Load MediaPipe face detection
mp_face_detection = mp.solutions.face_detection
face_detection = mp_face_detection.FaceDetection(min_detection_confidence=0.6)

def align_face(image):
    """Align face using MediaPipe face detection."""
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = face_detection.process(image_rgb)

    if results.detections:
        for detection in results.detections:
            bbox = detection.location_data.relative_bounding_box
            h_img, w_img, _ = image.shape
            x_min = int(bbox.xmin * w_img)
            y_min = int(bbox.ymin * h_img)
            width = int(bbox.width * w_img)
            height = int(bbox.height * h_img)

            # Ensure valid face bounding box
            if width > 0 and height > 0 and x_min >= 0 and y_min >= 0 and (x_min + width) <= w_img and (y_min + height) <= h_img:
                face = image[y_min:y_min+height, x_min:x_min+width]
                if face.size != 0:
                    return cv2.resize(face, (64, 64))

    # Default fallback: Resize original image if no face detected
    return cv2.resize(image, (64, 64))

def apply_histogram_equalization(image):
    """Apply histogram equalization on the Y channel."""
    img_yuv = cv2.cvtColor(image, cv2.COLOR_BGR2YUV)
    img_yuv[:, :, 0] = cv2.equalizeHist(img_yuv[:, :, 0])
    return cv2.cvtColor(img_yuv, cv2.COLOR_YUV2BGR)

# Augmentation pipeline (Fixed `CoarseDropout` issue)
transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.3),
    A.CoarseDropout(max_holes=3, max_size=20, p=0.5)  # Fixed CoarseDropout parameters
])

def process_fer2013(csv_path):
    """Load and preprocess FER2013 dataset from CSV."""
    df = pd.read_csv(csv_path)
    images, labels = [], []

    for _, row in tqdm(df.iterrows(), total=len(df), desc="Processing FER2013"):
        pixels = np.fromstring(row['pixels'], sep=' ', dtype=np.uint8).reshape(48, 48)
        image = cv2.cvtColor(pixels, cv2.COLOR_GRAY2BGR)  # Convert grayscale to BGR
        image = align_face(image)
        image = apply_histogram_equalization(image)
        images.append(image)
        labels.append(row['emotion'])

    return np.array(images), np.array(labels)

import re  # Import regex module

def is_valid_hex(s):
    """Check if the string contains only valid hexadecimal characters."""
    return bool(re.fullmatch(r'[0-9a-fA-F]+', s))

def process_ck_plus(csv_path):
    """Load and preprocess CK+ dataset from CSV (HEX-encoded images)."""
    df = pd.read_csv(csv_path)

    if 'pixels' not in df.columns or 'emotion' not in df.columns:
        print("Error: Missing 'pixels' or 'emotion' column in CK+ dataset.")
        return np.array([]), np.array([])

    images, labels = [], []

    for _, row in tqdm(df.iterrows(), total=len(df), desc="Processing CK+"):
        try:
            pixel_data = str(row['pixels']).strip()

            # Validate hexadecimal string before processing
            if not is_valid_hex(pixel_data):
                print(f"Skipping invalid hex data at index {_}: {pixel_data[:20]}...")  # Show first 20 chars
                continue  # Skip this row

            img_data = np.frombuffer(bytearray.fromhex(pixel_data), dtype=np.uint8)
            image = cv2.imdecode(img_data, cv2.IMREAD_COLOR)

            if image is None:
                continue  # Skip corrupted images

            image = align_face(image)
            image = apply_histogram_equalization(image)
            images.append(image)
            labels.append(row['emotion'])

        except Exception as e:
            print(f"Warning: Skipping invalid image at index {_}. Error: {e}")

    return np.array(images), np.array(labels)

def augment_dataset(images, labels, augment_factor=0.5):
    """Apply augmentation to a subset of the dataset."""
    num_augment = int(len(images) * augment_factor)
    aug_images, aug_labels = [], []

    for i in tqdm(range(num_augment), desc="Applying Augmentations"):
        img = images[i]
        aug_img = transform(image=img)['image']
        aug_images.append(aug_img)
        aug_labels.append(labels[i])

    return np.array(aug_images), np.array(aug_labels)

# Load datasets
fer_images, fer_labels = process_fer2013('/content/FER/fer2013.csv')
ck_images, ck_labels = process_ck_plus('/content/ck/ckextended.csv')

# Ensure CK+ dataset is not empty
if ck_images.size == 0 or ck_labels.size == 0:
    print("Warning: CK+ dataset is empty. Proceeding with FER2013 only.")

# Merge datasets safely
if ck_images.size > 0 and ck_labels.size > 0:
    all_images = np.concatenate((fer_images, ck_images), axis=0)
    all_labels = np.concatenate((fer_labels, ck_labels), axis=0)
else:
    all_images = fer_images
    all_labels = fer_labels

# Apply augmentation to a subset
aug_images, aug_labels = augment_dataset(all_images, all_labels, augment_factor=0.5)

# Final dataset
final_images = np.concatenate((all_images, aug_images), axis=0)
final_labels = np.concatenate((all_labels, aug_labels), axis=0)

# Save dataset in compressed format
np.savez_compressed('final_dataset.npz', images=final_images, labels=final_labels)

print(f"✅ Final dataset saved with {final_images.shape[0]} samples! 🚀")


# train model


In [None]:
import os
import tensorflow as tf
import numpy as np

# -------------------------------------------
# 1. Load & Preprocess the Merged Dataset
# -------------------------------------------
data = np.load('/content/final_dataset.npz', mmap_mode='r')
images, labels = data['images'], data['labels']

images = images.astype('float32') / 255.0
num_classes = len(np.unique(labels))
labels_onehot = tf.keras.utils.to_categorical(labels, num_classes)

# Use dummy intensity values (0.5) since ground truth isn't provided
intensity_targets = np.full((labels.shape[0], 1), 0.5, dtype='float32')

# Get image dimensions (assumes images are stored as (batch, 64, 64, 3))
print("Original image shape:", images.shape)
# DO NOT reshape images: we want to keep the shape (batch, 64, 64, 3)

# Create a tf.data.Dataset pipeline for efficient training
BATCH_SIZE = 16
dataset = tf.data.Dataset.from_tensor_slices(
    (images, {'emotion': labels_onehot, 'intensity': intensity_targets})
)
dataset = dataset.shuffle(1000).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

# -------------------------------------------
# 2. Define the Simple FER Model without Attention
# -------------------------------------------
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.models import Model

def build_simple_fer_model(input_shape=(64, 64, 3), num_classes=7):
    inputs = Input(shape=input_shape, name='input_image')

    # Convolutional layers for feature extraction
    x = Conv2D(16, (3, 3), activation='relu', padding='same')(inputs)
    x = MaxPooling2D(pool_size=(2, 2))(x)  # Output: 32x32x16

    x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)  # Output: 16x16x32

    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)  # Output: 8x8x64

    # Flatten and add dense layers for feature processing
    x = Flatten()(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.3)(x)

    # Dual output head:
    # Emotion classification branch
    emotion_output = Dense(num_classes, activation='softmax', name='emotion')(x)
    # Intensity regression branch
    intensity_output = Dense(1, activation='sigmoid', name='intensity')(x)

    model = Model(inputs, [emotion_output, intensity_output], name="Simple_FER_Model_NoAttention")
    return model

# -------------------------------------------
# 3. Build, Compile, and Summarize the Model
# -------------------------------------------
num_classes = 7
model = build_simple_fer_model(input_shape=(64, 64, 3), num_classes=num_classes)
model.compile(optimizer='adam',
              loss={'emotion': 'categorical_crossentropy', 'intensity': 'mse'},
              metrics={'emotion': 'accuracy', 'intensity': 'mae'})
model.summary()

# -------------------------------------------
# 4. Train the Model
# -------------------------------------------
model.fit(dataset, epochs=20)


In [None]:
model.save("/content/drive/MyDrive/FER.h5");

In [None]:

def build_simple_fer_model(input_shape=(64, 64, 3), num_classes=7):
    inputs = Input(shape=input_shape, name='input_image')

    # Convolutional layers for feature extraction
    x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(inputs)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)  # Output: 32x32x16

    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)  # Output: 16x16x32

    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)  # Output: 8x8x64

    # Flatten and add dense layers for feature processing
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.3)(x)

    # Dual output head:
    # Emotion classification branch
    emotion_output = layers.Dense(num_classes, activation='softmax', name='emotion')(x)
    # Intensity regression branch
    intensity_output = layers.Dense(1, activation='sigmoid', name='intensity')(x)

    # Create and return the model
    model = Model(inputs, [emotion_output, intensity_output], name="Simple_FER_Model_NoAttention")
    return model

# Build and compile the model
num_classes = 7
model = build_simple_fer_model(input_shape=(64, 64, 3), num_classes=num_classes)
model.compile(optimizer='adam',
              loss={'emotion': 'categorical_crossentropy', 'intensity': 'mse'},
              metrics={'emotion': 'accuracy', 'intensity': 'mae'})
model.summary()

# Mapping function

In [None]:
import os
import tensorflow as tf

def save_model(model, model_path):

    # Create the directory if it doesn't exist
    os.makedirs(os.path.dirname(model_path), exist_ok=True)
    # Save the model
    model.save(model_path)
    print(f"Model saved successfully at {model_path}")

# Example usage:
# save_model(model, 'models/fer_model.h5')
save_model(model, "/content/project_FRM/fer_model")


In [None]:
import tensorflow as tf
from tensorflow.keras import layers

def map_to_wavenet_condition(emotion_probs, intensity, num_classes=7, embedding_dim=16):
    """
    Maps FER model outputs (emotion probabilities and intensity) into a 16-dimensional conditioning vector
    for WaveNet.

    Parameters:
      emotion_probs: Tensor of shape (batch_size, num_classes) with emotion probabilities.
      intensity: Tensor of shape (batch_size, 1) with intensity scores.
      num_classes: Number of emotion classes (default is 7).
      embedding_dim: Desired dimension for the conditioning vector (default is 16).

    Returns:
      A tensor of shape (batch_size, embedding_dim) that can be used as conditioning input for WaveNet.
    """
    # Concatenate emotion probabilities and intensity along the last axis.
    conditioning_input = tf.concat([emotion_probs, intensity], axis=-1)  # Shape: (batch_size, num_classes + 1)

    # Pass the concatenated vector through two dense layers to produce the 16-dimensional embedding.
    x = layers.Dense(32, activation='relu')(conditioning_input)
    conditioning_vector = layers.Dense(embedding_dim, activation='relu')(x)

    return conditioning_vector

# --- Example Usage ---
# Suppose your FER model outputs are as follows:
# For demonstration, let's create dummy data:
batch_size = 4
num_classes = 7

# Random emotion probabilities (ensure they sum to 1 via softmax)
dummy_emotion_logits = tf.random.uniform((batch_size, num_classes))
dummy_emotion_probs = tf.nn.softmax(dummy_emotion_logits, axis=-1)

# Random intensity scores between 0 and 1
dummy_intensity = tf.random.uniform((batch_size, 1), minval=0, maxval=1)

# Map FER outputs to a 16-dimensional conditioning vector for WaveNet
conditioning_vector = map_to_wavenet_condition(dummy_emotion_probs, dummy_intensity, num_classes, embedding_dim=16)
print("Conditioning vector shape:", conditioning_vector.shape)


# WaveNet: