# Imports

In [2]:
import cv2
import numpy as np
import time
import os
import matplotlib.pyplot as plt
import pandas as pd

# Constants
## For styling

In [3]:
colors = [(255,0,0), (127,0,255), (0,127,0), (0,127,255)]
line_padding = [0.7, 1.5,1.5,1.5]

font_scale = 1
text_position_cnt = (100, 100)
text_position_time = (100, 120)

output_folder_video = "../output/video"
output_folder_csv = "../output/csv"

## For algorithm tuning

In [4]:
# Are for optime
kernel = np.ones((5,5),np.uint8)
threshold_area_size = [80, 20, 10, 40]
video_path = "../IMG_7102.MOV"
frame_shift = 500
set_fps = 150 # I dont know if its work

video_name = "test_01_my_hand"

## Support functions

In [5]:
# Add text overlay into video frame
def add_text_to_frame(frame, text, position=(30, 30), font=cv2.FONT_HERSHEY_SIMPLEX, font_scale=0.2, color=(0, 255, 0), thickness=2):
    """
    Add text to a frame.

    Parameters:
    - frame (numpy.ndarray): Input frame.
    - text (str): Text to be added to the frame.
    - position (tuple): Position of the text (x, y).
    - font (int): Font type.
    - font_scale (float): Font scale.
    - color (tuple): Text color (B, G, R).
    - thickness (int): Text thickness.

    Returns:
    - numpy.ndarray: Frame with added text.
    """
    frame_with_text = frame.copy()
    cv2.putText(frame_with_text, text, position, font, font_scale, color, thickness)
    return frame_with_text

In [6]:
def calculate_angle(line1, line2):
    # Convert lines to numpy arrays
    line1 = np.array(line1)
    line2 = np.array(line2)

    # Calculate the vectors corresponding to the lines
    vector1 = line1[1] - line1[0]
    vector2 = line2[1] - line2[0]

    # Calculate the dot product and cross product of the vectors
    dot_product = np.dot(vector1, vector2)
    cross_product = np.cross(vector1, vector2)

    # Calculate the magnitudes of the vectors
    magnitude1 = np.linalg.norm(vector1)
    magnitude2 = np.linalg.norm(vector2)

    # Calculate the cosine of the angle between the vectors
    cosine_theta = dot_product / (magnitude1 * magnitude2)

    # Determine the sign of the dot product to determine the direction
    if dot_product > 0:
        angle_radians = np.arccos(cosine_theta)
        # Convert the angle to degrees
        angle_degrees = 180 - np.degrees(angle_radians)
        # Adjust angle for the cross product sign
        if cross_product < 0:
            angle_degrees = 360 - angle_degrees
    else:
        angle_radians = np.arccos(cosine_theta)
        # Convert the angle to degrees
        angle_degrees = np.degrees(angle_radians)

    return angle_degrees

In [7]:
def calculate_vector(point1, point2):
    return np.array(point2) - np.array(point1)

In [8]:
def segment_marker_by_color(frame_tmp):
    # Input must be a frame in the cielab color model from the OpenCV function

    # Extract color channels
    L_channel = frame_tmp[:, :, 0]
    a_channel = frame_tmp[:, :, 1]
    b_channel = frame_tmp[:, :, 2]

    # Color segmentation using NumPy array operations
    marker_blue = (a_channel > 140) & (a_channel < 170) & (b_channel > 160)
    marker_pink = (a_channel > 175) & (b_channel < 80)
    marker_green = (a_channel < 120) & (b_channel > 130)
    marker_yellow = (a_channel > 80) & (a_channel < 120) & (b_channel > 90) & (b_channel < 110)

    return marker_blue, marker_pink, marker_green, marker_yellow

