# Driver Drowsiness Pipeline — Updated Notebook (with Streamlit)

In [None]:
!pip install --upgrade pip
!pip install opencv-python-headless==4.7.0.72 numpy tqdm imutils tensorflow keras kagglehub scipy
!pip install dlib || echo "dlib pip install failed — use conda install -c conda-forge dlib"

Download `shape_predictor_68_face_landmarks.dat` from the dlib model zoo and place it in the working directory.

In [None]:
# Dataset download (kagglehub or kaggle CLI)
import os
OUT_DIR = 'kaggle_data/yawn_eye'
os.makedirs(OUT_DIR, exist_ok=True)
try:
    import kagglehub
    kagglehub.login()
    handle = 'serenaraju/yawn-eye-dataset-new'
    kagglehub.dataset_download(handle, path=OUT_DIR, unzip=True, force_download=False)
    print("Downloaded with kagglehub")
except Exception as e:
    print("kagglehub failed:", e)
    try:
        get_ipython().system('kaggle datasets download -d serenaraju/yawn-eye-dataset-new -p kaggle_data/yawn_eye --unzip')
        print("Downloaded with Kaggle CLI")
    except Exception as e2:
        print("Kaggle CLI failed. Please download manually and place files under", OUT_DIR)


In [None]:
# Split dataset into train/val
import os, shutil, random
src = 'kaggle_data/yawn_eye'
dst_base = 'data'
train_open = os.path.join(dst_base,'train','open')
train_closed = os.path.join(dst_base,'train','closed')
val_open = os.path.join(dst_base,'val','open')
val_closed = os.path.join(dst_base,'val','closed')
for p in [train_open, train_closed, val_open, val_closed]:
    os.makedirs(p, exist_ok=True)

image_files = []
for root_dir, dirs, files in os.walk(src):
    for f in files:
        if f.lower().endswith(('.jpg','.jpeg','.png')):
            image_files.append(os.path.join(root_dir,f))

random.shuffle(image_files)
nval = max(1, int(0.2 * len(image_files)))

def label_from_name(fname):
    n = fname.lower()
    if 'open' in n: return 'open'
    if 'closed' in n or 'close' in n: return 'closed'
    d = os.path.basename(os.path.dirname(fname)).lower()
    if 'open' in d: return 'open'
    if 'closed' in d or 'close' in d: return 'closed'
    return 'open'

for i, fpath in enumerate(image_files):
    lab = label_from_name(fpath)
    if i < nval:
        dst = os.path.join(dst_base,'val',lab, os.path.basename(fpath))
    else:
        dst = os.path.join(dst_base,'train',lab, os.path.basename(fpath))
    shutil.copy(fpath, dst)

print('Copied. Train/Val created under data/')

In [None]:
# Train small CNN
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import os

img_size = (64,64)
batch = 32
train_dir = 'data/train'
val_dir = 'data/val'

if not os.path.exists(train_dir) or not os.path.exists(val_dir):
    print('Train/Val directories not found under data/.')
else:
    train_gen = ImageDataGenerator(rescale=1./255, rotation_range=10, zoom_range=0.1,
                                   horizontal_flip=True).flow_from_directory(train_dir, target_size=img_size, batch_size=batch, class_mode='binary')
    val_gen = ImageDataGenerator(rescale=1./255).flow_from_directory(val_dir, target_size=img_size, batch_size=batch, class_mode='binary')

    model = models.Sequential([
        layers.Conv2D(32,(3,3),activation='relu',input_shape=(img_size[0],img_size[1],3)),
        layers.MaxPooling2D(),
        layers.Conv2D(64,(3,3),activation='relu'),
        layers.MaxPooling2D(),
        layers.Conv2D(128,(3,3),activation='relu'),
        layers.MaxPooling2D(),
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(1, activation='sigmoid')
    ])

    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    history = model.fit(train_gen, validation_data=val_gen, epochs=10)
    model.save('eye_model.h5')
    print('Saved model to eye_model.h5')

