# Speed Point Tracking


# LUKAS-KANADE

In [1]:
#######################LUKAS-KANADE
import numpy as np 
import cv2
import time

cap = cv2.VideoCapture(0)

end_time = 20  # in seconds
timer_duration = 5  # in seconds

corNum = 30
feature_params = dict(maxCorners=corNum, qualityLevel=0.3, minDistance=7, blockSize=7)
lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

color = np.random.randint(0, 255, (corNum, 3)) 

old_frame = None
old_gray = None
p0 = None
trajectory = []

start_time = time.time()

# Create a VideoWriter object to save the video
output_path = 'LUKAS-KANADE1.mp4'
fps = cap.get(cv2.CAP_PROP_FPS)  # Get the frames per second
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))  # Get the frame width
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))  # Get the frame height
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))  # OUT FOR SAVING

while True:
    ret, frame = cap.read()
    if ret:
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        if old_frame is None:  # First iteration
            old_frame = frame.copy()
            old_gray = gray_frame.copy()
            p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params)
            for i in range(p0.shape[0]):
                trajectory.append({'trajectory': [], 'color': color[i].tolist(), 'absence_counter': 0})

        else:
            current_time = time.time()
            elapsed_time = current_time - start_time
            
            if elapsed_time <= timer_duration:
                fastest_speed = -1
                fastest_index = None
                
                # Calculate optical flow using Lucas-Kanade method
                p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, gray_frame, p0, None, **lk_params)
                good_new = p1[st == 1]
                good_old = p0[st == 1]

                # Calculate and compare movement distances for each point
                for i, (new, old) in enumerate(zip(good_new, good_old)):
                    a, b = new.ravel()
                    c, d = old.ravel()
                    a, b, c, d = int(a), int(b), int(c), int(d)
                    movement_distance = np.sqrt((c - a) ** 2 + (d - b) ** 2)

                    if movement_distance > fastest_speed:
                        
                        fastest_speed = movement_distance
                        fastest_index = i
                    frame = cv2.circle(frame, (a, b), 5, trajectory[i]['color'], -1)

            if elapsed_time >= timer_duration and fastest_index is not None:
                # Calculate optical flow using Lucas-Kanade method
                p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, gray_frame, p0, None, **lk_params)
                good_new = p1[st == 1]
                if fastest_index < len(good_new):
                    a, b = good_new[fastest_index].ravel()
                    a, b = int(a), int(b)
                    trajectory[fastest_index]['trajectory'].append((a, b))  # Accumulate points for the trajectory
    
                    frame = cv2.circle(frame, (a, b), 5, trajectory[fastest_index]['color'], -1)
    
                    # Draw the trajectory on the video
                    if len(trajectory[fastest_index]['trajectory']) > 1:
                        for i in range(1, len(trajectory[fastest_index]['trajectory'])):
                            cv2.line(frame, trajectory[fastest_index]['trajectory'][i - 1], trajectory[fastest_index]['trajectory'][i], trajectory[fastest_index]['color'], 2)

            # Update the previous frame and points for the next iteration
            old_gray = gray_frame.copy()
            p0 = good_new.reshape(-1, 1, 2)

            cv2.imshow('Fastest Moving Point', frame)
            out.write(frame)

            if elapsed_time >= end_time:
                break

            if cv2.waitKey(1) & 0xFF == 27:
                break

cap.release()
out.release()
cv2.destroyAllWindows()


[ WARN:0@0.044] global cap_v4l.cpp:982 open VIDEOIO(V4L2:/dev/video0): can't open camera by index
[ERROR:0@0.045] global obsensor_uvc_stream_channel.cpp:156 getStreamChannelGroup Camera index out of range


KeyboardInterrupt: 

# FARNEBACK

In [8]:
import numpy as np 
import cv2
import time

frame = cv2.VideoCapture(0)
status, old_frame = frame.read()
prvs = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)

hsv_mask = np.zeros_like(old_frame)
hsv_mask[..., 1] = 255

numPoint = 10

# Create a VideoWriter object to save the video
output_path = 'FARNEBACK.mp4'
fps = frame.get(cv2.CAP_PROP_FPS)  # Get the frames per second
width = int(frame.get(cv2.CAP_PROP_FRAME_WIDTH))  # Get the frame width
height = int(frame.get(cv2.CAP_PROP_FRAME_HEIGHT))  # Get the frame height
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))  # OUT FOR SAVING

def generate_random_color():
    return tuple(np.random.randint(0, 256, 3))