In [9]:
def main_function(frame, swap):
    # Convert the input frame to the CIELAB color space
    cielab_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2Lab)

    # Segment markers by color in the CIELAB color space
    marker_blue, marker_pink, marker_green, marker_yellow = segment_marker_by_color(cielab_frame)

    # Create a stack of masks for each color marker
    masks = np.stack([marker_blue, marker_pink, marker_green, marker_yellow], axis=0)

    # Define color names for visualization
    colors_name = ["blue", "pink", "green", "yellow"]

    # Initialize a list to store points per frame
    point_per_frame = []

    # Set the line padding value
    line_pad = 5  # Adjust this value as needed

    # Initialize the direction vector for the first line
    direction_vector_0_1 = None

    # Iterate over each color marker
    for mask, thr, color, color_name, direction_vector in zip(
            masks, threshold_area_size, colors, colors_name, [direction_vector_0_1, None, None, None]
    ):
        # Convert the mask to uint8
        mask = np.uint8(mask)

        # Find connected components in the mask
        num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(mask)

        # Filter regions based on area threshold
        filtered_regions = [index for index, stat in enumerate(stats[1:]) if stat[4] >= thr]

        # Initialize a list to store points per mask
        point_per_mask = []

        # Iterate over filtered regions in the mask
        for idx, index in enumerate(filtered_regions):
            # Access region properties from the stats array
            left, top, width, height, area = stats[index + 1]

            # Calculate the centroid
            centroid_x, centroid_y = int(left + width / 2), int(top + height / 2)

            # Append the centroid to the list of points for the mask
            point_per_mask.append((centroid_x, centroid_y))

        # Visualize circles for each point in the mask
        for idx, point in enumerate(point_per_mask):
            cv2.circle(frame, (point[0], point[1]), radius=idx * 10, color=color, thickness=5)

        # Visualize circles for each point with increased radius
        for idx, point in enumerate(point_per_mask):
            cv2.circle(frame, (point[0], point[1]), radius=idx * 10 + 10, color=color, thickness=5)

        # If direction vector is not initialized, calculate it from the first two points
        if direction_vector is None:
            direction_vector = calculate_vector(point_per_mask[1], point_per_mask[0])

        # Calculate points for the line based on the direction vector and line padding
        point1 = (
            int(point_per_mask[1][0] - line_pad * direction_vector[0]),
            int(point_per_mask[1][1] - line_pad * direction_vector[1]),
        )
        point2 = (
            int(point_per_mask[0][0] + line_pad * direction_vector[0]),
            int(point_per_mask[0][1] + line_pad * direction_vector[1]),
        )

        # Visualize the line connecting the two points
        cv2.line(frame, point1, point2, color, 3)

        # Append the points for the current mask to the list of points per frame
        point_per_frame.append(point_per_mask)

    # Calculate angles between consecutive lines
    angle_0 = calculate_angle(point_per_frame[0], point_per_frame[1])
    angle_1 = calculate_angle(point_per_frame[1], point_per_frame[2])
    angle_2 = calculate_angle(point_per_frame[2], point_per_frame[3])

    # Add text annotations to the frame with calculated angles
    frame = add_text_to_frame(frame, "ANGLE 0: {}".format(int(angle_0)), position=(1000, 810), font_scale=0.5, thickness=2, color=(255, 255, 0))
    frame = add_text_to_frame(frame, "ANGLE 1: {}".format(int(angle_1)), position=(1000, 830), font_scale=0.5, thickness=2, color=(255, 255, 0))
    frame = add_text_to_frame(frame, "ANGLE 2: {}".format(int(angle_2)), position=(1000, 850), font_scale=0.5, thickness=2, color=(255, 255, 0))

    return frame, angle_0, angle_1, angle_2

In [10]:
def store_video(frames, output_path, fps):
    # Function to store the video with updated frames
    fourcc = cv2.VideoWriter_fourcc(*'avc1')
    height, width, _ = frames[0].shape
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    for frame in frames:
        out.write(frame)

    out.release()

# Main logic

In [11]:
cap = cv2.VideoCapture("../IMG_7102.MOV")
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_shift)
cap.set(cv2.CAP_PROP_FPS, set_fps)


if not cap.isOpened():
    print("Error: Could not open the video file.")
    exit()

# Create a window to display the frames
cv2.namedWindow('Video Preview', cv2.WINDOW_NORMAL)

measure = [] # for storing angles
frames_to_store = []
cnt = frame_shift # for storing frame count
while True:
    strt = time.time()
    ret, frame = cap.read()

    if not ret:
        break

    # Use the original frame instead of creating a copy
    frame, angle_0, angle_1, angle_2  = main_function(frame, False)

    # Add text to the frame
    frame = add_text_to_frame(frame, str(cnt), position=text_position_cnt, font_scale=font_scale)

    # Calculate and add time information
    end = time.time()
    frame = add_text_to_frame(frame, str(end - strt), position=text_position_time, font_scale=font_scale)
    measure.append([cnt, angle_0,angle_1,angle_2])
    cv2.imshow('Video Preview', frame)
    frames_to_store.append(frame.copy())
    cnt += 1
    if cv2.waitKey(int(1000 / 1000)) & 0xFF == 27: # cv2.waitKey(1000) & 0xFF == ord('q')
        break

cap.release()
cv2.destroyAllWindows()



IndexError: list index out of range

## Store processed video

In [63]:
# Store the video with updated frames
output_video_path = os.path.join(output_folder_video,f"{video_name}.mp4")  # Set the desired output video path
store_video(frames_to_store, output_video_path, set_fps)


(<unknown>:18668): GStreamer-CRITICAL **: 15:00:15.501: gst_element_make_from_uri: assertion 'gst_uri_is_valid (uri)' failed
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write vide

## Store csv - raw_angles

In [67]:
df_angle = pd.DataFrame(data=measure, columns=["frame", "angle_0", "angle_1", "angle_2"])
df_angle["time"] = df_angle["frame"] / set_fps
df_angle.to_csv(os.path.join(output_folder_csv,f"{video_name}.csv"), index=False)
df_angle

Unnamed: 0,frame,angle_0,angle_1,angle_2,time
0,500,108.411997,95.616618,107.868928,3.333333
1,501,108.070983,94.565037,109.502839,3.340000
2,502,108.070983,95.416016,109.118753,3.346667
3,503,108.132717,95.416016,108.271437,3.353333
4,504,109.105193,93.530827,109.122416,3.360000
...,...,...,...,...,...
583,1083,192.166393,189.383085,187.626994,7.220000
584,1084,192.166393,189.383085,186.952957,7.226667
585,1085,192.220577,187.434327,189.575753,7.233333
586,1086,192.220577,188.010393,188.325650,7.240000
