In [None]:
from jaad_data import JAAD
jaad_path = "../Foundation Models for Pedestrian Trajectory Prediction"

imdb = JAAD("./")
imdb.extract_and_save_images()

In [None]:
# Path to the JAAD dataset root folder
jaad_path = "../Foundation Models for Pedestrian Trajectory Prediction"

# Instantiate JAAD class
jaad_dataset = JAAD("./")
# Generate database
jaad_database = jaad_dataset.generate_database()

# To extract frames with bounding boxes

In [None]:
import json
import pickle

# Load annotations from the .pkl file
annotations_file = "./data_cache/jaad_database.pkl"

with open(annotations_file, 'rb') as f:
    annotations = pickle.load(f)

# Create a dictionary to store the combined data
combined_data = {}

# Iterate through pedestrian annotations
for vid, vid_data in annotations.items():
    for ped_id, ped_data in vid_data['ped_annotations'].items():
        # Extract pedestrian actions
        pedestrian_actions = ped_data['behavior']
        frames = ped_data['frames']
        bounding_boxes = ped_data['bbox']
        
        # Associate pedestrian actions with frames
        for frame, action, bbox in zip(frames, pedestrian_actions, bounding_boxes):
            combined_data.setdefault(frame, []).append({
                'pedestrian_action': action,
                'bounding_box': bbox,
                'video_id': vid,
                'pedestrian_id': ped_id
            })

# Iterate through vehicle annotations
for vid, vid_data in annotations.items():
    if 'vehicle_annotations' in vid_data:
        for frame, action in vid_data['vehicle_annotations'].items():
            combined_data.setdefault(frame, []).append({
                'vehicle_action': action,
                'video_id': vid
            })

# Write the combined data to a JSON file
with open('combined_data.json', 'w') as json_file:
    json.dump(combined_data, json_file)

In [None]:
import os

# Get the current directory
current_directory = os.getcwd()

# List all files in the current directory
files = os.listdir(current_directory)

# Print the files
for file in files:
    print(file)

# Splitting the Dataset

In [None]:
import json
import pickle
import os

# Path to annotations .pkl file
annotations_file = "./data_cache/jaad_database.pkl"

# Path to the directory containing images
images_dir = "./images"

# Output directory to store images with bounding boxes
output_main_dir = "./images_with_boxes(Pedestrians Focused)"

# Load the database
with open(annotations_file, 'rb') as f:
    database = pickle.load(f)

# Initialize the list to store all prompts
all_video_prompts = []

# Iterate through each video in the database
for video_id, video_data in database.items():
    # Create a directory for the current video
    output_video_dir = os.path.join(output_main_dir, f"{video_id}")

    # Process each pedestrian in the video
    for pedestrian_id, pedestrian_data in video_data['ped_annotations'].items():
        # Create a directory for the current pedestrian
        output_pedestrian_dir = os.path.join(output_video_dir, f"Pedestrian_{pedestrian_id}")

        # Initialize the list to store input image paths
        input_image_paths = []

        # Process frames for this pedestrian and create prompts
        frames_to_process = pedestrian_data['frames'][::10][:5]
        for i, frame_num in enumerate(frames_to_process):
            # Construct the path to the image
            image_path = os.path.join(output_pedestrian_dir, f"Pedestrian_{pedestrian_id}_Image_{frame_num}.png")
            input_image_paths.append(f"image{i}: <img>{image_path}</img>")

        # Combine the input image paths into one line
        input_images_line = ' '.join(input_image_paths)

        # Generate the prompt for the current pedestrian
        prompt = {
            "from": "user",
            "value": f"Role: You are an autonomous vehicle that uses front-camera images to interact with pedestrians. Input: {input_images_line}"
        }

        # Append the prompt to the list of all video prompts
        all_video_prompts.append(prompt)

# Write prompts to a JSON file
with open('output_prompts_revised.json', 'w') as f:
    json.dump(all_video_prompts, f, indent=4)

print("Prompts generated and saved to 'output_prompts_revised.json'.")

This code will generate images for each ped for each frame without skipping any interval

