In [1]:
import os
import shutil
from sklearn.model_selection import train_test_split

# Paths
original_dataset_dir = r'D:\RuhunaNew\Academic\Research\Facial_Recog_Repo\Group_50_Repo\Datasets\Dataset_48_48_5000_per_each'  # Folder containing 9 emotion subfolders
output_dir = r'D:\RuhunaNew\Academic\Research\Facial_Recog_Repo\Group_50_Repo\Datasets\Preprocesssed_5000'  # Where train/val/test folders will be created

# Split ratios
train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15

# Create output directories for train, val, test with class subfolders
for split in ['train', 'val', 'test']:
    for emotion_class in os.listdir(original_dataset_dir):
        class_dir = os.path.join(output_dir, split, emotion_class)
        os.makedirs(class_dir, exist_ok=True)

# For each class folder, split files and copy
for emotion_class in os.listdir(original_dataset_dir):
    class_path = os.path.join(original_dataset_dir, emotion_class)
    images = [f for f in os.listdir(class_path) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

    # First split train and temp (val+test)
    train_files, temp_files = train_test_split(images, train_size=train_ratio, random_state=42, shuffle=True)
    # Then split temp into val and test
    val_size_adjusted = val_ratio / (val_ratio + test_ratio)  # Adjust val ratio relative to temp
    val_files, test_files = train_test_split(temp_files, train_size=val_size_adjusted, random_state=42, shuffle=True)

    # Copy files to respective folders
    for split_name, file_list in zip(['train', 'val', 'test'], [train_files, val_files, test_files]):
        for file_name in file_list:
            src = os.path.join(class_path, file_name)
            dst = os.path.join(output_dir, split_name, emotion_class, file_name)
            shutil.copy2(src, dst)

    print(f"Class '{emotion_class}': {len(train_files)} train, {len(val_files)} val, {len(test_files)} test images.")

print("Dataset split completed.")


Class 'angry': 3500 train, 750 val, 750 test images.
Class 'boring': 3500 train, 750 val, 750 test images.
Class 'disgust': 3500 train, 750 val, 750 test images.
Class 'fear': 3500 train, 750 val, 750 test images.
Class 'happy': 3500 train, 750 val, 750 test images.
Class 'neutral': 3500 train, 750 val, 750 test images.
Class 'sad': 3500 train, 750 val, 750 test images.
Class 'stress': 3500 train, 750 val, 750 test images.
Class 'surprise': 3500 train, 750 val, 750 test images.
Dataset split completed.


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import cv2
import numpy as np
from ultralytics import YOLO
import time

# ---------------------------
# Configurations and Globals
# ---------------------------
IMG_SIZE = 128          # CNN input size (variable)
NUM_CLASSES = 9        # Number of emotion classes
BATCH_SIZE = 128
EPOCHS = 20
PATIENCE = 5           # Early stopping patience
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ---------------------------
# Small CNN for Grayscale Emotion Classification
# ---------------------------
class SmallEmotionCNN(nn.Module):
    def __init__(self, num_classes=NUM_CLASSES):
        super(SmallEmotionCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),  # 1 x IMG_SIZE x IMG_SIZE
            nn.ReLU(),
            nn.MaxPool2d(2),  # 16 x IMG_SIZE/2 x IMG_SIZE/2

            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 32 x IMG_SIZE/4 x IMG_SIZE/4

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 64 x IMG_SIZE/8 x IMG_SIZE/8
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * (IMG_SIZE // 8) * (IMG_SIZE // 8), 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

# ---------------------------
# Dataset Class with YOLOv11 Face Detection
# ---------------------------
class EmotionDataset(Dataset):
    def __init__(self, image_paths, labels, yolo_model, transform=None):
        """
        image_paths: list of grayscale image file paths
        labels: list of integer labels (0 to NUM_CLASSES-1)
        yolo_model: loaded YOLOv11 model for face detection
        transform: torchvision transforms applied to cropped face
        """
        self.image_paths = image_paths
        self.labels = labels
        self.yolo_model = yolo_model
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]

        # Load grayscale image with OpenCV
        gray_img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        if gray_img is None:
            raise FileNotFoundError(f"Image not found: {img_path}")

        # Convert grayscale to 3-channel by duplicating channels for YOLO
        img_rgb = cv2.cvtColor(gray_img, cv2.COLOR_GRAY2RGB)

        # Run YOLOv11 face detection
        results = self.yolo_model(img_rgb)

        # Extract bounding boxes with confidence > 0.5
        boxes = []
        for r in results:
            for box in r.boxes:
                if box.conf > 0.5:
                    boxes.append(box.xyxy.cpu().numpy().astype(int))

        if len(boxes) == 0:
            # If no face detected, use full image
            h, w = gray_img.shape
            boxes = [np.array([0, 0, w, h])]

        # Take the largest box (assuming main face)
        boxes = sorted(boxes, key=lambda b: (b[2]-b[0])*(b[3]-b[1]), reverse=True)
        x1, y1, x2, y2 = boxes[0]

        # Crop face from grayscale image
        face = gray_img[y1:y2, x1:x2]
        face = cv2.resize(face, (IMG_SIZE, IMG_SIZE))

        face_pil = Image.fromarray(face)
        if self.transform:
            face_tensor = self.transform(face_pil)
        else:
            face_tensor = transforms.ToTensor()(face_pil)  # This will produce 1 channel tensor

        return face_tensor, label

# ---------------------------
# Data Transforms for Grayscale
# ---------------------------
transform = transforms.Compose([
    transforms.ToTensor(),  # Converts to 1xHxW tensor and scales [0,255] to [0,1]
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize grayscale
])

# ---------------------------
# Training and Validation Functions
# ---------------------------
def train_one_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0
    running_corrects = 0
    total = 0

    for inputs, labels in dataloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        running_corrects += torch.sum(preds == labels).item()
        total += inputs.size(0)

    epoch_loss = running_loss / total
    epoch_acc = running_corrects / total
    return epoch_loss, epoch_acc

def validate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0
    running_corrects = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels).item()
            total += inputs.size(0)

    epoch_loss = running_loss / total
    epoch_acc = running_corrects / total
    return epoch_loss, epoch_acc

