In [1]:
import cv2
import numpy as np
import time
from collections import deque
import math

# Tracking parameters
TRACKERS = {
    "CSRT": cv2.legacy.TrackerCSRT_create,
    "KCF": cv2.legacy.TrackerKCF_create,
    "MOSSE": cv2.legacy.TrackerMOSSE_create,
    "MIL": cv2.legacy.TrackerMIL_create
}

feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Configuration
MAX_TRAJECTORY_POINTS = 30
CONFIDENCE_THRESHOLD = 0.5
MOTION_PREDICTION_ENABLED = True
RECOVERY_MAX_ATTEMPTS = 3
TRACKER_TYPE = "CSRT"  # Change this to use different trackers

# Visualization settings
SHOW_TRAJECTORY = True
SHOW_PREDICTION = True
SHOW_FLOW_POINTS = True
SHOW_DIRECTION = True
SHOW_SPEED = True

COLORS = {
    'trajectory': (0, 0, 255),
    'prediction': (255, 0, 0),
    'direction': (0, 255, 255),
    'flow_points': (0, 0, 255),
    'bbox_good': (0, 255, 0),
    'bbox_poor': (0, 0, 255)
}

def recover_tracking(frame, last_bbox):
    """Attempt to recover lost tracking using mean shift"""
    try:
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        x, y, w, h = map(int, last_bbox)
        
        # Validate dimensions
        h_frame, w_frame = frame.shape[:2]
        x = max(0, min(x, w_frame - 2))
        y = max(0, min(y, h_frame - 2))
        w = max(1, min(w, w_frame - x))
        h = max(1, min(h, h - y))
        
        roi_hsv = hsv[y:y+h, x:x+w]
        roi_hist = cv2.calcHist([roi_hsv], [0], None, [180], [0, 180])
        cv2.normalize(roi_hist, roi_hist, 0, 255, cv2.NORM_MINMAX)
        
        dst = cv2.calcBackProject([hsv], [0], roi_hist, [0, 180], 1)
        term_crit = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 1)
        ret, track_window = cv2.meanShift(dst, (x, y, w, h), term_crit)
        
        return track_window if ret == 0 else last_bbox
    except:
        return last_bbox

def calculate_confidence(good_new, good_old):
    """Calculate tracking confidence based on optical flow"""
    if len(good_new) < 5:
        return 0.0
    
    flow_distances = np.linalg.norm(good_new - good_old, axis=1)
    consistency = 1.0 - (np.std(flow_distances) / (np.mean(flow_distances) + 1e-6))
    return max(0.0, min(1.0, consistency))

def calculate_speed(trajectory_points, fps):
    """Calculate speed in pixels per second"""
    if len(trajectory_points) < 2:
        return 0.0
    
    p1, p2 = trajectory_points[-2], trajectory_points[-1]
    distance = math.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2)
    return distance * (fps or 1.0)

def predict_next_position(trajectory_points):
    """Predict next position using velocity vector"""
    if len(trajectory_points) < 2:
        return None
    
    p1, p2 = trajectory_points[-2], trajectory_points[-1]
    velocity = (p2[0] - p1[0], p2[1] - p1[1])
    return (int(p2[0] + velocity[0]), int(p2[1] + velocity[1]))

