# Lane Detector driver script

In [1]:
from pynq import allocate, Overlay, Clocks, MMIO
from pynq.lib import AxiGPIO
from matplotlib import pyplot as plot
from IPython.display import Video
import numpy as np
import cv2 as cv
import math

## Load the system bitstream into PL

In [2]:
# Load the system into the PL
system = Overlay("bd_wrapper.bit");
dma = system.axi_dma_0
vpu = system.vpu_accel_0

# For some reason this clock gets reset to 66 MHz
Clocks.fclk0_mhz = 100

# Check if the VPU IP core is the right component by chekcing if the registers exist
for reg in ["in_width", "in_height", "seg_thres", "g_sigma", "e_thres", "h_thres"]:
    assert(hasattr(vpu.register_map, reg))

vpu.register_map

RegisterMap {
  CTRL = Register(AP_START=0, AP_DONE=0, AP_IDLE=1, AP_READY=0, RESERVED_1=0, AUTO_RESTART=0, RESERVED_2=0, RESERVED_3=0, RESERVED_4=0),
  GIER = Register(Enable=0, RESERVED=0),
  IP_IER = Register(CHAN0_INT_EN=0, CHAN1_INT_EN=0, RESERVED_0=0, CHAN2_INT_EN=0, RESERVED_1=0),
  IP_ISR = Register(CHAN0_INT_ST=0, CHAN1_INT_ST=0, RESERVED_0=0, CHAN2_INT_ST=0, RESERVED_1=0),
  in_height = Register(in_height=write-only),
  in_width = Register(in_width=write-only),
  seg_thres = Register(seg_thres=write-only, RESERVED=write-only),
  g_sigma = Register(g_sigma=write-only),
  e_thres = Register(e_thres=write-only, RESERVED=write-only),
  h_thres = Register(h_thres=write-only, RESERVED=write-only)
}

## Configure the UART-Lite peripheral

In [3]:
# Access the UART registers via memory mapped IO
uart = MMIO(system.ip_dict["axi_uartlite_0"]["phys_addr"], 65535)

# Register offsets from base address
# https://www.xilinx.com/support/documents/ip_documentation/axi_uartlite/v2_0/pg142-axi-uartlite.pdf
reg_rx = 0x00
reg_tx = 0x04
reg_status = 0x08
reg_control = 0x0C

# Bit positions for flags
flag_interrupt_en = 4
flag_rx_empty = 0
flag_tx_empty_n = 2
flag_reset_tx = 0
flag_reset_rx = 1

# We want the RX/TX queues to be empty before we do anything
# If this is already the case, skip this step.
if uart.read(reg_status) & (1 << flag_rx_empty) or not uart.read(reg_status) & (1 << flag_tx_empty_n):
    uart.write(reg_control, (1 << flag_reset_tx) | (1 << flag_reset_rx))
    while uart.read(reg_status) & (1 << flag_rx_empty) or not uart.read(reg_status) & (1 << flag_tx_empty_n):
        pass

# We want interrupts to be disabled, check the status register
# If this is already the case, skip this step.
if uart.read(reg_status) & (1 << flag_interrupt_en):
    uart.write(reg_control, (1 << flag_interrupt_en))
    while uart.read(reg_status) & (1 << flag_interrupt_en):
        pass

# Write an array of bytes over UART
def uart_write_bytes(vector):
    for v in vector:
        while not uart.read(reg_status) & (1 << flag_tx_empty_n):
            pass
        uart.write(reg_tx, v)
        
# Write a string as ASCII characters over UART
def uart_write_ascii(string):
    uart_write_bytes(map(lambda char: ord(char), string))

# Use the PYNQ GPIO for controlling the RGB LEDs
leds = AxiGPIO(system.ip_dict["axi_gpio_0"])
leds[0:6].write(0b100100) # all red

## Open the dashcam capture

In [4]:
max_width = 1280
max_height = 720
max_lines = 32
theta_margin = 0.05

# The dashcam should be found by V4L2 and available as device 0 
vrd = cv.VideoCapture("night4.m4v")
vrd.set(cv.CAP_PROP_FRAME_WIDTH, max_width)
vrd.set(cv.CAP_PROP_FRAME_HEIGHT, max_height)

False

## Configure the memory controller

In [5]:
# Configure the DMA
in_buffer = allocate(shape=(max_height, max_width, 3), dtype=np.uint8, cacheable=1)
out_buffer = allocate(shape=(2, max_lines), dtype=np.int32, cacheable=1)
fl_buffer = allocate(shape=(2, max_lines), dtype=np.float32, cacheable=1)

# Default settings for most scenarios
#seg_thres = 140
seg_thres = 70
g_sigma = 2.5
e_thres = 20
h_thres = 150

# Configure the IP core
vpu.register_map.in_width = max_width
vpu.register_map.in_height = max_height
vpu.register_map.seg_thres = seg_thres
vpu.register_map.g_sigma = e_thres
vpu.register_map.e_thres = e_thres
vpu.register_map.h_thres = h_thres

## Run the main loop

In [6]:
# Amount of data points to take into account when averaging
roll_amount = 10
# 90 degrees in radians
deg90_rad = math.pi / 2
# Sensitivity of the angle detection algorithm
angle_sens = 1.75