# ---------------------------
# Early Stopping Class
# ---------------------------
class EarlyStopping:
    def __init__(self, patience=PATIENCE, verbose=False):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_loss = None
        self.early_stop = False
        self.best_model_wts = None

    def __call__(self, val_loss, model):
        if self.best_loss is None or val_loss < self.best_loss:
            self.best_loss = val_loss
            self.best_model_wts = model.state_dict()
            self.counter = 0
            if self.verbose:
                print(f"Validation loss improved to {val_loss:.4f}, saving model.")
        else:
            self.counter += 1
            if self.verbose:
                print(f"Validation loss did not improve. Counter: {self.counter}/{self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True

# ---------------------------
# Main Training Loop
# ---------------------------
def main(train_image_paths, train_labels, val_image_paths, val_labels):
    # Load YOLOv11 face detection model (YOLO11n lightweight)
    yolo_model = YOLO("yolo11n.pt")  # Adjust path to your YOLOv11 weights

    # Create datasets and loaders
    train_dataset = EmotionDataset(train_image_paths, train_labels, yolo_model, transform=transform)
    val_dataset = EmotionDataset(val_image_paths, val_labels, yolo_model, transform=transform)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

    # Initialize CNN model, loss, optimizer
    model = SmallEmotionCNN().to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    early_stopping = EarlyStopping(patience=PATIENCE, verbose=True)

    for epoch in range(EPOCHS):
        start_time = time.time()

        train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer, DEVICE)
        val_loss, val_acc = validate(model, val_loader, criterion, DEVICE)

        elapsed = time.time() - start_time

        print(f"Epoch {epoch+1}/{EPOCHS} - "
              f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} - "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f} - "
              f"Time: {elapsed:.1f}s")

        early_stopping(val_loss, model)

        if early_stopping.early_stop:
            print("Early stopping triggered. Stopping training.")
            break

    # Load best model weights
    model.load_state_dict(early_stopping.best_model_wts)
    print("Training complete. Best validation loss: {:.4f}".format(early_stopping.best_loss))

    # Save model
    torch.save(model.state_dict(), "emotion_cnn_yolo_best.pth")
    print("Best model saved as emotion_cnn_best.pth")

    return model

import os

def get_image_paths_and_labels(root_dir):
    """
    Scans root_dir for subfolders (emotions), collects image paths and integer labels.
    Returns:
        image_paths: list of image file paths
        labels: list of integer labels corresponding to each path
        class_to_idx: dict mapping class name to label index
    """
    image_paths = []
    labels = []
    class_names = sorted([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))])
    class_to_idx = {cls_name: idx for idx, cls_name in enumerate(class_names)}
    
    for cls_name in class_names:
        cls_folder = os.path.join(root_dir, cls_name)
        for fname in os.listdir(cls_folder):
            if fname.lower().endswith(('.png', '.jpg', '.jpeg')):
                image_paths.append(os.path.join(cls_folder, fname))
                labels.append(class_to_idx[cls_name])
    return image_paths, labels, class_to_idx

# Example usage:
train_root = r"D:\RuhunaNew\Academic\Research\Facial_Recog_Repo\Group_50_Repo\Datasets\Preprocesssed_5000\train"  # e.g., /data/emotions/train
test_root = r"D:\RuhunaNew\Academic\Research\Facial_Recog_Repo\Group_50_Repo\Datasets\Preprocesssed_5000\test"    # e.g., /data/emotions/test

train_image_paths, train_labels, class_to_idx = get_image_paths_and_labels(train_root)
test_image_paths, test_labels, _ = get_image_paths_and_labels(test_root)

# Now call main() with these lists:
trained_model = main(train_image_paths, train_labels, test_image_paths, test_labels)