def retrieve_fastest_points(flow):
    fastest_speed = -1
    fastest_points = []
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    
    hsv_mask[..., 0] = ang * 180 / np.pi / 2
    hsv_mask[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
    rgb_representation = cv2.cvtColor(hsv_mask, cv2.COLOR_HSV2BGR)

    # Find the coordinates of the 10 fastest moving points
    for i in range(numPoint):  # Repeat 10 times to find top 10 points
        minVal, maxVal, minLoc, maxLoc = cv2.minMaxLoc(mag)
        fastest_speed = maxVal
        #print(fastest_speed)
        fastest_points.append(maxLoc)  # Store the fastest point
        mag[maxLoc[1], maxLoc[0]] = 0  # Remove this maximum value from further consideration

    return fastest_points


while status:
    ret, img = frame.read()
   
    next = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    
    flow = cv2.calcOpticalFlowFarneback(prvs, next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    # Compute magnite and angle of 2D vector
    
    # Assuming you have already defined variables like 'flow', 'hsv_mask', 'img', 'fastest_speed', 'fastest_points'
    fastest_points = retrieve_fastest_points(flow)

    for point in fastest_points:
       cv2.circle(img, point, 5, (0,0,255), -1) 
    
    cv2.imshow('farneback', img)
    out.write(img)
   
    prvs = next
    
    if cv2.waitKey(1) & 0xFF == 27:
        break 

            
    
frame.release()
out.release()
cv2.destroyAllWindows()

# YOLO

In [2]:
import torch
import cv2
from ultralytics import YOLO

# Model
#model = YOLO('yolov8x-pose-p6.pt')
model=YOLO('yolov8n-pose.pt') #más rápido

traiettoria = []

# Captura desde lawebcam
vid = cv2.VideoCapture(0)
ret, frame = vid.read()

# Create a VideoWriter object to save the video
output_path = 'YOLOrealtime.mp4'
fps = vid.get(cv2.CAP_PROP_FPS)  # Get the frames per second
width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))  # Get the frame width
height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))  # Get the frame height
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))  # OUT FOR SAVING
  
while(True):      
    # fotograma a fotograma
    ret, frame = vid.read()
  
    # si hay imagen válida
    if ret:  
        # Perform inference on an image
        #results = model(img, stream=True)
        puntos =[]
   
        # Ejecutar seguimiento YOLOv8 en el fotograma, persistiendo los rastreos entre fotogramas
        results = model.track(frame)[0]
    
        for r in results:
            kpts = r.keypoints
            nk = kpts.shape[1]
            #print(kpts)
            #for i in range(nk):            
            keypoint=kpts.xy[0,10]    
            x, y = (int(keypoint[0])),(int(keypoint[1]))
            puntos.append([x,y])
            # Disegna il punto corrente  
            cv2.circle(frame, (x,y),5,(0,255,0),-1)
            
            # Aggiungi le coordinate alla traiettoria
            traiettoria.append((x, y))
              
            # Disegna la traiettoria     
            for i in range(1, len(traiettoria)):        
                cv2.line(frame, traiettoria[i-1], traiettoria[i], (0, 0, 255), 2)

        # Muestra fotograma # Mostra il frame con la traiettoria
        cv2.imshow("Traiectoria del Punto", frame)
        out.write(frame)
    
    # Detenemos pulsado ESC
    if cv2.waitKey(20) == 27:
        break
  
# Libera el objeto de captura
vid.release()
out.release()
# Destruye ventanas
cv2.destroyAllWindows()


0: 480x640 1 person, 206.2ms
Speed: 4.4ms preprocess, 206.2ms inference, 3.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 283.4ms
Speed: 4.1ms preprocess, 283.4ms inference, 4.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 265.2ms
Speed: 7.9ms preprocess, 265.2ms inference, 3.8ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 312.1ms
Speed: 3.9ms preprocess, 312.1ms inference, 3.9ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 302.9ms
Speed: 8.7ms preprocess, 302.9ms inference, 3.7ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 239.2ms
Speed: 5.7ms preprocess, 239.2ms inference, 3.1ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 243.1ms
Speed: 4.4ms preprocess, 243.1ms inference, 3.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 246.5ms
Speed: 5.0ms preprocess, 246.5ms inference, 3.0ms postprocess per image at

In [None]:
import torch
import cv2
from ultralytics import YOLO
# Model
model = YOLO('yolov8x-pose-p6.pt')
#model=YOLO('yolov8n-pose.pt') más rápido
# Images
video = cv2.VideoCapture("video_para_yolo.mp4")

