<a href="https://colab.research.google.com/github/Stefano-Previti/Pedestrian_Intention_Estimation/blob/main/Pedestrian_intention_estimation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

▶**LOADING AND PREPARING THE DATASET**

**⏰Citation**:

In [None]:
bibtex_entries = """
@inproceedings{rasouli2017they,
  title={Are They Going to Cross? A Benchmark Dataset and Baseline for Pedestrian Crosswalk Behavior},
  author={Rasouli, Amir and Kotseruba, Iuliia and Tsotsos, John K},
  booktitle={ICCVW},
  pages={206--213},
  year={2017}
}

@inproceedings{rasouli2018role,
  title={It is Not All About Size: On the Role of Data Properties in Pedestrian Detection},
  author={Rasouli, Amir and Kotseruba, Iuliia and Tsotsos, John K},a
  booktitle={ECCVW},
  year={2018}
}
"""

print(bibtex_entries)

⚓Loading the first 10 video of the JAAD dataset.

In [None]:
# Mounting Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Importing necessary modules
import os
import shutil

# Defining the path to your files on Google Drive
dataset = '/content/drive/My Drive/JAAD_clips'

# Creating the directory to extract the contents if it doesn't exist
dataset_dir = '/content/data/JAAD_dataset'
os.makedirs(dataset_dir, exist_ok=True)

# Defining a function to copy the 22 videos
def copy_videos(src_dir, dst_dir):

    # Listing all files in the source directory
    all_files = os.listdir(src_dir)

    # Filtering only video files
    video_files = [file for file in all_files if file.endswith('.mp4')]

    # Sorting files by name
    video_files.sort()

    # Copying the videos
    for video in video_files:
        src_file = os.path.join(src_dir, video)
        dst_file = os.path.join(dst_dir, video)
        print(f"Copying: {video}")
        shutil.copy(src_file, dst_file)

# Copying the videos
copy_videos(dataset, dataset_dir)


In [None]:
#  Verification of the extraction by listing files in all subdirectories
def verify_extraction(directory, num_files_to_check=5):
    # Walking through all directories and files
    for root, dirs, files in os.walk(directory):
        print(f'Checking directory: {root}')
        files.sort()
        # Showing some files (up to num_files_to_check) in this directory
        for i, file_name in enumerate(files[:num_files_to_check]):
            file_path = os.path.join(root, file_name)
            print(f'File {i+1}: {file_path}')

# Running the verification function
verify_extraction(dataset_dir)

⚓Download of the annotations from the repo https://github.com/ykotseruba/JAAD?tab=readme-ov-file.

In [None]:
import zipfile

# Cloning the repository from GitHub
!git clone https://github.com/ykotseruba/JAAD.git

# Compressing the 'annotations' directories into ZIP files
import shutil
shutil.make_archive('/content/annotations', 'zip', 'JAAD/', 'annotations')

# Extracting the ZIP file for Bounding Box and frame annotations
with zipfile.ZipFile('annotations.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/annotations')

⚓Installing MediaPipe for pose estimation beacuse is faster in an enviroment like colab.

In [None]:
!pip install mediapipe

⚓Custom Dataset class for the JAAD dataset. This class extracts local context, 2D location trajectory and pose keypoints from the dataset for pedestrian analysis.

In [None]:
!pip install Pillow
import PIL.Image as Image
import cv2
import torch
import mediapipe as mp
import xml.etree.ElementTree as ET
from torch.utils.data import Dataset
from torchvision import transforms

