<a href="https://colab.research.google.com/github/Atharva647/DAta/blob/main/Proj.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!git clone https://github.com/Atharva647/DAta.git


Cloning into 'DAta'...
remote: Enumerating objects: 50, done.[K
remote: Total 50 (delta 0), reused 0 (delta 0), pack-reused 50 (from 1)[K
Receiving objects: 100% (50/50), 107.82 MiB | 27.85 MiB/s, done.


In [2]:
import cv2
import os

def extract_frames(video_path, output_dir, frame_rate=5):
    """
    Extract frames from a video and save them as images.

    Args:
        video_path (str): Path to the video file.
        output_dir (str): Directory to save the frames.
        frame_rate (int): Save one frame every `frame_rate` frames.
    """
    os.makedirs(output_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)

    frame_count = 0
    success = True
    while success:
        success, frame = cap.read()
        if success and frame_count % frame_rate == 0:
            frame_name = f"frame{frame_count:04d}.jpg"
            frame_path = os.path.join(output_dir, frame_name)
            cv2.imwrite(frame_path, frame)
        frame_count += 1

    cap.release()
    print(f"Frames saved to {output_dir}")

def process_videos(input_dir, output_dir, frame_rate=1):
    """
    Process all videos in a directory and extract frames.

    Args:
        input_dir (str): Directory containing videos.
        output_dir (str): Directory to save extracted frames.
        frame_rate (int): Save one frame every `frame_rate` frames.
    """
    for label in os.listdir(input_dir):
        label_dir = os.path.join(input_dir, label)
        if not os.path.isdir(label_dir):
            continue

        output_label_dir = os.path.join(output_dir, label)
        os.makedirs(output_label_dir, exist_ok=True)

        for video_file in os.listdir(label_dir):
            if video_file.endswith(('.mp4', '.avi', '.mkv')):
                video_path = os.path.join(label_dir, video_file)
                video_name = os.path.splitext(video_file)[0]
                output_video_dir = os.path.join(output_label_dir, video_name)
                extract_frames(video_path, output_video_dir, frame_rate=frame_rate)

# Example usage:
input_dir = r"/content/DAta/Data"
output_dir = "Frames"
frame_rate = 1  # Save every frame
process_videos(input_dir, output_dir, frame_rate)


Frames saved to Frames/Non acciden/negative_samples_147
Frames saved to Frames/Non acciden/negative_samples_139
Frames saved to Frames/Non acciden/negative_samples_1110
Frames saved to Frames/Non acciden/negative_samples_1106
Frames saved to Frames/Non acciden/negative_samples_1094
Frames saved to Frames/Non acciden/negative_samples_150
Frames saved to Frames/Non acciden/negative_samples_1093
Frames saved to Frames/Non acciden/negative_samples_136
Frames saved to Frames/Non acciden/negative_samples_138
Frames saved to Frames/Non acciden/negative_samples_149
Frames saved to Frames/Non acciden/negative_samples_1105
Frames saved to Frames/Non acciden/negative_samples_1097
Frames saved to Frames/Non acciden/negative_samples_152
Frames saved to Frames/Non acciden/negative_samples_1096
Frames saved to Frames/Non acciden/negative_samples_134
Frames saved to Frames/Non acciden/negative_samples_1108
Frames saved to Frames/Non acciden/negative_samples_1098
Frames saved to Frames/Non acciden/nega

In [16]:
import os
import numpy as np
import torch
from ultralytics import YOLO
from PIL import Image

def extract_features_yolo(frame_path, model):
    """
    Extract YOLO features for a single frame.

    Args:
        frame_path (str): Path to the frame image.
        model: YOLO model object.

    Returns:
        np.ndarray: Array of features for the frame.
    """
    # Load the image
    img = Image.open(frame_path)
    img = np.array(img)  # Convert the image to a numpy array

    # Run inference using the YOLO model
    results = model(img)  # Make prediction on the image

    frame_features = []
    # Access the detections for the image
    for det in results[0].boxes:  # Iterate over detections in the result
        # Extract bounding box in xyxy format, class, and confidence
        x1, y1, x2, y2 = det.xyxy[0].tolist()
        conf = det.conf[0].item()
        cls = int(det.cls[0].item())
        frame_features.append([x1, y1, x2, y2, cls, conf])

    return np.array(frame_features)

def process_video_frames(frame_dir, output_dir, model):
    """
    Process all frames of a single video and save YOLO features.

    Args:
        frame_dir (str): Directory containing frames of a video.
        output_dir (str): Directory to save extracted features.
        model: YOLO model object.
    """
    os.makedirs(output_dir, exist_ok=True)

    for frame_name in sorted(os.listdir(frame_dir)):
        frame_path = os.path.join(frame_dir, frame_name)

        # Check if the file is a valid image
        if frame_path.lower().endswith(('png', 'jpg', 'jpeg')):
            # Extract features for the frame
            features = extract_features_yolo(frame_path, model)

            # Save features as a .npy file
            frame_feature_file = os.path.join(output_dir, frame_name.replace('.jpg', '.npy'))
            np.save(frame_feature_file, features)

def process_all_videos(input_dir, output_dir, model):
    """
    Process all videos' frames and extract YOLO features.

    Args:
        input_dir (str): Root directory of frames (e.g., "Frames").
        output_dir (str): Directory to save features (e.g., "Features").
        model: YOLO model object.
    """
    for label in os.listdir(input_dir):
        label_dir = os.path.join(input_dir, label)
        if not os.path.isdir(label_dir):
            continue

        output_label_dir = os.path.join(output_dir, label)
        os.makedirs(output_label_dir, exist_ok=True)

        for video_name in os.listdir(label_dir):
            frame_dir = os.path.join(label_dir, video_name)
            if not os.path.isdir(frame_dir):
                continue

            output_video_dir = os.path.join(output_label_dir, video_name)
            print(f"Processing frames for video: {video_name}")
            process_video_frames(frame_dir, output_video_dir, model)
            print(f"Features saved to: {output_video_dir}")

# Load YOLOv5 Model using ultralytics
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = YOLO('yolov8n.pt')  # Load YOLOv8 nano model (small and fast)

# Set Directories
input_dir = "/content/Frames"  # Root directory containing frames
output_dir = "Features100"  # Root directory to save features

# Extract Features
process_all_videos(input_dir, output_dir, model)


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Speed: 2.4ms preprocess, 10.0ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 cars, 1 truck, 8.9ms
Speed: 2.4ms preprocess, 8.9ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 truck, 8.0ms
Speed: 2.6ms preprocess, 8.0ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 truck, 9.5ms
Speed: 2.4ms preprocess, 9.5ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 truck, 10.2ms
Speed: 2.4ms preprocess, 10.2ms inference, 2.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 truck, 12.3ms
Speed: 2.2ms preprocess, 12.3ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 truck, 9.0ms
Speed: 2.2ms preprocess, 9.0ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 truck, 8.9ms
Speed: 2.9ms 

In [6]:
import os
import numpy as np
from tqdm import tqdm


In [18]:
import os
import numpy as np

def prepare_sequences(features_dir, sequence_length, output_dir):
    """
    Prepare sequences from extracted YOLO features for LSTM.

    Args:
        features_dir (str): Directory containing YOLO features (input).
        sequence_length (int): Number of frames per sequence.
        output_dir (str): Directory to save prepared sequences.
    """
    os.makedirs(output_dir, exist_ok=True)

    for label in os.listdir(features_dir):  # Iterate over labels (e.g., "accident", "non_accident")
        label_dir = os.path.join(features_dir, label)
        if not os.path.isdir(label_dir):
            continue

        output_label_dir = os.path.join(output_dir, label)
        os.makedirs(output_label_dir, exist_ok=True)

        for video_name in os.listdir(label_dir):  # Iterate over videos in each label
            video_dir = os.path.join(label_dir, video_name)
            if not os.path.isdir(video_dir):
                continue

            output_video_dir = os.path.join(output_label_dir, video_name)
            os.makedirs(output_video_dir, exist_ok=True)  # Create directory for each video

            # Load all frame features for the video
            frame_features = []
            for frame_file in sorted(os.listdir(video_dir)):
                if frame_file.endswith('.npy'):
                    frame_path = os.path.join(video_dir, frame_file)
                    features = np.load(frame_path)
                    frame_features.append(features)

            # Convert to numpy array for processing
            frame_features = np.array(frame_features, dtype=object)

            # Prepare sequences
            num_frames = len(frame_features)
            for i in range(num_frames - sequence_length + 1):
                sequence = frame_features[i:i + sequence_length]

                # Save the sequence as a .npy file in the correct video directory
                sequence_file = os.path.join(output_video_dir, f"sequence_{i:04d}.npy")
                np.save(sequence_file, sequence)

            print(f"Sequences saved for video: {video_name}")

# Set Directories
features_dir = "/content/Features100"  # Directory with YOLO features
output_dir = "Sequencesfckfck"  # Directory to save prepared sequences

# Sequence Parameters
sequence_length = 10  # Number of frames per sequence

# Prepare Sequences
prepare_sequences(features_dir, sequence_length, output_dir)


Sequences saved for video: negative_samples_148
Sequences saved for video: negative_samples_137
Sequences saved for video: negative_samples_134
Sequences saved for video: negative_samples_1098
Sequences saved for video: negative_samples_1108
Sequences saved for video: negative_samples_1100
Sequences saved for video: negative_samples_1095
Sequences saved for video: negative_samples_1094
Sequences saved for video: negative_samples_146
Sequences saved for video: negative_samples_149
Sequences saved for video: negative_samples_1093
Sequences saved for video: negative_samples_1111
Sequences saved for video: negative_samples_1099
Sequences saved for video: negative_samples_1105
Sequences saved for video: negative_samples_1101
Sequences saved for video: negative_samples_1109
Sequences saved for video: negative_samples_140
Sequences saved for video: negative_samples_138
Sequences saved for video: negative_samples_139
Sequences saved for video: negative_samples_136
Sequences saved for video: ne

In [19]:
import os
import shutil
from sklearn.model_selection import train_test_split

def split_data(sequences_dir, output_dir, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15):
    """
    Split sequences into training, validation, and test sets.

    Args:
        sequences_dir (str): Directory containing sequences.
        output_dir (str): Directory to save split datasets.
        train_ratio (float): Ratio of data for training.
        val_ratio (float): Ratio of data for validation.
        test_ratio (float): Ratio of data for testing.
    """
    assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios must sum to 1.0"
    os.makedirs(output_dir, exist_ok=True)

    # Create subdirectories for train, val, and test
    splits = ['train', 'val', 'test']
    for split in splits:
        split_dir = os.path.join(output_dir, split)
        os.makedirs(split_dir, exist_ok=True)

    for label in os.listdir(sequences_dir):
        label_dir = os.path.join(sequences_dir, label)
        if not os.path.isdir(label_dir):
            continue

        sequences = []
        for video_name in os.listdir(label_dir):
            video_dir = os.path.join(label_dir, video_name)
            if os.path.isdir(video_dir):
                sequences.append(video_dir)

        # Split the data
        train_data, temp_data = train_test_split(sequences, test_size=(1 - train_ratio), random_state=42)
        val_data, test_data = train_test_split(temp_data, test_size=(test_ratio / (val_ratio + test_ratio)), random_state=42)

        # Copy sequences to respective directories
        for split_name, split_data in zip(splits, [train_data, val_data, test_data]):
            split_label_dir = os.path.join(output_dir, split_name, label)
            os.makedirs(split_label_dir, exist_ok=True)
            for video_dir in split_data:
                shutil.copytree(video_dir, os.path.join(split_label_dir, os.path.basename(video_dir)))

        print(f"Data split completed for label: {label}")

# Set Directories
sequences_dir = "/content/Sequencesfckfck"  # Directory with prepared sequences
output_dir = "Dataset_Split"  # Directory to save split data

# Split Data
split_data(sequences_dir, output_dir)


Data split completed for label: Non acciden
Data split completed for label: Accident


In [26]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import os
import numpy as np

class AccidentDataset(Dataset):
    def __init__(self, data_dir, max_detections=20, feature_size=6):
        self.data = []
        self.labels = []
        self.max_detections = max_detections
        self.feature_size = feature_size
        self.classes = sorted(os.listdir(data_dir))

        for label, class_name in enumerate(self.classes):
            class_dir = os.path.join(data_dir, class_name)
            for video_name in os.listdir(class_dir):
                video_dir = os.path.join(class_dir, video_name)
                for sequence_file in os.listdir(video_dir):
                    if sequence_file.endswith('.npy'):
                        sequence_path = os.path.join(video_dir, sequence_file)
                        self.data.append(sequence_path)
                        self.labels.append(label)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sequence_path = self.data[idx]
        label = self.labels[idx]

        # Load the sequence with allow_pickle=True
        sequence = np.load(sequence_path, allow_pickle=True)

        # Process each frame in the sequence
        processed_sequence = []
        for frame_features in sequence:
            # Convert to a 2D array (num_detections x feature_size)
            frame_features = np.array(frame_features, dtype=np.float32)
            if len(frame_features) == 0:
                # No detections: Add a zero-padded array
                frame_features = np.zeros((1, self.feature_size), dtype=np.float32)

            # Pad or truncate to match max_detections
            if len(frame_features) > self.max_detections:
                frame_features = frame_features[:self.max_detections]
            elif len(frame_features) < self.max_detections:
                padding = np.zeros((self.max_detections - len(frame_features), self.feature_size), dtype=np.float32)
                frame_features = np.vstack((frame_features, padding))

            # Flatten the frame features
            flattened_features = frame_features.flatten()
            processed_sequence.append(flattened_features)

        # Convert processed sequence to a tensor
        processed_sequence = np.array(processed_sequence, dtype=np.float32)
        processed_sequence = torch.tensor(processed_sequence, dtype=torch.float32)

        return processed_sequence, label



# Define the LSTM Model
class AccidentLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(AccidentLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # LSTM expects input of shape (batch_size, sequence_length, input_size)
        _, (hidden, _) = self.lstm(x)  # Take the last hidden state
        out = self.fc(hidden[-1])  # Pass through the fully connected layer
        return out


# Training Function
def train_model(model, dataloaders, criterion, optimizer, num_epochs, device):
    model = model.to(device)
    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        print("-" * 20)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            correct_predictions = 0

            for sequences, labels in dataloaders[phase]:
                sequences = sequences.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(sequences)
                    loss = criterion(outputs, labels)
                    _, preds = torch.max(outputs, 1)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * sequences.size(0)
                correct_predictions += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = correct_predictions.double() / len(dataloaders[phase].dataset)

            print(f"{phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")

    print("Training complete")
    return model


# Data Loaders
batch_size = 16
data_dir = "/content/Dataset_Split"

datasets = {phase: AccidentDataset(os.path.join(data_dir, phase)) for phase in ['train', 'val']}
dataloaders = {phase: DataLoader(datasets[phase], batch_size=batch_size, shuffle=True) for phase in ['train', 'val']}

# Model Parameters
input_size = 120  # x1, y1, x2, y2, class, confidence
hidden_size = 128
num_layers = 2
num_classes = len(os.listdir(os.path.join(data_dir, 'train')))
num_epochs = 20
learning_rate = 0.001

# Instantiate Model, Loss, and Optimizer
model = AccidentLSTM(input_size, hidden_size, num_layers, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Device Configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Train the Model
trained_model = train_model(model, dataloaders, criterion, optimizer, num_epochs, device)


Epoch 1/20
--------------------
Train Loss: 0.1437 Acc: 0.9479
Val Loss: 0.4177 Acc: 0.8748
Epoch 2/20
--------------------
Train Loss: 0.1071 Acc: 0.9645
Val Loss: 0.3926 Acc: 0.8996
Epoch 3/20
--------------------
Train Loss: 0.0953 Acc: 0.9690
Val Loss: 0.3120 Acc: 0.8943
Epoch 4/20
--------------------
Train Loss: 0.0775 Acc: 0.9757
Val Loss: 0.4230 Acc: 0.8570
Epoch 5/20
--------------------
Train Loss: 0.0758 Acc: 0.9762
Val Loss: 0.4967 Acc: 0.8810
Epoch 6/20
--------------------
Train Loss: 0.0728 Acc: 0.9760
Val Loss: 0.4387 Acc: 0.9032
Epoch 7/20
--------------------
Train Loss: 0.0668 Acc: 0.9775
Val Loss: 0.5354 Acc: 0.8774
Epoch 8/20
--------------------
Train Loss: 0.0632 Acc: 0.9787
Val Loss: 0.5795 Acc: 0.8641
Epoch 9/20
--------------------
Train Loss: 0.0692 Acc: 0.9755
Val Loss: 0.4941 Acc: 0.9014
Epoch 10/20
--------------------
Train Loss: 0.0701 Acc: 0.9766
Val Loss: 0.5988 Acc: 0.8641
Epoch 11/20
--------------------
Train Loss: 0.0521 Acc: 0.9831
Val Loss: 0.867

In [42]:
# Load the testing dataset
test_dataset = AccidentDataset(os.path.join(data_dir, 'test'))
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [43]:
# Testing Function
def test_model(model, dataloader, criterion, device):
    model = model.to(device)
    model.eval()  # Set the model to evaluation mode

    running_loss = 0.0
    correct_predictions = 0

    with torch.no_grad():  # No gradients are needed for testing
        for sequences, labels in dataloader:
            sequences = sequences.to(device)
            labels = labels.to(device)

            outputs = model(sequences)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)

            running_loss += loss.item() * sequences.size(0)
            correct_predictions += torch.sum(preds == labels.data)

    test_loss = running_loss / len(dataloader.dataset)
    test_acc = correct_predictions.double() / len(dataloader.dataset)

    print(f"Test Loss: {test_loss:.4f} Acc: {test_acc:.4f}")


In [44]:
# Test the Model
test_model(trained_model, test_dataloader, criterion, device)


Test Loss: 0.0388 Acc: 0.9913
