In [3]:
import cv2
import numpy as np
import time
from deep_sort_realtime.deepsort_tracker import DeepSort
from ultralytics import YOLO
from collections import defaultdict, deque
import torch
import torchvision.transforms as T
from torchvision.models import resnet50
from sklearn.metrics.pairwise import cosine_similarity

# Initialize device (automatically uses GPU if available)
device = torch.device("cuda")
print(f"Using device: {device}")

# Improved FeatureExtractor with device handling
class FeatureExtractor(torch.nn.Module):
    def __init__(self):
        super(FeatureExtractor, self).__init__()
        resnet = resnet50(pretrained=True)
        self.features = torch.nn.Sequential(*list(resnet.children())[:-2])
        self.avgpool = torch.nn.AdaptiveAvgPool2d((1, 1))
        self.fc = torch.nn.Sequential(
            torch.nn.Linear(2048, 512),
            torch.nn.BatchNorm1d(512),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.5),
            torch.nn.Linear(512, 256)
        )
        
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return torch.nn.functional.normalize(x, p=2, dim=1)

resnet = FeatureExtractor().to(device).eval()

# Enhanced preprocessing
transform = T.Compose([
    T.ToPILImage(),
    T.Resize((256, 128)),
    T.RandomHorizontalFlip(p=0.5),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def extract_deep_features(frame, bbox):
    x1, y1, x2, y2 = map(int, bbox)
    x1, y1 = max(0, x1-10), max(0, y1-10)
    x2, y2 = min(frame.shape[1], x2+10), min(frame.shape[0], y2+10)
    
    person_roi = frame[y1:y2, x1:x2]
    if person_roi.size == 0:
        return None
    
    try:
        img = transform(person_roi).unsqueeze(0).to(device)
        with torch.no_grad():
            features = resnet(img).squeeze().cpu().numpy()
        return features / (np.linalg.norm(features) + 1e-12)
    except Exception as e:
        print(f"Feature extraction error: {e}")
        return None

def extract_clothing_histogram(frame, bbox):
    x1, y1, x2, y2 = map(int, bbox)
    height = y2 - y1
    lower_y1 = y1 + int(0.4 * height)
    torso = frame[lower_y1:y2, x1:x2]
    if torso.size == 0:
        return None
    
    hsv = cv2.cvtColor(torso, cv2.COLOR_BGR2HSV)
    histograms = []
    h, w = torso.shape[:2]
    
    for i in range(2):
        for j in range(2):
            y_start = i * h // 2
            y_end = (i + 1) * h // 2
            x_start = j * w // 2
            x_end = (j + 1) * w // 2
            cell = hsv[y_start:y_end, x_start:x_end]
            hist = cv2.calcHist([cell], [0, 1], None, [8, 8], [0, 180, 0, 256])
            histograms.append(cv2.normalize(hist, hist).flatten())
    
    return np.concatenate(histograms)

def compare_features(feat1, feat2, thresholds=(0.8, 0.7)):
    if feat1 is None or feat2 is None:
        return False
    
    deep_sim = cosine_similarity([feat1[0]], [feat2[0]])[0][0] if feat1[0] is not None and feat2[0] is not None else 0
    hist_sim = cv2.compareHist(feat1[1].astype(np.float32), feat2[1].astype(np.float32), cv2.HISTCMP_CORREL) if feat1[1] is not None and feat2[1] is not None else 0
    return 0.7 * deep_sim + 0.3 * hist_sim > 0.85

# Custom detection function with GPU/CPU fallback
def run_detection(model, frame, device):
    try:
        # First try full GPU processing
        results = model(frame, classes=[0], conf=0.7, verbose=False, half=True)
        return results
    except Exception as e:
        if "nms" in str(e).lower() or "cuda" in str(e).lower():
            # Fallback: Run model on GPU, NMS on CPU
            with torch.no_grad():
                frame_tensor = torch.from_numpy(frame).to(device).float().permute(2, 0, 1) / 255.0
                if frame_tensor.ndimension() == 3:
                    frame_tensor = frame_tensor.unsqueeze(0)
                
                preds = model.model(frame_tensor.half() if device.type == 'cuda' else frame_tensor)
                preds = preds[0] if isinstance(preds, tuple) else preds
                preds = preds.cpu().float()
                
                from ultralytics.utils.ops import non_max_suppression
                preds = non_max_suppression(
                    preds,
                    conf_thres=0.7,
                    iou_thres=0.45,
                    classes=[0],
                    agnostic=False,
                    max_det=100
                )
                
                from ultralytics.engine.results import Results
                return [Results(orig_img=frame, path=None, names=model.names, boxes=pred) for pred in preds]
        raise

# Initialize models
model = YOLO("yolov8n.pt").to(device)

tracker = DeepSort(max_age=20, nn_budget=100, max_cosine_distance=0.4, max_iou_distance=0.7)

# Tracking system
total_unique_people = 0
known_persons = []
pending_features = defaultdict(lambda: {'deep': deque(maxlen=25), 'clothing': deque(maxlen=25)})
feature_memory = defaultdict(lambda: {'deep': None, 'clothing': None, 'count': 0})

# Performance metrics
frame_count = 0
start_time = time.time()
metrics = {
    'detection_times': [],
    'tracking_times': [],
    'feature_times': []
}

cap = cv2.VideoCapture(0)   #########################################################################################################################
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = 20  

fourcc = cv2.VideoWriter_fourcc(*'MP4V')  # You can also use 'MP4V' for .mp4
out = cv2.VideoWriter('output_test_video.avi', fourcc, fps, (frame_width, frame_height))


while True:
    frame_start_time = time.time()
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_count += 1
    
    # Clean up old records periodically
    if frame_count % 100 == 0:
        known_persons = [p for p in known_persons if time.time() - p['last_seen'] < 300]

    # Detection
    det_start = time.time()
    results = run_detection(model, frame, device)
    detections = []
    for box in results[0].boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
        w, h = x2 - x1, y2 - y1
        if h < 50 or h > 500 or w/h < 0.3 or w/h > 1.5:
            continue
        detections.append([[x1, y1, w, h], box.conf.item()])
    metrics['detection_times'].append(time.time() - det_start)

    # Tracking
    track_start = time.time()
    tracks = tracker.update_tracks(detections, frame=frame) if detections else []
    current_persons = set()
    metrics['tracking_times'].append(time.time() - track_start)

    for track in tracks:
        if not track.is_confirmed():
            continue

        feat_start = time.time()
        x1, y1, x2, y2 = map(int, track.to_ltrb())
        deepsort_id = track.track_id

        # Feature extraction
        deep_feat = extract_deep_features(frame, (x1, y1, x2, y2))
        clothing_feat = extract_clothing_histogram(frame, (x1, y1, x2, y2))

        # Update feature buffers
        if deep_feat is not None:
            pending_features[deepsort_id]['deep'].append(deep_feat)
        if clothing_feat is not None:
            pending_features[deepsort_id]['clothing'].append(clothing_feat)

        # Wait for stable features
        min_samples = 10
        if len(pending_features[deepsort_id]['deep']) < min_samples or len(pending_features[deepsort_id]['clothing']) < min_samples:
            continue

        # Update feature memory
        current_features = (
            np.mean(pending_features[deepsort_id]['deep'], axis=0),
            np.mean(pending_features[deepsort_id]['clothing'], axis=0)
        )

        # Person matching
        best_match_id = None
        best_match_score = 0
        for known in known_persons:
            score = compare_features(current_features, (known["features"], known["clothing"]))
            if score > best_match_score and score > 0.75:
                best_match_score = score
                best_match_id = known["id"]

        if best_match_id is not None:
            person_id = best_match_id
            for known in known_persons:
                if known["id"] == person_id:
                    known["features"] = 0.1 * current_features[0] + 0.9 * known["features"]
                    known["clothing"] = 0.1 * current_features[1] + 0.9 * known["clothing"]
                    known["last_seen"] = time.time()
                    break
        else:
            total_unique_people += 1
            person_id = total_unique_people
            known_persons.append({
                "id": person_id,
                "features": current_features[0],
                "clothing": current_features[1],
                "last_seen": time.time(),
                "first_seen": time.time()
            })

        current_persons.add(person_id)
        metrics['feature_times'].append(time.time() - feat_start)

        # Visualization
        color = (0, 255, 0) if time.time() - feature_memory[deepsort_id].get('first_seen', time.time()) > 2.0 else (0, 165, 255)
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(frame, f"ID: {person_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    # Calculate and display metrics
    fps = 1.0 / (time.time() - frame_start_time)
    avg_det = np.mean(metrics['detection_times'][-10:]) * 1000 if metrics['detection_times'] else 0
    avg_track = np.mean(metrics['tracking_times'][-10:]) * 1000 if metrics['tracking_times'] else 0
    avg_feat = np.mean(metrics['feature_times'][-10:]) * 1000 if metrics['feature_times'] else 0
    avg_age = np.mean([time.time()-p['first_seen'] for p in known_persons]) if known_persons else 0

    stats = [
        f"FPS: {fps:.1f}",
        f"Current: {len(current_persons)}",
        f"Total Unique: {total_unique_people}",
        f"Track Age: {avg_age:.1f}s",
        f"Detection: {avg_det:.1f}ms",
        f"Tracking: {avg_track:.1f}ms",
        f"Features: {avg_feat:.1f}ms",
        f"Device: {device.type.upper()}"
    ]
    out.write(frame) 
    for i, stat in enumerate(stats):
        cv2.putText(frame, stat, (10, 30 + i * 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

    cv2.imshow("People Tracking", frame)
    if cv2.waitKey(1) == ord('q'):
        break

out.release()  
cap.release()
cv2.destroyAllWindows()

# Final performance report
print("\n=== Performance Summary ===")
print(f"Total Frames: {frame_count}")
print(f"Average FPS: {frame_count/(time.time()-start_time):.1f}")
print(f"Peak Unique People: {total_unique_people}")
print(f"Avg Detection Time: {np.mean(metrics['detection_times'])*1000:.1f}ms")
print(f"Avg Tracking Time: {np.mean(metrics['tracking_times'])*1000:.1f}ms")
print(f"Avg Feature Time: {np.mean(metrics['feature_times'])*1000:.1f}ms")


Using device: cuda





=== Performance Summary ===
Total Frames: 266
Average FPS: 15.5
Peak Unique People: 1
Avg Detection Time: 23.5ms
Avg Tracking Time: 6.2ms
Avg Feature Time: 2.8ms
