Import Libraries

In [1]:
import os
import cv2
import math
import random
import numpy as np
import datetime as dt
import tensorflow as tf
import seaborn as sns
from collections import deque
import matplotlib.pyplot as plt


%matplotlib inline

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from tensorflow.keras.models import Sequential 
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.utils import plot_model
from tensorflow.keras.layers import LSTM, Dense, Conv2D, TimeDistributed, Flatten, GRU, Dropout, MaxPooling2D, Activation, BatchNormalization
from tensorflow.keras.optimizers import Adam

ModuleNotFoundError: No module named 'seaborn'

Set Numpy, Python, and TF seeds

In [3]:
model_input_size = (224,224,3)
seed_constant = 7
np.random.seed(seed_constant)
random.seed(seed_constant)
tf.random.set_seed(seed_constant)

batch_size = 8 # Change this shit pag sasabog na yung GPU

Visualize Data

In [10]:
import cv2
import numpy as np
from scipy import signal
from numba import jit
import numpy.lib.stride_tricks as stride_tricks

@jit(nopython=True)
def compute_flow_for_window(Ix, Iy, It, tau=0.01):
    """Compute flow for a single window using Numba acceleration."""
    # Build system of equations
    n = len(Ix)
    ATA = np.zeros((2, 2))
    ATb = np.zeros(2)
    
    # Manually compute A^T * A and A^T * b
    for i in range(n):
        ATA[0, 0] += Ix[i] * Ix[i]
        ATA[0, 1] += Ix[i] * Iy[i]
        ATA[1, 0] += Ix[i] * Iy[i]
        ATA[1, 1] += Iy[i] * Iy[i]
        ATb[0] += -Ix[i] * It[i]
        ATb[1] += -Iy[i] * It[i]
    
    # Check eigenvalues using determinant and trace
    det = ATA[0, 0] * ATA[1, 1] - ATA[0, 1] * ATA[1, 0]
    trace = ATA[0, 0] + ATA[1, 1]
    
    if det > tau and trace > tau:
        # Solve 2x2 system manually
        inv_det = 1.0 / det
        u = (ATA[1, 1] * ATb[0] - ATA[0, 1] * ATb[1]) * inv_det
        v = (-ATA[1, 0] * ATb[0] + ATA[0, 0] * ATb[1]) * inv_det
        return u, v
    return 0.0, 0.0

def sliding_window_view(arr, window_shape):
    """Create a sliding window view of an array."""
    shape = np.array(arr.shape)
    window_shape = np.array(window_shape)
    
    slices = tuple(slice(None, None, None) for _ in range(len(window_shape)))
    window_strides = np.array(arr.strides)
    
    indexing_strides = arr.strides
    win_indices = np.array(window_shape).reshape(-1, 1)
    new_shape = tuple(shape - window_shape + 1) + tuple(window_shape)
    new_strides = tuple(indexing_strides) + tuple(window_strides)
    
    return stride_tricks.as_strided(arr, new_shape, new_strides)

def optical_flow_optimized(I1g, I2g, window_size, tau=0.01):
    """
    Optimized implementation of Lucas-Kanade optical flow.
    """
    # Normalize and convert to float32 for better performance
    I1g = (I1g / 255.).astype(np.float32)
    I2g = (I2g / 255.).astype(np.float32)
    
    # Compute derivatives using Sobel for better accuracy and speed
    fx = cv2.Sobel(I1g, cv2.CV_32F, 1, 0, ksize=3)
    fy = cv2.Sobel(I1g, cv2.CV_32F, 0, 1, ksize=3)
    ft = I2g - I1g
    
    # Initialize motion components
    u = np.zeros(I1g.shape, dtype=np.float32)
    v = np.zeros(I1g.shape, dtype=np.float32)
    
    # Create sliding windows
    w = window_size // 2
    fx_windows = sliding_window_view(fx, (window_size, window_size))
    fy_windows = sliding_window_view(fy, (window_size, window_size))
    ft_windows = sliding_window_view(ft, (window_size, window_size))
    
    # Process each window
    for i in range(fx_windows.shape[0]):
        for j in range(fx_windows.shape[1]):
            Ix = fx_windows[i, j].flatten()
            Iy = fy_windows[i, j].flatten()
            It = ft_windows[i, j].flatten()
            
            flow = compute_flow_for_window(Ix, Iy, It, tau)
            u[i+w, j+w] = flow[0]
            v[i+w, j+w] = flow[1]
    
    return u, v

