In [None]:
# This scripts standaridizes the data layout of UBNormal

In [2]:
import os
import subprocess
import pandas as pd
import json
import random
import shutil
import numpy as np
import pathlib
import math

In [3]:
def run_command(
   command: str,
   cwd: str,
   return_output: bool = False,
   strip_response: bool = True,
) -> list:
   """Prints output of subprocess command in real-time.


   Args:
       command (string): The command to run
       return_output (bool): Return output of command as an array or not.
       strip_response (bool): Whether or not to strip whitespaces from response.


   Returns:
       list|int|None: The output of the command
   """
   output = []
   with subprocess.Popen(
       command,
       cwd=cwd,
       stdout=subprocess.PIPE,
       shell=True,
   ) as process:  # nosec
       while True:
           response = process.stdout.readline()
           if response == b"" and process.poll() is not None:
               break
           if response:
               cleaned_output = response.strip() if strip_response else response
               if return_output:
                   output.append(cleaned_output.decode("UTF-8"))
       command_response = process.poll()
   return output if return_output else command_response

In [None]:
# Copy videos
video_source_dir = "/mnt/c/Users/ruben/Downloads/UBnormal_data"
video_dest_dir = './data/UBnormal/videos'
pathlib.Path(video_dest_dir).mkdir(parents=True, exist_ok=True)
videos = []
for root, dirs, files in os.walk(video_source_dir):
    for file in files:
        file_path = os.path.join(root,file)
        if file.endswith('.mp4'):
            videos.append(file_path)
for file in videos:
    file_name = file.split('/')[-1]
    dest_name = os.path.join(video_dest_dir, file_name)
    # shutil.copyfile(file, dest_name)

In [None]:
# Prepare ground truth annotations

In [5]:
annotations_folder = './raw_data/UBnormal/annotations_stg'
pose_dir =  f'./raw_data/UBnormal/poses_stg'

In [6]:
# Frame level annotatations:
annotations_dir = f'{annotations_folder}/frame_level'
pathlib.Path(annotations_dir).mkdir(parents=True, exist_ok=True)
for dataset_type in ['training', 'validation', 'test']:
    annotations = []
    source_dir = f"/mnt/d/VUB/CurrentTrendsOfAI/PoseGraphAnomalyDetection/UBnormal/data/frame_gt/{dataset_type}"
    for root, dirs, files in os.walk(source_dir):
        for file in files:
            file_path = os.path.join(root,file)
            if file.endswith('.txt'):
                annotations.append(file_path)
    for annotation_file in annotations:
        video_name = annotation_file.split('/')[-2]
        with open(annotation_file) as f:
            annotation_data = [int(float(line.strip())) for line in f.readlines()]
        np.save(file=f'{annotations_dir}/{video_name}.npy', arr=np.array(annotation_data), allow_pickle=True)

In [24]:
# Pose level annotatations
def get_pose_data(pose_data: str) -> dict:
    """Load in the pose data into a dict.
    This dict has as key the frame in which the poses where detected
    and as values as list of each detected obj_id + the bounding box
    """
    # Convert pose data so that you can fetch the poses found in a frame
    parsed_pose_data = {}
    for obj_id, frame_data in pose_data.items():
        for frame_id, kp_data in frame_data.items():
            max_x = 0
            min_x = 10000
            max_y = 0
            min_y = 10000
            
            # Format of keypoints is x,y,confidence * 17
            for i in range(0, len(kp_data['keypoints']), 3):
                co_x = kp_data['keypoints'][i]
                co_y = kp_data['keypoints'][i+1]
                # We don't need the confidence, 
                if co_x > max_x:
                    max_x = co_x
                elif co_x < min_x:
                    min_x = co_x
                if co_y > max_y:
                    max_y = co_y
                elif co_y < min_y:
                    min_y = co_y
            if not parsed_pose_data.get(frame_id):
                parsed_pose_data[frame_id] = []
            nose_coordinates = (kp_data['keypoints'][0], kp_data['keypoints'][1])
            parsed_pose_data[frame_id].append(
                (obj_id, [min_x, min_y, max_x, max_y], nose_coordinates)
            )
    return parsed_pose_data