class JAADDataset(Dataset):
    def __init__(self, data_dir,annotations_dir, image_extension='.png'):
        """
        Initializing the JAADDataset.

        Parameters:
        - data_dir (str): Directory containing the image data.
        - annotations_dir (str): Directory containing annotations XML files.
        - image_extension (str): The extension of image files in the dataset (default: '.png').
        """
        self.data_dir = data_dir
        self.image_extension = image_extension

        # Parsing general annotations
        self.annotations = self._parse_annotations(annotations_dir)


    def __getitem__(self, idx):
        """
        Retrieving a data sample at the specified index.

        Parameters:
        - idx (int): Index of the data sample to retrieve.

        Returning:
        - dict: A dictionary containing the following keys:
            - 'local_context' (Tensor): Tensor of stacked images around the pedestrian.
            - 'location_trajectory' (Tensor): 2D coordinates of the pedestrian's trajectory.
            - 'pose_keypoints' (Tensor): Keypoints for the pedestrian's pose.
            - 'video_id'
            -'pedestrian'
            -'cross'
            -'frame_id'
        """
        annotation= self.annotations[idx]
        if annotation is not None:
          video_id=annotation['video_id']
          cross=annotation['cross']
          pedestrian=annotation['pedestrian']
          frame_id=annotation['frame_id']



        # Loading images and related data
        if annotation is not None:
          frame = self._load_frame(annotation)
          bbox = annotation['bbox']
          local_context = self._get_local_context(frame, bbox)
          location_trajectory = self._get_location_trajectory(annotation)
          pose_keypoints = self._get_pose_keypoints(frame,bbox)

        return {
              'local_context': torch.tensor(local_context, dtype=torch.float32),
              'location_trajectory': torch.tensor(location_trajectory, dtype=torch.float32),
              'pose_keypoints': torch.tensor(pose_keypoints, dtype=torch.float32),
              'cross':cross,
              'pedestrian':pedestrian,
              'frame_id':frame_id,
              'video_id':video_id,
        }


    def __len__(self):
        """
        Returns the length of the general annotations.
        """
        return len(self.annotations)


    def _parse_annotations(self, directory, max_files=10):
        """
        Parse annotations from XML files and collect them per video.

        Parameters:
        - directory (str): Directory containing annotations XML files.
        - max_files (int): Maximum number of files to process.

        Returns:
        - annotations (dict): Dictionary with video keys and lists of annotations.
        """
        general_annotations = []
        video_index = 1

        for root_dir, dirs, files in os.walk(directory):
            files.sort()

            for filename in files:
                if video_index > max_files:
                    break
                if filename.endswith('.xml'):
                    file_path = os.path.join(root_dir, filename)
                    try:
                        tree = ET.parse(file_path)
                        root = tree.getroot()
                        pedestrian_count = 1
                        for track in root.findall('track'):
                            if track is not None:
                                pedestrian=f'pedestrian_{pedestrian_count}'
                                pedestrian_count+=1
                                boxes = track.findall('box')
                                for box in boxes:
                                    frame_id = int(box.get('frame'))
                                    xbr = float(box.get('xbr'))
                                    xtl = float(box.get('xtl'))
                                    ybr = float(box.get('ybr'))
                                    ytl = float(box.get('ytl'))
                                    cross = str(box.get('cross'))
                                    if cross == 'crossing':
                                        cross = 1
                                    else:
                                        cross = 0

                                    general_annotations.append({
                                        'frame_id': frame_id,
                                        'bbox': [xbr, xtl, ybr, ytl],
                                        'video_id': video_index,
                                        'cross': cross,
                                        'pedestrian': pedestrian
                                    })

                        video_index += 1

                    except Exception as e:
                        print(f"Error processing file {file_path}: {e}")

        return general_annotations




    def _load_frame(self, annotation):
          """
          Loads a frame from a video based on the annotation data.

          Parameters:
          - annotation (dict): Annotation data containing the frame ID and video name.

          Returns:
          - frame (numpy array): Extracted frame from the video.
          """
          frame_id = annotation['frame_id']
          video_id = annotation['video_id']

          # Extracting the video name from the annotation
          video_name = f"video_{video_id:04d}.mp4"
          print(f"Video: {video_name}")


          # Building the path to the video file
          video_path = os.path.join(self.data_dir,video_name)

          # Opening the video file
          cap = cv2.VideoCapture(video_path)

          if not cap.isOpened():
              raise ValueError(f"Cannot open the video file: {video_path}")

          # Setting the current frame position
          cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
          ret, frame = cap.read()
          cap.release()

          if not ret:
              raise ValueError(f"Cannot read frame {frame_id} from video: {video_path}")

          # Converting the image from BGR to RGB
          frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
          return frame

    def _get_local_context(self, frame, bbox):
          """
          Extracting the local context around the pedestrian by cropping and resizing the frame.

          Parameters:
          - frame (numpy array): Image frame containing the pedestrian.
          - bbox (list): Bounding box coordinates [xtl, ytl, xbr, ybr] around the pedestrian.

          Returning:
          - numpy array: Cropped and resized frame to [224, 224] for local context analysis using bicubic interpolation.
          """
          # Unpacking the bounding box coordinates
          xbr, xtl, ybr , ytl= bbox

          # Cropping the region around the pedestrian using the bounding box
          cropped_frame = frame[int(ytl):int(ybr), int(xtl):int(xbr)]

          # Resizing the cropped frame to the target size [224, 224] using bicubic LANCZOS
          new_width = 224
          new_height = 224
          new_size = (new_width, new_height)

          #Converting to PIL Image for resizing
          local_context_image = Image.fromarray(cropped_frame)
          image = local_context_image.resize(new_size, Image.LANCZOS)
          #Converting back to numpy array for MediaPipe
          image_np = np.array(image)

          return image_np

    def _get_location_trajectory(self, annotation):
        """
        Extracting the 2D location trajectory from the annotation data.

        Parameters:
        - annotation (dict): Annotation data containing bounding boxes.

        Returning:
        - bbox : bounding box coordinates representing
          the pedestrian's trajectory.
        """
        bbox = annotation['bbox']

        return bbox


    def _get_pose_keypoints(self, frame, bbox):
            """
            Extracting the (x, y) keypoints of the pedestrian's pose from the given image using MediaPipe Pose.
            If no pose is detected, returning a tensor of 36 zeros.

            Parameters:
            - frame (numpy array): Image frame containing the pedestrian.
            - bbox (list): Bounding box with coordinates [xtl, ytl, xbr, ybr] around the pedestrian.

            Returning:
            - Tensor: A tensor of shape (36,) containing the (x, y) coordinates for 18 keypoints or zeros.
            """

            # Initializing the pose detection model
            mp_pose = mp.solutions.pose

            # Starting with min_detection_confidence of 0.50 and reducing by 0.05 until 0.25
            confidence = 0.50
            while confidence >= 0.25:
                with mp_pose.Pose(static_image_mode=True, min_detection_confidence=confidence) as pose:
                    # Processing the image to detect pose landmarks
                    image = self._get_local_context(frame, bbox)
                    results = pose.process(image)

                    # Specifying the indices of 18 specific landmarks to extract
                    selected_landmarks = [
                        2,  # Left Eye
                        5,  # Right Eye
                        7,  # Left Ear
                        8,  # Right Ear
                        11, # Left Shoulder
                        12, # Right Shoulder
                        13, # Left Elbow
                        14, # Right Elbow
                        15, # Left Wrist
                        16, # Right Wrist
                        23, # Left Hip
                        24, # Right Hip
                        25, # Left Knee
                        26, # Right Knee
                        27, # Left Ankle
                        28, # Right Ankle
                        33, # Left Heel
                        34  # Right Heel
                    ]

                    # Extracting the (x, y) coordinates for the selected landmarks
                    landmarks_xy = []
                    if results.pose_landmarks is not None:  # Checking if landmarks were detected
                        for idx in selected_landmarks:
                            if idx < len(results.pose_landmarks.landmark):  # Checking if index is within bounds
                                landmark = results.pose_landmarks.landmark[idx]
                                x = landmark.x * image.shape[1]
                                y = landmark.y * image.shape[0]
                            else:
                                x = 0
                                y = 0
                            if x<0 or y<0 or x>224 or y>224:
                                x=0
                                y=0
                            landmarks_xy.extend([x, y])
                    else:
                        # If no landmarks were detected, fill with zeros
                        landmarks_xy = [0] * 36

                    # Converting the list of coordinates into a PyTorch tensor
                    pose_keypoints = torch.tensor(landmarks_xy, dtype=torch.float32)

                    # Check if pose_keypoints is not a zero vector
                    if torch.sum(pose_keypoints) > 0:
                        return pose_keypoints

                # Reduce confidence by 0.05
                confidence -= 0.05

            # If no valid pose_keypoints were found, return the zero vector
            return torch.tensor([0] * 36, dtype=torch.float32)