In [None]:
import os
import cv2
import pickle
import csv
import ast

# Path to annotations .pkl file
annotations_file = "./data_cache/jaad_database.pkl"

# Path to the directory containing images
images_dir = "./images"

# Output directory to store images with bounding boxes
output_main_dir = "./images_with_boxes(Pedestrians Focused)"

# Load the annotation data
with open(annotations_file, 'rb') as f:
    annotation_data = pickle.load(f)

# Load pedestrian annotations from the CSV file
pedestrian_annotations = {}
with open('annotations_output.csv', mode='r') as csv_file:
    csv_reader = csv.DictReader(csv_file)
    for row in csv_reader:
        # Convert the 'Bounding Box' field from string representation of list to actual list
        bounding_box = ast.literal_eval(row['Bounding Box'])
        frames = [int(frame.strip()) for frame in row['Frames'][1:-1].split(',')]
        pedestrian_annotations.setdefault(row['Video ID'], []).append({
            'Frames': frames,
            'Bounding Boxes': bounding_box,
            'Pedestrian ID': row['Pedestrian ID']
        })

# Process the first 10 videos
num_videos_to_process = 346  # Set the end number to 346
videos_processed = 162  # Start processing from video number 163

for video_id, video_data in annotation_data.items():
    if videos_processed > num_videos_to_process:  # Adjusted the condition to stop at the end video
        break

    
    # Create a directory for the current video
    output_video_dir = os.path.join(output_main_dir, f"{video_id}")
    os.makedirs(output_video_dir, exist_ok=True)

    # Get the number of frames in the video
    num_frames = video_data['num_frames']

    # Process each pedestrian in the video
    for pedestrian_data_list in pedestrian_annotations.get(video_id, []):
        pedestrian_id = pedestrian_data_list['Pedestrian ID']
        pedestrian_frames = pedestrian_data_list['Frames']
        bounding_boxes = pedestrian_data_list['Bounding Boxes']

        # Create a directory for the current pedestrian
        output_pedestrian_dir = os.path.join(output_video_dir, f"Pedestrian_{pedestrian_id}")
        os.makedirs(output_pedestrian_dir, exist_ok=True)

        # Process the frames for this pedestrian
        for i, frame_num in enumerate(pedestrian_frames):
            # Check if the current frame is within the bounds of the video
            if frame_num <= num_frames:
                # Load the image for the current frame
                image_file = f"{frame_num:05d}.png"
                image_path = os.path.join(images_dir, video_id, image_file)
                if not os.path.exists(image_path):
                    print(f"Image file {image_file} does not exist for video {video_id}. Skipping...")
                    continue

                # Read the image
                image = cv2.imread(image_path)
                if image is None:
                    print(f"Error reading image file {image_file} for video {video_id}. Skipping...")
                    continue

                # Draw bounding box for the pedestrian in the frame
                x1, y1, x2, y2 = map(int, bounding_boxes[i])
                cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)

                # Save the image with bounding box
                output_filename = f"Pedestrian_{pedestrian_id}_Image_{frame_num}.png"
                output_path = os.path.join(output_pedestrian_dir, output_filename)
                cv2.imwrite(output_path, image)

                print("Processed:", output_path)

    print("All images processed for video:", video_id)
    videos_processed += 1

print("Processing completed for all videos.")


In [None]:
import os
import cv2
import pickle
import csv
import ast

# Path to annotations .pkl file
annotations_file = "./data_cache/jaad_database.pkl"

# Path to the directory containing images
images_dir = "./images"

# Output directory to store images with bounding boxes
output_main_dir = "./images_with_boxes(Pedestrians Focused)"

# Load the annotation data
with open(annotations_file, 'rb') as f:
    annotation_data = pickle.load(f)