success, frame = video.read()
size=(frame.shape[1], frame.shape[0])
fourcc = cv2.VideoWriter_fourcc(*'DIVX')
video_out = cv2.VideoWriter('YOLO2.mp4', fourcc, 30.0, size)

# Bucle a través de los fotogramas del video
while (success):
    puntos =[]
   
    # Ejecutar seguimiento YOLOv8 en el fotograma, persistiendo los rastreos entre fotogramas
    results = model.track(frame)[0]
   
    for r in results:
        kpts = r.keypoints
        nk = kpts.shape[1]
        for i in range(nk):
            keypoint=kpts.xy[0,i]    
            x, y = (int(keypoint[0])),(int(keypoint[1]))
            puntos.append([x,y])
            cv2.circle(frame, (x,y),5,(0,255,0),-1)
                
    # Mostrar el fotograma anotado
    cv2.imshow('salida',frame)
    video_out.write(frame)
    success, frame = video.read()
    # Romper el bucle si se presiona 'q'
    if cv2.waitKey(1) & 0xFF == ord("q"):
         success = False
         break
    

# Liberar el objeto de captura de video y cerrar la ventana de visualización
video.release()
video_out.release()
cv2.destroyAllWindows()

# RAFT

In [None]:
import numpy as np
import torch
import matplotlib.pyplot as plt
import torchvision.transforms.functional as F
import torchvision.transforms as T
from pathlib import Path
from urllib.request import urlretrieve
import cv2


plt.rcParams["savefig.bbox"] = "tight"



def plot(imgs, **imshow_kwargs):
    if not isinstance(imgs[0], list):
        # Make a 2d grid even if there's just 1 row
        imgs = [imgs]

    num_rows = len(imgs)
    num_cols = len(imgs[0])
    _, axs = plt.subplots(nrows=num_rows, ncols=num_cols, squeeze=False)
    for row_idx, row in enumerate(imgs):
        for col_idx, img in enumerate(row):
            ax = axs[row_idx, col_idx]
            img = F.to_pil_image(img.to("cpu"))
            ax.imshow(np.asarray(img), **imshow_kwargs)
            ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])

    plt.tight_layout()

from torchvision.io import read_video

frames, _, _ = read_video("video_para_yolo.mp4")
frames = frames.permute(0, 3, 1, 2)  # (N, H, W, C) -> (N, C, H, W)

img1_batch = torch.stack([frames[100], frames[150]])
img2_batch = torch.stack([frames[101], frames[151]])

plot(img1_batch)

def preprocess(batch):
    transforms = T.Compose(
        [
            T.ConvertImageDtype(torch.float32),
            T.Normalize(mean=0.5, std=0.5),  # map [0, 1] into [-1, 1]
            T.Resize(size=(520, 960)),
        ]
    )
    batch = transforms(batch)
    return batch

#verificamos el tipo de imagen de entrada
print(f"shape = {img1_batch.shape}, dtype = {img1_batch.dtype}")

#usamos cuda para ir más rapido
device = "cuda" if torch.cuda.is_available() else "cpu"

#preposesamos la imágen
img1_batch = preprocess(img1_batch).to(device)
img2_batch = preprocess(img2_batch).to(device)

#verificamos que el tipo de imagen que obtenemos cumple con el tipo que necesita de entrada el modelo
print(f"shape = {img1_batch.shape}, dtype = {img1_batch.dtype}")

In [None]:
#from torchvision.models.optical_flow import raft_large #se puede cambiar "raft_large" por "raft_small" para ir más rápido, pero peor resultado
from torchvision.models.optical_flow import raft_small

model = raft_small(pretrained=True, progress=False).to(device)
model = model.eval()

list_of_flows = model(img1_batch.to(device), img2_batch.to(device))
print(f"type = {type(list_of_flows)}")
print(f"length = {len(list_of_flows)} = number of iterations of the model")
predicted_flows = list_of_flows[-1]
print(f"dtype = {predicted_flows.dtype}")
print(f"shape = {predicted_flows.shape} = (N, 2, H, W)")
print(f"min = {predicted_flows.min()}, max = {predicted_flows.max()}")
from torchvision.utils import flow_to_image

flow_imgs = flow_to_image(predicted_flows)

# The images have been mapped into [-1, 1] but for plotting we want them in [0, 1]
img1_batch = [(img1 + 1) / 2 for img1 in img1_batch]

grid = [[img1, flow_img] for (img1, flow_img) in zip(img1_batch, flow_imgs)]
plot(grid)