<a href="https://colab.research.google.com/github/WenjunYuAnny/Artificial_Intelligence_Assignment/blob/main/Assignment_3_Part_2/Assignment3_Part2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
!pip install filterpy

Collecting filterpy
  Downloading filterpy-1.4.5.zip (177 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/178.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━[0m [32m163.8/178.0 kB[0m [31m5.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m178.0/178.0 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: filterpy
  Building wheel for filterpy (setup.py) ... [?25l[?25hdone
  Created wheel for filterpy: filename=filterpy-1.4.5-py3-none-any.whl size=110458 sha256=c862f68e419cd32069085039b74b33d9443971bf1b1796e69b8e21a373acbfbe
  Stored in directory: /root/.cache/pip/wheels/0f/0c/ea/218f266af4ad626897562199fbbcba521b8497303200186102
Successfully built filterpy
Installing collected packages: filterpy
Successfully installed filterpy-1.4.5


In [None]:
import cv2
import pandas as pd
from filterpy.kalman import KalmanFilter
import numpy as np
import os

## Retrive meaturement from the csv file from Task 1
The threshold is set to 0.35 (to have more measurements included)

In [None]:
#retrieve center measurement from the csv
def detect_object(bbox):
    if not bbox.empty:
        detected_center = np.array([
            (bbox['x1'].iloc[0] + bbox['x2'].iloc[0])/2,
            (bbox['y1'].iloc[0] + bbox['y2'].iloc[0])/2,
        ])
        drone_detected = True
    else:
        detected_center = np.array([])
        drone_detected = False
    return detected_center, drone_detected

##Initialize the Karman Filter

In [None]:
def init_kalman_filter(video_fps, init_location, init_est_error, process_noise, measurement_noise):
    dt = 1.0 / video_fps
    kf = KalmanFilter(dim_x=4, dim_z=2) #state: x, y, vx, vy; measurement: x, y
    #state transition matrix
    kf.F = np.array([
        [1,0,dt,0],
        [0,1,0,dt],
        [0,0,1,0],
        [0,0,0,1]
    ])
    #measurement matrix
    kf.H = np.array([
        [1,0,0,0],
        [0,1,0,0]
    ])
    #initial state
    kf.x = np.array([init_location[0], init_location[1], 0, 0])
    kf.P *= init_est_error
    kf.Q = np.eye(4) * process_noise
    kf.R = np.eye(2) * measurement_noise
    return kf

##Processing single video to output trajectories

In [None]:
def tracking_drone_in_video(video_path, csv_path, output_dir):
    #parameter initialization
    init_est_error = 1
    process_noise = 0.01
    measurement_noise = 2

    #read the video and the csv file of bounding boxes
    video_reader = cv2.VideoCapture(video_path)
    bboxes = pd.read_csv(csv_path)
    print(os.path.basename(video_path) + " is being processed")

    is_track_initialized = False
    max_frames_without_detection = 60 #tolerate 2s of no detection before reinitialization
    frames_without_detection = 0
    clip_counter = 0

    video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
    video_fps = video_reader.get(cv2.CAP_PROP_FPS)

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    video_writer = None

    while video_reader.isOpened():
        ret, frame = video_reader.read()
        if not ret:
            break

        frame_number = int(video_reader.get(cv2.CAP_PROP_POS_FRAMES))
        bbox = bboxes[bboxes['frame'] == frame_number]

        detected_center, drone_detected = detect_object(bbox)
        if not bbox.empty:#drones detected
            frames_without_detection = 0
            if not is_track_initialized:
                kf = init_kalman_filter(video_fps, detected_center, init_est_error, process_noise, measurement_noise)
                is_track_initialized = True
                kf.update(detected_center)
                tracked_center = kf.x
                trajectory = [tracked_center] #start a new trajectory when the tracking is initialized

            else:
                kf.predict()
                kf.update(detected_center)
                tracked_center = kf.x
                trajectory.append(tracked_center)

        else:#no detection
            frames_without_detection += 1
            if frames_without_detection > max_frames_without_detection:
                is_track_initialized = False
            else:
                if is_track_initialized:
                    kf.predict() #only predict, no update
                    tracked_center = kf.x
                    trajectory.append(tracked_center)

        label = 'Drone detected' if drone_detected else 'Drone NOT detected'
        if drone_detected:
            #plot the bounding box from the detection
            frame = cv2.rectangle(
                frame,
                (int(bbox['x1'].iloc[0]), int(bbox['y1'].iloc[0])),
                (int(bbox['x2'].iloc[0]), int(bbox['y2'].iloc[0])),
                color=(0,255,0),
                thickness=2
            )

        if video_writer is None and is_track_initialized: #when tracking is initialized, a video clip starts
            clip_counter += 1
            video_filename = os.path.splitext(os.path.basename(video_path))[0] + f"_tracked_{clip_counter}.mp4"
            output_path = os.path.join(output_dir, video_filename)
            video_writer = cv2.VideoWriter(output_path, fourcc, video_fps, (video_width, video_height))

        if not is_track_initialized and video_writer is not None: #when the drone cannot be detected for too long, stop the video clip
            video_writer.release()
            video_writer = None

        if is_track_initialized:#plot all the centers estimated in the trajectory
            for location in trajectory:
                center = (int(location[0]), int(location[1]))
                cv2.circle(frame, center, radius=3, color=(0,255,255), thickness=1)

        if is_track_initialized and len(trajectory) > 1:#plot the lines in the trajectory
            for i in range(1, len(trajectory)):
                cv2.line(frame, (int(trajectory[i-1][0]), int(trajectory[i-1][1])),
                        (int(trajectory[i][0]), int(trajectory[i][1])), (0,0,255), thickness=1)

        frame = cv2.putText(frame, label, (10,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
        if video_writer is not None:
            video_writer.write(frame)

    if video_writer is not None:
        video_writer.release()
    video_reader.release()
    print(os.path.basename(video_path) + "tracking process is done")

In [None]:
video_dir = "/content/gdrive/MyDrive/2023 Fall Artificial Intelligence/Drone_detection/video"
csv_dir = "/content/gdrive/MyDrive/2023 Fall Artificial Intelligence/Drone_detection/bbox"
output_dir = "/content/gdrive/MyDrive/2023 Fall Artificial Intelligence/Drone_detection/video_tracking"

#Processing video directory

In [None]:
for filename in os.listdir(video_dir):
  if filename.endswith('.mp4'):
    video_path = os.path.join(video_dir, filename)
    csv_path = os.path.join(csv_dir, os.path.splitext(filename)[0] + ".csv")
    tracking_drone_in_video(video_path, csv_path, output_dir)

Drone Tracking 1.mp4 is being processed
Drone Tracking 1.mp4tracking process is done
Drone tracking 2.mp4 is being processed
Drone tracking 2.mp4tracking process is done
