In [None]:
import cv2
import numpy as np
import os
import time
import tkinter as tk
from tkinter import filedialog
from tkinter import messagebox
from datetime import datetime

Base Code Kanade-Lucas-Tomasi Tracker 

In [26]:
video_path = r'D:\MAPUA\Thesis\Video_Sample1\Video1.mp4'

# Parameters for Shi-Tomasi corner detection
feature_params = dict(maxCorners=1000, qualityLevel=0.01, minDistance=10, blockSize=3)

# Parameters for Lucas-Kanade optical flow
lk_params = dict(winSize=(15, 15), maxLevel=3, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Read the video
cap = cv2.VideoCapture(video_path)
# Create a background subtractor
background_subtractor = cv2.createBackgroundSubtractorMOG2()

# Read the first frame
ret, prev_frame = cap.read()
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

# Detect the arm region using color segmentation
hsv_frame = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2HSV)
lower_skin = np.array([0, 20, 70], dtype=np.uint8)
upper_skin = np.array([20, 255, 255], dtype=np.uint8)
arm_mask = cv2.inRange(hsv_frame, lower_skin, upper_skin)

# Apply morphological operations to refine the mask
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
arm_mask = cv2.morphologyEx(arm_mask, cv2.MORPH_OPEN, kernel)

# Detect features in the arm region
prev_corners = cv2.goodFeaturesToTrack(prev_gray, mask=arm_mask, **feature_params)

# Create an empty mask for drawing purposes
mask = np.zeros_like(prev_frame)

while True:
    # Read the current frame
    ret, frame = cap.read()
    if not ret:
        break

    # Apply background subtraction
    fg_mask = background_subtractor.apply(frame)

    # Convert the frame to grayscale
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Perform bitwise AND operation to extract arm regions
    arm_mask = cv2.bitwise_and(fg_mask, gray)

    # Track features using Lucas-Kanade optical flow
    next_corners, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, prev_corners, None, **lk_params)

    # Filter out invalid and low-quality tracks
    good_old = prev_corners[status == 1]
    good_new = next_corners[status == 1]
    good_err = err[status == 1]

    # Refine feature set based on error threshold
    good_old = good_old[good_err < 10]
    good_new = good_new[good_err < 10]

    # Draw the tracks
    for i, (new, old) in enumerate(zip(good_new, good_old)):
        a, b = new.ravel()
        c, d = old.ravel()
        mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), (0, 255, 0), 2)
        frame = cv2.circle(frame, (int(a), int(b)), 5, (0, 255, 0), -1)

    # Overlay the optical flow tracks on the frame
    img = cv2.add(frame, mask)

    # Display the resulting image
    cv2.imshow('KLT Tracker', img)
    if cv2.waitKey(int(1000 / cap.get(cv2.CAP_PROP_FPS))) & 0xFF == ord('q'):
        break

    # Update the previous frame, corners, and arm mask
    prev_gray = gray.copy()
    prev_corners = good_new.reshape(-1, 1, 2)
    arm_mask = np.zeros_like(gray)
    arm_mask = cv2.bitwise_or(arm_mask, arm_mask)

# Release resources
cap.release()
cv2.destroyAllWindows()

Tkinter Integration

In [None]:
video_save_directory = r'D:\MAPUA\Thesis\RecordedVideo'
exercise_videos = r'D:\MAPUA\Thesis\ExerciseVideos'
output_folder = r'D:\MAPUA\Thesis\OpticalFlowVideos'

# Create the main GUI window
window = tk.Tk()
window.title("Kanade-Lucas-Tomasi Tracking")
window.geometry("400x250")

window_width = 400
window_height = 300

# Calculate the screen dimensions
screen_width = window.winfo_screenwidth()
screen_height = window.winfo_screenheight()

# Calculate the position for centering the window
x = (screen_width - window_width) // 2
y = (screen_height - window_height) // 2

# Set the window geometry to center the window on the screen
window.geometry(f"{window_width}x{window_height}+{x}+{y}")
# Variable to store the video file name
video_file_name = ""

