In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
zip_base="/content/drive/MyDrive/CarCrash/CarCrash/videos"
os.listdir(zip_base)

['Crash-1500.txt',
 'Crash-1500.zip',
 'Normal.zip',
 'ytb_list.txt',
 'YouTube_download.py']

In [3]:
import zipfile
import os

zip_base = "/content/drive/MyDrive/CarCrash/CarCrash/videos"
extract_base = "/content/CarCrash_Videos"

os.makedirs(extract_base, exist_ok=True)

with zipfile.ZipFile(f"{zip_base}/Normal.zip", "r") as zip_ref:
    zip_ref.extractall(f"{extract_base}/normal")

with zipfile.ZipFile(f"{zip_base}/Crash-1500.zip", "r") as zip_ref:
    zip_ref.extractall(f"{extract_base}/Crash")


In [4]:
os.listdir("/content/CarCrash_Videos")

['Crash', 'normal']

In [5]:
BASE_PATH = "/content/CarCrash_Videos"

In [6]:
import os
import cv2
import numpy as np
import tensorflow as tf

In [7]:
IMG_SIZE = 112
NUM_FRAMES = 16

In [8]:
def load_video(video_path):
    # Convert TensorFlow string tensor to Python string
    video_path = video_path.decode("utf-8")

    cap = cv2.VideoCapture(video_path)

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    total_frames = max(total_frames, 1)

    frame_idxs = np.linspace(0, total_frames - 1, NUM_FRAMES, dtype=int)

    grabbed_frames = {}
    idx = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if idx in frame_idxs:
            frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE))
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = frame / 255.0
            grabbed_frames[idx] = frame

        idx += 1

    cap.release()

    frames = [grabbed_frames[i] for i in frame_idxs if i in grabbed_frames]

    # Handle short / corrupted videos safely
    if len(frames) < NUM_FRAMES:
        return np.zeros(
            (NUM_FRAMES, IMG_SIZE, IMG_SIZE, 3),
            dtype=np.float32
        )

    return np.array(frames, dtype=np.float32)


In [9]:

video_paths = []
labels = []

for cls, label in [("Crash", 1), ("normal", 0)]:
    class_dir = os.path.join(BASE_PATH, cls)
    for file in os.listdir(class_dir):
        if file.endswith(".mp4"):
            video_paths.append(os.path.join(class_dir, file))
            labels.append(label)

print("Total videos found:", len(video_paths))


Total videos found: 4500


In [10]:
def tf_video_loader(path, label):
    video = tf.numpy_function(
        load_video,
        [path],
        tf.float32
    )

    # Explicit shape is REQUIRED for TensorFlow
    video.set_shape((NUM_FRAMES, IMG_SIZE, IMG_SIZE, 3))

    return video, label


In [11]:
dataset = tf.data.Dataset.from_tensor_slices((video_paths, labels))

dataset = dataset.map(
    tf_video_loader,
    num_parallel_calls=tf.data.AUTOTUNE
)

dataset = dataset.shuffle(100)
dataset = dataset.batch(1)
dataset = dataset.prefetch(tf.data.AUTOTUNE)


In [12]:
for x, y in dataset.take(1):
    print("Video batch shape:", x.shape)
    print("Label batch shape:", y.shape)


Video batch shape: (1, 16, 112, 112, 3)
Label batch shape: (1,)


In [13]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split


In [14]:
video_paths_np = np.array(video_paths)
labels_np = np.array(labels)

# 70% train, 15% validation, 15% test
X_train, X_temp, y_train, y_temp = train_test_split(
    video_paths_np,
    labels_np,
    test_size=0.3,
    random_state=42,
    stratify=labels_np
)

X_val, X_test, y_val, y_test = train_test_split(
    X_temp,
    y_temp,
    test_size=0.5,
    random_state=42,
    stratify=y_temp
)

print("Train:", len(X_train))
print("Validation:", len(X_val))
print("Test:", len(X_test))


Train: 3150
Validation: 675
Test: 675


In [15]:
def make_dataset(video_paths, labels, batch_size=4, shuffle=True):
    ds = tf.data.Dataset.from_tensor_slices((video_paths, labels))

    if shuffle:
        ds = ds.shuffle(buffer_size=len(video_paths))

    ds = ds.map(
        tf_video_loader,
        num_parallel_calls=tf.data.AUTOTUNE
    )

    ds = ds.batch(batch_size)
    ds = ds.prefetch(tf.data.AUTOTUNE)

    return ds


In [16]:
BATCH_SIZE = 1

train_ds = make_dataset(X_train, y_train, BATCH_SIZE, shuffle=True)
val_ds   = make_dataset(X_val, y_val, BATCH_SIZE, shuffle=False)
test_ds  = make_dataset(X_test, y_test, BATCH_SIZE, shuffle=False)


In [17]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    TimeDistributed, Conv2D, MaxPooling2D,
    Flatten, LSTM, Dense, Dropout
)

model = Sequential([
    TimeDistributed(
        Conv2D(16, (3,3), activation="relu"),
        input_shape=(NUM_FRAMES, IMG_SIZE, IMG_SIZE, 3)
    ),
    TimeDistributed(MaxPooling2D((2,2))),

    TimeDistributed(
        Conv2D(32, (3,3), activation="relu")
    ),
    TimeDistributed(MaxPooling2D((2,2))),

    TimeDistributed(Flatten()),

    LSTM(128),
    Dropout(0.5),

    Dense(1, activation="sigmoid")
])


  super().__init__(**kwargs)


In [18]:
model.compile(
    optimizer="adam",
    loss="binary_crossentropy",
    metrics=["accuracy"]
)

model.summary()


In [19]:
EPOCHS = 10

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS
)


Epoch 1/10
[1m3150/3150[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m712s[0m 223ms/step - accuracy: 0.6371 - loss: 0.6884 - val_accuracy: 0.6667 - val_loss: 0.6364
Epoch 2/10
[1m3150/3150[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m751s[0m 227ms/step - accuracy: 0.6716 - loss: 0.6443 - val_accuracy: 0.6667 - val_loss: 0.6413
Epoch 3/10
[1m3150/3150[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m694s[0m 220ms/step - accuracy: 0.6727 - loss: 0.6453 - val_accuracy: 0.6667 - val_loss: 0.6365
Epoch 4/10
[1m3150/3150[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m777s[0m 231ms/step - accuracy: 0.6584 - loss: 0.6554 - val_accuracy: 0.6667 - val_loss: 0.6375
Epoch 5/10
[1m3150/3150[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m740s[0m 231ms/step - accuracy: 0.6656 - loss: 0.6456 - val_accuracy: 0.6667 - val_loss: 0.6380
Epoch 6/10
[1m3150/3150[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m723s[0m 229ms/step - accuracy: 0.6802 - loss: 0.6303 - val_accuracy: 0.6667 - val_loss:

In [20]:
test_loss, test_acc = model.evaluate(test_ds)
print("Test accuracy:", test_acc)

[1m675/675[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m127s[0m 187ms/step - accuracy: 0.6934 - loss: 0.6286
Test accuracy: 0.6666666865348816