def process_video_with_optical_flow(input_video_path, output_video_path, window_size=15, tau=0.01):
    """
    Process video with optimized Lucas-Kanade optical flow and create visualization.
    """
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        print(f"Error: Could not open {input_video_path}")
        return

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Create video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    # Read first frame
    ret, old_frame = cap.read()
    if not ret:
        print("Error: Could not read first frame")
        return

    old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)

    frame_num = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_num += 1
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Calculate optical flow using optimized version
        u, v = optical_flow_optimized(old_gray, frame_gray, window_size, tau)

        # Create visualization
        magnitude = np.sqrt(u**2 + v**2)
        magnitude_normalized = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX)

        # Create color visualization using HSV
        hsv = np.zeros((height, width, 3), dtype=np.uint8)
        hsv[..., 0] = cv2.normalize(np.arctan2(v, u) * 180 / np.pi, None, 0, 180, cv2.NORM_MINMAX)
        hsv[..., 1] = 255
        hsv[..., 2] = magnitude_normalized

        # Convert HSV to BGR for visualization
        flow_rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

        # Write frame
        out.write(flow_rgb)

        # Update for next frame
        old_gray = frame_gray.copy()

        # Progress update
        print(f"\rProcessing frame {frame_num}/{total_frames}", end="")

    cap.release()
    out.release()
    print(f"\nOptical flow video saved to {output_video_path}")

if __name__ == "__main__":
    # Your specific video paths
    input_video_path = "/Users/patrickpadua/Downloads/Yango RRL/CSV Generator/CNN_GRU_usingRGBOF/RawDataset/Testing/LumbarSideBends/LSB_1_Testing.mp4"
    output_video_path = "/Users/patrickpadua/Downloads/Yango RRL/CSV Generator/CNN_GRU_usingRGBOF/OF_test/LSB_1_Testing_combined_opencv_improved.mp4"

    # Process the video with optimized Lucas-Kanade
    process_video_with_optical_flow(input_video_path, output_video_path, window_size=15, tau=0.01)

Processing frame 1/304

KeyboardInterrupt: 

In [17]:
import cv2
import numpy as np