# Load pose files in a dict for easy retrieval
poses = {}
for root, dirs, files in os.walk(pose_dir):
    for file in files:
        file_path = os.path.join(root,file)
        if file.endswith('_tracked_person.json'):
            video_name = '_'.join(file.split('_')[:-3])
            poses[video_name] = file_path

frame_level_annotations_dir = f'{annotations_folder}/frame_level'
annotations_dir = f'{annotations_folder}/pose_level'
pathlib.Path(annotations_dir).mkdir(parents=True, exist_ok=True)

annotations = []

bounding_boxes = {}

for dataset_type in ['training', 'validation', 'test']:
    source_dir = f"/mnt/d/VUB/CurrentTrendsOfAI/PoseGraphAnomalyDetection/UBnormal/data/frame_gt/{dataset_type}"
    for root, dirs, files in os.walk(source_dir):
        for file in files:
            file_path = os.path.join(root,file)
            if file.endswith('.txt'):
                video_name = file.split('.')[0]
                annotations.append(file_path)
    bounding_box_dir = f"/mnt/d/VUB/CurrentTrendsOfAI/PoseGraphAnomalyDetection/UBnormal/data/bounding_boxes/{dataset_type}"
    for root, dirs, files in os.walk(bounding_box_dir):
        for file in files:
            file_path = os.path.join(root,file)
            if file.endswith('.txt'):
                video_name = file.split('.')[0]
                bounding_boxes[video_name] = file_path

for annotation_file in annotations:
    video_name = annotation_file.split('/')[-2]
    try:
        bounding_box_file = bounding_boxes[video_name]
        with open(bounding_box_file) as f:            # Format: [new_track_id, frame_idx, bbox.x_min, bbox.y_min, bbox.x_max, bbox.y_max
            bounding_box_data = [[int(float(i)) for i in line.strip().split(',')]
                                 for line in f.readlines()]
    except KeyError as e:
        # Normal videos don't have anomalies and hence no bounding boxes
        if not video_name.startswith('normal'):
            raise e
        bounding_box_data = []
    # Map annotation objects to tracked poses
    with open(poses[video_name]) as f:
        tracking_data = json.load(f)
    pose_data = get_pose_data(tracking_data)
    pose_gt_dict = {}
    for bb_record in bounding_box_data:
        new_track_id, frame_idx, annon_bb_x_min, annon_bb_y_min, annon_bb_x_max, annon_bb_y_max = bb_record
        padded_frame_id = str(frame_idx).rjust(4, '0')
        found_obj_id = None
        min_distance = 100
        # It's possible that AlphaPose does not detect the pose in a specific frame
        for obj_id, obj_bb, nose_coordinates in pose_data.get(padded_frame_id, []):
            obj_bb_min_x, obj_bb_min_y, obj_bb_max_x, obj_bb_max_y = obj_bb
            if obj_bb_min_x < nose_coordinates[0] < obj_bb_max_x and obj_bb_min_y < nose_coordinates[1] < obj_bb_max_y:
                metric = (abs(annon_bb_x_min - obj_bb_min_x)
                          + abs(annon_bb_y_min - obj_bb_min_y)
                          + abs(annon_bb_x_max - obj_bb_max_x)
                          + abs(annon_bb_y_max - obj_bb_max_y))
                if metric < min_distance:
                    min_distance = metric
                    found_obj_id = obj_id
        if found_obj_id:
            if found_obj_id not in pose_gt_dict:
                pose_gt_dict[found_obj_id] = []
            pose_gt_dict[found_obj_id].append(frame_idx)
    video_length = len(np.load(f'{frame_level_annotations_dir}/{video_name}.npy', allow_pickle=True))
    for track_id in tracking_data:
        gt_array = [0] * video_length
        for frame in pose_gt_dict.get(track_id, []):
            gt_array[frame] = 1
        pathlib.Path(f'{annotations_dir}/{video_name}').mkdir(parents=True, exist_ok=True)
        np.save(file=f'{annotations_dir}/{video_name}/{track_id}.npy', arr=np.array(gt_array), allow_pickle=True)