In [1]:
import os
import pandas as pd
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
import shutil

In [2]:
# The RAF-DB labels are not aligned with the universal emotion labels
RAF_DB_Label_Mapping = {1:'surprise', 2:'fear', 3:'disgust', 4:'happy', 5:'sad', 6:'angry', 7:'neutral'}
universal_emotion_mapping = {0: 'Angry', 1: 'Disgust', 2: 'Fear', 3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'}

# This aligns the mapping of the RAF-DB labels with the universal emotion labels
Updated_RAF_DB_Label_Mapping = {1:5, 2:2, 3:1, 4:3, 5:4, 6:0, 7:6}

In [3]:
def process_split(base_dir, rafdb_root, original_dir, split_name, target_dir, df=None):
    """
    Process a dataset split and create labels CSV
    original_dir: Path to original RAF-DB images (train/test)
    split_name: Prefix for filenames (train/val/test)
    target_dir: Output directory
    df: DataFrame containing labels (for test split)
    """
    os.makedirs(os.path.join(base_dir, target_dir), exist_ok=True)
    labels = []

    # For test split (validation/test), use integer labels
    if df is not None:
        for idx, row in df.iterrows():
            label = row['label']
            emotion = RAF_DB_Label_Mapping[int(label)]
            orig_path = os.path.join(rafdb_root, "DATASET", original_dir, str(label), row['image'])
            
            img = Image.open(orig_path)
            filename = f"{split_name}_{idx}_{emotion}.png"
            img.save(os.path.join(base_dir, target_dir, filename))
            
            # Store INTEGER label for validation/test
            labels.append({'filename': filename, 'label': Updated_RAF_DB_Label_Mapping[int(label)]})

    # For train split, keep emotion names in CSV
    else:
        idx = 0
        for label in os.listdir(os.path.join(rafdb_root, "DATASET", original_dir)):
            label_dir = os.path.join(rafdb_root, "DATASET", original_dir, label)
            if not os.path.isdir(label_dir):
                continue

            for img_file in os.listdir(label_dir):
                emotion = RAF_DB_Label_Mapping[int(label)]
                orig_path = os.path.join(label_dir, img_file)
                
                img = Image.open(orig_path)
                filename = f"{split_name}_{idx}_{emotion}.png"
                img.save(os.path.join(base_dir, target_dir, filename))
                
                # Store TEXT label for training
                labels.append({'filename': filename, 'label': Updated_RAF_DB_Label_Mapping[int(label)]})
                idx += 1

    # Order the entries and save labels CSV
    labels = pd.DataFrame(labels)
    labels['sort_key'] = labels['filename'].str.extract(r'_(\d+)_').astype(int)
    labels = labels.sort_values('sort_key').drop(columns='sort_key')

    pd.DataFrame(labels).to_csv(os.path.join(base_dir, target_dir, "labels.csv"), index=False)

In [4]:
def copy_images_to_label_subdirectories(base_dir):
    """
    For each subdirectory (test, train, validation) in base_dir,
    copy images into subdirectories based on their label, extracted from the filename.
    
    Expected filename format: {usecase}_{index}_{label}.{ext}
    """
    # Define the usage folders.
    usage_dirs = ['train', 'test', 'validation']
    
    for usage in usage_dirs:
        usage_path = os.path.join(base_dir, usage)
        if not os.path.isdir(usage_path):
            print(f"Directory {usage_path} does not exist. Skipping.")
            continue
        
        # Process each file in the usage folder.
        for filename in os.listdir(usage_path):
            file_path = os.path.join(usage_path, filename)
            if os.path.isfile(file_path):
                # Parse the filename. We expect at least 3 parts separated by '_'
                parts = filename.split('_')
                if len(parts) < 3:
                    print(f"Filename {filename} does not match expected format. Skipping.")
                    continue
                
                # The label is assumed to be the last part, with the file extension removed.
                label_with_ext = parts[-1]
                label, _ = os.path.splitext(label_with_ext)
                
                # Create the label subdirectory if it doesn't exist.
                label_dir = os.path.join(usage_path, label)
                os.makedirs(label_dir, exist_ok=True)
                
                # Copy the image into the label subdirectory.
                destination_file_path = os.path.join(label_dir, filename)
                shutil.copy2(file_path, destination_file_path)

In [None]:
# Define paths and structure
rafdb_root = "RAF-DB" # Where the original dataset is stored
base_dir = "RAFDB_Structured_2" # Where you want the structured dataset to be saved
os.makedirs(base_dir, exist_ok=True)

# Process training set
process_split(base_dir, rafdb_root, "train", "train", "train")

# Process test set with stratified split
test_labels = pd.read_csv(os.path.join(rafdb_root, "test_labels.csv"))
train_labels = pd.read_csv(os.path.join(rafdb_root, "train_labels.csv"))

# Stratified split of test set into validation/test (50/50)
val_df, test_df = train_test_split(
    test_labels,
    test_size=0.5,
    stratify=test_labels['label'],
    random_state=42
)

# Create validation set (uses integer labels in CSV)
process_split(base_dir, rafdb_root, "test", "val", "validation", val_df)

# Create test set (uses integer labels in CSV)
process_split(base_dir, rafdb_root, "test", "test", "test", test_df)

print("Dataset restructuring complete!")

# Set the base directory containing the train, test, and validation subdirectories.
base_directory = 'RAF-DB_Structured'
copy_images_to_label_subdirectories(base_directory)

Dataset restructuring complete!
Filename labels.csv does not match expected format. Skipping.
Filename labels.csv does not match expected format. Skipping.
Filename labels.csv does not match expected format. Skipping.


In [70]:
# import os
# import shutil
# import pandas as pd
# import random
# from collections import defaultdict

# def organize_rafdb(dataset_path, output_path, train_ratio=0.6, val_ratio=0.2, test_ratio=0.2):
#     label_mapping = {1: "Surprised", 2: "Fear", 3: "Disgust", 4: "Happy", 5: "Sad", 6: "Angry", 7: "Neutral"}

#     assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios must sum to 1.0"
    
#     # Load label data
#     train_labels = pd.read_csv(os.path.join(dataset_path, "train_labels.csv"))
#     test_labels = pd.read_csv(os.path.join(dataset_path, "test_labels.csv"))
    
#     all_labels = pd.concat([train_labels, test_labels], ignore_index=True)
#     all_labels.columns = ["image", "label"]
    
#     # Prepare directories
#     for split in ['train', 'validation', 'test']:
#         split_path = os.path.join(output_path, split)
#         os.makedirs(split_path, exist_ok=True)
    
#     # Full image paths
#     image_paths = []
#     for root, _, files in os.walk(os.path.join(dataset_path, "DATASET")):
#         for file in files:
#             if file.endswith(".jpg"):
#                 image_paths.append(os.path.join(root, file))
    
#     # Map image names to paths
#     image_dict = {os.path.basename(path): path for path in image_paths}
    
#     # Filter available images
#     all_labels = all_labels[all_labels["image"].isin(image_dict.keys())]

#     # Group by label
#     emotion_groups = defaultdict(list)
#     for _, row in all_labels.iterrows():
#         emotion_groups[row["label"]].append(row)
    
#     # Stratified splitting
#     train_files, val_files, test_files = [], [], []
    
#     for emotion, files in emotion_groups.items():
#         random.shuffle(files)
#         n = len(files)
#         train_end = int(n * train_ratio)
#         val_end = train_end + int(n * val_ratio)
        
#         train_files.extend(files[:train_end])
#         val_files.extend(files[train_end:val_end])
#         test_files.extend(files[val_end:])
    
#     # Function to copy images
#     def copy_images(data, split):
#         for cnt, row in enumerate(data):
#             image_name = row["image"]
#             label = row["label"]
#             src_path = image_dict[image_name]
#             new_name = f"{split}_{cnt}_{label_mapping[label]}.jpg"
#             dest_path = os.path.join(output_path, split, new_name)
#             shutil.copy(src_path, dest_path)
    
#     # Copy images to their respective directories
#     copy_images(train_files, "train")
#     copy_images(val_files, "validation")
#     copy_images(test_files, "test")
    
#     print("Dataset organized successfully!")

# # Example usage
# dataset_path = "RAF-DB"
# dataset_output_path = "RAF-DB_ResEmoteNet"
# organize_rafdb(dataset_path, dataset_output_path, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1)

### Preparing RAF-DB Data - ResEmoteNet Format - Equal Class Distribution - Data Augmentation (Low Quantity Labels)

In [66]:
# import os
# import shutil
# import pandas as pd
# import random
# from collections import defaultdict

# def organize_rafdb(dataset_path, output_path, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15):
#     assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios must sum to 1.0"

#     # Load label data
#     train_labels = pd.read_csv(os.path.join(dataset_path, "train_labels.csv"))
#     test_labels = pd.read_csv(os.path.join(dataset_path, "test_labels.csv"))

#     all_labels = pd.concat([train_labels, test_labels], ignore_index=True)
#     all_labels.columns = ["image", "label"]

#     # Prepare directories
#     for split in ['train', 'validation', 'test']:
#         os.makedirs(os.path.join(output_path, split), exist_ok=True)

#     # Full image paths
#     image_paths = {os.path.basename(path): path for path in 
#                    [os.path.join(root, file) 
#                     for root, _, files in os.walk(os.path.join(dataset_path, "DATASET")) 
#                     for file in files if file.endswith(".jpg")]}

#     # Filter available images
#     all_labels = all_labels[all_labels["image"].isin(image_paths.keys())]

#     # Group images by label
#     emotion_groups = defaultdict(list)
#     for _, row in all_labels.iterrows():
#         emotion_groups[row["label"]].append(row)

#     # Stratified sampling: Ensure equal distribution across splits
#     train_files, val_files, test_files = [], [], []

#     for label, files in emotion_groups.items():
#         random.shuffle(files)  # Shuffle each label group
#         n = len(files)
#         train_end = int(n * train_ratio)
#         val_end = train_end + int(n * val_ratio)

#         train_files.extend(files[:train_end])
#         val_files.extend(files[train_end:val_end])
#         test_files.extend(files[val_end:])

#     # Function to copy images to respective directories
#     def copy_images(data, split):
#         for _, row in data.iterrows():
#             image_name = row["image"]
#             label = row["label"]
#             src_path = image_paths[image_name]
#             new_name = f"{split}_{os.path.splitext(image_name)[0]}_{label}.jpg"
#             dest_path = os.path.join(output_path, split, new_name)
#             shutil.copy(src_path, dest_path)

#     # Copy images to their respective directories
#     copy_images(pd.DataFrame(train_files), "train")
#     copy_images(pd.DataFrame(val_files), "validation")
#     copy_images(pd.DataFrame(test_files), "test")

#     # Display label distribution in each directory
#     def display_distribution(data, split):
#         label_counts = pd.DataFrame(data)["label"].value_counts().sort_index()
#         total = len(data)
#         percentages = (label_counts / total * 100).round(2)
#         distribution = pd.DataFrame({"Count": label_counts, "Percentage": percentages})
#         print(f"\nLabel distribution in {split} set:")
#         print(distribution)
    
#     display_distribution(train_files, "train")
#     display_distribution(val_files, "validation")
#     display_distribution(test_files, "test")

#     print("\nDataset organized successfully!")

# # Example usage
# dataset_path = "RAF-DB"
# dataset_output_path = "RAF-DB_ResEmoteNet"
# organize_rafdb(dataset_path, dataset_output_path, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1)

In [None]:
# import os
# import shutil
# import pandas as pd
# import random
# import numpy as np 
# import albumentations as A
# from albumentations.pytorch import ToTensorV2
# from collections import defaultdict
# from sklearn.utils.class_weight import compute_class_weight
# import cv2
# import matplotlib.pyplot as plt

# def organize_rafdb(dataset_path, output_path, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15):
#     assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios must sum to 1.0"

#     label_mapping = {1: "Surprised", 2: "Fear", 3: "Disgust", 4: "Happy", 5: "Sad", 6: "Angry", 7: "Neutral"}

#     # Load label data
#     train_labels = pd.read_csv(os.path.join(dataset_path, "train_labels.csv"))
#     test_labels = pd.read_csv(os.path.join(dataset_path, "test_labels.csv"))

#     all_labels = pd.concat([train_labels, test_labels], ignore_index=True)
#     all_labels.columns = ["image", "label"]

#     # Prepare directories
#     for split in ['train', 'validation', 'test']:
#         os.makedirs(os.path.join(output_path, split), exist_ok=True)

#     # Full image paths
#     image_paths = {os.path.basename(path): path for path in 
#                    [os.path.join(root, file) 
#                     for root, _, files in os.walk(os.path.join(dataset_path, "DATASET")) 
#                     for file in files if file.endswith(".jpg")]}

#     # Filter available images
#     all_labels = all_labels[all_labels["image"].isin(image_paths.keys())]

#     # Group images by label
#     emotion_groups = defaultdict(list)
#     for _, row in all_labels.iterrows():
#         emotion_groups[row["label"]].append(row)

#     # Stratified sampling: Ensure equal distribution across splits
#     train_files, val_files, test_files = [], [], []

#     for label, files in emotion_groups.items():
#         random.shuffle(files)  # Shuffle each label group
#         n = len(files)
#         train_end = int(n * train_ratio)
#         val_end = train_end + int(n * val_ratio)

#         train_files.extend(files[:train_end])
#         val_files.extend(files[train_end:val_end])
#         test_files.extend(files[val_end:])

#     # # Compute class weights for training
#     # train_labels_list = [row["label"] for row in train_files]
#     # unique_classes = np.array(sorted(set(train_labels_list))) 
#     # class_weights = compute_class_weight(
#     #     class_weight="balanced",
#     #     classes=unique_classes, 
#     #     y=train_labels_list
#     # )
#     # class_weights_dict = {i: weight for i, weight in zip(unique_classes, class_weights)}
#     # print("\nComputed Class Weights:", class_weights_dict)

#     # Define augmentation for minority classes (adjust labels as needed)
#     augmentation = A.Compose([
#         A.RandomRotate90(),
#         A.HorizontalFlip(),
#         A.ColorJitter(),
#         ToTensorV2()
#     ])

#     # Function to copy and augment images
#     def copy_images(data, split):
#         for index, row in data.iterrows():
#             image_name = row["image"]
#             label = row["label"]
#             src_path = image_paths[image_name]
#             new_name = f"{split}_{index}_{label_mapping[label]}.jpg"
#             dest_path = os.path.join(output_path, split, new_name)
            
#             # Read image using OpenCV (install with: pip install opencv-python)
#             image = cv2.imread(src_path)
#             image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert to RGB
            
#             # Apply augmentation only to minority classes - Update this to auto select based on distribution
#             if label in [2, 3, 6]:  # Example minority classes
#                 augmented = augmentation(image=image)
#                 augmented_image = augmented["image"].numpy().transpose(1, 2, 0)
#                 cv2.imwrite(dest_path, cv2.cvtColor(augmented_image, cv2.COLOR_RGB2BGR))
#             else:
#                 # Directly copy without augmentation
#                 shutil.copy(src_path, dest_path)

#     # Copy images to their respective directories
#     copy_images(pd.DataFrame(train_files), "train")
#     copy_images(pd.DataFrame(val_files), "validation")
#     copy_images(pd.DataFrame(test_files), "test")

#     # Display label distribution in each directory
#     def display_distribution(data, split):
#         label_counts = pd.DataFrame(data)["label"].value_counts().sort_index()
#         total = len(data)
#         percentages = (label_counts / total * 100).round(2)
#         distribution = pd.DataFrame({"Count": label_counts, "Percentage": percentages})
#         print(f"\nLabel distribution in {split} set:")
#         print(distribution)
    
#     display_distribution(train_files, "train")
#     display_distribution(val_files, "validation")
#     display_distribution(test_files, "test")

#     print("\nDataset organized successfully!")

# # Example usage
# dataset_path = "RAF-DB"
# dataset_output_path = "RAF-DB_ResEmoteNet"
# organize_rafdb(dataset_path, dataset_output_path, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1)


Label distribution in train set:
       Count  Percentage
label                   
1       1295       10.56
2        284        2.31
3        701        5.71
4       4765       38.84
5       1968       16.04
6        693        5.65
7       2563       20.89

Label distribution in validation set:
       Count  Percentage
label                   
1        161       10.52
2         35        2.29
3         87        5.69
4        595       38.89
5        246       16.08
6         86        5.62
7        320       20.92

Label distribution in test set:
       Count  Percentage
label                   
1        163       10.58
2         36        2.34
3         89        5.78
4        597       38.77
5        246       15.97
6         88        5.71
7        321       20.84

Dataset organized successfully!


### Convert To GrayScale

In [None]:
# import os
# import cv2

# def convert_to_grayscale(input_dir):
#     """
#     Converts all images in a directory to grayscale and saves them in a new directory.
    
#     Args:
#         input_dir (str): Path to directory containing images
#     """
#     # Create output directory path
#     base_dir = os.path.dirname(input_dir)
#     dir_name = os.path.basename(input_dir)
#     output_dir = os.path.join(base_dir, f"{dir_name}_grayscale")
    
#     # Create output directory if it doesn't exist
#     os.makedirs(output_dir, exist_ok=True)
    
#     # Supported image extensions
#     valid_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.tif', '.tiff')
    
#     # Process all files
#     for root, dirs, files in os.walk(input_dir):
#         for file in files:
#             if file.lower().endswith(valid_extensions):
#                 # Create input and output paths
#                 input_path = os.path.join(root, file)
#                 relative_path = os.path.relpath(root, input_dir)
#                 output_subdir = os.path.join(output_dir, relative_path)
                
#                 # Create subdirectories if needed
#                 os.makedirs(output_subdir, exist_ok=True)
#                 output_path = os.path.join(output_subdir, file)
                
#                 try:
#                     # Read and convert image
#                     img = cv2.imread(input_path)
#                     gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
                    
#                     # Save grayscale image
#                     cv2.imwrite(output_path, gray_img)
#                     # print(f"Converted: {input_path} -> {output_path}")
                    
#                 except Exception as e:
#                     print(f"Error processing {input_path}: {str(e)}")

# # Usage example
# convert_to_grayscale("RAF-DB_ResEmoteNet\\test")
# convert_to_grayscale("RAF-DB_ResEmoteNet\\train")
# convert_to_grayscale("RAF-DB_ResEmoteNet\\validation")

Converted: RAF-DB_ResEmoteNet\test\test_10001_Neutral.jpg -> RAF-DB_ResEmoteNet\test_grayscale\.\test_10001_Neutral.jpg
Converted: RAF-DB_ResEmoteNet\test\test_10003_Neutral.jpg -> RAF-DB_ResEmoteNet\test_grayscale\.\test_10003_Neutral.jpg
Converted: RAF-DB_ResEmoteNet\test\test_10004_Neutral.jpg -> RAF-DB_ResEmoteNet\test_grayscale\.\test_10004_Neutral.jpg
Converted: RAF-DB_ResEmoteNet\test\test_1000_Angry.jpg -> RAF-DB_ResEmoteNet\test_grayscale\.\test_1000_Angry.jpg
Converted: RAF-DB_ResEmoteNet\test\test_10021_Neutral.jpg -> RAF-DB_ResEmoteNet\test_grayscale\.\test_10021_Neutral.jpg
Converted: RAF-DB_ResEmoteNet\test\test_10029_Neutral.jpg -> RAF-DB_ResEmoteNet\test_grayscale\.\test_10029_Neutral.jpg
Converted: RAF-DB_ResEmoteNet\test\test_10047_Neutral.jpg -> RAF-DB_ResEmoteNet\test_grayscale\.\test_10047_Neutral.jpg
Converted: RAF-DB_ResEmoteNet\test\test_10054_Neutral.jpg -> RAF-DB_ResEmoteNet\test_grayscale\.\test_10054_Neutral.jpg
Converted: RAF-DB_ResEmoteNet\test\test_10068_

### Create Label CSV File

In [None]:
# import os
# import csv

# # Original mapping provided
# label_mapping = {
#     1: "Surprised",
#     2: "Fear", 
#     3: "Disgust",
#     4: "Happy",
#     5: "Sad",
#     6: "Angry",
#     7: "Neutral"
# }

# # Create inverted mapping for label conversion
# emotion_to_id = {v: k for k, v in label_mapping.items()}

# def create_label_csvs(root_dir):
#     """
#     Creates CSV files with numerical labels based on filename patterns.
#     File naming convention: [prefix]_[number]_[EMOTION_NAME].jpg
#     Example: test_21_Happy.jpg → 4
#     """
#     valid_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
    
#     for dirpath, _, filenames in os.walk(root_dir):
#         csv_data = []
        
#         for filename in filenames:
#             if not filename.lower().endswith(valid_extensions):
#                 continue
                
#             base_name = os.path.splitext(filename)[0]
#             parts = base_name.split('_')
            
#             if len(parts) < 3:
#                 print(f"Skipping invalid filename: {filename}")
#                 continue
                
#             # Get emotion name and convert to title case
#             emotion_name = parts[-1].capitalize()
            
#             # Convert to numerical label
#             label = emotion_to_id.get(emotion_name)
            
#             if label is None:
#                 print(f"Skipping unknown emotion: {emotion_name} in {filename}")
#                 continue
                
#             csv_data.append([filename, label])
        
#         if csv_data:
#             csv_path = os.path.join(dirpath, "labels.csv")
#             with open(csv_path, 'w', newline='') as f:
#                 writer = csv.writer(f)
#                 writer.writerow(["Image_name", "label"])
#                 writer.writerows(csv_data)
                
#             print(f"Created {csv_path} with {len(csv_data)} entries")

# # Usage
# # create_label_csvs("RAF-DB_ResEmoteNet\\test")
# # create_label_csvs("RAF-DB_ResEmoteNet\\train")
# # create_label_csvs("RAF-DB_ResEmoteNet\\validation")

# create_label_csvs("RAF-DB_ResEmoteNet\\test_grayscale")
# create_label_csvs("RAF-DB_ResEmoteNet\\train_grayscale")
# create_label_csvs("RAF-DB_ResEmoteNet\\validation_grayscale")

Created RAF-DB_ResEmoteNet\test_grayscale\labels.csv with 1540 entries
Created RAF-DB_ResEmoteNet\train_grayscale\labels.csv with 13329 entries
Created RAF-DB_ResEmoteNet\validation_grayscale\labels.csv with 1530 entries