def start_recording():
    global video_file_name
    video_file_name = "Video_" + str(len(os.listdir(video_save_directory)) + 1) + ".mp4"

    # Prompt the user to select a video size
    video_sizes = {
        "640x480": (640, 480),
        "1280x720": (1280, 720),
        "1920x1080": (1920, 1080)
    }
    selected_size = tk.StringVar()
    size_selection_window = tk.Toplevel(window)
    size_selection_window.title("Select Video Size")
    size_selection_window.geometry("200x150")
    size_selection_label = tk.Label(size_selection_window, text="Select Video Size:")
    size_selection_label.pack(pady=10)
    size_selection = tk.OptionMenu(size_selection_window, selected_size, *video_sizes.keys())
    size_selection.pack(pady=10)

    def start_recording_with_size():
        size_selection_window.destroy()

        # Get the selected video size
        width, height = video_sizes[selected_size.get()]

        # Start a countdown timer with a pop-up message
        # Create a countdown timer pop-up
        countdown_duration = 5

        countdown_window = tk.Toplevel(window)
        countdown_window.geometry("200x100")
        countdown_window.title("Countdown")

        countdown_label = tk.Label(countdown_window, text=f"Recording starts in {countdown_duration} seconds")
        countdown_label.pack(pady=20)

        for countdown in range(countdown_duration, 0, -1):
            countdown_label.config(text=f"Recording starts in {countdown} seconds")
            countdown_window.update()
            time.sleep(1)

        countdown_window.destroy()  # Close the countdown window

        # Start recording the video using the camera
        video_path = os.path.join(video_save_directory, video_file_name)

        # Define the codec and create a VideoWriter object
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(video_path, fourcc, 20.0, (width, height))

        cap = cv2.VideoCapture(0)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)

        start_time = time.time()
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            

            # Display the recording timer on the frame
            elapsed_time = time.time() - start_time
            timer_text = f"Recording Time: {int(elapsed_time)}s / 10s"
            cv2.putText(frame, timer_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            cv2.imshow('Recording', frame)

            # Write the frame to the video file
            out.write(frame)

            if elapsed_time >= 10:
                break

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        # Release resources
        cap.release()
        out.release()
        cv2.destroyAllWindows()

        messagebox.showinfo("Recording Finished", "Video recording has been completed.")

    start_button = tk.Button(size_selection_window, text="Start Recording", command=start_recording_with_size)
    start_button.pack(pady=10)

def show_recordings():
    os.startfile(video_save_directory)

def get_optical_flow():
    # Select the video to extract landmarks from
    video_file = filedialog.askopenfilename(initialdir=exercise_videos, title="Select Video File",
                                            filetypes=(("Video Files", "*.mp4"), ("All Files", "*.*")))
    if not video_file:
        return

    # Get the video file name and create the output folder
    video_file_name = os.path.basename(video_file)
    output_folder_path = os.path.join(output_folder, os.path.splitext(video_file_name)[0])
    os.makedirs(output_folder_path, exist_ok=True)

    # Parameters for Shi-Tomasi corner detection
    feature_params = dict(maxCorners=1000, qualityLevel=0.01, minDistance=10, blockSize=3)

    # Parameters for Lucas-Kanade optical flow
    lk_params = dict(winSize=(15, 15), maxLevel=3, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

    # Read the video
    cap = cv2.VideoCapture(video_file)
    # Create a background subtractor
    background_subtractor = cv2.createBackgroundSubtractorMOG2()

    # Read the first frame
    ret, prev_frame = cap.read()
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

    # Detect the arm region using color segmentation
    hsv_frame = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2HSV)
    lower_skin = np.array([0, 20, 70], dtype=np.uint8)
    upper_skin = np.array([20, 255, 255], dtype=np.uint8)
    arm_mask = cv2.inRange(hsv_frame, lower_skin, upper_skin)

    # Apply morphological operations to refine the mask
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
    arm_mask = cv2.morphologyEx(arm_mask, cv2.MORPH_OPEN, kernel)

    # Detect features in the arm region
    prev_corners = cv2.goodFeaturesToTrack(prev_gray, mask=arm_mask, **feature_params)

    # Create an empty mask for drawing purposes
    mask = np.zeros_like(prev_frame)

    # Output video path
    output_video_filename = f"LK_{os.path.splitext(video_file_name)[0]}.mp4"
    output_video_path = os.path.join(output_folder_path, output_video_filename)
    output_fps = cap.get(cv2.CAP_PROP_FPS)
    output_size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
    output_writer = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), output_fps, output_size)

    while True:
        # Read the current frame
        ret, frame = cap.read()
        if not ret:
            break

        # Apply background subtraction
        fg_mask = background_subtractor.apply(frame)

        # Convert the frame to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Perform bitwise AND operation to extract arm regions
        arm_mask = cv2.bitwise_and(fg_mask, gray)

                # Track features using Lucas-Kanade optical flow
        next_corners, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, prev_corners, None, **lk_params)

        # Filter out invalid and low-quality tracks
        good_old = prev_corners[status == 1]
        good_new = next_corners[status == 1]
        good_err = err[status == 1]

        # Refine feature set based on error threshold
        good_old = good_old[good_err < 10]
        good_new = good_new[good_err < 10]

        # Draw the tracks
        for i, (new, old) in enumerate(zip(good_new, good_old)):
            a, b = new.ravel()
            c, d = old.ravel()
            mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), (0, 255, 0), 2)
            frame = cv2.circle(frame, (int(a), int(b)), 5, (0, 255, 0), -1)

        # Overlay the optical flow tracks on the frame
        img = cv2.add(frame, mask)

        # Write frame to the output video
        output_writer.write(img)

        # Display the resulting image
        cv2.imshow('KLT Tracker', img)
        cv2.waitKey(1)  # Set a small delay (1 millisecond)

        # Update the previous frame, corners, and arm mask
        prev_gray = gray.copy()
        prev_corners = good_new.reshape(-1, 1, 2)
        arm_mask = np.zeros_like(gray)
        arm_mask = cv2.bitwise_or(arm_mask, arm_mask)

    # Release resources
    cap.release()
    output_writer.release()
    cv2.destroyAllWindows()
    print(f"Optical Flow video saved to {output_video_path}")

