In [1]:
import os
import cv2
import numpy as np
from collections import defaultdict

from scipy.ndimage import zoom
import seaborn as sns
from matplotlib import pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import shuffle

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, LSTM, TimeDistributed
from tensorflow.keras.utils import to_categorical


Check Path

In [2]:
import os

# Set correct base paths
DROWSY_PATH = os.path.join("D:\VSCODE\DDD_project\DatasetNew", "DrowsySet")
EMOTION_PATH = os.path.join("D:\VSCODE\DDD_project\DatasetNew", "Emotion")

# Confirm folder existence
print("Drowsy path exists:", os.path.exists(DROWSY_PATH))
print("Emotion path exists:", os.path.exists(EMOTION_PATH))


Drowsy path exists: True
Emotion path exists: True


In [3]:
DROWSY_PATH = "D:\VSCODE\DDD_project\DatasetNew\DrowsySet"
EMOTION_PATH = "D:\VSCODE\DDD_project\DatasetNew\Emotion"

# Drowsy: ['Active', 'Fatigue']
# Emotion: ['Anger', 'Fear', 'Happy', 'Sad', 'Surprise']

DROWSY_CLASSES = ["Active", "Fatigue"]
EMOTION_CLASSES = ["Anger", "Fear", "Happy", "Sad", "Surprise"]



Shared preprocess function

In [4]:
import os
import cv2
import numpy as np
from collections import defaultdict

def preprocess_list(x):
    return int((x.split("-")[2]).split(".")[0])  # Assumes consistent naming: subject-seq-frame.jpg

def preprocess_dict(x):
    res = list(np.argsort(list(map(preprocess_list, x))))
    return [x[i] for i in res]

def img2array(files, path, size=(48, 48)):
    images = []
    for file in files:
        img = cv2.imread(os.path.join(path, file), cv2.IMREAD_GRAYSCALE)
        if img is not None:
            img = cv2.resize(img, size)
            images.append(img)
    return np.array(images)


Load Datasets

In [5]:
from collections import defaultdict

drowsy_data = defaultdict(lambda: defaultdict(list))
emotion_data = defaultdict(lambda: defaultdict(list))

# Load Drowsy Data
for label in DROWSY_CLASSES:
    folder_path = os.path.join(DROWSY_PATH, label)
    if not os.path.exists(folder_path):
        print(f"❌ Missing folder: {folder_path}")
    else:
        for f in os.listdir(folder_path):
            subject = f.split("-")[0]
            drowsy_data[label][subject].append(f)

# Load Emotion Data
for label in EMOTION_CLASSES:
    folder_path = os.path.join(EMOTION_PATH, label)
    if not os.path.exists(folder_path):
        print(f"❌ Missing folder: {folder_path}")
    else:
        for f in os.listdir(folder_path):
            subject = f.split("-")[0]
            emotion_data[label][subject].append(f)


In [6]:
# --- Drowsy Set ---
print("🟡 DrowsySet File Count:")
for label in DROWSY_CLASSES:
    folder_path = os.path.join(DROWSY_PATH, label)
    if os.path.exists(folder_path):
        file_count = len(os.listdir(folder_path))
        print(f"  {label}: {file_count} files")
    else:
        print(f"  ❌ Folder not found: {folder_path}")

# --- Emotion Set ---
print("\n🟢 Emotion File Count:")
for label in EMOTION_CLASSES:
    folder_path = os.path.join(EMOTION_PATH, label)
    if os.path.exists(folder_path):
        file_count = len(os.listdir(folder_path))
        print(f"  {label}: {file_count} files")
    else:
        print(f"  ❌ Folder not found: {folder_path}")


🟡 DrowsySet File Count:
  Active: 4560 files
  Fatigue: 4560 files

🟢 Emotion File Count:
  Anger: 135 files
  Fear: 75 files
  Happy: 207 files
  Sad: 84 files
  Surprise: 249 files


In [7]:
total_drowsy = sum(len(os.listdir(os.path.join(DROWSY_PATH, label))) for label in DROWSY_CLASSES if os.path.exists(os.path.join(DROWSY_PATH, label)))
total_emotion = sum(len(os.listdir(os.path.join(EMOTION_PATH, label))) for label in EMOTION_CLASSES if os.path.exists(os.path.join(EMOTION_PATH, label)))
print(f"\n🔢 Total files - DrowsySet: {total_drowsy}, Emotion: {total_emotion}")



🔢 Total files - DrowsySet: 9120, Emotion: 750