# Load pedestrian annotations from the CSV file
pedestrian_annotations = {}
with open('annotations_output.csv', mode='r') as csv_file:
    csv_reader = csv.DictReader(csv_file)
    for row in csv_reader:
        # Convert the 'Bounding Box' field from string representation of list to actual list
        bounding_box = ast.literal_eval(row['Bounding Box'])
        frames = [int(frame.strip()) for frame in row['Frames'][1:-1].split(',')]
        pedestrian_annotations.setdefault(row['Video ID'], []).append({
            'Frames': frames,
            'Bounding Boxes': bounding_box,
            'Pedestrian ID': row['Pedestrian ID']
        })

# Specify the range of videos to process (starting and ending video numbers)
start_video_num = 165  # Start processing from video number 163
end_video_num = 346    # End processing at video number 346

for video_id, video_data in annotation_data.items():
    # Extract the video number from the video_id
    video_num = int(video_id.split('_')[-1])

    # Check if the current video number is within the specified range
    if start_video_num <= video_num <= end_video_num:
        # Create a directory for the current video
        output_video_dir = os.path.join(output_main_dir, f"{video_id}")
        os.makedirs(output_video_dir, exist_ok=True)

        # Get the number of frames in the video
        num_frames = video_data['num_frames']

        # Process each pedestrian in the video
        for pedestrian_data_list in pedestrian_annotations.get(video_id, []):
            pedestrian_id = pedestrian_data_list['Pedestrian ID']
            pedestrian_frames = pedestrian_data_list['Frames']
            bounding_boxes = pedestrian_data_list['Bounding Boxes']

            # Create a directory for the current pedestrian
            output_pedestrian_dir = os.path.join(output_video_dir, f"Pedestrian_{pedestrian_id}")
            os.makedirs(output_pedestrian_dir, exist_ok=True)

            # Process the frames for this pedestrian
            for i, frame_num in enumerate(pedestrian_frames):
                # Check if the current frame is within the bounds of the video
                if frame_num <= num_frames:
                    # Load the image for the current frame
                    image_file = f"{frame_num:05d}.png"
                    image_path = os.path.join(images_dir, video_id, image_file)
                    if not os.path.exists(image_path):
                        print(f"Image file {image_file} does not exist for video {video_id}. Skipping...")
                        continue

                    # Read the image
                    image = cv2.imread(image_path)
                    if image is None:
                        print(f"Error reading image file {image_file} for video {video_id}. Skipping...")
                        continue

                    # Draw bounding box for the pedestrian in the frame
                    x1, y1, x2, y2 = map(int, bounding_boxes[i])
                    cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)

                    # Save the image with bounding box
                    output_filename = f"Pedestrian_{pedestrian_id}_Image_{frame_num}.png"
                    output_path = os.path.join(output_pedestrian_dir, output_filename)
                    cv2.imwrite(output_path, image)

                    print("Processed:", output_path)

        print("All images processed for video:", video_id)

print("Processing completed for all videos.")


# Video Folders Stats

In [None]:
import os

# Path to the directory containing images
images_dir = "./images"

# Initialize a dictionary to store the count of images in each video folder
video_image_count = {}

# Iterate through each subdirectory (video folder) in the images directory
for video_folder in sorted(os.listdir(images_dir)):
    video_path = os.path.join(images_dir, video_folder)
    if os.path.isdir(video_path):
        # Count the number of image files in the video folder
        image_files = [file for file in os.listdir(video_path) if file.endswith('.png')]
        num_images = len(image_files)
        # Store the count in the dictionary
        video_image_count[video_folder] = num_images

# Print the count of images in each video folder in the sequence of videos
for video_folder, num_images in video_image_count.items():
    print(f"Video {video_folder}: {num_images} images")


In [None]:
import os

# Path to the directory containing images
images_dir = "./images"

# Initialize lists to store the number of images in each video folder
num_images_list = []

# Iterate through each subdirectory (video folder) in the images directory
for video_folder in os.listdir(images_dir):
    video_path = os.path.join(images_dir, video_folder)
    if os.path.isdir(video_path):
        # Count the number of image files in the video folder
        num_images = len([file for file in os.listdir(video_path) if file.endswith('.png')])
        # Add the count to the list
        num_images_list.append(num_images)