def view_OpticalFlow():
    # Open the output f older containing the extracted landmarks
    os.startfile(output_folder)

def exit_program():
    window.destroy()

def on_closing():
    # Delete the recorded video file if the GUI window is closed
    global video_file_name
    if video_file_name:
        video_path = os.path.join(video_save_directory, video_file_name)
        if os.path.exists(video_path):
            os.remove(video_path)
    window.destroy()

# Bind the window closing event to the on_closing function
window.protocol("WM_DELETE_WINDOW", on_closing)

# Create buttons for recording, viewing recordings, extracting landmarks, and exiting
record_button = tk.Button(window, text="Start Recording", command=start_recording)
record_button.pack(pady=10)

view_button = tk.Button(window, text="View Recordings", command=show_recordings)
view_button.pack(pady=10)

get_OpticalFlow_button = tk.Button(window, text="Get Optical Flow", command=get_optical_flow)
get_OpticalFlow_button.pack(pady=10)

# Create the "View Extracted Landmarks" button
view_OpticalFlow_button = tk.Button(window, text="View Optical Flow", command=view_OpticalFlow)
view_OpticalFlow_button.pack(pady=10)

exit_button = tk.Button(window, text="Exit", command=exit_program)
exit_button.pack(pady=10)

# Run the GUI main loop
window.mainloop()


Gstreamer Version

In [None]:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
Gst.init(None)

# Specify the video save directory
video_save_directory = r'D:\MAPUA\Thesis\RecordedVideo'
output_folder = r'D:\MAPUA\Thesis\OpticalFlowVideos'

# Create the main GUI window
window = tk.Tk()
window.title("Kanade-Lucas-Tomasi Tracking")
window.geometry("400x250")

window_width = 400
window_height = 300

# Calculate the screen dimensions
screen_width = window.winfo_screenwidth()
screen_height = window.winfo_screenheight()

# Calculate the position for centering the window
x = (screen_width - window_width) // 2
y = (screen_height - window_height) // 2

# Set the window geometry to center the window on the screen
window.geometry(f"{window_width}x{window_height}+{x}+{y}")
# Variable to store the video file name
video_file_name = ""

def start_recording():
    global video_file_name
    video_file_name = "Video_" + str(len(os.listdir(video_save_directory)) + 1) + ".mp4"

    # Prompt the user to select a video size
    video_sizes = {
        "640x480": (640, 480),
        "1280x720": (1280, 720),
        "1920x1080": (1920, 1080)
    }
    selected_size = tk.StringVar()
    size_selection_window = tk.Toplevel(window)
    size_selection_window.title("Select Video Size")
    size_selection_window.geometry("200x150")
    size_selection_label = tk.Label(size_selection_window, text="Select Video Size:")
    size_selection_label.pack(pady=10)
    size_selection = tk.OptionMenu(size_selection_window, selected_size, *video_sizes.keys())
    size_selection.pack(pady=10)

    def start_recording_with_size():
        size_selection_window.destroy()

        # Get the selected video size
        width, height = video_sizes[selected_size.get()]

        # Start recording the video using GStreamer
        video_path = os.path.join(video_save_directory, video_file_name)

        pipeline = Gst.parse_launch(f"nvarguscamerasrc ! video/x-raw(memory:NVMM), width={width}, height={height}, format=(string)NV12, framerate=(fraction)30/1 ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! video/x-raw, format=(string)BGR ! videoconvert ! video/x-raw, format=(string)BGR ! appsink")

        pipeline.set_state(Gst.State.PLAYING)

        out = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*'mp4v'), 30.0, (width, height))

        start_time = time.time()
        while True:
            sample = pipeline.get_by_name("sink").emit("pull-sample")
            buf = sample.get_buffer()
            success, frame = buf.map(Gst.MapFlags.READ)
            if not success:
                break

            # Display the recording timer on the frame
            elapsed_time = time.time() - start_time
            timer_text = f"Recording Time: {int(elapsed_time)}s / 10s"
            cv2.putText(frame, timer_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            cv2.imshow('Recording', frame)

            # Write the frame to the video file
            out.write(frame)

            if elapsed_time >= 10:
                break

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        # Release resources
        out.release()
        cv2.destroyAllWindows()
        pipeline.set_state(Gst.State.NULL)

        messagebox.showinfo("Recording Finished", "Video recording has been completed.")

    start_button = tk.Button(size_selection_window, text="Start Recording", command=start_recording_with_size)
    start_button.pack(pady=10)