def main(video_source=0):
    cap = cv2.VideoCapture(video_source)
    if not cap.isOpened():
        print("Error opening video source")
        return

    # Initial frame and ROI selection
    ret, frame = cap.read()
    if not ret:
        print("Error reading initial frame")
        return

    bbox = cv2.selectROI("Select Object", frame, False)
    cv2.destroyWindow("Select Object")
    x, y, w, h = map(int, bbox)

    # Initialize tracker and optical flow
    tracker = TRACKERS[TRACKER_TYPE]()
    tracker.init(frame, bbox)
    prev_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    roi = prev_frame[y:y+h, x:x+w]
    prev_pts = cv2.goodFeaturesToTrack(roi, mask=None, **feature_params)
    if prev_pts is not None:
        prev_pts += np.array([x, y], dtype=np.float32)

    trajectory = deque(maxlen=MAX_TRAJECTORY_POINTS)
    paused = False
    confidence = 1.0
    fps = cap.get(cv2.CAP_PROP_FPS)

    while True:
        if not paused:
            ret, frame = cap.read()
            if not ret:
                break

            # Tracking update
            success, bbox = tracker.update(frame)
            x, y, w, h = map(int, bbox)
            center = (int(x + w/2), int(y + h/2))
            trajectory.append(center)

            # Optical flow calculation
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            if prev_pts is not None and len(prev_pts) > 5:
                next_pts, status, _ = cv2.calcOpticalFlowPyrLK(prev_frame, gray, prev_pts, None, **lk_params)
                
                if next_pts is not None:
                    good_new = next_pts[status == 1]
                    good_old = prev_pts[status == 1]
                    confidence = calculate_confidence(good_new, good_old)
                    
                    if confidence > CONFIDENCE_THRESHOLD:
                        movement = np.mean(good_new - good_old, axis=0)
                        bbox = (x + movement[0], y + movement[1], w, h)
                        prev_pts = good_new.reshape(-1, 1, 2)
            
            prev_frame = gray.copy()

            # Visualization
            color = COLORS['bbox_good'] if confidence > CONFIDENCE_THRESHOLD else COLORS['bbox_poor']
            cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)

            if SHOW_TRAJECTORY and len(trajectory) > 1:
                for i in range(1, len(trajectory)):
                    alpha = i/len(trajectory)
                    cv2.line(frame, trajectory[i-1], trajectory[i], 
                            tuple(int(c*alpha) for c in COLORS['trajectory']), 2)

            if SHOW_PREDICTION and MOTION_PREDICTION_ENABLED:
                pred = predict_next_position(trajectory)
                if pred:
                    cv2.circle(frame, pred, 5, COLORS['prediction'], -1)

            if SHOW_DIRECTION and len(trajectory) >= 2:
                p1, p2 = trajectory[-2], trajectory[-1]
                angle = math.atan2(p2[1]-p1[1], p2[0]-p1[0])
                end = (int(x + 50*math.cos(angle)), int(y + 50*math.sin(angle)))
                cv2.arrowedLine(frame, (x, y), end, COLORS['direction'], 2)

            # Display info
            speed = calculate_speed(trajectory, fps)
            info = [
                f"Confidence: {confidence:.2f}",
                f"Speed: {speed:.1f} px/s",
                f"Position: ({x}, {y})"
            ]
            
            for i, text in enumerate(info):
                cv2.putText(frame, text, (10, 30 + i*30),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

        cv2.imshow("Tracking", frame)
        key = cv2.waitKey(1) & 0xFF

        if key == ord('q'):
            break
        elif key == ord('p'):
            paused = not paused
        elif key == ord('r'):
            tracker = TRACKERS[TRACKER_TYPE]()
            tracker.init(frame, bbox)
            confidence = 1.0
        elif key == ord('c'):
            trajectory.clear()

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    # Use 0 for webcam or file path for video
    main(video_source=0)

In [1]:
import cv2
import numpy as np
import math
from collections import deque
import time

# Tracking parameters
TRACKERS = {
    "CSRT": cv2.legacy.TrackerCSRT_create,
    "KCF": cv2.legacy.TrackerKCF_create,
    "MOSSE": cv2.legacy.TrackerMOSSE_create,
    "MIL": cv2.legacy.TrackerMIL_create
}

# Optical Flow parameters
feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

class ObjectTracker:
    def __init__(self, tracker_type='CSRT', max_trajectory=30, confidence_threshold=0.5):
        """
        Initialize the object tracker
        
        :param tracker_type: Tracking algorithm to use
        :param max_trajectory: Maximum number of trajectory points to store
        :param confidence_threshold: Minimum confidence to maintain tracking
        """
        self.tracker_type = tracker_type
        self.tracker = TRACKERS[tracker_type]()
        self.max_trajectory = max_trajectory
        self.confidence_threshold = confidence_threshold
        
        # Tracking variables
        self.trajectory_points = deque(maxlen=max_trajectory)
        self.prev_frame = None
        self.prev_pts = None
        self.confidence_score = 1.0
        self.speed = 0.0
        
    def initialize_tracker(self, frame, bbox):
        """
        Initialize tracker with initial bounding box
        
        :param frame: First video frame
        :param bbox: Initial bounding box (x, y, width, height)
        """
        # Validate and adjust bbox
        bbox = self._validate_bbox(bbox, frame.shape)
        x, y, w, h = map(int, bbox)
        
        # Convert frame to grayscale
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Initialize tracker
        self.tracker = TRACKERS[self.tracker_type]()
        self.tracker.init(frame, bbox)
        
        # Initialize optical flow points
        roi = gray_frame[y:y+h, x:x+w]
        self.prev_pts = cv2.goodFeaturesToTrack(roi, mask=None, **feature_params)
        if self.prev_pts is not None:
            self.prev_pts += np.array([x, y], dtype=np.float32)
        
        self.prev_frame = gray_frame
        
    def track(self, frame, fps=30):
        """
        Track object in the current frame
        
        :param frame: Current video frame
        :param fps: Frames per second for speed calculation
        :return: Tracking results (success, new bbox, processed frame)
        """
        # Convert frame to grayscale
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        try:
            # Update tracker
            success, new_bbox = self.tracker.update(frame)
            
            if not success:
                # Attempt recovery
                new_bbox = self._recover_tracking(frame, new_bbox)
                success = True
                self.confidence_score *= 0.5
            
            # Validate bbox
            new_bbox = self._validate_bbox(new_bbox, frame.shape)
            x, y, w, h = map(int, new_bbox)
            
            # Calculate center point and update trajectory
            center_point = (int(x + w/2), int(y + h/2))
            self.trajectory_points.append(center_point)
            
            # Calculate speed
            if len(self.trajectory_points) >= 2:
                self.speed = self._calculate_speed(fps)
            
            # Optical flow analysis
            if self.prev_frame is not None and self.prev_pts is not None and len(self.prev_pts) > 5:
                next_pts, status, _ = cv2.calcOpticalFlowPyrLK(
                    self.prev_frame, gray_frame, self.prev_pts, None, **lk_params
                )
                
                if next_pts is not None:
                    good_new = next_pts[status == 1]
                    good_old = self.prev_pts[status == 1]
                    
                    # Calculate confidence
                    self.confidence_score = self._calculate_confidence(good_new, good_old)
                    
                    # Update tracking if confidence is good
                    if len(good_new) > 5 and self.confidence_score > self.confidence_threshold:
                        movement = np.mean(good_new - good_old, axis=0)
                        new_x, new_y = x + movement[0], y + movement[1]
                        new_bbox = (new_x, new_y, w, h)
                        
                        self.prev_pts = good_new.reshape(-1, 1, 2)
                        self.prev_frame = gray_frame.copy()
            
            # Annotate frame with tracking information
            annotated_frame = frame.copy()
            cv2.rectangle(annotated_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
            
            return success, new_bbox, annotated_frame
        
        except Exception as e:
            print(f"Tracking error: {e}")
            return False, None, frame
    
    def _validate_bbox(self, bbox, frame_shape):
        """
        Validate and adjust bbox to be within frame bounds
        
        :param bbox: Bounding box coordinates
        :param frame_shape: Shape of the frame
        :return: Adjusted bbox
        """
        x, y, w, h = map(int, bbox)
        h_frame, w_frame = frame_shape[:2]
        
        x = max(0, min(x, w_frame - 2))
        y = max(0, min(y, h_frame - 2))
        w = max(1, min(w, w_frame - x))
        h = max(1, min(h, h_frame - y))
        
        return (x, y, w, h)
    
    def _recover_tracking(self, frame, last_bbox):
        """
        Attempt to recover lost tracking
        
        :param frame: Current video frame
        :param last_bbox: Last known bounding box
        :return: Recovered bbox
        """
        try:
            # Convert to HSV for better color-based detection
            hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
            x, y, w, h = map(int, last_bbox)
            
            # Ensure bbox dimensions are positive and within frame bounds
            h_frame, w_frame = frame.shape[:2]
            x = max(0, min(x, w_frame - 2))
            y = max(0, min(y, h_frame - 2))
            w = max(1, min(w, w_frame - x))
            h = max(1, min(h, h_frame - y))
            
            roi_hsv = hsv[y:y+h, x:x+w]
            
            # Calculate histogram of the ROI
            roi_hist = cv2.calcHist([roi_hsv], [0], None, [180], [0, 180])
            cv2.normalize(roi_hist, roi_hist, 0, 255, cv2.NORM_MINMAX)
            
            # Apply back projection
            dst = cv2.calcBackProject([hsv], [0], roi_hist, [0, 180], 1)
            
            # Apply meanshift to get the new location
            term_crit = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 1)
            ret, track_window = cv2.meanShift(dst, (x, y, w, h), term_crit)
            
            return track_window if ret != 0 else last_bbox
        except Exception as e:
            print(f"Recovery failed: {e}")
            return last_bbox
    
    def _calculate_confidence(self, good_new, good_old):
        """
        Calculate tracking confidence based on optical flow consistency
        
        :param good_new: New feature points
        :param good_old: Old feature points
        :return: Confidence score
        """
        if len(good_new) < 5:
            return 0.0
        
        flow_distances = np.linalg.norm(good_new - good_old, axis=1)
        consistency = 1.0 - (np.std(flow_distances) / (np.mean(flow_distances) + 1e-6))
        return max(0.0, min(1.0, consistency))
    
    def _calculate_speed(self, fps):
        """
        Calculate object speed in pixels per second
        
        :param fps: Frames per second
        :return: Speed in pixels per second
        """
        if len(self.trajectory_points) < 2:
            return 0.0
        
        # Calculate distance between last two points
        p1, p2 = self.trajectory_points[-2], self.trajectory_points[-1]
        distance = math.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2)
        
        # Convert to pixels per second
        return distance * (fps or 1.0)
    
    def predict_next_position(self):
        """
        Predict next position based on recent motion
        
        :return: Predicted next position or None
        """
        if len(self.trajectory_points) < 2:
            return None
        
        # Calculate velocity vector from last two points
        p1, p2 = self.trajectory_points[-2], self.trajectory_points[-1]
        velocity = (p2[0] - p1[0], p2[1] - p1[1])
        
        # Predict next position
        predicted = (int(p2[0] + velocity[0]), int(p2[1] + velocity[1]))
        return predicted