# Calculate the minimum, maximum, and average number of images
min_images = min(num_images_list)
max_images = max(num_images_list)
avg_images = sum(num_images_list) / len(num_images_list) if num_images_list else 0

# Print the results
print(f"Minimum number of images: {min_images}")
print(f"Maximum number of images: {max_images}")
print(f"Average number of images: {avg_images:.2f}")

# Bounding Box Annotations for the entire dataset of every 10th frame

In [None]:
import os
import cv2
import pickle
import json

# Path to annotations .pkl file
annotations_file = "./data_cache/jaad_database.pkl"

# Output directory to store JSON file
output_dir = "./bounding_box_annotations"

# Load the annotation data
with open(annotations_file, 'rb') as f:
    annotation_data = pickle.load(f)

# Create a dictionary to store bounding box annotations for all videos
all_bounding_boxes = {}

# Process each video
for video_id, video_data in annotation_data.items():
    # Get the bounding box annotations for the first pedestrian in the video
    pedestrian_data = next(iter(video_data['ped_annotations'].values()), None)
    if pedestrian_data:
        frames = pedestrian_data['frames']
        bounding_boxes = pedestrian_data['bbox']
    else:
        frames = []
        bounding_boxes = []

    # Combine frame numbers with bounding box annotations for every 10th frame
    combined_annotations = [{"frame": frame, "bbox": bbox} for frame, bbox in zip(frames[::10], bounding_boxes[::10])]

    # Add the combined annotations to the dictionary
    all_bounding_boxes[video_id] = combined_annotations

print("Bounding box annotations collected for all videos.")

# Save the bounding box annotations to a JSON file
json_output_file = os.path.join(output_dir, "all_videos_bounding_boxes_with_frames.json")
with open(json_output_file, 'w') as json_file:
    json.dump(all_bounding_boxes, json_file, indent=4)

print("Bounding box annotations saved to JSON file:", json_output_file)


# Extracting CSV Features for every pedestrian in Videos

In [None]:
import pickle
import csv

# Load the annotations database from the pickle file
annotations_file = "./data_cache/jaad_database.pkl"
with open(annotations_file, 'rb') as f:
    annotation_data = pickle.load(f)

# Define the output CSV file path
output_csv_file = "annotations_output.csv"

# Define the header for the CSV file
csv_header = ["Video ID", "Pedestrian ID", "Old Pedestrian ID", "Frames", "Bounding Box", "Occlusion",
              "Behavior (Cross)", "Behavior (Reaction)", "Behavior (Hand Gesture)", "Behavior (Look)",
              "Behavior (Action)", "Behavior (Nod)", "Appearance (Pose Front)", "Appearance (Pose Back)",
              "Appearance (Pose Left)", "Appearance (Pose Right)", "Appearance (Clothes Below Knee)",
              "Appearance (Clothes Upper Light)", "Appearance (Clothes Upper Dark)", "Appearance (Clothes Lower Light)",
              "Appearance (Clothes Lower Dark)", "Appearance (Backpack)", "Appearance (Bag Hand)",
              "Appearance (Bag Elbow)", "Appearance (Bag Shoulder)", "Appearance (Bag Left Side)",
              "Appearance (Bag Right Side)", "Appearance (Cap)", "Appearance (Hood)", "Appearance (Sunglasses)",
              "Appearance (Umbrella)", "Appearance (Phone)", "Appearance (Baby)", "Appearance (Object)",
              "Appearance (Stroller/Cart)", "Appearance (Bicycle/Motorcycle)", "Attributes (Age)",
              "Attributes (Old ID)", "Attributes (Num Lanes)", "Attributes (Crossing)", "Attributes (Gender)",
              "Attributes (Crossing Point)", "Attributes (Decision Point)", "Attributes (Intersection)",
              "Attributes (Designated)", "Attributes (Signalized)", "Attributes (Traffic Direction)",
              "Attributes (Group Size)", "Attributes (Motion Direction)"]

