In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from google.colab.patches import cv2_imshow
from PIL import Image, ImageTk
from tkinter import *
import pandas as pd
import numpy as np
import json
import os
import cv2
import csv
import re

# checking the classes exists or need to create their folders

In [3]:
main_data_folder = '/content/drive/MyDrive/Colab Notebooks/RL-AutoNav/training_processed_frames'
classes = ['Stop', 'Right', 'Left', 'Forward']

if not os.path.exists(main_data_folder):
    os.makedirs(main_data_folder)
    print(f"Created data folder: {main_data_folder}")
else:
    print(f"Data folder already exists: {main_data_folder}")

for data_class in classes:
  data_class_path = os.path.join(main_data_folder, data_class)
  if not os.path.exists(data_class_path):
    os.makedirs(data_class_path)
    print(f"Created '{data_class}' subfolder: {data_class_path}")
  else:
    print(f"Subfolder '{data_class}' already exists: {data_class_path}")

Created data folder: /content/drive/MyDrive/Colab Notebooks/RL-AutoNav/training_processed_frames
Created 'Stop' subfolder: /content/drive/MyDrive/Colab Notebooks/RL-AutoNav/training_processed_frames/Stop
Created 'Right' subfolder: /content/drive/MyDrive/Colab Notebooks/RL-AutoNav/training_processed_frames/Right
Created 'Left' subfolder: /content/drive/MyDrive/Colab Notebooks/RL-AutoNav/training_processed_frames/Left
Created 'Forward' subfolder: /content/drive/MyDrive/Colab Notebooks/RL-AutoNav/training_processed_frames/Forward


In [4]:
csv_data = '/content/drive/MyDrive/Colab Notebooks/RL-AutoNav/training_frame_metadata.csv'
if not os.path.exists(csv_data):
    print(f"The csv file '{csv_data}' does not exist. Creating the file...")
    data = pd.DataFrame(columns=["video_id", "frame_number", "label", "frame_id", "file_path"])
    data.to_csv(csv_data, index=False)
    print(f"CSV File '{csv_data}' created successfully.")
else:
    print(f"The csv file '{csv_data}' already exists.")
    data = pd.read_csv(csv_data)
    print(f"Resuming from the last processed entry.")

The csv file '/content/drive/MyDrive/Colab Notebooks/RL-AutoNav/training_frame_metadata.csv' does not exist. Creating the file...
CSV File '/content/drive/MyDrive/Colab Notebooks/RL-AutoNav/training_frame_metadata.csv' created successfully.


# function to get labels ('Stop', 'Right', 'Left', 'Forward') for frames

In [5]:
def Get_label(prev_frame, next_frame):
    # Calculates a flow field
    flow = cv2.calcOpticalFlowFarneback(prev_frame, next_frame, None, # Placeholder for the output flow
                                        pyr_scale=0.5, # Scale between image pyramid levels (0.5 means each level is 1/2 size of previous one)
                                        levels=3, # No. of pyramid levels (including the initial image); more levels help detect large motions
                                        winsize=15, iterations=3, poly_n=5, # Pixel neighborhood size to fit a polynomial.
                                        poly_sigma=1.2, # Standard deviation of the Gaussian used for polynomial fitting
                                        flags=0 # Operation flags (for extra optimizations; 0 is the default and standard)
                                        )

    speed_magnitude, angle_for_direction = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    min_motion_threshold = 1.0

    if np.mean(speed_magnitude) < min_motion_threshold:
      return "Stop"

    avg_horizontal_motion, avg_vertical_motion = np.mean(flow[..., 0]), np.mean(flow[..., 1])

    if abs(avg_horizontal_motion) > abs(avg_vertical_motion):
        if avg_horizontal_motion > 0:
          return "Right"
        else:
          return "Left"
    else:
      return "Forward"

# Extracting frames and doing their preprocessing