num_frames = 0
total_ticks = 0
left_rho_roll = 0
left_theta_roll = 0
right_rho_roll = 0
right_theta_roll = 0
departing_state = False
departing_state_prev = False

# Approximate the rolling average
def roll_avg(avg, new):
    avg -= avg / roll_amount;
    avg += new / roll_amount;
    return avg;

# Set LEDs to green to indicate that video is being processed
leds[0:6].write(0b010010)

# While dashcam is capturing footage
while vrd.isOpened():
    ret, frame = vrd.read()
    
    # Check for frame capture error
    if not ret:
        break
    
    # Copy image into memory
    in_buffer[:] = frame
    
    start = cv.getTickCount()
    
    # Send a status update via UART to tell that we are processing
    uart_write_bytes([
        0b00000010, # Packet type STATUS_UPDATE
        0b00000001, # Indicate that processor is active
        seg_thres, # Segmentation threshold
        int(g_sigma * (1 << 6)), # Gaussian sigma in Q2.6
        e_thres, # Sobel edge threshold
        h_thres # Hough vote threshold
    ])
    
    # Start streaming the image
    dma.sendchannel.transfer(in_buffer)
    dma.recvchannel.transfer(out_buffer)

    # Signal the VPU to start processing
    vpu.write(0x00,0x81)

    # Wait until results are available
    dma.sendchannel.wait()
    dma.recvchannel.wait()
    
    end = cv.getTickCount()
    total_ticks += (end - start)
    num_frames += 1
    
    # Convert all fixed-point results to floats
    fl_buffer[:] = out_buffer / (1 << (32-10))
    
    # Categorize lines into left and right buckets
    left_lines = []
    right_lines = []
    
    for i in range(0, max_lines):
        rho = fl_buffer[0, i]
        theta = fl_buffer[1, i]
        if theta > deg90_rad + theta_margin:
            left_lines.append((rho, theta))
        elif theta < deg90_rad - theta_margin and theta != 0:
            right_lines.append((rho, theta))

    # Average the left lines
    if left_lines:
        rho, theta = np.average(left_lines, axis=0)
        
        if left_rho_roll == 0:
            left_rho_roll = rho
        else:
            left_rho_roll = roll_avg(left_rho_roll, rho)
        
        if left_theta_roll == 0:
            left_theta_roll = theta
        else:
            left_theta_roll = roll_avg(left_theta_roll, theta)
    
    # Average the right lines
    if right_lines:
        rho, theta = np.average(right_lines, axis=0)
        
        if right_rho_roll == 0:
            right_rho_roll = rho
        else:
            right_rho_roll = roll_avg(right_rho_roll, rho)
        
        if right_theta_roll == 0:
            right_theta_roll = theta
        else:
            right_theta_roll = roll_avg(right_theta_roll, theta)
    
    # We have a result
    if left_theta_roll != 0 and right_theta_roll != 0:
        
        # Send a packet with the resulting average lines
        uart_write_bytes([
            0b00000100, # Packet type FRAME_PROCESSED
            2, # Amount of Lines found (left avg and right avg)
            int(left_rho_roll * (1 << 1)), # rho of left line in Q7.1
            int(left_theta_roll * (1 << 6)), # theta in radians of left line in Q2.6
            int(right_rho_roll * (1 << 1)), # rho of right line in Q7.1
            int(right_theta_roll * (1 << 6)) # theta in radians of right line in Q2.6
        ])
        
        # relative difference in degrees of the left and the center line
        rel_left = left_theta_roll - deg90_rad
        # relative difference in degrees of the right and the center line
        rel_right = deg90_rad - right_theta_roll
        
        # Save the state of the previous iteration
        departing_state_prev = departing_state
        
        # If the angle of the {left,right} line is double that of the other line
        # Use a simple state machine for this
        if rel_left > rel_right * angle_sens or rel_right > rel_left * angle_sens:
            departing_state = True
        else:
            departing_state = False
            
        # React on the result of the state machine
        if not departing_state_prev and departing_state:
            # Set LEDs to blue to indicate departure
            leds[0:6].write(0b001001)
            
            # Write departure state update to UART
            uart_write_bytes([
                0b00001000, # Packet type DEPARTURE_WARNING
                0b00000001 # indicate that departure is imminent
            ])
        elif departing_state_prev and not departing_state:
            # Revert LEDs to green to indicate departure
            leds[0:6].write(0b010010)
            
            # Write departure state update to UART
            uart_write_bytes([
                0b00001000, # Packet type DEPARTURE_WARNING
                0b00000000 # indicate that departure is over
            ])
    
# Release resources
vrd.release()
del in_buffer
del out_buffer
del fl_buffer

# Set LEDs to red to indicate offline
leds[0:6].write(0b100100)

# Terminating, send status update to signal that we are offline
# Status info packet
uart_write_bytes([
    0b00000010, # Packet type STATUS_UPDATE
    0b00000000, # Indicate that processor is inactive
    seg_thres, # Segmentation threshold
    int(g_sigma * (1 << 6)), # Gaussian sigma in Q2.6
    e_thres, # Sobel edge threshold
    h_thres # Hough vote threshold
])

print("Avg: ", (total_ticks / num_frames) / cv.getTickFrequency(), " s over ", num_frames, " frames")

Avg:  0.019379453597222224  s over  720  frames