Convert Dataset to grayscale and uniform

In [8]:
import cv2  # OpenCV for image processing
from tqdm import tqdm

def load_image_sequence_from_folder(folder_path):
    """Load and return all images from a given folder as grayscale 48x48."""
    frames = []
    for filename in sorted(os.listdir(folder_path)):
        img_path = os.path.join(folder_path, filename)
        img = cv2.imread(img_path)  # Reads in color (BGR)
        if img is None:
            continue
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)  # Convert to grayscale
        img = cv2.resize(img, (48, 48))              # Resize if needed
        frames.append(img)
    return np.array(frames)


In [9]:
from collections import defaultdict
import cv2
import os
import numpy as np
from tqdm import tqdm

drowsy_data = defaultdict(lambda: defaultdict(list))

for label in DROWSY_CLASSES:  # ["Active", "Fatigue"]
    folder_path = os.path.join(DROWSY_PATH, label)
    if os.path.exists(folder_path):
        print(f"📂 Loading {label} from {folder_path}")
        
        filenames = sorted(os.listdir(folder_path))  # ✅ Sort files
        
        for f in tqdm(filenames):
            subject = f.split("-")[0]  # Or however your subjects are separated
            f_path = os.path.join(folder_path, f)
            img = cv2.imread(f_path)
            if img is None:
                continue
            img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)  # Grayscale
            img = cv2.resize(img, (48, 48))              # Resize to uniform size
            drowsy_data[label][subject].append(img)
    else:
        print(f"❌ Missing folder: {folder_path}")


📂 Loading Active from D:\VSCODE\DDD_project\DatasetNew\DrowsySet\Active


100%|██████████| 4560/4560 [00:39<00:00, 116.66it/s]


📂 Loading Fatigue from D:\VSCODE\DDD_project\DatasetNew\DrowsySet\Fatigue


100%|██████████| 4560/4560 [00:35<00:00, 129.71it/s]


In [10]:
emotion_data = defaultdict(lambda: defaultdict(list))

for label in EMOTION_CLASSES:  # ["Anger", "fear", "happy", "sad", "surprise"]
    folder_path = os.path.join(EMOTION_PATH, label)
    if os.path.exists(folder_path):
        print(f"📂 Loading {label} from {folder_path}")
        for f in tqdm(os.listdir(folder_path)):
            subject = f.split("-")[0]
            f_path = os.path.join(folder_path, f)
            img = cv2.imread(f_path)  # Load image (even if already grayscale, OpenCV returns 3D sometimes)
            if img is None:
                continue
            img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)  # Convert to grayscale (idempotent if already grayscale)
            img = cv2.resize(img, (48, 48))              # Resize
            emotion_data[label][subject].append(img)
    else:
        print(f"❌ Missing folder: {folder_path}")


📂 Loading Anger from D:\VSCODE\DDD_project\DatasetNew\Emotion\Anger


100%|██████████| 135/135 [00:00<00:00, 3745.86it/s]


📂 Loading Fear from D:\VSCODE\DDD_project\DatasetNew\Emotion\Fear


100%|██████████| 75/75 [00:00<00:00, 3947.51it/s]


📂 Loading Happy from D:\VSCODE\DDD_project\DatasetNew\Emotion\Happy


100%|██████████| 207/207 [00:00<00:00, 3392.64it/s]


📂 Loading Sad from D:\VSCODE\DDD_project\DatasetNew\Emotion\Sad


100%|██████████| 84/84 [00:00<00:00, 2624.53it/s]


📂 Loading Surprise from D:\VSCODE\DDD_project\DatasetNew\Emotion\Surprise


100%|██████████| 249/249 [00:00<00:00, 4290.96it/s]


Pre LSTM

In [11]:
import numpy as np
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

# Define max sequence length
SEQ_LENGTH = 10

def pad_or_truncate(seq, desired_len=SEQ_LENGTH):
    if len(seq) < desired_len:
        # Pad with zeros
        pad_len = desired_len - len(seq)
        padding = np.zeros((pad_len, 48, 48), dtype=np.uint8)
        return np.concatenate([seq, padding], axis=0)
    else:
        # Truncate
        return seq[:desired_len]

def process_dataset(data_dict, label_map):
    X, y = [], []
    for label, subjects in data_dict.items():
        for subj_frames in subjects.values():
            seq = pad_or_truncate(subj_frames)
            X.append(seq)
            y.append(label_map[label])
    X = np.array(X)
    y = np.array(y)
    return X, y


DrowsySet Preparation

