In [None]:
from google.colab import drive

In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
DATASET_PATH = '/content/drive/MyDrive/MyDataset'


In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, LSTM, Dense, TimeDistributed, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split


In [None]:
import os

# Check if the dataset path exists
print(f"Dataset Path Exists: {os.path.exists(DATASET_PATH)}")

# Print folders inside the dataset path
if os.path.exists(DATASET_PATH):
    print("Folders inside dataset:", os.listdir(DATASET_PATH))


Dataset Path Exists: True
Folders inside dataset: ['normal', 'rash']


In [None]:
for category in ["normal", "rash"]:
    category_path = os.path.join(DATASET_PATH, category)
    print(f"Checking {category_path} ...")

    if os.path.exists(category_path):
        print(f"Videos inside '{category}':", os.listdir(category_path))


Checking /content/drive/MyDrive/MyDataset/normal ...
Videos inside 'normal': ['video_791', 'video_792', 'video_793', 'video_794', 'video_795', 'video_796', 'video_797', 'video_798', 'video_799', 'video_800', 'video_801', 'video_802', 'video_803', 'video_804', 'video_805', 'video_806', 'video_807', 'video_808', 'video_809', 'video_810', 'video_811', 'video_812', 'video_813', 'video_814', 'video_815', 'video_816', 'video_817', 'video_818', 'video_819', 'video_820', 'video_821', 'video_822', 'video_823', 'video_824', 'video_825', 'video_826', 'video_827', 'video_828', 'video_829', 'video_830', 'video_831', 'video_832', 'video_833', 'video_834', 'video_835', 'video_836', 'video_837', 'video_838', 'video_839', 'video_840', 'video_841', 'video_842', 'video_843', 'video_844', 'video_845', 'video_846', 'video_847', 'video_848', 'video_849', 'video_850', 'video_851', 'video_852', 'video_853', 'video_854', 'video_855', 'video_856', 'video_857', 'video_858', 'video_859', 'video_860', 'video_861',

In [None]:
import os
from PIL import Image

# Set the path to your dataset folder
DATASET_PATH = "/content/drive/MyDrive/MyDataset"

def is_valid_image(image_path):
    """Check if an image is valid and can be opened."""
    try:
        with Image.open(image_path) as img:
            img.verify()  # Verify image integrity
        return True
    except Exception:
        return False

def repair_image(image_path):
    """Try to repair an image by re-saving it."""
    try:
        with Image.open(image_path) as img:
            img = img.convert("RGB")  # Convert to a valid format
            img.save(image_path)  # Overwrite the existing file
        print(f"Repaired: {image_path}")
        return True
    except Exception:
        return False

def clean_dataset():
    """Scan and attempt to repair images in the dataset."""
    corrupted_count = 0

    for root, _, files in os.walk(DATASET_PATH):
        for file in files:
            file_path = os.path.join(root, file)

            if not is_valid_image(file_path):
                print(f"Corrupted: {file_path}")
                if repair_image(file_path):
                    continue  # Skip to the next file if repair was successful
                else:
                    corrupted_count += 1

    print(f"Total unrepaired corrupted images: {corrupted_count}")

# Run the cleaning process
clean_dataset()


Corrupted: /content/drive/MyDrive/MyDataset/normal/video_1175/35296.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_1175/35297.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_1175/35298.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_1175/35299.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_1175/35300.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_1175/35301.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_1175/35302.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_1175/35303.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_1175/35304.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_229/06887.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_229/06888.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_229/06889.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_229/06890.jpg
Corrupted: /content/drive/MyDrive/MyDataset/normal/video_229/06891.j

In [None]:
import os
import numpy as np
import cv2
from tensorflow.keras.utils import to_categorical

# Paths
DATASET_PATH = "/content/drive/MyDrive/MyDataset"
OUTPUT_PATH = "/content/drive/MyDrive/preprocessed"
os.makedirs(OUTPUT_PATH, exist_ok=True)

# Config
CATEGORIES = ["normal", "rash"]
IMG_SIZE = (128, 128)  # Resize images to this size
SEQUENCE_LENGTH = 15   # Ensure each video has exactly this many frames

def preprocess_and_save():
    """Preprocess videos and save as .npy files."""
    for label, category in enumerate(CATEGORIES):
        category_path = os.path.join(DATASET_PATH, category)
        save_path = os.path.join(OUTPUT_PATH, category)
        os.makedirs(save_path, exist_ok=True)

        if not os.path.exists(category_path):
            print(f"❌ Error: {category_path} does not exist!")
            continue

        print(f"🔄 Processing category: {category}")

        for video_folder in os.listdir(category_path):
            video_path = os.path.join(category_path, video_folder)
            save_file = os.path.join(save_path, f"{video_folder}.npy")

            if not os.path.isdir(video_path):
                print(f"⚠️ Skipping {video_folder}, not a folder.")
                continue

            print(f"📂 Processing video: {video_folder}")
            frames = []

            # Load frames
            for img_name in sorted(os.listdir(video_path)):
                img_path = os.path.join(video_path, img_name)
                if not img_name.lower().endswith((".jpg", ".png")):
                    print(f"➡️ Skipping {img_name}, not an image file.")
                    continue

                img = cv2.imread(img_path)
                img = cv2.resize(img, IMG_SIZE) / 255.0  # Normalize
                frames.append(img)

            # Ensure uniform length
            if len(frames) < SEQUENCE_LENGTH:
                print(f"🛠️ Padding {video_folder} with black frames.")
                while len(frames) < SEQUENCE_LENGTH:
                    frames.append(np.zeros((IMG_SIZE[0], IMG_SIZE[1], 3)))  # Black frame

            frames = frames[:SEQUENCE_LENGTH]  # Trim if longer
            np.save(save_file, np.array(frames, dtype=np.float32))  # Save to disk

    print("✅ Preprocessing complete!")

# Run preprocessing
preprocess_and_save()


🔄 Processing category: normal
📂 Processing video: video_791
🛠️ Padding video_791 with black frames.
📂 Processing video: video_792
📂 Processing video: video_793
📂 Processing video: video_794
📂 Processing video: video_795
📂 Processing video: video_796
📂 Processing video: video_797
📂 Processing video: video_798
🛠️ Padding video_798 with black frames.
📂 Processing video: video_799
🛠️ Padding video_799 with black frames.
📂 Processing video: video_800
🛠️ Padding video_800 with black frames.
📂 Processing video: video_801
📂 Processing video: video_802
📂 Processing video: video_803
📂 Processing video: video_804
📂 Processing video: video_805
📂 Processing video: video_806
📂 Processing video: video_807
📂 Processing video: video_808
📂 Processing video: video_809
📂 Processing video: video_810
📂 Processing video: video_811
📂 Processing video: video_812
📂 Processing video: video_813
📂 Processing video: video_814
📂 Processing video: video_815
📂 Processing video: video_816
📂 Processing video: video_817


In [None]:
import numpy as np
import glob
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

# 📌 Define dataset path
DATA_PATH = "/content/drive/MyDrive/preprocessed"
CATEGORIES = ["normal", "rash"]

# 📌 Load file paths only (not the actual videos yet)
video_paths = []
labels = []

for category in CATEGORIES:
    video_files = glob.glob(f"{DATA_PATH}/{category}/*.npy")  # Get file paths
    label = 0 if category == "normal" else 1  # Assign labels

    for video in video_files:
        video_paths.append(video)
        labels.append(label)

# 📌 Convert labels to One-Hot Encoding
labels = to_categorical(labels, num_classes=2)

# 📌 Split file paths into train & test
train_videos, test_videos, y_train, y_test = train_test_split(video_paths, labels, test_size=0.2, random_state=42)

# 📌 Define a data generator to load files in batches
def data_generator(video_paths, labels, batch_size=16):
    while True:
        for i in range(0, len(video_paths), batch_size):
            batch_videos = video_paths[i:i + batch_size]
            batch_labels = labels[i:i + batch_size]

            # Load only the necessary videos
            X_batch = np.array([np.load(video) for video in batch_videos])
            y_batch = np.array(batch_labels)

            yield X_batch, y_batch

# ✅ Instead of loading all data, use generators
train_generator = data_generator(train_videos, y_train, batch_size=10)
test_generator = data_generator(test_videos, y_test, batch_size=10)


In [1]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, LSTM, Dense, Dropout, TimeDistributed, GlobalAveragePooling2D, Input

# Define parameters
SEQUENCE_LENGTH = 15  # Number of frames per video
IMG_HEIGHT, IMG_WIDTH = 128, 128  # Using your existing frame size
CHANNELS = 3  # RGB images

# Define the model
model = Sequential([
    Input(shape=(SEQUENCE_LENGTH, IMG_HEIGHT, IMG_WIDTH, CHANNELS)),

    # CNN Feature Extraction
    TimeDistributed(Conv2D(32, (3, 3), activation='relu', padding='same')),
    TimeDistributed(MaxPooling2D((2, 2))),

    TimeDistributed(Conv2D(64, (3, 3), activation='relu', padding='same')),
    TimeDistributed(MaxPooling2D((2, 2))),

    TimeDistributed(Conv2D(128, (3, 3), activation='relu', padding='same')),
    TimeDistributed(MaxPooling2D((2, 2))),

    TimeDistributed(Conv2D(256, (3, 3), activation='relu', padding='same')),
    TimeDistributed(MaxPooling2D((2, 2))),

    TimeDistributed(Conv2D(512, (3, 3), activation='relu', padding='same')),
    TimeDistributed(MaxPooling2D((2, 2))),

    TimeDistributed(GlobalAveragePooling2D()),

    LSTM(128, return_sequences=True),
    LSTM(64),

    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(64, activation='relu'),
    Dropout(0.3),

    Dense(2, activation='softmax')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Print model summary
model.summary()

In [None]:
history=model.fit(train_generator, epochs=10, steps_per_epoch=len(train_videos)//64,
                  validation_data=test_generator, validation_steps=len(test_videos)//64)

Epoch 1/10
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m707s[0m 19s/step - accuracy: 0.6876 - loss: 0.6093 - val_accuracy: 0.6778 - val_loss: 0.6427
Epoch 2/10
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m743s[0m 20s/step - accuracy: 0.7837 - loss: 0.5417 - val_accuracy: 0.7889 - val_loss: 0.5422
Epoch 3/10
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m744s[0m 20s/step - accuracy: 0.7588 - loss: 0.5644 - val_accuracy: 0.7556 - val_loss: 0.6196
Epoch 4/10
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m738s[0m 20s/step - accuracy: 0.7685 - loss: 0.5739 - val_accuracy: 0.7778 - val_loss: 0.5147
Epoch 5/10
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m706s[0m 19s/step - accuracy: 0.7486 - loss: 0.5583 - val_accuracy: 0.7333 - val_loss: 0.5783
Epoch 6/10
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m744s[0m 20s/step - accuracy: 0.6997 - loss: 0.6096 - val_accuracy: 0.6889 - val_loss: 0.6061
Epoch 7/10
[1m37/37[0m [3

In [None]:
model.save('/content/drive/MyDrive/rash_driving_model.keras')

In [2]:
from tensorflow.keras.models import load_model
model = load_model('/content/drive/MyDrive/rash_driving_model.keras')

In [None]:
import math
steps = math.ceil(len(test_videos) / 10)
model.evaluate(test_generator,steps=steps)


[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 97ms/step - accuracy: 0.7462 - loss: 0.5572


[0.5761864185333252, 0.7413508892059326]

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

# Adjust input shape based on your actual .npy file shape
INPUT_SHAPE = (15, 128, 128, 3)  # (frames, height, width, channels)

model2 = models.Sequential([
    layers.Input(shape=INPUT_SHAPE),

    layers.Conv3D(32, (3, 3, 3), activation='relu', padding='same'),
    layers.MaxPooling3D((1, 2, 2)),

    layers.Conv3D(64, (3, 3, 3), activation='relu', padding='same'),
    layers.MaxPooling3D((2, 2, 2)),

    layers.Conv3D(128, (3, 3, 3), activation='relu', padding='same'),
    layers.MaxPooling3D((2, 2, 2)),

    layers.Conv3D(256, (3, 3, 3), activation='relu', padding='same'),
    layers.MaxPooling3D((2, 2, 2)),

    # layers.Conv3D(512, (3, 3, 3), activation='relu', padding='same'),
    # layers.MaxPooling3D((2, 2, 2)),

    layers.Flatten(),
    layers.Dense(256, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(2, activation='softmax')  # You used one-hot labels
])


In [None]:
model2.compile(
    optimizer='adam',
    loss='categorical_crossentropy',  # since you're using to_categorical
    metrics=['accuracy']
)


In [None]:
from tensorflow.keras.callbacks import EarlyStopping
early_stopping = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)
h=model2.fit(
    train_generator,
    steps_per_epoch=len(train_videos) // 64,
    validation_data=test_generator,
    validation_steps=len(test_videos) // 10,
    epochs=100,
    callbacks=[early_stopping]
)


Epoch 1/100
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 433ms/step - accuracy: 0.8460 - loss: 0.3506 - val_accuracy: 0.7923 - val_loss: 0.5176
Epoch 2/100
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 534ms/step - accuracy: 0.7998 - loss: 0.4436 - val_accuracy: 0.7789 - val_loss: 0.5432
Epoch 3/100
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 451ms/step - accuracy: 0.8376 - loss: 0.3615 - val_accuracy: 0.7621 - val_loss: 0.5389
Epoch 4/100
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 485ms/step - accuracy: 0.8421 - loss: 0.3638 - val_accuracy: 0.7973 - val_loss: 0.4948
Epoch 5/100
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 451ms/step - accuracy: 0.8095 - loss: 0.4319 - val_accuracy: 0.7588 - val_loss: 0.5888
Epoch 6/100
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 397ms/step - accuracy: 0.7964 - loss: 0.4979 - val_accuracy: 0.7538 - val_loss: 0.5640
Epoch 7/100
[1m

In [None]:
model2.save('/content/drive/MyDrive/rash.keras')

In [4]:
from tensorflow.keras.models import load_model
model2 = load_model('/content/drive/MyDrive/rash.keras')

In [None]:
import math
steps = math.ceil(len(test_videos) / 10)
model2.evaluate(test_generator,steps=steps)

[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 136ms/step - accuracy: 0.8115 - loss: 0.4598


[0.47264060378074646, 0.8006590008735657]

In [None]:
import cv2
import numpy as np

def extract_frames_from_video(video_path, img_size=(128, 128), max_frames=15):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while len(frames) < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, img_size)
        frames.append(frame)
    cap.release()

    # Pad if too short
    while len(frames) < max_frames:
        frames.append(np.zeros((img_size[0], img_size[1], 3), dtype=np.uint8))

    return np.array(frames)
def predict_video_class(video_path):
    frames = extract_frames_from_video(video_path)  # shape: (30, 64, 64, 3)
    input_data = np.expand_dims(frames, axis=0)  # shape: (1, 30, 64, 64, 3)

    prediction = model2.predict(input_data)
    label = "Rash Driving" if prediction[0][0] > 0.5 else "Normal Driving"
    print(f"Prediction: {label} (Confidence: {prediction[0][0]:.2f})")
predict_video_class('/content/drive/MyDrive/truePositive.mp4')

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
Prediction: Normal Driving (Confidence: 0.01)


In [None]:
import cv2
import numpy as np

def extract_frames_from_video(video_path, img_size=(128, 128), max_frames=15):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while len(frames) < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, img_size)
        frames.append(frame)
    cap.release()

    # Pad if too short
    while len(frames) < max_frames:
        frames.append(np.zeros((img_size[0], img_size[1], 3), dtype=np.uint8))

    return np.array(frames)
def predict_video_class(video_path):
    frames = extract_frames_from_video(video_path)  # shape: (30, 64, 64, 3)
    input_data = np.expand_dims(frames, axis=0)  # shape: (1, 30, 64, 64, 3)

    prediction = model2.predict(input_data)
    label = "Rash Driving" if prediction[0][0] > 0.5 else "Normal Driving"
    print(f"Prediction: {label} (Confidence: {prediction[0][0]:.2f})")
predict_video_class('/content/drive/MyDrive/trueNegative.mp4')

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 44ms/step
Prediction: Rash Driving (Confidence: 0.90)


In [5]:
import cv2
import numpy as np

def extract_frames_from_video(video_path, img_size=(128, 128), max_frames=15):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while len(frames) < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, img_size)
        frames.append(frame)
    cap.release()

    # Pad if too short
    while len(frames) < max_frames:
        frames.append(np.zeros((img_size[0], img_size[1], 3), dtype=np.uint8))

    return np.array(frames)
def predict_video_class(video_path):
    frames = extract_frames_from_video(video_path)  # shape: (30, 64, 64, 3)
    input_data = np.expand_dims(frames, axis=0)  # shape: (1, 30, 64, 64, 3)

    prediction = model2.predict(input_data)
    label = "Rash Driving" if prediction[0][0] > 0.5 else "Normal Driving"
    print(f"Prediction: {label} (Confidence: {prediction[0][0]:.2f})")
predict_video_class('/content/drive/MyDrive/falseNegative.mp4')

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step
Prediction: Normal Driving (Confidence: 0.00)


In [None]:
import cv2
import os

# Path to your video file
video_path = "/content/drive/MyDrive/falsepositive.mp4"  # Replace with your video file path
output_folder = "/content/drive/MyDrive/out1"   # Folder to save extracted frames

# Create the output folder if it doesn't exist
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# Open the video file
cap = cv2.VideoCapture(video_path)

# Check if video was opened successfully
if not cap.isOpened():
    print("Error: Couldn't open video.")
    exit()

frame_count = 0

while True:
    ret, frame = cap.read()

    if not ret:
        break  # End of video

    # Save the current frame as an image
    frame_filename = os.path.join(output_folder, f"frame_{frame_count:04d}.jpg")
    cv2.imwrite(frame_filename, frame)

    # Increment frame count
    frame_count += 1

# Release the video capture object
cap.release()

print(f"Extracted {frame_count} frames from the video.")


Extracted 169 frames from the video.


In [None]:
import cv2
import os

# Path to your video file
video_path = "/content/drive/MyDrive/falseNegative.mp4"  # Replace with your video file path
output_folder = "/content/drive/MyDrive/out2"   # Folder to save extracted frames

# Create the output folder if it doesn't exist
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# Open the video file
cap = cv2.VideoCapture(video_path)

# Check if video was opened successfully
if not cap.isOpened():
    print("Error: Couldn't open video.")
    exit()

frame_count = 0

while True:
    ret, frame = cap.read()

    if not ret:
        break  # End of video

    # Save the current frame as an image
    frame_filename = os.path.join(output_folder, f"frame_{frame_count:04d}.jpg")
    cv2.imwrite(frame_filename, frame)

    # Increment frame count
    frame_count += 1

# Release the video capture object
cap.release()

print(f"Extracted {frame_count} frames from the video.")

Extracted 77 frames from the video.
