In [None]:
!git clone https://github.com/AsangCode/Pedestrian_Behaviour_Prediction

In [None]:
!pip install tensorflow==2.15.1

In [None]:
# Run this to initialize the required components [YOLO, SORT and DenseNet]
try:
  %tensorflow_version 2.x
except Exception:
  pass

!pip install filterpy
%cd Pedestrian_Behaviour_Prediction

import sys
from absl import app, logging, flags
from absl.flags import FLAGS
import time
import cv2
import numpy as np
import tensorflow as tf
from yolov3_tf2.models import (
    YoloV3, YoloV3Tiny
)
from yolov3_tf2.dataset import transform_images, load_tfrecord_dataset
from yolov3_tf2.utils import draw_outputs

from sortn import *

flags.DEFINE_string('classes', '/content/Pedestrian_Behaviour_Prediction/data/coco.names', 'path to classes file')
flags.DEFINE_string('weights', '/content/Pedestrian_Behaviour_Prediction/yolov3_train_3.weights.h5','path to weights file')
flags.DEFINE_boolean('tiny', False, 'yolov3 or yolov3-tiny')
flags.DEFINE_integer('size', 416, 'resize images to')
flags.DEFINE_string('tfrecord', None, 'tfrecord instead of image')
flags.DEFINE_integer('num_classes', 1, 'number of classes in the model')
flags.DEFINE_string('video', '/content/Pedestrian_Behaviour_Prediction/data/JAAD_test_video_0339.mp4','path to video file or number for webcam)')
flags.DEFINE_string('output','Result.mp4', 'path to output video')
flags.DEFINE_string('output_format', 'mp4v', 'codec used in VideoWriter when saving video to file')

app._run_init(['yolov3'], app.parse_flags_with_usage)

#Reading the model from JSON file
with open('densenet_model.json', 'r') as json_file:
    json_savedModel= json_file.read()

model_j = tf.keras.models.model_from_json(json_savedModel)
model_j.load_weights('densenet_1.hdf5')

def pred_func(X_test):
  predictions = model_j.predict(X_test[0:1], verbose=0)
  Y = np.argmax(predictions[0], axis=0)

  return Y

In [None]:
import xml.etree.ElementTree as ET