def main():
    # Example usage
    cap = cv2.VideoCapture(0)  # Use 0 for webcam or provide video file path
    
    # Wait for the first frame
    ret, frame = cap.read()
    if not ret:
        print("Failed to capture first frame")
        return
    
    # Select initial bounding box
    bbox = cv2.selectROI("Select Object", frame, fromCenter=False, showCrosshair=True)
    cv2.destroyWindow("Select Object")
    
    # Initialize tracker
    tracker = ObjectTracker(tracker_type='CSRT')
    tracker.initialize_tracker(frame, bbox)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Track object
        success, new_bbox, annotated_frame = tracker.track(frame)
        
        if success and new_bbox is not None:
            # Optional: add visualization or logging here
            cv2.imshow("Tracking", annotated_frame)
        
        # Optional: predict next position
        predicted_pos = tracker.predict_next_position()
        if predicted_pos:
            print(f"Predicted next position: {predicted_pos}")
        
        # Exit on 'q' key press
        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

Predicted next position: (304, 287)
Predicted next position: (308, 287)
Predicted next position: (311, 279)
Predicted next position: (346, 279)
Predicted next position: (409, 299)
Predicted next position: (508, 288)
Predicted next position: (528, 287)
Predicted next position: (517, 280)
Predicted next position: (516, 278)
Predicted next position: (392, 259)
Predicted next position: (326, 266)
Predicted next position: (232, 284)
Predicted next position: (298, 288)
Predicted next position: (324, 280)
Predicted next position: (364, 291)
Predicted next position: (329, 292)
Predicted next position: (284, 297)
Predicted next position: (305, 289)
Predicted next position: (369, 113)
Predicted next position: (320, 388)
Predicted next position: (317, 287)
Predicted next position: (303, 307)
Predicted next position: (267, 301)
Predicted next position: (274, 298)
Predicted next position: (264, 303)
Predicted next position: (255, 309)
Predicted next position: (246, 301)
Predicted next position: (23

In [1]:
import cv2
import numpy as np
from collections import deque
import math

# Core tracking parameters
TRACKERS = {
    "CSRT": cv2.legacy.TrackerCSRT_create,
    "KCF": cv2.legacy.TrackerKCF_create,
    "MOSSE": cv2.legacy.TrackerMOSSE_create,
    "MIL": cv2.legacy.TrackerMIL_create
}

# Optical flow parameters
feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
lk_params = dict(winSize=(15, 15), maxLevel=2, 
                criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Initialize video capture
cap = cv2.VideoCapture(0)
ret, frame = cap.read()
if not ret:
    print("Failed to open camera")
    exit()

# Select ROI and initialize tracker
bbox = cv2.selectROI("Select Object", frame, fromCenter=False)
cv2.destroyWindow("Select Object")

tracker = TRACKERS["CSRT"]()
tracker.init(frame, bbox)

# Initialize tracking variables
prev_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
x, y, w, h = map(int, bbox)
roi = prev_frame[y:y+h, x:x+w]
prev_pts = cv2.goodFeaturesToTrack(roi, mask=None, **feature_params)
if prev_pts is not None:
    prev_pts += np.array([x, y], dtype=np.float32)

trajectory_points = deque(maxlen=30)
confidence_score = 1.0

# Main tracking loop
while True:
    ret, frame = cap.read()
    if not ret:
        break

    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    success, bbox = tracker.update(frame)

    if success:
        x, y, w, h = map(int, bbox)
        center_point = (int(x + w/2), int(y + h/2))
        trajectory_points.append(center_point)

        # Update optical flow
        if prev_pts is not None and len(prev_pts) > 5:
            next_pts, status, _ = cv2.calcOpticalFlowPyrLK(
                prev_frame, gray_frame, prev_pts, None, **lk_params)
            
            if next_pts is not None:
                good_new = next_pts[status == 1]
                good_old = prev_pts[status == 1]
                
                # Calculate tracking confidence
                if len(good_new) > 5:
                    flow_distances = np.linalg.norm(good_new - good_old, axis=1)
                    confidence_score = 1.0 - (np.std(flow_distances) / (np.mean(flow_distances) + 1e-6))
                    confidence_score = max(0.0, min(1.0, confidence_score))
                    
                    if confidence_score > 0.5:
                        movement = np.mean(good_new - good_old, axis=0)
                        bbox = (x + movement[0], y + movement[1], w, h)
                
                prev_pts = good_new.reshape(-1, 1, 2)

        # Draw tracking visualization
        cv2.rectangle(frame, (x, y), (x + w, y + h), 
                     (0, int(255 * confidence_score), 0), 2)
        
        # Draw trajectory
        if len(trajectory_points) > 1:
            for i in range(1, len(trajectory_points)):
                cv2.line(frame, trajectory_points[i-1], 
                        trajectory_points[i], (0, 0, 255), 2)

        # Draw optical flow points
        if 'good_new' in locals():
            for pt in good_new:
                cv2.circle(frame, tuple(map(int, pt)), 3, (0, 0, 255), -1)

    prev_frame = gray_frame.copy()
    cv2.imshow("Tracking", frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

In [None]:
import cv2
import numpy as np

cap = cv2.VideoCapture(0)
tracker = None
bbox = None
tracking = False

# Optical flow parameters
feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Tracking variables
prev_frame = None
prev_pts = None
tracking_history = []  # Store recent tracking positions

# Select ROI
ret, frame = cap.read()
if not ret or frame is None:
    print('Error: Unable to capture video')
    cap.release()
    cv2.destroyAllWindows()
else:
    cv2.putText(frame, "Select ROI & press ENTER", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    bbox = cv2.selectROI("Select ROI", frame, fromCenter=False, showCrosshair=True)
    cv2.destroyWindow("Select ROI")

    if bbox != (0, 0, 0, 0):
        # Initialize CSRT tracker
        tracker = cv2.TrackerCSRT_create()
        tracker.init(frame, bbox)
        tracking = True
        
        # Initialize optical flow
        prev_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        x, y, w, h = map(int, bbox)
        roi = prev_frame[y:y+h, x:x+w]
        prev_pts = cv2.goodFeaturesToTrack(roi, mask=None, **feature_params)
        if prev_pts is not None:
            prev_pts = prev_pts + np.array([x, y], dtype=np.float32)  # Adjust coordinates to global frame

def update_tracker_with_optical_flow(frame, gray_frame):
    global prev_frame, prev_pts, bbox
    
    if prev_frame is None or prev_pts is None or len(prev_pts) < 5:
        return False, bbox
    
    # Calculate optical flow
    next_pts, status, _ = cv2.calcOpticalFlowPyrLK(prev_frame, gray_frame, prev_pts, None, **lk_params)
    
    # Filter good points
    if next_pts is not None:
        good_new = next_pts[status == 1]
        good_old = prev_pts[status == 1]
        
        if len(good_new) > 5:  # Enough points to estimate movement
            # Calculate bounding box movement
            movement = np.mean(good_new - good_old, axis=0)
            x, y, w, h = bbox
            
            # Update bounding box position
            new_x = x + movement[0]
            new_y = y + movement[1]
            
            # Update previous points for next iteration
            prev_pts = good_new.reshape(-1, 1, 2)
            prev_frame = gray_frame.copy()
            
            # Update bbox
            bbox = (new_x, new_y, w, h)
            return True, bbox
    
    # Not enough points to track
    return False, bbox

# Main loop
while True:
    ret, frame = cap.read()
    if not ret or frame is None:
        print('Error: Unable to capture video')
        break
    
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    if tracking and tracker:
        # 1. Try CSRT tracker
        tracking_success, new_bbox = tracker.update(frame)
        
        # 2. Apply optical flow
        of_success, of_bbox = update_tracker_with_optical_flow(frame, gray_frame)
            
        # Decide which result to use
        if tracking_success and of_success:
            # Blend CSRT and optical flow results (weighted average)
            csrt_weight = 0.7
            of_weight = 0.3
            x1, y1, w1, h1 = new_bbox
            x2, y2, w2, h2 = of_bbox
            
            blended_x = x1 * csrt_weight + x2 * of_weight
            blended_y = y1 * csrt_weight + y2 * of_weight
            bbox = (blended_x, blended_y, w1, h1)  # Keep original width/height
            
            # Update optical flow points occasionally for long-term stability
            if len(tracking_history) % 10 == 0:
                x, y, w, h = map(int, bbox)
                roi = gray_frame[max(0, y):min(frame.shape[0], y+h), max(0, x):min(frame.shape[1], x+w)]
                if roi.size > 0:
                    pts = cv2.goodFeaturesToTrack(roi, mask=None, **feature_params)
                    if pts is not None and pts.size > 0:
                        prev_pts = pts + np.array([x, y], dtype=np.float32)
                        prev_frame = gray_frame.copy()
        elif tracking_success:
            bbox = new_bbox
        elif of_success:
            bbox = of_bbox
        
        # Store tracking history
        if tracking_success or of_success:
            x, y, w, h = map(float, bbox)
            center_x, center_y = x + w/2, y + h/2
            tracking_history.append((center_x, center_y))
            if len(tracking_history) > 30:  # Keep last 30 positions
                tracking_history.pop(0)
            
            print("Tracking: Success")
        else:
            print("Tracking: Lost")
        
    # Draw tracking information on frame
    if bbox is not None:
        x, y, w, h = map(int, bbox)
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
        
        # Draw tracking history
        if len(tracking_history) > 1:
            for i in range(1, len(tracking_history)):
                pt1 = tuple(map(int, tracking_history[i-1]))
                pt2 = tuple(map(int, tracking_history[i]))
                cv2.line(frame, pt1, pt2, (0, 0, 255), 1)
    
    # Display the frame
    cv2.imshow("Tracking", frame)
    
    # Exit on 'q' key
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Clean up
cap.release()
cv2.destroyAllWindows()

error: OpenCV(4.11.0) D:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\resize.cpp:4208: error: (-215:Assertion failed) !ssize.empty() in function 'cv::resize'


: 

In [None]:
import cv2
import numpy as np

# Initialize video capture
cap = cv2.VideoCapture(0)
tracker = None
bbox = None
tracking = False

# Optical flow parameters
feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Kalman Filter setup
kalman = cv2.KalmanFilter(4, 2)  # 4 state variables (x, y, vx, vy), 2 measurement variables (x, y)
kalman.transitionMatrix = np.array([[1, 0, 1, 0],  [0, 1, 0, 1],  [0, 0, 1, 0],  [0, 0, 0, 1]], np.float32)
kalman.measurementMatrix = np.array([[1, 0, 0, 0],  [0, 1, 0, 0]], np.float32)
kalman.processNoiseCov = np.eye(4, dtype=np.float32) * 0.03
kalman.measurementNoiseCov = np.eye(2, dtype=np.float32) * 0.3
kalman.statePre = np.zeros((4, 1), np.float32)
kalman.statePost = np.zeros((4, 1), np.float32)

# Tracking variables
prev_frame = None
prev_pts = None

# Select ROI
ret, frame = cap.read()
if not ret or frame is None:
    print('Error: Unable to capture video')
    cap.release()
    cv2.destroyAllWindows()
else:
    cv2.putText(frame, "Select ROI & press ENTER", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    bbox = cv2.selectROI("Select ROI", frame, fromCenter=False, showCrosshair=True)
    cv2.destroyWindow("Select ROI")

    if bbox != (0, 0, 0, 0):
        tracker = cv2.TrackerCSRT_create()
        tracker.init(frame, bbox)
        tracking = True
        prev_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        x, y, w, h = map(int, bbox)
        roi = prev_frame[y:y+h, x:x+w]
        prev_pts = cv2.goodFeaturesToTrack(roi, mask=None, **feature_params)
        if prev_pts is not None:
            prev_pts = prev_pts + np.array([x, y], dtype=np.float32)
    
    # Initialize Kalman Filter
    kalman.statePre[:2] = np.array([[x + w / 2], [y + h / 2]], np.float32)
    kalman.statePost = kalman.statePre.copy()

def update_tracker_with_optical_flow(frame, gray_frame):
    global prev_frame, prev_pts, bbox
    if prev_frame is None or prev_pts is None or len(prev_pts) < 5:
        return False, bbox
    
    next_pts, status, _ = cv2.calcOpticalFlowPyrLK(prev_frame, gray_frame, prev_pts, None, **lk_params)
    if next_pts is not None:
        good_new = next_pts[status == 1]
        good_old = prev_pts[status == 1]
        if len(good_new) > 5:
            movement = np.mean(good_new - good_old, axis=0)
            x, y, w, h = bbox
            new_x = x + movement[0]
            new_y = y + movement[1]
            prev_pts = good_new.reshape(-1, 1, 2)
            prev_frame = gray_frame.copy()
            bbox = (new_x, new_y, w, h)
            return True, bbox
    return False, bbox

# Main loop
while True:
    ret, frame = cap.read()
    if not ret or frame is None:
        print('Error: Unable to capture video')
        break
    
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    if tracking:
        # Predict next position using Kalman Filter
        prediction = kalman.predict()
        pred_x, pred_y = prediction[0], prediction[1]
        
        # Update tracker
        tracking_success, new_bbox = tracker.update(frame)
        of_success, of_bbox = update_tracker_with_optical_flow(frame, gray_frame)
        
        if tracking_success and of_success:
            x1, y1, w1, h1 = new_bbox
            x2, y2, w2, h2 = of_bbox
            measured_x = (x1 + x2) / 2
            measured_y = (y1 + y2) / 2
        elif tracking_success:
            measured_x, measured_y = new_bbox[0], new_bbox[1]
        elif of_success:
            measured_x, measured_y = of_bbox[0], of_bbox[1]
        else:
            measured_x, measured_y = pred_x, pred_y
        
        measurement = np.array([[np.float32(measured_x)], [np.float32(measured_y)]])
        kalman.correct(measurement)
        bbox = (measured_x, measured_y, bbox[2], bbox[3])
        
        x, y, w, h = map(int, bbox)
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

    
    cv2.imshow("Tracking", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

In [None]:
import cv2
import numpy as np

cap = cv2.VideoCapture(0)
tracker = None
bbox = None
tracking = False

# Optical flow parameters
feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Tracking variables
prev_frame = None
prev_pts = None
tracking_history = []  # Store recent tracking positions

# Select ROI
ret, frame = cap.read()
cv2.putText(frame, "Select ROI & press ENTER", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
bbox = cv2.selectROI("Select ROI", frame, fromCenter=False, showCrosshair=True)
cv2.destroyWindow("Select ROI")

if bbox != (0, 0, 0, 0):
    # Initialize CSRT tracker
    tracker = cv2.TrackerCSRT_create()
    tracker.init(frame, bbox)
    tracking = True
    
    # Initialize optical flow
    prev_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    x, y, w, h = map(int, bbox)
    roi = prev_frame[y:y+h, x:x+w]
    prev_pts = cv2.goodFeaturesToTrack(roi, mask=None, **feature_params)
    if prev_pts is not None:
        prev_pts = prev_pts + np.array([x, y], dtype=np.float32)  # Adjust coordinates to global frame
    print('bbox:', bbox)
    print('roi:', roi)
    print('prev_pts:', prev_pts)

def update_tracker_with_optical_flow(frame, gray_frame):
    global prev_frame, prev_pts, bbox
    
    if prev_frame is None or prev_pts is None or len(prev_pts) < 5:
        return False, bbox
    
    # Calculate optical flow
    next_pts, status, _ = cv2.calcOpticalFlowPyrLK(prev_frame, gray_frame, prev_pts, None, **lk_params)
    
    # Filter good points
    if next_pts is not None:
        good_new = next_pts[status == 1]
        good_old = prev_pts[status == 1]
        
        if len(good_new) > 5:  # Enough points to estimate movement
            # Calculate bounding box movement
            movement = np.mean(good_new - good_old, axis=0)
            x, y, w, h = bbox
            
            # Update bounding box position
            new_x = x + movement[0]
            new_y = y + movement[1]
            
            # Update previous points for next iteration
            prev_pts = good_new.reshape(-1, 1, 2)
            prev_frame = gray_frame.copy()
            
            # Update bbox
            bbox = (new_x, new_y, w, h)
            return True, bbox
    
    # Not enough points to track
    return False, bbox

# Main loop
while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    if tracking and tracker:
        # 1. Try CSRT tracker
        tracking_success, new_bbox = tracker.update(frame)
        
        # 2. Apply optical flow
        of_success, of_bbox = update_tracker_with_optical_flow(frame, gray_frame)
            
        # Decide which result to use
        if tracking_success and of_success:
            # Blend CSRT and optical flow results (weighted average)
            csrt_weight = 0.7
            of_weight = 0.3
            x1, y1, w1, h1 = new_bbox
            x2, y2, w2, h2 = of_bbox
            
            blended_x = x1 * csrt_weight + x2 * of_weight
            blended_y = y1 * csrt_weight + y2 * of_weight
            bbox = (blended_x, blended_y, w1, h1)  # Keep original width/height
            
            # Update optical flow points occasionally for long-term stability
            if len(tracking_history) % 10 == 0:
                x, y, w, h = map(int, bbox)
                roi = gray_frame[max(0, y):min(frame.shape[0], y+h), max(0, x):min(frame.shape[1], x+w)]
                if roi.size > 0:
                    pts = cv2.goodFeaturesToTrack(roi, mask=None, **feature_params)
                    if pts is not None and pts.size > 0:
                        prev_pts = pts + np.array([x, y], dtype=np.float32)
                        prev_frame = gray_frame.copy()
        elif tracking_success:
            bbox = new_bbox
        elif of_success:
            bbox = of_bbox
        
        # Store tracking history
        if tracking_success or of_success:
            x, y, w, h = map(float, bbox)
            center_x, center_y = x + w/2, y + h/2
            tracking_history.append((center_x, center_y))
            if len(tracking_history) > 30:  # Keep last 30 positions
                tracking_history.pop(0)
            
            print("Tracking: Success")
        else:
            print("Tracking: Lost")
        
    # Draw tracking information on frame
    if bbox is not None:
        x, y, w, h = map(int, bbox)
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
        
        # Draw tracking history
        if len(tracking_history) > 1:
            for i in range(1, len(tracking_history)):
                pt1 = tuple(map(int, tracking_history[i-1]))
                pt2 = tuple(map(int, tracking_history[i]))
                cv2.line(frame, pt1, pt2, (0, 0, 255), 1)
    
    # Display the frame
    cv2.imshow("Tracking", frame)
    
    # Exit on 'q' key
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Clean up
cap.release()
cv2.destroyAllWindows()


 [[327. 403.]]

 [[298. 416.]]

 [[312. 420.]]

 [[290. 413.]]]
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success

In [8]:
import cv2
import numpy as np

# Initialize video capture
cap = cv2.VideoCapture(0)

# Initialize tracking variables
tracker = None
bbox = None
tracking = False

# Optical flow parameters
feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
lk_params = dict(winSize=(15, 15), maxLevel=2, 
                 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Tracking variables
prev_frame = None
prev_pts = None
tracking_history = []  # Store recent tracking positions

# Kalman Filter setup
kalman = cv2.KalmanFilter(4, 2)  # 4 state variables (x, y, dx, dy), 2 measurement variables (x, y)
kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)  # Measurement matrix
kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)  # State transition matrix
kalman.processNoiseCov = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32) * 0.03  # Process noise
kalman.measurementNoiseCov = np.array([[1, 0], [0, 1]], np.float32) * 0.1  # Measurement noise

# Select ROI
ret, frame = cap.read()
cv2.putText(frame, "Select ROI & press ENTER", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
bbox = cv2.selectROI("Select ROI", frame, fromCenter=False, showCrosshair=True)
cv2.destroyWindow("Select ROI")

if bbox != (0, 0, 0, 0):
    # Initialize CSRT tracker
    tracker = cv2.TrackerCSRT_create()
    tracker.init(frame, bbox)
    tracking = True
    
    # Initialize optical flow
    prev_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    x, y, w, h = map(int, bbox)
    roi = prev_frame[y:y+h, x:x+w]
    prev_pts = cv2.goodFeaturesToTrack(roi, mask=None, **feature_params)
    if prev_pts is not None:
        prev_pts = prev_pts + np.array([x, y], dtype=np.float32)  # Adjust coordinates to global frame
    
    # Initialize Kalman filter with current position
    center_x, center_y = x + w/2, y + h/2
    kalman.statePre = np.array([[center_x], [center_y], [0], [0]], np.float32)
    kalman.statePost = np.array([[center_x], [center_y], [0], [0]], np.float32)

def update_tracker_with_optical_flow(frame, gray_frame):
    global prev_frame, prev_pts, bbox
    
    if prev_frame is None or prev_pts is None or len(prev_pts) < 5:
        return False, bbox
    
    # Calculate optical flow
    next_pts, status, _ = cv2.calcOpticalFlowPyrLK(prev_frame, gray_frame, prev_pts, None, **lk_params)
    
    # Filter good points
    if next_pts is not None:
        good_new = next_pts[status == 1]
        good_old = prev_pts[status == 1]
        
        if len(good_new) > 5:  # Enough points to estimate movement
            # Calculate bounding box movement
            movement = np.mean(good_new - good_old, axis=0)
            x, y, w, h = bbox
            
            # Update bounding box position
            new_x = x + movement[0]
            new_y = y + movement[1]
            
            # Update previous points for next iteration
            prev_pts = good_new.reshape(-1, 1, 2)
            prev_frame = gray_frame.copy()
            
            # Update bbox
            bbox = (new_x, new_y, w, h)
            return True, bbox
    
    # Not enough points to track
    return False, bbox

def apply_kalman_filter(bbox):
    # Predict
    prediction = kalman.predict()
    
    # Get the current center from bbox
    x, y, w, h = bbox
    center_x, center_y = x + w/2, y + h/2
    
    # Correct with measurement
    measurement = np.array([[center_x], [center_y]], np.float32)
    kalman.correct(measurement)
    
    # Get the filtered position
    filtered_x = kalman.statePost[0][0]
    filtered_y = kalman.statePost[1][0]
    
    # Calculate the new bbox position
    new_x = filtered_x - w/2
    new_y = filtered_y - h/2
    
    return (new_x, new_y, w, h)

# Main loop
while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    if tracking and tracker:
        # 1. Try CSRT tracker
        tracking_success, new_bbox = tracker.update(frame)
        
        # 2. Apply optical flow
        of_success, of_bbox = update_tracker_with_optical_flow(frame, gray_frame)
            
        # Decide which result to use
        if tracking_success and of_success:
            # Blend CSRT and optical flow results (weighted average)
            csrt_weight = 0.7
            of_weight = 0.3
            x1, y1, w1, h1 = new_bbox
            x2, y2, w2, h2 = of_bbox
            
            blended_x = x1 * csrt_weight + x2 * of_weight
            blended_y = y1 * csrt_weight + y2 * of_weight
            bbox = (blended_x, blended_y, w1, h1)  # Keep original width/height
            
            # Apply Kalman filter to smooth the result
            bbox = apply_kalman_filter(bbox)
            
            # Update optical flow points occasionally for long-term stability
            if len(tracking_history) % 10 == 0:
                x, y, w, h = map(int, bbox)
                roi = gray_frame[max(0, y):min(frame.shape[0], y+h), 
                               max(0, x):min(frame.shape[1], x+w)]
                if roi.size > 0:
                    pts = cv2.goodFeaturesToTrack(roi, mask=None, **feature_params)
                    if pts is not None and pts.size > 0:
                        prev_pts = pts + np.array([x, y], dtype=np.float32)
                        prev_frame = gray_frame.copy()
        elif tracking_success:
            bbox = new_bbox
            # Apply Kalman filter to smooth the result
            bbox = apply_kalman_filter(bbox)
        elif of_success:
            bbox = of_bbox
            # Apply Kalman filter to smooth the result
            bbox = apply_kalman_filter(bbox)
        
        # Store tracking history
        if tracking_success or of_success:
            x, y, w, h = map(float, bbox)
            center_x, center_y = x + w/2, y + h/2
            tracking_history.append((center_x, center_y))
            if len(tracking_history) > 30:  # Keep last 30 positions
                tracking_history.pop(0)
            
            print("Tracking: Success")
        else:
            print("Tracking: Lost")
        
    # Draw tracking information on frame
    if bbox is not None:
        x, y, w, h = map(int, bbox)
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
        
        # Draw tracking history
        if len(tracking_history) > 1:
            for i in range(1, len(tracking_history)):
                pt1 = tuple(map(int, tracking_history[i-1]))
                pt2 = tuple(map(int, tracking_history[i]))
                cv2.line(frame, pt1, pt2, (0, 0, 255), 1)
    
    # Display the frame
    cv2.imshow("Tracking", frame)
    
    # Exit on 'q' key
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Clean up
cap.release()
cv2.destroyAllWindows()

Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: Success
Tracking: 

In [37]:
# CSRT Tracker & Optical Flow & Kalman Filter  & Tracking History
import cv2
import numpy as np
from collections import deque

cap = cv2.VideoCapture(0)
tracker = cv2.TrackerCSRT_create()
ret, frame = cap.read()
bbox = cv2.selectROI("Select Object", frame, False)
tracker.init(frame, bbox)

prev_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

kalman = cv2.KalmanFilter(4, 2)
kalman.measurementMatrix = np.array([[1, 0, 0, 0],[0, 1, 0, 0]], np.float32)
kalman.transitionMatrix = np.array([[1, 0, 1, 0],[0, 1, 0, 1],[0, 0, 1, 0],[0, 0, 0, 1]], np.float32)
kalman.processNoiseCov = np.eye(4, dtype=np.float32) * 0.03


tracking_history = deque(maxlen=10)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    success, bbox = tracker.update(frame)

    if success:
        x, y, w, h = [int(v) for v in bbox]
        center_x, center_y = x + w // 2, y + h // 2

        measurement = np.array([[np.float32(center_x)], [np.float32(center_y)]])
        kalman.correct(measurement)

        tracking_history.append((center_x, center_y, w, h))

    else:
        
        if len(tracking_history) > 1:
            # الحصول على آخر موقعين معروفين
            p0 = np.array([[tracking_history[-2][:2]]], dtype=np.float32)
            p1, st, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, p0, None)

            if p1 is not None and st[0] == 1:
                # Optical Flow
                pred_x, pred_y = int(p1[0][0][0]), int(p1[0][0][1])
            else:
                #  Kalman Filter 
                prediction = kalman.predict()
                pred_x, pred_y = int(prediction[0]), int(prediction[1])

            # إعادة إنشاء التراكر عند الموقع المتوقع
            bbox = (pred_x - w // 2, pred_y - h // 2, w, h)
            tracker = cv2.TrackerCSRT_create()
            tracker.init(frame, bbox)

    prev_gray = gray.copy()

    if len(tracking_history) > 0:
        last_x, last_y, last_w, last_h = tracking_history[-1]
        cv2.rectangle(frame, (last_x - last_w // 2, last_y - last_h // 2),
                      (last_x + last_w // 2, last_y + last_h // 2), (0, 255, 0), 2)

    for i in range(1, len(tracking_history)):
        cv2.line(frame, tracking_history[i - 1][:2], tracking_history[i][:2], (0, 0, 255), 2)

    cv2.imshow("Tracking", frame)

    if cv2.waitKey(1) & 0xFF == 27:
        break

cap.release()
cv2.destroyAllWindows()

In [24]:
import cv2
import numpy as np
from collections import deque

# Initialize webcam
cap = cv2.VideoCapture(0)

# Read first frame
ret, frame = cap.read()

# Let user select ROI (Region of Interest)
bbox = cv2.selectROI("Select Object to Track", frame, False)
cv2.destroyWindow("Select Object to Track")

# Initialize the CSRT tracker
tracker = cv2.legacy.TrackerCSRT_create()
tracker.init(frame, bbox)

# Initialize variables for tracking
tracking_active = True
bbox_history = deque(maxlen=10)  # Store last 10 positions
bbox_history.append(bbox)

# Initialize Kalman filter
kalman = cv2.KalmanFilter(4, 2)
kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
kalman.processNoiseCov = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32) * 0.03

# Initialize variables for optical flow
old_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
old_points = None
    
while True:
    # Read a new frame
    ret, frame = cap.read()
        
    # Create a copy of the frame for visualization
    frame_display = frame.copy()
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    # 1. CSRT Tracker
    if tracking_active:
        success, bbox = tracker.update(frame)
        
        # If tracking is successful, update history and Kalman filter
        if success:
            x, y, w, h = [int(v) for v in bbox]
            bbox_history.append((x, y, w, h))
            
            # Kalman measurement update
            kalman.correct(np.array([[x + w/2], [y + h/2]], np.float32))
            
            # Save points for optical flow
            if old_points is None or len(old_points) < 10:
                # Define region for feature detection
                roi = old_gray[y:y+h, x:x+w]
                # Find good features to track
                try:
                    points = cv2.goodFeaturesToTrack(roi, 100, 0.01, 10)
                    if points is not None:
                        old_points = points + np.array([x, y], np.float32)
                except cv2.error:
                    old_points = None
            
            # Draw the bbox
            cv2.rectangle(frame_display, (x, y), (x + w, y + h), (0, 255, 0), 2)
            cv2.putText(frame_display, "Tracking", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        else:
            tracking_active = False
            
    # 2. If tracking fails, use prediction and attempt reinitialization
    if not tracking_active:
        # Predict using Kalman filter
        prediction = kalman.predict()
        pred_x, pred_y = int(prediction[0]), int(prediction[1])
        
        # Use the last known size from history
        if bbox_history:
            last_x, last_y, last_w, last_h = bbox_history[-1]
            predicted_bbox = (pred_x - last_w//2, pred_y - last_h//2, last_w, last_h)
            
            # Attempt to reinitialize tracker at the predicted position
            tracker = cv2.legacy.TrackerCSRT_create()
            success = tracker.init(frame, predicted_bbox)
            if success:
                tracking_active = True
                bbox = predicted_bbox
            
            # Draw the predicted position
            cv2.rectangle(frame_display, (pred_x - last_w//2, pred_y - last_h//2), 
                            (pred_x + last_w//2, pred_y + last_h//2), (0, 0, 255), 2)
            cv2.putText(frame_display, "Predicted", (pred_x - last_w//2, pred_y - last_h//2 - 10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
    
    # 3. Lucas-Kanade Optical Flow for additional tracking
    if old_points is not None and len(old_points) > 0:
        new_points, status, _ = cv2.calcOpticalFlowPyrLK(old_gray, gray, old_points, None)
        
        if new_points is not None:
            # Keep only good points
            good_new = new_points[status == 1]
            good_old = old_points[status == 1]
            
            # If we have enough points, use them to refine the bbox
            if len(good_new) > 5 and tracking_active:
                # Calculate the median shift
                shifts = good_new - good_old
                if len(shifts) > 0:
                    median_shift = np.median(shifts, axis=0)
                    x, y, w, h = [int(v) for v in bbox]
                    
                    # Apply the shift to the current bbox
                    shifted_bbox = (x + int(median_shift[0]), y + int(median_shift[1]), w, h)
                    
                    # Update Kalman with optical flow information
                    center_x = shifted_bbox[0] + shifted_bbox[2]//2
                    center_y = shifted_bbox[1] + shifted_bbox[3]//2
                    kalman.correct(np.array([[center_x], [center_y]], np.float32))
                
            # Draw optical flow tracks
            for i, (new, old) in enumerate(zip(good_new, good_old)):
                a, b = new.ravel().astype(int)
                c, d = old.ravel().astype(int)
                cv2.line(frame_display, (a, b), (c, d), (0, 255, 255), 2)
                cv2.circle(frame_display, (a, b), 3, (0, 255, 255), -1)
            
            # Update old_points for next frame
            old_points = good_new.reshape(-1, 1, 2)
    
    # Update old_gray for next optical flow calculation
    old_gray = gray.copy()
    
    # Draw tracking history
    for i in range(1, len(bbox_history)):
        prev_x, prev_y = bbox_history[i-1][0] + bbox_history[i-1][2]//2, bbox_history[i-1][1] + bbox_history[i-1][3]//2
        curr_x, curr_y = bbox_history[i][0] + bbox_history[i][2]//2, bbox_history[i][1] + bbox_history[i][3]//2
        cv2.line(frame_display, (prev_x, prev_y), (curr_x, curr_y), (255, 0, 0), 1)
    
    # Show the result
    cv2.imshow("Object Tracking", frame_display)
    
    # Break on ESC key
    if cv2.waitKey(1) == 27:  # ESC key
        break
        
cap.release()
cv2.destroyAllWindows()


  pred_x, pred_y = int(prediction[0]), int(prediction[1])