def process_video_with_fast_flow(input_video_path, output_video_path):
    """
    Optimized version with faster processing using:
    - Sparse feature points
    - Vectorized operations
    - Reduced computational complexity
    - Hardware acceleration (if available)
    """
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        print(f"Error: Could not open {input_video_path}")
        return

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Create video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    # Read first frame
    ret, old_frame = cap.read()
    if not ret:
        print("Error: Could not read first frame")
        return

    # Preprocessing optimized with Gaussian blur instead of bilateral
    old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
    old_gray = cv2.GaussianBlur(old_gray, (5, 5), 0)

    # Create sparse grid (reduced density)
    grid_step = 20
    y, x = np.mgrid[grid_step//2:height:grid_step, grid_step//2:width:grid_step]
    p0 = np.vstack((x.ravel(), y.ravel())).T.reshape(-1, 1, 2).astype(np.float32)

    # Optimized LK parameters
    lk_params = dict(
        winSize=(15, 15),        # Balanced window size
        maxLevel=3,              # Increased pyramid levels
        criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 7, 0.03),
        flags=cv2.OPTFLOW_USE_INITIAL_FLOW,
        minEigThreshold=1e-4
    )

    # Preallocate arrays for flow visualization
    hsv = np.zeros((height, width, 3), dtype=np.uint8)
    hsv[..., 1] = 255  # Saturation is always maximum

    frame_num = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_num += 1
        
        # Optimized preprocessing
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        frame_gray = cv2.GaussianBlur(frame_gray, (5, 5), 0)

        # Check if p0 is empty or invalid
        if p0 is None or p0.shape[0] == 0:
            # Reinitialize p0 if it's empty
            y, x = np.mgrid[grid_step//2:height:grid_step, grid_step//2:width:grid_step]
            p0 = np.vstack((x.ravel(), y.ravel())).T.reshape(-1, 1, 2).astype(np.float32)

        # Calculate optical flow with pyramid
        p1, st, _ = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)

        # Vectorized flow field calculation
        if p1 is not None and st is not None:
            good_new = p1[st == 1]
            good_old = p0[st == 1]
            
            # Calculate flow vectors in vectorized manner
            flow_vectors = good_new - good_old
            x_old = good_old[:, 0, 0].astype(int)
            y_old = good_old[:, 0, 1].astype(int)
            
            # Create mask for valid coordinates
            valid = (x_old >= 0) & (x_old < width) & (y_old >= 0) & (y_old < height)
            x_old = x_old[valid]
            y_old = y_old[valid]
            flow_vectors = flow_vectors[valid]

            # Create sparse flow field
            u_sparse = np.zeros_like(old_gray, dtype=np.float32)
            v_sparse = np.zeros_like(old_gray, dtype=np.float32)
            u_sparse[y_old, x_old] = flow_vectors[:, 0, 0]
            v_sparse[y_old, x_old] = flow_vectors[:, 0, 1]

            # Fast Gaussian blur with smaller kernel
            u = cv2.GaussianBlur(u_sparse, (3, 3), 0)
            v = cv2.GaussianBlur(v_sparse, (3, 3), 0)
        else:
            u = np.zeros_like(old_gray, dtype=np.float32)
            v = np.zeros_like(old_gray, dtype=np.float32)

        # Create HSV visualization
        magnitude, angle = cv2.cartToPolar(u, v, angleInDegrees=True)
        hsv[..., 0] = (angle / 2).astype(np.uint8)  # Hue range is 0-180 for OpenCV
        hsv[..., 2] = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX)
        flow_rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

        # Sparse vector visualization (every 5th frame)
        if frame_num % 5 == 0 and p1 is not None:
            mask = magnitude[y_old, x_old] > np.median(magnitude[y_old, x_old])
            for x_p, y_p, dx, dy in zip(x_old[mask], y_old[mask], 
                                      flow_vectors[mask, 0, 0], 
                                      flow_vectors[mask, 0, 1]):
                cv2.arrowedLine(flow_rgb, 
                              (x_p, y_p),
                              (int(x_p + dx*3), int(y_p + dy*3)),
                              (0, 255, 0), 1, tipLength=0.2)

        # Write frame
        out.write(flow_rgb)

        # Update for next frame
        old_gray = frame_gray.copy()
        if p1 is not None and st is not None:
            p0 = good_new.reshape(-1, 1, 2)
        else:
            # Reinitialize p0 if tracking failed
            y, x = np.mgrid[grid_step//2:height:grid_step, grid_step//2:width:grid_step]
            p0 = np.vstack((x.ravel(), y.ravel())).T.reshape(-1, 1, 2).astype(np.float32)

        # Maintain point density by reinitializing periodically
        if frame_num % 30 == 0:
            y, x = np.mgrid[grid_step//2:height:grid_step*2, grid_step//2:width:grid_step*2]
            new_points = np.vstack((x.ravel(), y.ravel())).T.reshape(-1, 1, 2).astype(np.float32)
            p0 = np.vstack((p0, new_points))
            p0 = p0[:len(x.ravel())]  # Maintain original point count

        print(f"\rProcessed frame {frame_num}/{total_frames}", end="")

    cap.release()
    out.release()
    print(f"\nOptimized optical flow video saved to {output_video_path}")

if __name__ == "__main__":
    input_video_path = "/Users/patrickpadua/Downloads/Yango RRL/CSV Generator/CNN_GRU_usingRGBOF/RawDataset/Testing/LumbarSideBends/LSB_1_Testing.mp4"
    output_video_path = "/Users/patrickpadua/Downloads/Yango RRL/CSV Generator/CNN_GRU_usingRGBOF/OF_test/LSB_1_Testing_combined_opencv_improved.mp4"
    process_video_with_fast_flow(input_video_path, output_video_path)

error: OpenCV(4.10.0) /Users/xperience/GHA-Actions-OpenCV/_work/opencv-python/opencv-python/opencv/modules/video/src/lkpyramid.cpp:1274: error: (-215:Assertion failed) nextPtsMat.checkVector(2, CV_32F, true) == npoints in function 'calc'


In [13]:
    input_video_path = "/Users/patrickpadua/Downloads/Yango RRL/CSV Generator/CNN_GRU_usingRGBOF/RawDataset/Testing/LumbarSideBends/LSB_1_Testing.mp4"
    output_video_path = "/Users/patrickpadua/Downloads/Yango RRL/CSV Generator/CNN_GRU_usingRGBOF/OF_test/LSB_1_Testing_combined_opencv_improved.mp4"

PreProcess the Dataset

In [None]:
IMAGE_HEIGHT, IMAGE_WIDTH =  64, 64

SEQUENCE_LENGTH = 20

DATASET_DIR = r"D:\MAPUA\CNN-GRU_exp\UCF50"

CLASSES_LIST = ["BenchPress", "CleanAndJerk", "JumpingJack", "Lunges", "PushUps", "Taichi"]

In [None]:
import cv2
import numpy as np
from argparse import ArgumentParser

def dense_optical_flow(method, video_path, params=[], to_gray=False):
    # read the video
    cap = cv2.VideoCapture(video_path)
    # Read the first frame
    ret, old_frame = cap.read()

    # crate HSV & make Value a constant
    hsv = np.zeros_like(old_frame)
    hsv[..., 1] = 255

    # Preprocessing for exact method
    if to_gray:
        old_frame = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)

    while True:
        # Read the next frame
        ret, new_frame = cap.read()
        frame_copy = new_frame
        if not ret:
            break
        # Preprocessing for exact method
        if to_gray:
            new_frame = cv2.cvtColor(new_frame, cv2.COLOR_BGR2GRAY)
        # Calculate Optical Flow
        flow = method(old_frame, new_frame, None, *params)

        # Encoding: convert the algorithm's output into Polar coordinates
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        # Use Hue and Saturation to encode the Optical Flow
        hsv[..., 0] = ang * 180 / np.pi / 2
        hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
        # Convert HSV image into BGR for demo
        bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
        cv2.imshow("frame", frame_copy)
        cv2.imshow("optical flow", bgr)
        k = cv2.waitKey(25) & 0xFF
        if k == 27:
            break
        old_frame = new_frame

def lucas_kanade_method(video_path):
    cap = cv2.VideoCapture(video_path)
    # params for ShiTomasi corner detection
    feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
    # Parameters for lucas kanade optical flow
    lk_params = dict(
        winSize=(15, 15),
        maxLevel=2,
        criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03),
    )
    # Create some random colors
    color = np.random.randint(0, 255, (100, 3))
    # Take first frame and find corners in it
    ret, old_frame = cap.read()
    old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
    p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params)
    # Create a mask image for drawing purposes
    mask = np.zeros_like(old_frame)
    while True: 
        ret, frame = cap.read()
        if not ret:
            break
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # calculate optical flow
        p1, st, err = cv2.calcOpticalFlowPyrLK(
            old_gray, frame_gray, p0, None, **lk_params
        )
        # Select good points
        good_new = p1[st == 1]
        good_old = p0[st == 1]
        # draw the tracks
        for i, (new, old) in enumerate(zip(good_new, good_old)):
            a, b = new.ravel()
            c, d = old.ravel()
            mask = cv2.line(mask, (a, b), (c, d), color[i].tolist(), 2)
            frame = cv2.circle(frame, (a, b), 5, color[i].tolist(), -1)
        img = cv2.add(frame, mask)
        cv2.imshow("frame", img)
        k = cv2.waitKey(25) & 0xFF
        if k == 27:
            break
        if k == ord("c"):
            mask = np.zeros_like(old_frame)
        # Now update the previous frame and previous points
        old_gray = frame_gray.copy()
        p0 = good_new.reshape(-1, 1, 2)

def main():
    parser = ArgumentParser()
    parser.add_argument(
        "--algorithm",
        choices=["farneback", "lucaskanade", "lucaskanade_dense", "rlof"],
        required=True,
        help="Optical flow algorithm to use",
    )
    parser.add_argument(
        "--video_path", default="videos/cat.mp4", help="Path to the video",
    )

    args = parser.parse_args()
    video_path = args.video_path
    if args.algorithm == "lucaskanade":
        lucas_kanade_method(video_path)
    elif args.algorithm == "lucaskanade_dense":
        method = cv2.optflow.calcOpticalFlowSparseToDense
        dense_optical_flow(method, video_path, to_gray=True)
    elif args.algorithm == "farneback":
        method = cv2.calcOpticalFlowFarneback
        params = [0.5, 3, 15, 3, 5, 1.2, 0]  # Farneback's algorithm parameters
        dense_optical_flow(method, video_path, params, to_gray=True)
    elif args.algorithm == "rlof":
        method = cv2.optflow.calcOpticalFlowDenseRLOF
        dense_optical_flow(method, video_path)


if __name__ == "__main__":
    main()