In [12]:
input_dir = '/content/drive/MyDrive/Colab Notebooks/RL-AutoNav/BDDA/training'
output_dir = '/content/drive/MyDrive/Colab Notebooks/RL-AutoNav/training_processed_frames'

In [10]:
last_frame_indices = {}
if not data.empty:
    for _, row in data.iterrows():
        video_name = row['video_id']
        frame_index = row['frame_number']
        if video_name not in last_frame_indices:
            last_frame_indices[video_name] = frame_index
        else:
            last_frame_indices[video_name] = max(last_frame_indices[video_name], frame_index)

last_frame_indices

{}

In [11]:
for video_name in sorted(os.listdir(input_dir), key=lambda x: [int(part) if part.isdigit() else part for part in re.split(r'(\d+)', x)]):
    if video_name.lower().endswith(('.mp4', '.avi', '.mov')):
        video_path = os.path.join(input_dir, video_name)
        cap = cv2.VideoCapture(video_path)

        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        duration = total_frames / fps if fps > 0 else 0

        print(f"Processing video: {video_name} | FPS: {fps} | Total Frames: {total_frames} | Video Duration: {duration:.2f}s")

        # Skip empty videos
        if total_frames == 0 or fps == 0:
            print(f"Skipping {video_name}: No frames available.")
            cap.release()
            continue

        processed_frame_count = 0
        temp_frame = None  # Placeholder for the first frame
        frame_count = 0

        success, frame = cap.read()
        while success:
            frame_count += 1
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            if temp_frame is not None:
                # Get the label using your predefined function
                label = Get_label(temp_frame, gray_frame)

                # Resize and save the frame
                frame_resized = cv2.resize(frame, (224, 224))
                frame_name = f"{video_name}_{label.lower()}_{processed_frame_count+1}.jpg"
                frame_path = os.path.join(output_dir, label, frame_name)
                os.makedirs(os.path.dirname(frame_path), exist_ok=True)
                cv2.imwrite(frame_path, frame_resized)

                # Append frame data to CSV
                frame_data = {
                    "video_id": video_name,
                    "frame_number": processed_frame_count+1,
                    "label": label,
                    "frame_id": frame_name,
                    "file_path": frame_path
                }
                frame_df = pd.DataFrame([frame_data])
                frame_df.to_csv(csv_data, mode='a', header=False, index=False)

                processed_frame_count += 1

                # Stop if 5 frames are processed
                if processed_frame_count >= 5:
                    break

            # Update temp_frame to the current frame for the next comparison
            temp_frame = gray_frame.copy()

            # Read the next frame
            success, frame = cap.read()

        cap.release()

print(f"..................................Preprocessing is done..................................")


Processing video: 2.mp4 | FPS: 29.97 | Total Frames: 300 | Video Duration: 10.01s
Processing video: 3.mp4 | FPS: 29.97 | Total Frames: 300 | Video Duration: 10.01s
Processing video: 4.mp4 | FPS: 29.97 | Total Frames: 300 | Video Duration: 10.01s
Processing video: 5.mp4 | FPS: 30.0 | Total Frames: 300 | Video Duration: 10.00s
Processing video: 10.mp4 | FPS: 59.94 | Total Frames: 600 | Video Duration: 10.01s
Processing video: 11.mp4 | FPS: 30.0 | Total Frames: 300 | Video Duration: 10.00s
Processing video: 12.mp4 | FPS: 30.0 | Total Frames: 300 | Video Duration: 10.00s
Processing video: 13.mp4 | FPS: 29.97 | Total Frames: 300 | Video Duration: 10.01s
Processing video: 15.mp4 | FPS: 29.97 | Total Frames: 300 | Video Duration: 10.01s
Processing video: 17.mp4 | FPS: 30.0 | Total Frames: 300 | Video Duration: 10.00s
Processing video: 18.mp4 | FPS: 29.97 | Total Frames: 780 | Video Duration: 26.03s
Processing video: 20.mp4 | FPS: 29.97 | Total Frames: 300 | Video Duration: 10.01s
Processing v