**▶INPUT ACQUISITION**

⚓Acquisition of the input and saving in the drive.

In [None]:
import os
import torch
from torch.utils.data import DataLoader
from google.colab import drive
import numpy as np

# Mounting Google Drive
drive.mount('/content/drive')

# Path in Google Drive where tensors will be saved
input_tensors = '/content/drive/My Drive/pedestrian_input_tensors/'

# Ensuring the directory exists
os.makedirs(input_tensors, exist_ok=True)

# Defining extraction directory
annotations_dir = '/content/annotations'

# Assuming JAADDataset is a custom dataset class
# Creating the custom Dataset
JAAD_dataset = JAADDataset(dataset_dir, annotations_dir)

# Creating the DataLoader
batch_size = 2
data_loader = DataLoader(JAAD_dataset, batch_size=batch_size, shuffle=False)

def extract_and_save(data_loader, input_tensors_dir):
    # Creating the save directory if it doesn't exist
    os.makedirs(input_tensors_dir, exist_ok=True)
    sample_count = 0

    # Iterating over each batch from the DataLoader
    for batch_idx, batch in enumerate(data_loader):
        video_ids = batch['video_id']
        frame_ids = batch['frame_id']
        location_trajectories = batch['location_trajectory']
        pose_keypoints = batch['pose_keypoints']
        local_context = batch['local_context']
        pedestrians = batch['pedestrian']
        cross = batch['cross']

        # Iterating over each element in the batch
        for i in range(len(video_ids)):
            # Creating a new sample
            sample = {
                'video_id': video_ids[i],
                'frame_id': frame_ids[i],
                'location_trajectory': location_trajectories[i],
                'pose_keypoints': pose_keypoints[i],
                'local_context': local_context[i],
                'pedestrian': pedestrians[i],
                'cross': cross[i]
            }

            # Saving the sample
            torch.save(sample, os.path.join(input_tensors_dir, f'sample_{sample_count}.pt'))
            sample_count += 1

    print(f"Saved {sample_count} samples to {input_tensors_dir}")

# Extract and save
extract_and_save(data_loader, input_tensors)
