In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
!pip3 install --no-build-isolation --no-cache https://github.com/openpifpaf/openpifpaf/archive/main.zip

In [None]:
!pip3 install -r /content/drive/MyDrive/TCC/Tracker/requirements.txt

In [None]:
!pip3 install fastapi nest-asyncio pyngrok uvicorn python-multipart

In [None]:
from tqdm import tqdm
from fastapi import FastAPI, File, UploadFile
from typing import List
from pyngrok import ngrok
from datetime import datetime

import os
import csv
import json
import copy
import shutil
import math
import uvicorn
import nest_asyncio
import numpy as np
import matplotlib.pyplot as plt

In [None]:
xh, yh = 1, 0
threshold = 0.7

neck = 0
left_shoulder, right_shoulder = 5, 6
left_hip, right_hip = 11, 12
joints = [0, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
limbs = [(6, 8), (8, 10), (5, 7), (7, 9), (12, 14), (14, 16), (11, 13), (13, 15)]

total_bins = 16

In [None]:
def euclidean_1d(p1, p2):
    distance = 0

    for i in range(len(p1)):
        distance = distance + (float(p1[i]) - float(p2[i])) ** 2

    return distance ** 0.5

In [None]:
def save_files(path, files):
    path_output = os.path.join(path, 'Capture')
    
    os.mkdir(path_output)
    
    for image in tqdm(files, desc='Progress'):
        with open(os.path.join(path_output, image.filename), 'wb') as buffer:
            shutil.copyfileobj(image.file, buffer)

In [None]:
def detect_poses(path):
    path_input = os.path.join(path, 'Capture')
    path_output = os.path.join(path, 'Estimation')
    
    os.mkdir(path_output)

    path_images = ''

    for image in sorted(os.listdir(path_input)):
        path_images = path_images + os.path.join(path_input, image) + ' '

    get_ipython().system('python3 -m openpifpaf.predict ' + path_images + '--json-output ' + path_output  + ' --checkpoint shufflenetv2k30 --debug')

In [None]:
def track_poses(path):
    path_input = os.path.join(path, 'Capture')
    path_output = os.path.join(path, 'BoundingBoxes')
    
    os.mkdir(path_output)

    get_ipython().system('cd /content/drive/MyDrive/TCC/Tracker && python3 ./track.py --source ' + path_input + ' --out_path ' + path_output + ' --save-txt --yolo-weights ./yolov5/weights/crowdhuman_yolov5m.pt --classes 0')

In [None]:
def prepare_keypoints(path):
    dictionary = {
        'frame': 0,
        'people': []
    }

    people = {
        'keypoints': []
    }

    path_input = os.path.join(path, 'Estimation')
    path_output = os.path.join(path, 'Keypoints')
    
    os.mkdir(path_output)

    for estimation in tqdm(sorted(os.listdir(path_input)), desc='Progress'):
        keypoints = os.path.join(path_input, estimation)
        frame = estimation.split('.')[0]
        
        people['keypoints'] = []
        dictionary['frame'] = int(frame)
        dictionary['people'] = []

        with open(keypoints, 'r') as f:
            poses = json.load(f)

            if len(poses) != 0:
                for pose in poses:
                    people['keypoints'] = pose['keypoints']
                    dictionary['people'].append(copy.deepcopy(people))
    
        keypoints = frame + '.json'
        
        with open(os.path.join(path_output, keypoints), 'w') as f:
            json.dump(dictionary, f)

In [None]:
def association(path_keypoints, path_boundingboxes):
    with open(os.path.join(path_boundingboxes, 'bbox.csv'), 'r') as f:
        reader = csv.reader(f)
        boundingboxes_list = list(reader)
        
    ids = []

    frames = []
    num_frames = len(os.listdir(path_keypoints))

    boundingboxes = [[] for i in range(num_frames)]

    for boundingbox in boundingboxes_list:
        frame = int(boundingbox[0])

        if frame >= num_frames:
            continue

        id = int(float(boundingbox[1]))
        ids.append(id)

        x1, y1, x2, y2 = float(boundingbox[2]), float(boundingbox[3]), float(boundingbox[4]), float(boundingbox[5])
        confbox = float(boundingbox[6])

        boundingboxes[frame].append([id, x1, y1, x2, y2, confbox])
        frames.append(frame)

    ids = set(ids)
    num_ids = len(ids)
    indexes = {}

    for i, id in enumerate(ids):
        indexes[id] = i

    frames = set(frames)
    
    subjects_keypoints = [[] for i in range(num_ids)]
    
    for frame in frames:
        frame_string = str(frame)
        keypoints_file = frame_string.zfill(8) + '.json'
        
        with open(os.path.join(path_keypoints, keypoints_file), 'r') as f:
            keypoints_list = json.load(f)['people']

        num_boundingboxes = len(boundingboxes[frame])
        boundingbox_used = [False for i in range(num_boundingboxes)]

        for i, keypoints in enumerate(keypoints_list):
            distances = []

            for j, boundingbox in enumerate(boundingboxes[frame]):
                if boundingbox_used[j] == True:
                    continue
                    
                id = int(boundingbox[0])
                x1, y1, x2, y2 = float(boundingbox[1]), float(boundingbox[2]), float(boundingbox[3]), float(boundingbox[4])
                x3, y3, x4, y4 = x1, y2, x2, y1
                confbox = float(boundingbox[5])

                key = keypoints['keypoints']

                for joint in joints:
                    x, y = float(key[3 * joint]), float(key[3 * joint + 1])
                    conf = float(key[3 * joint + 2])

                    if (x == 0. and y == 0.):
                        distances.append((float('inf'), -1, -1))
                    else:
                        d1 = euclidean_1d([x, y], [x1, y1])
                        d2 = euclidean_1d([x, y], [x2, y2])
                        d3 = euclidean_1d([x, y], [x3, y3])
                        d4 = euclidean_1d([x, y], [x4, y4])
                        distances.append((max(d1, d2, d3, d4), id, j))

            distances = sorted(distances)
            
            if len(distances) == 0 or distances[0][0] == float('inf'):
                continue

            id = distances[0][1]
            subjects_keypoints[indexes[id]].append(keypoints['keypoints'])
            
            boundingbox_used[distances[0][2]] = True
    
    return subjects_keypoints

In [None]:
def extract_features(keypoints_list):
    angles_list = []
    distances_list = []
    anthropometrics_list = []
    
    for limb in limbs:
        angles = []
        distances = []
        anthropometrics = []

        for keypoints in keypoints_list:
            x1, y1 = float(keypoints[3 * limb[0]]), float(keypoints[3 * limb[0] + 1])
            conf1 = float(keypoints[3 * limb[0] + 2])
            
            x2, y2 = float(keypoints[3 * limb[1]]), float(keypoints[3 * limb[1] + 1])
            conf2 = float(keypoints[3 * limb[1] + 2])
            
            xw, yw = x2 - x1, y2 - y1
            
            xneck, yneck = float(keypoints[3 * neck]), float(keypoints[3 * neck + 1])
            confneck = float(keypoints[3 * neck + 2])
            
            xleftshoulder, yleftshoulder = float(keypoints[3 * left_shoulder]), float(keypoints[3 * left_shoulder + 1])
            confleftshoulder = float(keypoints[3 * left_shoulder + 2])

            xrightshoulder, yrightshoulder = float(keypoints[3 * right_shoulder]), float(keypoints[3 * right_shoulder + 1])
            confrightshoulder = float(keypoints[3 * right_shoulder + 2])

            xlefthip, ylefthip = float(keypoints[3 * left_hip]), float(keypoints[3 * left_hip + 1])
            conflefthip = float(keypoints[3 * left_hip + 2])

            xrighthip, yrighthip = float(keypoints[3 * right_hip]), float(keypoints[3 * right_hip + 1])
            conflrighthip = float(keypoints[3 * right_hip + 2])
                
            torso = max(euclidean_1d([xleftshoulder, yleftshoulder], [xlefthip, ylefthip]), euclidean_1d([xrightshoulder, yrightshoulder], [xrighthip, yrighthip]))

            if (x1 == 0 and y1 == 0) or (x2 == 0 and y2 == 0) or (xw == 0 and yw == 0):
                rho = -1
            else:
                rho = math.acos((xw * xh + yw * yh) / (((xw * xw + yw * yw) ** 0.5) * ((xh * xh + yh * yh) ** 0.5)))

            if (x1 == 0 and y1 == 0) or (x2 == 0 and y2 == 0) or (xw == 0 and yw == 0) or (xneck == 0 and yneck == 0):
                norm = 0
            else:
                xv, yv = xneck - x1, yneck - y1
                scalar = (xw * xv + yw * yv) / (((xw * xw + yw * yw) ** 0.5) ** 2)
                projx, projy = xw * scalar, yw * scalar
                dx, dy = xv - projx, yv - projy
                norm = (dx * dx + dy * dy) ** 0.5

            if (x1 == 0 and y1 == 0) or conf1 < threshold or (x2 == 0 and y2 == 0) or conf2 < threshold or torso == 0:
                dist = 0
            else:
                dist = euclidean_1d([x1, y1], [x2, y2]) / torso

            angles.append(rho)
            distances.append(math.log2(norm + 1))
            anthropometrics.append(dist)

        angles_list.append(angles)
        distances_list.append(distances)
        anthropometrics_list.append(anthropometrics)

    return angles_list, distances_list, anthropometrics_list

In [None]:
def create_histograms(angles, distances, anthropometrics):
    max_distance = 141.87993213380608
    max_anthropometrics = 1.0035907765532568

    histogram_angles, bins, patches = plt.hist(angles, total_bins, range=(-1, math.pi))
    plt.close()
    histogram_distances, bins, patches = plt.hist(distances, total_bins, range=(0, math.log2(max_distance + 1)))
    plt.close()
    histogram_anthropometrics, bins, patches = plt.hist(anthropometrics, total_bins, range=(0, max_anthropometrics))
    plt.close()

    for arr in range(len(histogram_angles)):
        for val in range(len(histogram_angles[arr])):
            if len(angles[arr]) != 0:
                histogram_angles[arr][val] = histogram_angles[arr][val] / len(angles[arr])

    for arr in range(len(histogram_distances)):
        for val in range(len(histogram_distances[arr])):
            if len(distances[arr]) != 0:
                histogram_distances[arr][val] = histogram_distances[arr][val] / len(distances[arr])

    for arr in range(len(histogram_anthropometrics)):
        for val in range(len(histogram_anthropometrics[arr])):
            if len(anthropometrics[arr]) != 0:
                histogram_anthropometrics[arr][val] = histogram_anthropometrics[arr][val] / len(anthropometrics[arr])
                
    return histogram_angles, histogram_distances, histogram_anthropometrics

In [None]:
class HTTPServer():
    def __init__(self, app=FastAPI()):
        self.app = app

        @app.post('/createsingledescriptors')
        async def single_descriptors(files: List[UploadFile] = File(...)):
            now = datetime.now()
            folder = now.strftime('%H-%M-%S')
            
            path = os.path.join('/content/drive/MyDrive/TCC', folder)
            os.mkdir(path)

            try:
                save_files(path, files)
                
                detect_poses(path)

                prepare_keypoints(path)

                keypoints_list = list()

                for keypoints in sorted(os.listdir(os.path.join(path, 'Keypoints'))):
                    with open(os.path.join(os.path.join(path, 'Keypoints'), keypoints), 'r') as f:
                        poses = json.load(f)['people']

                        if len(poses) != 0:
                            for pose in poses:
                                keypoints_list.append(pose['keypoints'])

                angles, distances, anthropometrics = extract_features(keypoints_list)
                histogram_angles, histogram_distances, histogram_anthropometrics = create_histograms(angles, distances, anthropometrics)
            finally:
                shutil.rmtree(path)
                
            return {'angles': histogram_angles.tolist(), 'distances': histogram_distances.tolist(), 'anthropometrics': histogram_anthropometrics.tolist()}

        @app.post('/createmultidescriptors')
        async def multi_descriptors(files: List[UploadFile] = File(...)):
            now = datetime.now()
            folder = now.strftime('%H-%M-%S')
            
            path = os.path.join('/content/drive/MyDrive/TCC', folder)
            os.mkdir(path)
            
            response = list()

            try:
                save_files(path, files)
                
                detect_poses(path)
                track_poses(path)

                prepare_keypoints(path)
                
                subject_keypoints = association(os.path.join(path, 'Keypoints'), os.path.join(path, 'BoundingBoxes'))
                
                for keypoints_list in subject_keypoints:
                    angles, distances, anthropometrics = extract_features(keypoints_list)
                    histogram_angles, histogram_distances, histogram_anthropometrics = create_histograms(angles, distances, anthropometrics)
                    response.append({'angles': histogram_angles.tolist(), 'distances': histogram_distances.tolist(), 'anthropometrics': histogram_anthropometrics.tolist()})
            finally:
                shutil.rmtree(path)

            return {'list': response}

    def run(self, port=8000):
        ngrok_tunnel = ngrok.connect(port)
        print('Public URL:', ngrok_tunnel.public_url)
        nest_asyncio.apply()
        uvicorn.run(self.app, port=port)
        

In [None]:
app = HTTPServer()
app.run()