In [20]:
import os
import numpy as np
import cv2
from PIL import Image, ImageDraw, ImageFont
from scipy.interpolate import interp1d
import imageio

# Constants
WIDTH = 1280
HEIGHT = 720
SCALE = 100
FIXED_HEIGHT = 1.5
FIXED_HEIGHT *= SCALE

MAX_MISSED_FRAMES = 4
COST_THRESHOLD = 300
EDGE_BUFFER = 50

BEV_WIDTH = 15
BEV_HEIGHT = 10
BOX_START = (0, 0)
BOX_END = (15, 10)
BEV_WIDTH *= SCALE
BEV_HEIGHT *= SCALE
BOX_START = (BOX_START[0] * SCALE, BOX_START[1] * SCALE)
BOX_END = (BOX_END[0] * SCALE, BOX_END[1] * SCALE)
BOX_WIDTH = BOX_END[0] - BOX_START[0]
BOX_HEIGHT = BOX_END[1] - BOX_START[1]

TEST_DATA_ID = '일도체육공원143'
TEST_DATA_NAME = "TS_2.시나리오_143.Outdoor_일도체육공원143(593)"
OUT_FILE_NAME = './saved_exp_results/' + TEST_DATA_ID + '_result.txt'
IMG_FOLDER = '../ProcessedData/AIhub/images/Training/' + TEST_DATA_NAME + '/images'
WAIT_TIME = 50

SAVE_VISUALIZATION = True
SAVE_FRAME = 352
FPS = 6

# World and image points
world_points = np.array([
    [0,0,0], [0.9, 0, 0], [5.05,0,0], [9.95,0,0], [14.1,0,0], 
    [15,0,0], [0,5.8,0], [5.05,5.8,0], [9.95,5.8,0], [15,5.8,0],
    [7.5,0,3.05], [7.5,0,1.525], [7.5,5.8,0], [6, 9, 1.8], [5.2, 5.8, 1.6]
], dtype=np.float32)
world_points *= SCALE

image_points = np.array([
    [274, 307], [304, 307], [465, 304], [672, 302], [840, 302], 
    [877, 302], [113, 416], [392, 409], [711, 404], [1006, 397], 
    [564, 171], [564, 229], [547, 406], [420, 409], [517, 298]
], dtype=np.float32)

# Function to normalize 2D points
def normalize_points_2d(points):
    centroid = np.mean(points, axis=0)
    shifted_points = points - centroid
    scale = np.sqrt(2) / np.mean(np.linalg.norm(shifted_points, axis=1))
    normalization_matrix = np.array([
        [scale, 0, -scale * centroid[0]],
        [0, scale, -scale * centroid[1]],
        [0, 0, 1]
    ])
    normalized_points = np.dot(normalization_matrix, np.vstack((points.T, np.ones((1, points.shape[0])))))
    return normalized_points.T, normalization_matrix

# Function to normalize 3D points
def normalize_points_3d(points):
    centroid = np.mean(points, axis=0)
    shifted_points = points - centroid
    scale = np.sqrt(3) / np.mean(np.linalg.norm(shifted_points, axis=1))
    normalization_matrix = np.array([
        [scale, 0, 0, -scale * centroid[0]],
        [0, scale, 0, -scale * centroid[1]],
        [0, 0, scale, -scale * centroid[2]],
        [0, 0, 0, 1]
    ])
    normalized_points = np.dot(normalization_matrix, np.vstack((points.T, np.ones((1, points.shape[0])))))
    return normalized_points.T, normalization_matrix

# Function to compute homography matrix
def compute_homography(src_points, dst_points):
    A = []
    for src, dst in zip(src_points, dst_points):
        x, y, z = src[:3]
        u, v = dst[:2]
        A.append([-x, -y, -z, -1, 0, 0, 0, 0, u * x, u * y, u * z, u])
        A.append([0, 0, 0, 0, -x, -y, -z, -1, v * x, v * y, v * z, v])
    A = np.array(A)
    U, S, Vh = np.linalg.svd(A)
    L = Vh[-1, :] / Vh[-1, -1]
    H = L.reshape(3, 4)
    return H

# Function to calculate homography matrix from world to image points
def calculate_homography(world_points, image_points):
    norm_world_points, T_world = normalize_points_3d(world_points)
    norm_image_points, T_image = normalize_points_2d(image_points)
    H_normalized = compute_homography(norm_world_points, norm_image_points)
    H = np.dot(np.linalg.inv(T_image), np.dot(H_normalized, T_world))
    return H

