In [None]:
from collections import defaultdict

import cv2
import os
import subprocess
import time
import numpy as np
import pandas as pd

from ultralytics import YOLO

from IPython.display import clear_output

# Pedestrian trajectory tracking and prediction

Videos obtained from: https://jelenia-gora.webcamera.pl/

In [None]:
def download_video(record_duration=10):
    stream_url = "https://hoktastream4.webcamera.pl/jeleniagora_cam_73a10c/jeleniagora_cam_73a10c.stream/playlist.m3u8"

    output_file = f"data/{time.strftime('%Y%m%d-%H%M%S')}_{record_duration}s.mp4"

    ffmpeg_command = [
        "ffmpeg",
        "-i",
        stream_url,
        "-t",
        str(record_duration),
        "-filter:v",
        "crop=700:600:350:400",
        output_file,
    ]

    try:
        subprocess.run(ffmpeg_command, timeout=record_duration + 10, check=True)
    except Exception as e:
        os.remove(output_file)
        print(e)


    print(f"Recording completed. The video is saved as {output_file}.")

## Tracking

In [None]:
def track(model, cap):
    # Store the track history
    track_history = defaultdict(lambda: [])

    fps = cap.get(cv2.CAP_PROP_FPS)

    # Store the detected frames indices and speeds
    frames_list = defaultdict(lambda: [])
    speeds = defaultdict(lambda: [])

    # Loop through the video frames
    while cap.isOpened():
        # Read a frame from the video
        success, frame = cap.read()

        if success:
            # Run YOLOv8 tracking on the frame, persisting tracks between frames
            results = model.track(frame, persist=True, classes=[0])

            # Get the boxes and track IDs
            boxes = results[0].boxes.xywh.cpu()
            track_ids = results[0].boxes.id.int().cpu().tolist()

            # Visualize the results on the frame
            annotated_frame = results[0].plot(conf=False)

            # Plot the tracks
            for box, track_id in zip(boxes, track_ids):
                x, y, w, h = box
                track = track_history[track_id]
                track.append((float(x), float(y)))  # x, y center point
                if len(track) > 120:  # retain 90 tracks for 90 frames
                    track.pop(0)

                frames = frames_list[track_id] # indices of frames in which the object was detected
                frames.append(cap.get(cv2.CAP_PROP_POS_FRAMES))
                if len(frames) > 2:
                    frames.pop(0)

                # Calculate the speed
                if len(track) > 1:
                    speed = np.linalg.norm(np.array(track[-1]) - np.array(track[-2])) * fps / (frames[-1] - frames[-2])
                    speeds[track_id] = speed

                    # Draw the speed in box title
                    cv2.putText(annotated_frame, f"{speed:.2f} px/s", (int(x), int(y) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                                (255, 255, 255), 1)

                # Draw the tracking lines
                points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
                cv2.polylines(annotated_frame, [points], isClosed=False, color=(230, 230, 230), thickness=10)

            yield (annotated_frame, boxes, track_ids, speeds)

            # Break the loop if 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break
        else:
            # Break the loop if the end of the video is reached
            break

    # Release the video capture object and close the display window
    cap.release()
    cv2.destroyAllWindows()

## Tracking visualisation

In [None]:
def create_sequence_data(sequence_frames, sequence_ids, sequence_x, sequence_y):
    data = pd.DataFrame({'Frame': sequence_frames, 'PersonID': sequence_ids, 'X': sequence_x, 'Y': sequence_y})
    data.to_csv('sequence.csv', index=False)

In [None]:
model = YOLO("yolov8n.pt")

video_path = "data/videos/20231108-190653_10s.mp4"
cap = cv2.VideoCapture(video_path)

height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
fps = cap.get(cv2.CAP_PROP_FPS)
VIDEO_CODEC = "MP4V"

output_video_name = "result.mp4"
tmp_output_path = "tmp_" + output_video_name

output_video = cv2.VideoWriter(
    tmp_output_path, cv2.VideoWriter_fourcc(*VIDEO_CODEC), fps, (width, height)
)

sequence_frames = []
sequence_ids = []
sequence_x = []
sequence_y = []
frame_number = 0

for frame_number, (frame, boxes, track_ids, speeds) in enumerate(track(model, cap)):
    clear_output(wait=True)
    cv2.imshow("YOLOv8 Tracking", frame)
    # sleep(1)

    for box, track_id in zip(boxes, track_ids):
        x, y, w, h = box
        x, y = int(x), int(y)

        # add data for only one person
        if track_id == 4:
            sequence_frames.append(frame_number)
            sequence_ids.append(track_id)
            sequence_x.append(x)
            sequence_y.append(y)
        print(
            f"Person {track_id} is at ({x}, {y}) "
            + (
                f"commuting at {speeds[track_id]:.2f} px/s"
                if track_id in speeds
                else ""
            )
        )

    frame_number += 1
    output_video.write(frame)

create_sequence_data(sequence_frames, sequence_ids, sequence_x, sequence_y)
output_video.release()

if os.path.exists(output_video_name):
    os.remove(output_video_name)

subprocess.run(
    [
        "ffmpeg",
        "-i",
        tmp_output_path,
        "-crf",
        "18",
        "-preset",
        "veryfast",
        "-hide_banner",
        "-loglevel",
        "error",
        "-vcodec",
        "libx264",
        output_video_name,
    ]
)
os.remove(tmp_output_path)

## Trajectory prediction

In [None]:
df = pd.read_csv('sequence.csv')
xy_values = df[['X', 'Y']].to_numpy()

def split_sequence(sequence, n_steps):
    X = []
    y = []

    for i in range(len(sequence)):
        last_index = i + n_steps

        if last_index > len(sequence) - 1:
            break

        seq_x, seq_y = sequence[i:last_index], sequence[last_index]
        X.append(seq_x)
        y.append(seq_y)

    X = np.array(X)
    y = np.array(y)

    return X, y

n_steps = 19  

train_size = int(0.8 * len(xy_values))

X_train, y_train = split_sequence(xy_values[:train_size], n_steps)
X_test, y_test = split_sequence(xy_values[train_size:], n_steps)

n_features = 2  

X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], n_features))
X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], n_features))


In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

model = tf.keras.Sequential()
model.add(layers.LSTM(50, activation='relu', input_shape=(n_steps, n_features)))
model.add(layers.Dense(2)) 

model.compile(optimizer=tf.keras.optimizers.Adam(0.01), loss=tf.keras.losses.MeanSquaredError())

model.fit(X_train, y_train, epochs=300, verbose=1)

predicted_values = model.predict(X_test, verbose=1)

for i in range(len(y_test)):
    print(f"Rzeczywiste (X, Y): ({y_test[i][0]}, {y_test[i][1]}), Prognozowane (X, Y): ({predicted_values[i][0]}, {predicted_values[i][1]})")