def parse_annotations(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()

    gt_data = {}

    for box in root.findall('.//box'):
        frame = int(box.attrib['frame'])
        xtl, ytl, xbr, ybr = float(box.attrib['xtl']), float(box.attrib['ytl']), float(box.attrib['xbr']), float(box.attrib['ybr'])

        cross_element = box.find('.//attribute[@name="cross"]')  # Find cross attribute

        if cross_element is not None and cross_element.text is not None:
            cross = cross_element.text.lower()
            cross_label = 1 if cross == "crossing" else 0  # Convert label to binary
        else:
            cross_label = 0  # Default to non-crossing if missing

        bbox = [xtl, ytl, xbr, ybr]  # Bounding box coordinates

        if frame not in gt_data:
            gt_data[frame] = []

        gt_data[frame].append({"bbox": bbox, "cross": cross_label})

    return gt_data

# Example usage
gt_annotations = parse_annotations("/content/Pedestrian_Behaviour_Prediction/data/video_0339.xml")
print(gt_annotations.get(127, "No annotations for this frame"))


In [None]:
import pickle

# Save
with open("gt_annotations.pkl", "wb") as f:
    pickle.dump(gt_annotations, f)

# Load later
with open("gt_annotations.pkl", "rb") as f:
    gt_annotations = pickle.load(f)


In [None]:
import time

# Run this
FLAGS.yolo_iou_threshold = 0.5
FLAGS.yolo_score_threshold = 0.5

color = (255, 0, 0)
thickness = 2

yolo = YoloV3(classes=FLAGS.num_classes)

yolo.load_weights(FLAGS.weights)
logging.info('weights loaded')

class_names = [c.strip() for c in open(FLAGS.classes).readlines()]
logging.info('classes loaded')



def run_model():

  start_time = time.time()  # Start time of execution

  print('Processing started.......')
  frame = 0

  try:
      vid = cv2.VideoCapture(int(FLAGS.video))
  except:
      vid = cv2.VideoCapture(FLAGS.video)

  out = None

  if FLAGS.output:
      # by default VideoCapture returns float instead of int
      width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
      height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
      fps = int(vid.get(cv2.CAP_PROP_FPS))
      codec = cv2.VideoWriter_fourcc(*FLAGS.output_format)
      out = cv2.VideoWriter(FLAGS.output, codec, fps, (width, height))

  #create instance of SORT
  mot_tracker = Sort()
  rolling_data={}
  predictions = {}

  while True:
    _, img = vid.read()

    if img is None:
        break

    frame +=1


    preprocess_start = time.time()

    img_in = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img_orig = np.copy(img)
    img_in = tf.expand_dims(img_in, 0)
    img_in = transform_images(img_in, FLAGS.size)

    preprocess_end = time.time()

    yolo_start = time.time()
    boxes, scores, classes, nums = yolo.predict(img_in) # yolo prediction
    yolo_end = time.time()

    tracking_start = time.time()
    dets = boxes[:,:nums[0],:].reshape(nums[0], 4)  # filter pedestrians
    trackers = mot_tracker.update(dets[classes[0][:nums[0]] == 0]) # track the pedestrians
    tracking_end = time.time()

    postprocess_start = time.time()
    for d in trackers:

      wh = np.flip(img.shape[0:2])
      x1y1 = tuple((np.array(d[0:2]) * wh).astype(np.int32))
      x2y2 = tuple((np.array(d[2:4]) * wh).astype(np.int32))

      y = 0

      if int(d[4]) in list(rolling_data.keys()):

        if len(rolling_data[int(d[4])]) == 16:

          seq = np.stack(np.array(rolling_data[int(d[4])]),axis=2) # (100*100*16*3)
          seq = np.expand_dims(seq, axis=0)
          y = pred_func(seq) # classification output

        else:

          seq = np.stack(np.array([rolling_data[int(d[4])][-1]] * 16),axis=2)
          seq = np.expand_dims(seq, axis=0)
          y = pred_func(seq) # classification output

      # risky pedestrian identification thru box color

      if frame not in predictions:
        predictions[frame] = []  # Initialize list if frame key is missing

      predictions[frame].append({"bbox": [x1y1[0], x1y1[1], x2y2[0], x2y2[1]], "cross_pred": int(y)})


      if y == 1:
        color = (0, 0, 255)

      else:
        color = (0, 255, 0)

      image = cv2.rectangle(img, x1y1, x2y2, color, thickness)
      image = cv2.putText(image, str(int(d[4])), org = (x1y1[0],x1y1[1]-5) , fontFace = cv2.FONT_HERSHEY_SIMPLEX, fontScale=1, color=color, thickness=thickness)
      image = cv2.putText(image, "Frame No: {}".format(frame), (0, 30),cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (255, 0, 0), 2)

      # storing the data for last 16 frames
      try:

        if int(d[4]) in list(rolling_data.keys()): # ID exists in dict

          if len(rolling_data[int(d[4])]) < 16: # bboxes values for 16 frames

            cropped_seq = []
            cropped_img = cv2.resize(img_orig[x1y1[1]:x2y2[1], x1y1[0]:x2y2[0]],(100,100))
            rolling_data[int(d[4])].append(np.asarray(cropped_img)) # append the image

          else:

            del rolling_data[int(d[4])][0] # delete oldest frame bbox and append latest frame bbox
            cropped_seq = []
            cropped_img = cv2.resize(img_orig[x1y1[1]:x2y2[1], x1y1[0]:x2y2[0]],(100,100))
            rolling_data[int(d[4])].append(np.asarray(cropped_img))

        else:

          cropped_seq = []
          cropped_img = cv2.resize(img_orig[x1y1[1]:x2y2[1], x1y1[0]:x2y2[0]],(100,100))
          rolling_data[int(d[4])] = [np.asarray(cropped_img)]
      except:
        pass
    postprocess_end = time.time()

    if FLAGS.output:
      out.write(img)
    #cv2.imshow('output', img)

    print(f"Frame {frame}:")
    print(f"  Preprocessing Time: {preprocess_end - preprocess_start:.4f} sec")
    print(f"  YOLO Prediction Time: {yolo_end - yolo_start:.4f} sec")
    print(f"  Tracking Time: {tracking_end - tracking_start:.4f} sec")
    print(f"  Post-processing Time: {postprocess_end - postprocess_start:.4f} sec")

    if cv2.waitKey(1) == ord('q'):
      break

  # **Save predictions after loop**
  import pickle
  with open("predictions.pkl", "wb") as f:
      pickle.dump(predictions, f)

  print("Predictions saved successfully!")

  cv2.destroyAllWindows()
  print('\nProcessing completed.......!!!')
  print('Check video file in Pedestrian_Behaviour_Prediction folder!')

  return


In [None]:
import numpy as np

def compute_iou(box1, box2):
    """Compute IoU between two bounding boxes."""
    x1, y1, x2, y2 = box1
    x1g, y1g, x2g, y2g = box2

    xi1, yi1 = max(x1, x1g), max(y1, y1g)
    xi2, yi2 = min(x2, x2g), min(y2, y2g)

    inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)

    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x2g - x1g) * (y2g - y1g)

    union_area = box1_area + box2_area - inter_area

    return inter_area / union_area if union_area else 0


In [None]:
def match_predictions(gt_data, pred_data, iou_threshold=0.5):
    matched = []

    for frame in pred_data:
        if frame in gt_data:
            for pred in pred_data[frame]:
                best_iou = 0
                best_match = None

                for gt in gt_data[frame]:
                    iou = compute_iou(pred["bbox"], gt["bbox"])
                    if iou > best_iou:
                        best_iou = iou
                        best_match = gt

                if best_iou >= iou_threshold and best_match:
                    matched.append((best_match["cross"], pred["cross_pred"]))  # (GT label, Prediction)

    return matched


In [None]:
import pickle

# Load ground truth annotations
with open("gt_annotations.pkl", "rb") as f:
    gt_annotations = pickle.load(f)

# Load model predictions
with open("predictions.pkl", "rb") as f:
    predictions = pickle.load(f)


In [None]:
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Match ground truth and predictions
matched_labels = match_predictions(gt_annotations, predictions)

# Extract true labels (y_true) and predicted labels (y_pred)
y_true = [gt for gt, pred in matched_labels]
y_pred = [pred for gt, pred in matched_labels]

# Compute performance metrics
metrics = {
    "Accuracy": accuracy_score(y_true, y_pred),
    "Precision": precision_score(y_true, y_pred),
    "Recall": recall_score(y_true, y_pred),
    "F1-Score": f1_score(y_true, y_pred)
}

# Convert to DataFrame for table format
metrics_df = pd.DataFrame(metrics.items(), columns=["Metric", "Value"])

# Print formatted table
print(metrics_df)