# Function to calculate X, Y coordinates from u, v, Z using homography matrix
def calculate_XY_from_uvZ(H, u, v, Z):
    h11, h12, h13, h14 = H[0]
    h21, h22, h23, h24 = H[1]
    h31, h32, h33, h34 = H[2]
    A = np.array([
        [h11 - u * h31, h12 - u * h32],
        [h21 - v * h31, h22 - v * h32]
    ])
    B = np.array([
        [u * (h33 * Z + h34) - (h13 * Z + h14)],
        [v * (h33 * Z + h34) - (h23 * Z + h24)]
    ])
    X, Y = np.linalg.solve(A, B).flatten()
    return X, Y

# Function to read data from file
def read_data_from_file(file_path):
    with open(file_path, 'r') as file:
        data = file.readlines()
    return data

# Function to calculate cost matrix for given positions
def calculate_cost_matrix(prev_positions, curr_positions):
    num_prev = len(prev_positions)
    num_curr = len(curr_positions)
    cost_matrix = np.zeros((num_prev, num_curr))
    for i, prev_pos in enumerate(prev_positions):
        for j, curr_pos in enumerate(curr_positions):
            cost_matrix[i, j] = np.linalg.norm(np.array(prev_pos) - np.array(curr_pos))
    return cost_matrix

# Function to connect points based on smallest values in cost matrix first
def connect_small_values_first(cost_matrix, threshold):
    associations = []
    num_prev, num_curr = cost_matrix.shape
    used_prev = set()
    used_curr = set()
    while len(used_prev) < num_prev and len(used_curr) < num_curr:
        min_value = np.inf
        min_pos = (-1, -1)
        for i in range(num_prev):
            if i in used_prev:
                continue
            for j in range(num_curr):
                if j in used_curr:
                    continue
                if cost_matrix[i, j] < min_value:
                    min_value = cost_matrix[i, j]
                    min_pos = (i, j)
        if min_value >= threshold:
            break
        associations.append(min_pos)
        used_prev.add(min_pos[0])
        used_curr.add(min_pos[1])
    return associations

# Function to track objects over multiple frames based on distance threshold
def track_objects(frame_data):
    tracks = {}
    missed_frames = {}
    finished_tracks = {}
    next_person_id = 0
    for frame_idx, (frame_number, num_heads, positions) in enumerate(frame_data):
        if frame_idx == 0:
            for pos in positions:
                tracks[next_person_id] = [(frame_number, pos)]
                missed_frames[next_person_id] = 0
                next_person_id += 1
        else:
            prev_positions = [track[-1][1] for track in tracks.values()]
            prev_ids = list(tracks.keys())
            cost_matrix = calculate_cost_matrix(prev_positions, positions)
            associations = connect_small_values_first(cost_matrix, COST_THRESHOLD)
            assigned_ids = set()
            for i, j in associations:
                person_id = prev_ids[i]
                tracks[person_id].append((frame_number, positions[j]))
                missed_frames[person_id] = 0
                assigned_ids.add(person_id)
            for person_id in list(tracks.keys()):
                if person_id not in assigned_ids:
                    missed_frames[person_id] += 1
                    if missed_frames[person_id] > MAX_MISSED_FRAMES:
                        if len(tracks[person_id]) == 1:
                            del tracks[person_id]
                            del missed_frames[person_id]
                        else:
                            finished_tracks[person_id] = tracks[person_id]
                            del tracks[person_id]
                            del missed_frames[person_id]
            for j in range(len(positions)):
                if j not in [assoc[1] for assoc in associations]:
                    if positions[j][0] < EDGE_BUFFER or positions[j][0] > WIDTH - EDGE_BUFFER or positions[j][1] < EDGE_BUFFER or positions[j][1] > HEIGHT - EDGE_BUFFER:
                        tracks[next_person_id] = [(frame_number, positions[j])]
                        missed_frames[next_person_id] = 0
                        next_person_id += 1
                    else:
                        tracks[next_person_id] = [(frame_number, positions[j])]
                        missed_frames[next_person_id] = MAX_MISSED_FRAMES - 1
                        next_person_id += 1
    return {**tracks, **finished_tracks}

# Function to interpolate missing data points in tracks
def interpolate_missing_data(tracks):
    interpolated_tracks = {}
    for person_id, track in tracks.items():
        frames = [pos[0] for pos in track]
        positions = [pos[1] for pos in track]
        frames_interp = np.arange(frames[0], frames[-1] + 1)
        positions_x = [pos[0] for pos in positions]
        positions_y = [pos[1] for pos in positions]
        interp_x = interp1d(frames, positions_x, kind='linear', fill_value='extrapolate')
        interp_y = interp1d(frames, positions_y, kind='linear', fill_value='extrapolate')
        interpolated_positions = [(int(interp_x(frame)), int(interp_y(frame))) for frame in frames_interp]
        interpolated_tracks[person_id] = list(zip(frames_interp, interpolated_positions))
    return interpolated_tracks