In [12]:
DROWSY_LABELS = {"Active": 0, "Fatigue": 1}
X_drowsy, y_drowsy = process_dataset(drowsy_data, DROWSY_LABELS)

# Normalize images
X_drowsy = X_drowsy / 255.0

# Flatten to (samples, timesteps, features)
X_drowsy = X_drowsy.reshape((X_drowsy.shape[0], SEQ_LENGTH, -1))

# Train/Test Split
X_drowsy_train, X_drowsy_test, y_drowsy_train, y_drowsy_test = train_test_split(
    X_drowsy, y_drowsy, test_size=0.2, random_state=42
)


EmotionSet Preparation

In [13]:
EMOTION_LABELS = {"Anger": 0, "Fear": 1, "Happy": 2, "Sad": 3, "Surprise": 4}
X_emotion, y_emotion = process_dataset(emotion_data, EMOTION_LABELS)

# Normalize and reshape
X_emotion = X_emotion / 255.0
X_emotion = X_emotion.reshape((X_emotion.shape[0], SEQ_LENGTH, -1))

# One-hot encode labels
y_emotion_cat = to_categorical(y_emotion, num_classes=5)

# Train/Test Split
X_emotion_train, X_emotion_test, y_emotion_train, y_emotion_test = train_test_split(
    X_emotion, y_emotion_cat, test_size=0.2, random_state=42
)


Drowsiness detection

In [14]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense

model_drowsy = Sequential([
    LSTM(64, return_sequences=True, input_shape=(SEQ_LENGTH, 48*48)),
    LSTM(32),
    Dense(16, activation='relu'),
    Dense(1, activation='sigmoid')
])

model_drowsy.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model_drowsy.summary()


  super().__init__(**kwargs)


Emotion Classifier

In [15]:
model_emotion = Sequential([
    LSTM(64, return_sequences=True, input_shape=(SEQ_LENGTH, 48*48)),
    LSTM(32),
    Dense(32, activation='relu'),
    Dense(5, activation='softmax')
])

model_emotion.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model_emotion.summary()


In [16]:
# Drowsy Model
model_drowsy.fit(X_drowsy_train, y_drowsy_train, epochs=10, batch_size=32, validation_split=0.2)

# Emotion Model
model_emotion.fit(X_emotion_train, y_emotion_train, epochs=10, batch_size=32, validation_split=0.2)


