In [None]:
import cv2
import torch
import numpy as np
import random
from collections import deque
import time
import torch.nn as nn
import torch.optim as optim

In [None]:
# Deep Q-Network (DQN) Model
class DQN(nn.Module):
    """DQN with three fully connected layers and ReLU activations."""

    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

NameError: name 'nn' is not defined

In [None]:
# Deep Q-Network (DQN) Model
class DQN(nn.Module):
    """DQN with three fully connected layers and ReLU activations."""

    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

In [None]:
# Reinforcement Learning Agent with DQN
class DQNAgent:
    """DQN Agent with experience replay and target network for stability."""

    def __init__(self, state_size, action_size, init_confidence=0.5):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=5000)  # Experience replay buffer
        self.gamma = 0.95  # Discount factor for future rewards
        self.epsilon = 1.0  # Initial exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995  # Exponential decay rate for exploration
        self.learning_rate = 0.001
        self.model = DQN(state_size, action_size)  # Main network
        self.target_model = DQN(state_size, action_size)  # Target network
        self.target_model.load_state_dict(self.model.state_dict())
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.confidence_threshold = init_confidence  # Dynamic detection threshold
        self.update_target_every = 100  # Update target network every 100 steps
        self.step_count = 0  # Counter for target network updates

    def remember(self, state, action, reward, next_state, done):
        """Store experience in replay buffer."""
        self.memory.append((state, action, reward, next_state, done))

    def choose_action(self, state):
        """Epsilon-greedy action selection."""
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)  # Explore
        state_tensor = torch.FloatTensor(state)
        with torch.no_grad():
            act_values = self.model(state_tensor)
        return torch.argmax(act_values).item()  # Exploit

    def replay(self, batch_size):
        """Train the network using randomly sampled experiences."""
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        states = torch.FloatTensor([m[0] for m in minibatch])
        actions = torch.LongTensor([m[1] for m in minibatch])
        rewards = torch.FloatTensor([m[2] for m in minibatch])
        next_states = torch.FloatTensor([m[3] for m in minibatch])
        dones = torch.FloatTensor([float(m[4]) for m in minibatch])

        # Current Q values from main network
        current_q = self.model(states).gather(1, actions.unsqueeze(1)).squeeze()

        # Next Q values from target network
        with torch.no_grad():
            next_q = self.target_model(next_states).detach().max(1)[0]

        # Compute target Q values
        target_q = rewards + (1 - dones) * self.gamma * next_q

        # Calculate loss and optimize
        loss = nn.MSELoss()(current_q, target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Update exploration rate
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        # Update target network periodically
        self.step_count += 1
        if self.step_count % self.update_target_every == 0:
            self.target_model.load_state_dict(self.model.state_dict())

In [None]:
# Initialize Object Detection Model (YOLOv5 on CPU)
device = torch.device("cpu")
# Load YOLOv5s model (smallest variant for CPU efficiency)
model = torch.hub.load("ultralytics/yolov5", "yolov5s", pretrained=True)
model.to(device)
model.eval()
default_confidence = 0.5  # Initial confidence threshold

Using cache found in /home/abdullahalazmi/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2025-2-5 Python-3.13.2 torch-2.6.0+cu124 CPU

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 


In [7]:
# Utility Functions
def calculate_frame_variation(prev_frame, current_frame):
    """Calculate mean absolute difference between consecutive frames."""
    if prev_frame is None:
        return 0
    diff = cv2.absdiff(prev_frame, current_frame)
    return np.mean(diff) / 255.0  # Normalize to [0,1]


def update_detector_confidence(threshold):
    """Update YOLOv5's confidence threshold dynamically."""
    model.conf = max(0.1, min(0.9, threshold))  # Clamp between 0.1-0.9

In [8]:
# Initialize Agent and Video Capture
# State: [normalized_detections, confidence, normalized_frame_variation]
state_size = 3
action_size = 2  # 0: Increase confidence, 1: Decrease confidence
agent = DQNAgent(state_size, action_size, init_confidence=default_confidence)

# Initialize video capture with error handling
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    raise IOError("Cannot open webcam")

# Frame processing variables
prev_gray = None
batch_size = 32
frame_count = 0
fps_start_time = time.time()
episode_reward = 0

# State tracking for correct experience replay
prev_state = None
prev_action = None

In [None]:
# Main Processing Loop
while True:
    # Read and validate frame
    ret, frame = cap.read()
    if not ret:
        break

    # Resize for efficiency and convert to RGB for YOLO
    resized_frame = cv2.resize(frame, (640, 480))
    rgb_frame = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)

    # Convert to grayscale for frame variation calculation
    current_gray = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2GRAY)

    # Calculate frame variation (normalized 0-1)
    frame_variation = calculate_frame_variation(prev_gray, current_gray)
    prev_gray = current_gray

    # Perform object detection with current confidence
    results = model(rgb_frame)
    detections = results.pandas().xyxy[0]
    valid_detections = detections[
        detections["confidence"] >= agent.confidence_threshold
    ]
    total_detections = len(valid_detections)

    # Normalize state components
    max_expected_detections = 50  # Adjust based on use case
    normalized_detections = total_detections / max_expected_detections
    normalized_variation = frame_variation  # Already normalized

    # Current state vector
    current_state = [
        normalized_detections,
        agent.confidence_threshold,
        normalized_variation,
    ]

    # Store experience and train if previous action exists
    if prev_state is not None:
        # Enhanced reward calculation (continuous)
        target_min, target_max = 5, 15
        if total_detections < target_min:
            reward = -(target_min - total_detections) / target_min
        elif total_detections > target_max:
            reward = -(total_detections - target_max) / target_max
        else:
            reward = 1.0

        episode_reward += reward
        agent.remember(prev_state, prev_action, reward, current_state, False)
        agent.replay(batch_size)

    # Choose and execute action
    action = agent.choose_action(current_state)
    step_size = 0.05  # Confidence adjustment step
    if action == 0:
        new_conf = agent.confidence_threshold + step_size
    else:
        new_conf = agent.confidence_threshold - step_size
    agent.confidence_threshold = max(0.1, min(0.9, new_conf))

    # Update previous state/action for next iteration
    prev_state = current_state
    prev_action = action

    # Visualization and Display
    # Draw detections and info on frame
    for _, det in valid_detections.iterrows():
        x1, y1, x2, y2 = map(int, det[["xmin", "ymin", "xmax", "ymax"]])
        cv2.rectangle(resized_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        label = f"{det['name']} {det['confidence']:.2f}"
        cv2.putText(
            resized_frame,
            label,
            (x1, y1 - 10),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.5,
            (0, 255, 0),
            2,
        )

    # Display performance metrics
    y_offset = 30
    info_lines = [
        f"Confidence: {agent.confidence_threshold:.2f}",
        f"Detections: {total_detections}",
        f"Episode Reward: {episode_reward:.1f}",
        f"Epsilon: {agent.epsilon:.2f}",
    ]
    for line in info_lines:
        cv2.putText(
            resized_frame,
            line,
            (10, y_offset),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            (0, 255, 255),
            2,
        )
        y_offset += 30

    # Calculate FPS
    frame_count += 1
    if (time.time() - fps_start_time) >= 1:
        fps = frame_count / (time.time() - fps_start_time)
        fps_start_time = time.time()
        frame_count = 0
    else:
        fps = 0
    cv2.putText(
        resized_frame,
        f"FPS: {fps:.1f}",
        (10, 470),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.7,
        (0, 255, 0),
        2,
    )

    # Display frame
    cv2.imshow("Adaptive Object Detection", resized_frame)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

# Cleanup
cap.release()
cv2.destroyAllWindows()

  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with a