# Function to visualize head positions before perspective transformation
def visualize_positions(frame_data, img_folder):
    with imageio.get_writer(f'./saved_exp_results/{TEST_DATA_ID}_positions.gif', mode='I', fps=FPS, loop=0) as writer:
        for frame_number, num_heads, positions in frame_data:
            img_path = os.path.join(img_folder, f"{frame_number}.jpg")
            img = Image.open(img_path)
            img = img.resize((WIDTH, HEIGHT))
            img = np.array(img)
            img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
            for pos in positions:
                cv2.circle(img, pos, 10, (0, 255, 0), 2)
            if SAVE_VISUALIZATION and frame_number == SAVE_FRAME:
                Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)).save(f'./saved_exp_results/{TEST_DATA_ID}_position_screenshot.jpg')
            writer.append_data(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
            cv2.imshow('Head Positions', img)
            if cv2.waitKey(WAIT_TIME) & 0xFF == ord('q'):
                break
        cv2.destroyAllWindows()

# Function to precompute the outside area mask and box lines for visualization
def precompute_outside_area_and_box_lines(H):
    pale_red = (204, 204, 255)
    transformed_img = np.ones((BEV_HEIGHT, BEV_WIDTH, 3), dtype=np.uint8) * 255
    outside_area_mask = np.zeros((BEV_HEIGHT, BEV_WIDTH), dtype=np.uint8)
    for y in range(BOX_START[1], BOX_END[1]):
        for x in range(BOX_START[0], BOX_END[0]):
            U = np.dot(H, np.array([x, y, 0, 1]))
            u, v = U[0] / U[2], U[1] / U[2]
            if not (0 <= u < WIDTH and 0 <= v < HEIGHT):
                outside_area_mask[y, x] = 1
    box_corners = [BOX_START, (BOX_END[0], BOX_START[1]), BOX_END, (BOX_START[0], BOX_END[1])]
    box_lines = []
    for corner in box_corners:
        X, Y = corner
        U = np.dot(H, np.array([X, Y, 0, 1]))
        u_trans, v_trans = U[0] / U[2], U[1] / U[2]
        box_lines.append((int(u_trans), int(v_trans)))
    return outside_area_mask, pale_red, box_lines

# Function to visualize the tracks on images and transformed positions
def visualize_tracks(tracks, frame_data, img_folder, H):
    colors = [(255, 128, 128), (128, 255, 128), (128, 128, 255), (255, 255, 128), 
              (255, 128, 255), (128, 255, 255), (192, 192, 128), (128, 192, 192)]
    outside_area_mask, pale_red, box_lines = precompute_outside_area_and_box_lines(H)
    frame_dict = {frame_number: positions for frame_number, num_heads, positions in frame_data}
    visualization_img = np.ones((BEV_HEIGHT, BEV_WIDTH, 3), dtype=np.uint8) * 255
    visualization_img[outside_area_mask == 1] = pale_red[::-1]
    
    with imageio.get_writer(f'./saved_exp_results/{TEST_DATA_ID}_tracks_original.gif', mode='I', fps=FPS, loop=0) as writer_original, \
         imageio.get_writer(f'./saved_exp_results/{TEST_DATA_ID}_tracks_transformed.gif', mode='I', fps=FPS, loop=0) as writer_transformed:
    
        for frame_number in sorted(frame_dict.keys()):
            img_path = os.path.join(img_folder, f"{frame_number}.jpg")
            img = Image.open(img_path)
            img = img.resize((WIDTH, HEIGHT))
            img = np.array(img)
            img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
            transformed_img = np.ones((BEV_HEIGHT, BEV_WIDTH, 3), dtype=np.uint8) * 255
            cv2.line(img, box_lines[0], box_lines[1], (0, 0, 255), 2)
            cv2.line(img, box_lines[1], box_lines[2], (0, 0, 255), 2)
            cv2.line(img, box_lines[2], box_lines[3], (0, 0, 255), 2)
            cv2.line(img, box_lines[3], box_lines[0], (0, 0, 255), 2)
            for person_id, track in tracks.items():
                for track_frame_number, pos in track:
                    if track_frame_number == frame_number and pos != (-1, -1):
                        u, v = pos
                        color = colors[person_id % len(colors)]
                        cv2.circle(transformed_img, (int(u), int(v)), 10, color, -1)
                        cv2.circle(transformed_img, (int(u), int(v)), 10, (0, 0, 0), 2)
                        cv2.putText(transformed_img, str(person_id), (int(u) + 15, int(v) + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2, cv2.LINE_AA)
                        X, Y, Z = pos[0], pos[1], FIXED_HEIGHT
                        U = np.dot(H, np.array([X, Y, Z, 1]))
                        u_trans, v_trans = U[0] / U[2], U[1] / U[2]
                        cv2.circle(img, (int(u_trans), int(v_trans)), 10, color, 2)
                        cv2.putText(img, str(person_id), (int(u_trans) + 15, int(v_trans) + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2, cv2.LINE_AA)
            
            transformed_img[outside_area_mask == 1] = pale_red
            transformed_img = transformed_img[BOX_START[1]:BOX_END[1], BOX_START[0]:BOX_END[0]]
            if SAVE_VISUALIZATION and frame_number == SAVE_FRAME:
                Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)).save(f'./saved_exp_results/{TEST_DATA_ID}_track_screenshot.jpg')
                Image.fromarray(cv2.cvtColor(transformed_img, cv2.COLOR_BGR2RGB)).save(f'./saved_exp_results/{TEST_DATA_ID}_track_transformed_screenshot.jpg')
            writer_original.append_data(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
            writer_transformed.append_data(cv2.cvtColor(transformed_img, cv2.COLOR_BGR2RGB))
            img = cv2.resize(img, (WIDTH // 2, HEIGHT // 2)) 
            transformed_img = cv2.resize(transformed_img, (BOX_WIDTH // 2, BOX_HEIGHT // 2))
            cv2.imshow('Tracked Heads - Original', img)
            cv2.moveWindow('Tracked Heads - Original', 0, 0)
            cv2.imshow('Tracked Heads - Transformed', transformed_img)
            cv2.moveWindow('Tracked Heads - Transformed', WIDTH // 2, 0)
            if cv2.waitKey(WAIT_TIME) & 0xFF == ord('q'):
                break

    if SAVE_VISUALIZATION:
        visualization_img = visualization_img[BOX_START[1]:BOX_END[1], BOX_START[0]:BOX_END[0]]
        visualization_pil_img = Image.fromarray(visualization_img)
        draw = ImageDraw.Draw(visualization_pil_img)
        font = ImageFont.load_default()
        for person_id, track in tracks.items():
            for i in range(1, len(track)):
                start_point = track[i-1][1]
                end_point = track[i][1]
                draw.line([start_point, end_point], fill=colors[person_id % len(colors)], width=2)
                mid_point = ((start_point[0] + end_point[0]) // 2, (start_point[1] + end_point[1]) // 2)
                draw.text(mid_point, str(person_id), fill=(0, 0, 0), font=font)
        visualization_pil_img.save(f'./saved_exp_results/{TEST_DATA_ID}_track_visualization.jpg')

    cv2.destroyAllWindows()

def main():
    data = read_data_from_file(OUT_FILE_NAME)
    frame_data = []
    transformed_frame_data = []
    H = calculate_homography(world_points, image_points)
    for line in data:
        parts = list(map(int, line.split()))
        frame_number = parts[0]
        num_heads = parts[1]
        positions = [(parts[i], parts[i + 1]) for i in range(2, len(parts), 2)]
        transformed_positions = [calculate_XY_from_uvZ(H, pos[0], pos[1], FIXED_HEIGHT) for pos in positions]
        frame_data.append((frame_number, num_heads, positions))
        transformed_frame_data.append((frame_number, num_heads, transformed_positions))
    visualize_positions(frame_data, IMG_FOLDER)
    tracks = track_objects(transformed_frame_data)
    # interpolated_tracks = interpolate_missing_data(tracks)
    visualize_tracks(tracks, frame_data, IMG_FOLDER, H)

if __name__ == '__main__':
    main()

In [98]:
import cv2
from PIL import Image
import numpy as np

# List to store click positions
click_positions = []

# Define the mouse callback function
def mouse_callback(event, x, y, flags, param):
    if event == cv2.EVENT_LBUTTONDOWN:
        click_positions.append([x, y])
        cv2.circle(open_cv_image, (x, y), 5, (0, 0, 255), -1)
        cv2.putText(open_cv_image, str(len(click_positions)), (x + 10, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
        cv2.imshow("Image", open_cv_image)

# Read the image using PIL
image_path = "../ProcessedData/AIhub/images/Training/" + TEST_DATA_NAME + "/images/360.jpg"
image = Image.open(image_path)

# Resize the image
resized_image = image.resize((1280, 720))

# Convert the resized image to OpenCV format
open_cv_image = cv2.cvtColor(np.array(resized_image), cv2.COLOR_RGB2BGR)


# Create a window and set the mouse callback function
cv2.namedWindow("Image")
cv2.setMouseCallback("Image", mouse_callback)

# Show the image
cv2.imshow("Image", open_cv_image)
cv2.waitKey(0)
cv2.destroyAllWindows()

print(click_positions)


[[517, 298]]
