# Edge Detection (720p) on Arty Z7
This notebook implements a real-time Canny Edge Detection pipeline using the PYNQ framework.

In [None]:
from pynq.overlays.base import BaseOverlay
from pynq.lib.video import VideoMode
import cv2
import numpy as np
import threading
import queue
import time
import gc

# Configuration
WIDTH, HEIGHT = 1280, 720
PROCESS_SCALE = 0.5  # Downscale to run faster
BUFFER_SIZE = 2
TARGET_FPS = 30
CANNY_TH1 = 100
CANNY_TH2 = 200

In [None]:
# Setup FPGA Overlay and HDMI Output
print("Initializing FPGA...")
base = BaseOverlay("base.bit")
hdmi_out = base.video.hdmi_out

mode = VideoMode(WIDTH, HEIGHT, 24)
hdmi_out.configure(mode)
hdmi_out.start()
print("HDMI Output Initialized")

In [None]:
# Setup USB Camera
print("Initializing Camera...")
cap = cv2.VideoCapture(0, cv2.CAP_V4L2)

# Force MJPEG to save USB bandwidth
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT)
cap.set(cv2.CAP_PROP_FPS, TARGET_FPS)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Low latency buffer

if not cap.isOpened():
    raise RuntimeError("Failed to open camera!")

print(f"Camera Initialized: {WIDTH}x{HEIGHT} @ {TARGET_FPS}FPS")

In [None]:
# Shared State and Queues
class StreamState:
    def __init__(self):
        self.running = True
        self.fps_capture = 0.0
        self.fps_process = 0.0
        self.fps_display = 0.0
        self.dropped_frames = 0

state = StreamState()
frame_queue = queue.Queue(maxsize=BUFFER_SIZE)
result_queue = queue.Queue(maxsize=BUFFER_SIZE)

In [None]:
def capture_thread():
    frame_count = 0
    last_reset = time.perf_counter()
    
    while state.running:
        ret, frame = cap.read()
        if not ret:
            time.sleep(0.001)
            continue
        
        # Ensure correct dimensions
        if frame.shape[:2] != (HEIGHT, WIDTH):
            frame = cv2.resize(frame, (WIDTH, HEIGHT))
        
        try:
            frame_queue.put(frame, block=False)
            frame_count += 1
        except queue.Full:
            state.dropped_frames += 1
        
        # Update Capture FPS
        if frame_count % 30 == 0:
            now = time.perf_counter()
            elapsed = now - last_reset
            state.fps_capture = 30 / elapsed
            last_reset = now

In [None]:
def processing_thread():
    process_w = int(WIDTH * PROCESS_SCALE)
    process_h = int(HEIGHT * PROCESS_SCALE)
    
    frame_count = 0
    last_reset = time.perf_counter()
    
    while state.running:
        try:
            frame = frame_queue.get(timeout=0.1)
        except queue.Empty:
            continue
        
        # Downscale for faster processing
        small = cv2.resize(frame, (process_w, process_h))
        
        # Canny Edge Detection
        edges = cv2.Canny(small, CANNY_TH1, CANNY_TH2)
        
        # Upscale mask to original resolution
        mask_full = cv2.resize(edges, (WIDTH, HEIGHT), interpolation=cv2.INTER_NEAREST)
        
        # Overlay edges in Green (BGR: 0, 128, 0)
        frame[mask_full > 0] = (0, 128, 0)
        
        try:
            result_queue.put(frame, block=False)
            frame_count += 1
        except queue.Full:
            pass
        
        # Update Processing FPS
        if frame_count % 30 == 0:
            now = time.perf_counter()
            elapsed = now - last_reset
            state.fps_process = 30 / elapsed
            last_reset = now

In [None]:
def display_thread():
    prev_time = time.perf_counter()
    fps_smooth = 0.0
    hdmi_buffer = hdmi_out.newframe()
    
    while state.running:
        try:
            frame = result_queue.get(timeout=0.1)
        except queue.Empty:
            continue
        
        # Calculate Display FPS
        current_time = time.perf_counter()
        time_diff = current_time - prev_time
        prev_time = current_time
        
        if time_diff > 0:
            instant_fps = 1.0 / time_diff
            fps_smooth = 0.9 * fps_smooth + 0.1 * instant_fps
        state.fps_display = fps_smooth
        
        # Draw OSD
        cv2.putText(frame, f"Cap: {state.fps_capture:.1f}", (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
        cv2.putText(frame, f"Proc: {state.fps_process:.1f}", (20, 85), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
        cv2.putText(frame, f"Disp: {fps_smooth:.1f}", (20, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        cv2.putText(frame, "Edge Mode", (20, 155), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 128, 0), 2)
        
        # Convert to RGB and Output to HDMI
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        hdmi_buffer[:] = frame_rgb
        hdmi_out.writeframe(hdmi_buffer)

In [None]:
# Main Execution Loop
print("Starting Pipeline...")

try:
    threads = [
        threading.Thread(target=capture_thread, daemon=True),
        threading.Thread(target=processing_thread, daemon=True),
        threading.Thread(target=display_thread, daemon=True)
    ]
    
    for t in threads:
        t.start()
    
    # Monitor loop - prints status every 5 seconds
    while True:
        time.sleep(5)
        print(f"[Status] FPS: Cap={state.fps_capture:.1f} | Proc={state.fps_process:.1f} | Disp={state.fps_display:.1f} | Drops={state.dropped_frames}")

except KeyboardInterrupt:
    print("\nStopping...")
    state.running = False
    for t in threads:
        t.join(timeout=1.0)
finally:
    cap.release()
    hdmi_out.stop()
    del hdmi_out
    gc.collect()
    print("Resources Released.")