In [None]:
# EAR demo (requires dlib predictor)
import cv2, dlib, numpy as np
from scipy.spatial import distance
from imutils import face_utils
import os
PRED_PATH = 'shape_predictor_68_face_landmarks.dat'
if not os.path.exists(PRED_PATH):
    print('Missing dlib predictor:', PRED_PATH)
else:
    EAR_THRESH = 0.25
    EAR_CONSEC_FRAMES = 20
    detector = dlib.get_frontal_face_detector()
    predictor = dlib.shape_predictor(PRED_PATH)
    (lStart, lEnd) = face_utils.FACIAL_LANDMARKS_IDXS['left_eye']
    (rStart, rEnd) = face_utils.FACIAL_LANDMARKS_IDXS['right_eye']

    def eye_aspect_ratio(eye):
        A = distance.euclidean(eye[1], eye[5])
        B = distance.euclidean(eye[2], eye[4])
        C = distance.euclidean(eye[0], eye[3])
        return (A + B) / (2.0 * C)

    cap = cv2.VideoCapture(0)
    counter = 0
    print('Starting EAR demo — press q to quit')
    while True:
        ret, frame = cap.read()
        if not ret: break
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        rects = detector(gray, 0)
        for rect in rects:
            shape = face_utils.shape_to_np(predictor(gray, rect))
            leftEye = shape[lStart:lEnd]
            rightEye = shape[rStart:rEnd]
            leftEAR = eye_aspect_ratio(leftEye)
            rightEAR = eye_aspect_ratio(rightEye)
            ear = (leftEAR + rightEAR) / 2.0
            leftHull = cv2.convexHull(leftEye)
            rightHull = cv2.convexHull(rightEye)
            cv2.drawContours(frame, [leftHull], -1, (0,255,0), 1)
            cv2.drawContours(frame, [rightHull], -1, (0,255,0), 1)
            if ear < EAR_THRESH:
                counter += 1
                if counter >= EAR_CONSEC_FRAMES:
                    cv2.putText(frame, 'DROWSINESS ALERT!', (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2)
            else:
                counter = 0
        cv2.imshow('EAR Demo', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

In [None]:
# CNN realtime demo (requires eye_model.h5 and dlib predictor)
import cv2, dlib, numpy as np
from imutils import face_utils
import os
from tensorflow.keras.models import load_model

MODEL_PATH = 'eye_model.h5'
PRED_PATH = 'shape_predictor_68_face_landmarks.dat'

if not os.path.exists(MODEL_PATH):
    print('Trained model not found:', MODEL_PATH)
elif not os.path.exists(PRED_PATH):
    print('dlib predictor not found:', PRED_PATH)
else:
    model = load_model(MODEL_PATH)
    detector = dlib.get_frontal_face_detector()
    predictor = dlib.shape_predictor(PRED_PATH)
    (lStart, lEnd) = face_utils.FACIAL_LANDMARKS_IDXS['left_eye']
    (rStart, rEnd) = face_utils.FACIAL_LANDMARKS_IDXS['right_eye']

    def crop_eye(frame, eye_pts):
        x1 = np.min(eye_pts[:,0]) - 5
        y1 = np.min(eye_pts[:,1]) - 5
        x2 = np.max(eye_pts[:,0]) + 5
        y2 = np.max(eye_pts[:,1]) + 5
        x1, y1 = max(0,x1), max(0,y1)
        crop = frame[y1:y2, x1:x2]
        try:
            crop = cv2.resize(crop, (64,64))
        except:
            return None
        return crop

    cap = cv2.VideoCapture(0)
    closed_count = 0
    THRESH_PROB = 0.6
    CONSEC_FRAMES = 20
    print('Starting CNN demo — press q to quit.')
    while True:
        ret, frame = cap.read()
        if not ret: break
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        rects = detector(gray, 0)
        for rect in rects:
            shape = face_utils.shape_to_np(predictor(gray, rect))
            leftEye = shape[lStart:lEnd]
            rightEye = shape[rStart:rEnd]
            le = crop_eye(frame, leftEye)
            re = crop_eye(frame, rightEye)
            if le is None or re is None:
                continue
            p_left = model.predict(np.expand_dims(le.astype('float32')/255.0,0))[0][0]
            p_right = model.predict(np.expand_dims(re.astype('float32')/255.0,0))[0][0]
            closed_prob = 1 - ((p_left + p_right) / 2.0)
            if closed_prob > THRESH_PROB:
                closed_count += 1
                if closed_count >= CONSEC_FRAMES:
                    cv2.putText(frame, 'DROWSINESS ALERT!', (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2)
            else:
                closed_count = 0
        cv2.imshow('CNN Demo', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

In [None]:
# The next cell will write streamlit_app.py
import json


In [None]:
import json
app_code = json.loads("\"import streamlit as st\\nimport cv2\\nimport numpy as np\\nimport time\\nimport threading\\nimport os\\nfrom tensorflow.keras.models import load_model\\nfrom imutils import face_utils\\nimport dlib\\nfrom scipy.spatial import distance\\nfrom collections import deque\\n\\nst.set_page_config(page_title='Drowsiness Demo \\u2014 Dual Models', layout='wide')\\nst.title('Driver Drowsiness \\u2014 Streamlit Frontend (Two Models & Live Graphs)')\\n\\nst.sidebar.header('Settings')\\nmode = st.sidebar.selectbox('Mode', ['Compare Models (CNNs)', 'EAR (fast) + CNN'])\\nuse_camera = st.sidebar.checkbox('Use webcam (cv2.VideoCapture)', value=True)\\nvideo_file = st.sidebar.file_uploader('Or upload video file (.mp4)', type=['mp4','avi','mov'])\\n\\npredictor_file = st.sidebar.file_uploader('Upload dlib shape_predictor_68_face_landmarks.dat', type=['dat'])\\nmodelA_file = st.sidebar.file_uploader('Upload model A (eye_model_A.h5)', type=['h5'])\\nmodelB_file = st.sidebar.file_uploader('Upload model B (eye_model_B.h5)', type=['h5'])\\n\\nstart_button = st.sidebar.button('Start')\\nstop_button = st.sidebar.button('Stop')\\n\\nplaceholder = st.empty()\\ncol1, col2 = st.columns([2,1])\\n\\n# session state\\nif 'running' not in st.session_state:\\n    st.session_state.running = False\\nif 'cap' not in st.session_state:\\n    st.session_state.cap = None\\nif 'modelA' not in st.session_state:\\n    st.session_state.modelA = None\\nif 'modelB' not in st.session_state:\\n    st.session_state.modelB = None\\nif 'predictor_path' not in st.session_state:\\n    st.session_state.predictor_path = None\\nif 'histA' not in st.session_state:\\n    st.session_state.histA = deque(maxlen=200)\\nif 'histB' not in st.session_state:\\n    st.session_state.histB = deque(maxlen=200)\\nif 'histEAR' not in st.session_state:\\n    st.session_state.histEAR = deque(maxlen=200)\\n\\n# Save uploaded files\\nos.makedirs('uploads', exist_ok=True)\\nif predictor_file is not None:\\n    ppath = os.path.join('uploads', predictor_file.name)\\n    with open(ppath, 'wb') as f:\\n        f.write(predictor_file.getbuffer())\\n    st.session_state.predictor_path = ppath\\n\\nif modelA_file is not None:\\n    mpath = os.path.join('uploads', modelA_file.name)\\n    with open(mpath, 'wb') as f:\\n        f.write(modelA_file.getbuffer())\\n    st.session_state.modelA = load_model(mpath)\\n\\nif modelB_file is not None:\\n    mpath = os.path.join('uploads', modelB_file.name)\\n    with open(mpath, 'wb') as f:\\n        f.write(modelB_file.getbuffer())\\n    st.session_state.modelB = load_model(mpath)\\n\\n# helpers\\ndef eye_aspect_ratio(eye):\\n    A = distance.euclidean(eye[1], eye[5])\\n    B = distance.euclidean(eye[2], eye[4])\\n    C = distance.euclidean(eye[0], eye[3])\\n    return (A + B) / (2.0 * C)\\n\\ndef crop_eye(frame, eye_pts):\\n    x1 = np.min(eye_pts[:,0]) - 5\\n    y1 = np.min(eye_pts[:,1]) - 5\\n    x2 = np.max(eye_pts[:,0]) + 5\\n    y2 = np.max(eye_pts[:,1]) + 5\\n    x1, y1 = max(0,int(x1)), max(0,int(y1))\\n    x2, y2 = int(x2), int(y2)\\n    crop = frame[y1:y2, x1:x2]\\n    try:\\n        crop = cv2.resize(crop, (64,64))\\n    except:\\n        return None\\n    return crop\\n\\n# processing thread\\ndef run_pipeline(cap, mode, modelA, modelB, predictor_path):\\n    if predictor_path is None:\\n        st.error('Dlib predictor required. Upload it in the sidebar.')\\n    \"")
with open('streamlit_app.py','w',encoding='utf-8') as f:
    f.write(app_code)
print('Wrote streamlit_app.py')


## Two-model demo: LogisticRegression vs RandomForestClassifier

This cell runs a small demonstration using a synthetic classification dataset. It trains a Logistic Regression and a Random Forest, then plots ROC and Precision-Recall curves for both models so you can visually compare them.

In [None]:
# Two-model training + plots
try:
    import numpy as np
    import matplotlib.pyplot as plt
    from sklearn.datasets import make_classification
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import LogisticRegression
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score, classification_report, confusion_matrix
except Exception as e:
    print("Missing required packages. You can install them with: pip install scikit-learn matplotlib numpy")
    raise

# Create synthetic dataset
X, y = make_classification(n_samples=2000, n_features=20, n_informative=10, n_redundant=2,
                           n_clusters_per_class=2, weights=[0.7, 0.3], flip_y=0.01, random_state=42)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, stratify=y, random_state=42)

scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_test_s = scaler.transform(X_test)

models = {
    'LogisticRegression': LogisticRegression(max_iter=1000, random_state=42),
    'RandomForest': RandomForestClassifier(n_estimators=200, random_state=42)
}

results = {}
for name, model in models.items():
    model.fit(X_train_s, y_train) if name == 'LogisticRegression' else model.fit(X_train, y_train)
    # For RandomForest we'll use raw features (it handles them), for LR use scaled features
    X_eval = X_test_s if name == 'LogisticRegression' else X_test
    if hasattr(model, 'predict_proba'):
        probs = model.predict_proba(X_eval)[:, 1]
    else:
        # fallback for models without predict_proba
        probs = model.decision_function(X_eval)
    preds = (probs >= 0.5).astype(int)
    fpr, tpr, _ = roc_curve(y_test, probs)
    roc_auc = auc(fpr, tpr)
    precision, recall, _ = precision_recall_curve(y_test, probs)
    ap = average_precision_score(y_test, probs)

    results[name] = {
        'model': model,
        'probs': probs,
        'preds': preds,
        'fpr': fpr,
        'tpr': tpr,
        'roc_auc': roc_auc,
        'precision': precision,
        'recall': recall,
        'ap': ap,
        'report': classification_report(y_test, preds, digits=4),
        'confusion': confusion_matrix(y_test, preds)
    }

# Plot ROC and Precision-Recall curves
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
for name, res in results.items():
    plt.plot(res['fpr'], res['tpr'], label=f"{name} (AUC={res['roc_auc']:.3f})")
plt.plot([0, 1], [0, 1], 'k--', alpha=0.4)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc='lower right')

plt.subplot(1, 2, 2)
for name, res in results.items():
    plt.plot(res['recall'], res['precision'], label=f"{name} (AP={res['ap']:.3f})")
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower left')
plt.tight_layout()

# Save figure for convenience
out_path = 'two_model_comparison_roc_pr.png'
plt.savefig(out_path, dpi=150)
plt.show()

# Print textual results
for name, res in results.items():
    print(f"--- {name} ---")
    print("Confusion matrix:\n", res['confusion'])
    print("Classification report:\n", res['report'])
    print()

print(f"Saved ROC/PR plot to {out_path}")