Epoch 1/10
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 20ms/step - accuracy: 0.6527 - loss: 0.6113 - val_accuracy: 0.7603 - val_loss: 0.4275
Epoch 2/10
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 16ms/step - accuracy: 0.7430 - loss: 0.4578 - val_accuracy: 0.7945 - val_loss: 0.3731
Epoch 3/10
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 15ms/step - accuracy: 0.7784 - loss: 0.4054 - val_accuracy: 0.7822 - val_loss: 0.4128
Epoch 4/10
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 15ms/step - accuracy: 0.7697 - loss: 0.4092 - val_accuracy: 0.8014 - val_loss: 0.3638
Epoch 5/10
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 15ms/step - accuracy: 0.7980 - loss: 0.3862 - val_accuracy: 0.8007 - val_loss: 0.3687
Epoch 6/10
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 15ms/step - accuracy: 0.8037 - loss: 0.3818 - val_accuracy: 0.7521 - val_loss: 0.4148
Epoch 7/10
[1m183/18

<keras.src.callbacks.history.History at 0x1866c4c4a10>

In [17]:
def predict_driver_state(X_seq):
    X_seq = X_seq / 255.0
    X_seq = X_seq.reshape((1, SEQ_LENGTH, 48*48))

    drowsy_pred = model_drowsy.predict(X_seq)[0][0]

    if drowsy_pred >= 0.5:
        emotion_pred = model_emotion.predict(X_seq)
        emotion_label = np.argmax(emotion_pred)
        print(f"🟢 Status: Active - Emotion: {list(EMOTION_LABELS.keys())[emotion_label]}")
    else:
        print("🔴 Status: Drowsy")


Build Sequence

In [18]:
import os
import cv2
import numpy as np
from collections import defaultdict
from tqdm import tqdm

def load_data_from_dir(base_dir, img_size=(48, 48), grayscale=True):
    data = []
    for label in os.listdir(base_dir):
        class_dir = os.path.join(base_dir, label)
        if not os.path.isdir(class_dir):
            continue
        for fname in sorted(os.listdir(class_dir)):
            fpath = os.path.join(class_dir, fname)
            img = cv2.imread(fpath)
            if img is None:
                continue
            if grayscale:
                img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            img = cv2.resize(img, img_size)
            data.append((img, label))
    return data

# Load both datasets
emotion_data = load_data_from_dir(EMOTION_PATH)
drowsy_data = load_data_from_dir(DROWSY_PATH)

def group_into_sequences(data, sequence_length=3, relabel_fn=lambda l: l):
    sequences = []
    labels = []
    for i in range(0, len(data) - sequence_length + 1, sequence_length):
        seq = [data[j][0] for j in range(i, i + sequence_length)]
        label = relabel_fn(data[i][1])
        sequences.append(np.array(seq))
        labels.append(label)
    return sequences, labels

# Define label mapping logic
emotion_sequences, emotion_labels = group_into_sequences(
    emotion_data, sequence_length=3, relabel_fn=lambda l: f"Active:{l}"
)
drowsy_sequences, drowsy_labels = group_into_sequences(
    drowsy_data, sequence_length=15, relabel_fn=lambda l: "Drowsy" if l == "Fatigue" else "Active"
)

# Combine sequences and labels


print(f"✅ Emotion sequences: {len(emotion_sequences)}")
print(f"✅ Drowsy sequences: {len(drowsy_sequences)}")


✅ Emotion sequences: 250
✅ Drowsy sequences: 608


In [25]:
# Combine emotion and drowsy sequences and labels
all_sequences = emotion_sequences + drowsy_sequences
all_labels = emotion_labels + drowsy_labels

print(f"Total combined sequences: {len(all_sequences)}")


Total combined sequences: 858


In [31]:
print("Example data structure from emotion_sequences:")
print(type(emotion_sequences[0]))
print(f"Length: {len(emotion_sequences[0])}")
print(f"First element type: {type(emotion_sequences[0][0])}")
print(f"Last element: {emotion_sequences[0][-1]}")


print(f"Length: {len(emotion_sequences[0])}")
print(f"Length: {len(drowsy_sequences[0])}")

Example data structure from emotion_sequences:
<class 'numpy.ndarray'>
Length: 3
First element type: <class 'numpy.ndarray'>
Last element: [[ 27  13  10 ...  17 123 240]
 [ 19   9  14 ...   5  64 209]
 [ 14   7  17 ...   4  25 166]
 ...
 [222 224 226 ... 152 154 159]
 [229 230 232 ... 150 152 156]
 [233 233 233 ... 148 152 156]]
Length: 3
Length: 15


In [27]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

# Convert sequences and labels to NumPy arrays
X = np.array(all_sequences)
y = np.array(all_labels)

# Encode string labels to integers
le = LabelEncoder()
y_encoded = le.fit_transform(y)
y_categorical = to_categorical(y_encoded)

# Split into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y_categorical, test_size=0.2, random_state=42, stratify=y_categorical
)

# Show label mapping
label_map = dict(zip(le.classes_, le.transform(le.classes_)))
print("✅ Label mapping:", label_map)
print("✅ Training samples:", X_train.shape)
print("✅ Test samples:", X_test.shape)


ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (858,) + inhomogeneous part.

Merge and shuffle sequence

In [None]:
from sklearn.utils import shuffle

# Merge both datasets
all_sequences = emotion_sequences + drowsy_sequences
print(f"📦 Total sequences: {len(all_sequences)}")

# Shuffle the sequences
all_sequences = shuffle(all_sequences, random_state=42)


📦 Total sequences: 858


Splitting

In [None]:
from sklearn.model_selection import train_test_split

X = [seq[0] for seq in all_sequences]
y = [seq[1] for seq in all_sequences]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"📊 Train: {len(X_train)} sequences")
print(f"📊 Test:  {len(X_test)} sequences")


📊 Train: 686 sequences
📊 Test:  172 sequences


Check before converting to numpy array

In [None]:
from sklearn.model_selection import train_test_split

# 1. First split raw lists
X_train_raw, X_test_raw, y_train_raw, y_test_raw = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# 2. Function to filter and fix sequences
def filter_valid_sequences(X_list, y_list):
    filtered_X, filtered_y = [], []
    for seq, label in zip(X_list, y_list):
        if isinstance(seq, list) and len(seq) > 0:
            frame_shapes = [f.shape for f in seq]
            if all(s == frame_shapes[0] for s in frame_shapes):  # all same shape
                filtered_X.append(np.array(seq))  # convert list of frames to 3D array
                filtered_y.append(label)
    return filtered_X, filtered_y