# Write the header to the CSV file
with open(output_csv_file, mode='w', newline='') as csv_file:
    writer = csv.writer(csv_file)
    writer.writerow(csv_header)

    # Process each video in the database
    for video_id, video_data in annotation_data.items():
        # Process pedestrian annotations for the current video
        for ped_id, ped_data in video_data['ped_annotations'].items():
            # Extract pedestrian attributes
            ped_attributes = [video_id, ped_id, ped_data['old_id'], ped_data['frames'],
                              ped_data['bbox'], ped_data['occlusion']]
            behaviors = ped_data['behavior']
            appearances = ped_data['appearance']
            attributes = ped_data['attributes']
            # Append behavioral, appearance, and attribute attributes
            for behavior in ["cross", "reaction", "hand_gesture", "look", "action", "nod"]:
                ped_attributes.append(behaviors.get(behavior, []))
            for appearance in ["pose_front", "pose_back", "pose_left", "pose_right",
                               "clothes_below_knee", "clothes_upper_light", "clothes_upper_dark",
                               "clothes_lower_light", "clothes_lower_dark", "backpack", "bag_hand",
                               "bag_elbow", "bag_shoulder", "bag_left_side", "bag_right_side",
                               "cap", "hood", "sunglasses", "umbrella", "phone", "baby", "object",
                               "stroller_cart", "bicycle_motorcycle"]:
                ped_attributes.append(appearances.get(appearance, []))
            for attribute in ["age", "old_id", "num_lanes", "crossing", "gender", "crossing_point",
                              "decision_point", "intersection", "designated", "signalized",
                              "traffic_direction", "group_size", "motion_direction"]:
                ped_attributes.append(attributes.get(attribute, []))
            # Write pedestrian attributes to CSV
            writer.writerow(ped_attributes)

print("Annotations saved to CSV file:", output_csv_file)


In [None]:
import pickle
import csv

def main():
    # Load the annotations database from the pickle file
    annotations_file = "./data_cache/jaad_database.pkl"
    with open(annotations_file, 'rb') as f:
        annotation_data = pickle.load(f)

    # Define the output CSV file path
    output_csv_file = "traffic_annotations_output.csv"

    # Define the header for the CSV file
    csv_header = ["Video ID", "Frame ID", "Road Type", "Pedestrian Crossing", "Pedestrian Sign", "Stop Sign", "Traffic Light"]

    # Write the header to the CSV file
    with open(output_csv_file, mode='w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(csv_header)

        # Process each video in the database
        for video_id, video_data in annotation_data.items():
            # Process traffic annotations for the current video
            traffic_annotations = video_data.get('traffic_annotations', {})
            for frame_id, attributes in traffic_annotations.items():
                # Check if attributes is an integer (not a dictionary)
                if isinstance(attributes, int):
                    continue
                # Extract traffic attributes
                row = [video_id, frame_id, attributes.get('road_type', ''),
                       attributes.get('ped_crossing', ''), attributes.get('ped_sign', ''),
                       attributes.get('stop_sign', ''), attributes.get('traffic_light', '')]
                # Write traffic attributes to CSV
                writer.writerow(row)

    print("Annotations saved to CSV file:", output_csv_file)

if __name__ == "__main__":
    main()


In [None]:
import pickle
import csv

def main():
    # Load the annotations database from the pickle file
    annotations_file = "./data_cache/jaad_database.pkl"
    with open(annotations_file, 'rb') as f:
        annotation_data = pickle.load(f)

    # Define the output CSV file path
    output_csv_file = "vehicle_attributes.csv"

    # Define the header for the CSV file
    csv_header = ["Video ID", "Frame ID", "Vehicle Action"]

    # Write the header to the CSV file
    with open(output_csv_file, mode='w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(csv_header)

        # Process each video in the database
        for video_id, video_data in annotation_data.items():
            # Process vehicle annotations for the current video
            vehicle_annotations = video_data.get('vehicle_annotations', {})
            for frame_id, action in vehicle_annotations.items():
                # Extract vehicle attributes
                row = [video_id, frame_id, action]
                # Write vehicle attributes to CSV
                writer.writerow(row)

    print("Vehicle attributes saved to CSV file:", output_csv_file)

if __name__ == "__main__":
    main()
