In [None]:
import cv2
import math
import time
import queue
import threading

from utils.thresholding import *
from pynq import Overlay
from pynq import MMIO
import time

In [None]:
def set_bit(value, bit):
    return value | (1 << bit)

In [None]:
# Speed = PWM. Direciton of wheel rotation = GPIO
def control_motors(motor_en, l_forward, r_forward):
    gpio_val = 0

    # Set left motor bits
    gpio_val |= 0x4 if l_forward else 0x8

    # Set right motor bits
    gpio_val |= 0x1 if r_forward else 0x2

    # Write combined value once
    motor_en.write(0x0, gpio_val)

In [None]:
def defuzz(low, high):
    low_x = 0
    high_x = 0
    
    if low > 0:
        low_x = (low-1)/(-1/110)-100
    else: 
        low_x = 80
    
    if high > 0:
        high_x = (high-1)/(1/110)+100
    else:
        high_x = -80
    
    #print([low, high])
    #print([low_x, high_x])
    
    speed = ((low_x*low)+(high_x*high))/(low+high)
    if speed == 100:
        speed = speed - 2
    #print(speed)
    return speed

In [None]:
def fuzzy_control(angle):
    NF = 0
    VC = 0
    PF = 0
    
    # Fuzzification
    # 1) NF
    if angle <= 0:
        if angle <= -50:
            NF = 1 
        else: 
            NF = angle / -50
    
    # 2) VC
    if angle <= 45 and angle >= -45:
        if angle < 0:
            VC = (angle / 45) + 1
        else:
            VC = (angle / -45) + 1
    
    # 3) PF
    if angle >= 0:
        if angle >= 50: 
            PF = 1 
        else:
            PF = angle / 50
    
    #print([NF, VC, PF])
    
    # Inference
    L_LOW = NF
    L_HIGH = max(VC, PF)
    
    R_LOW = PF
    R_HIGH = max(NF, VC)
    
    # Defuzzification
    # 1) Left Wheels
    l_speed = defuzz(L_LOW, L_HIGH)
    
    if l_speed > 99:
        l_speed = 98;
    elif l_speed < 1:
        l_speed = 2;
        
    print("left wheels speed: " + str(l_speed))
    
    # 2) Right Wheels
    r_speed = defuzz(R_LOW, R_HIGH)
    
    if r_speed > 99:
        r_speed = 98;
    elif r_speed < 1:
        r_speed = 2;
        
    print("right wheels speed: " + str(r_speed))
    
    period = int((_period_ & 0x0ffff) *100);
    r_pulse = int((int(r_speed) & 0x07f)*period/100);
    l_pulse = int((int(l_speed) & 0x07f)*period/100);
    
    control_motors(motor_enable, l_forward, r_forward)

    # Set period and duty cycle
    right_motor.write(R_TLR0_address, period)
    right_motor.write(R_TLR1_address, r_pulse)
    left_motor.write(L_TLR0_address, period)
    left_motor.write(L_TLR1_address, l_pulse)
    
    # Define the LOAD bits (1 << bit_offset)
    R_LOAD0 = 1 << R_TCSR0_register['LOAD0']['bit_offset']
    R_LOAD1 = 1 << R_TCSR1_register['LOAD1']['bit_offset']
    L_LOAD0 = 1 << L_TCSR0_register['LOAD0']['bit_offset']
    L_LOAD1 = 1 << L_TCSR1_register['LOAD1']['bit_offset']

    # Pulse LOAD bits
    right_motor.write(R_TCSR0_address, R_temp_val_0 | R_LOAD0)
    right_motor.write(R_TCSR1_address, R_temp_val_1 | R_LOAD1)
    left_motor.write(L_TCSR0_address, L_temp_val_0 | L_LOAD0)
    left_motor.write(L_TCSR1_address, L_temp_val_1 | L_LOAD1)

    # Re-enable timer (in case LOAD cleared ENT bits)
    right_motor.write(R_TCSR0_address, R_temp_val_0)
    right_motor.write(R_TCSR1_address, R_temp_val_1)
    left_motor.write(L_TCSR0_address, L_temp_val_0)
    left_motor.write(L_TCSR1_address, L_temp_val_1)

    time.sleep(0.1)  # Prevent busy loop (optional)

In [None]:
def compute_lane_heading_angle(left_fit=None, right_fit=None, y_eval=200):
    angles = []

    def angle_from_fit(fit):
        # Derivative of the 2nd-order polynomial at y = y_eval
        dy = 1.0  # pixel change in y
        dx = 2 * fit[0] * y_eval + fit[1]  # derivative at y_eval
        angle_rad = np.arctan(dx)  # slope to angle
        return -np.degrees(angle_rad)

    if left_fit is not None:
        angles.append(angle_from_fit(left_fit))
    if right_fit is not None:
        angles.append(angle_from_fit(right_fit))

    if angles:
        print(angles)
        # Average heading angle from both lanes
        return np.mean(angles)
    else:
        return None

In [None]:
def detect_lane_lines_connected_components(binary_warped):
    # Ensure the input is binary (0 and 255)
    binary = np.uint8(binary_warped * 255) if binary_warped.max() <= 1 else np.uint8(binary_warped)

    # Apply connected components
    num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(binary, connectivity=8)

    # Image dimensions
    height, width = binary.shape

    # Store points
    leftx, lefty, rightx, righty = [], [], [], []

    for i in range(1, num_labels):  # Label 0 is background
        x, y, w, h, area = stats[i]
        cx, cy = centroids[i]

        # Heuristic filters for likely lane lines
        if area > 1000 and h > 30:
            if cx < width // 2:
                coords = np.column_stack(np.where(labels == i))
                for pt in coords:
                    lefty.append(pt[0])
                    leftx.append(pt[1])
            else:
                coords = np.column_stack(np.where(labels == i))
                for pt in coords:
                    righty.append(pt[0])
                    rightx.append(pt[1])

    # Convert lists to numpy arrays
    leftx = np.array(leftx)
    lefty = np.array(lefty)
    rightx = np.array(rightx)
    righty = np.array(righty)

    return leftx, lefty, rightx, righty

In [None]:
WIDTH = 640
HEIGHT = 480

frame_in_w = 640
frame_in_h = 480

# Change these variables based on our car.
wheelbase = 0.1524 # Length of car from front to back in meters
n = 5 # Angle gain

ym_per_pix = 0.1524 / 72.0
xm_per_pix = 0.2286 / 600.0
y_eval = 210
midx = 320

y_bottom = HEIGHT
y_top = int(HEIGHT * 0.4)
#y_top = 0

angle = 0
kernel = np.ones((3, 3), np.uint8)  # Try 5x5 instead of 3x3

In [None]:
scale_factor_h = frame_in_w / 1280
scale_factor_v = frame_in_h / 720
offset = 200 * scale_factor_h
src = np.float32([
    [10, 300],
    [630, 300],
    [640, 410],
    [0, 410]
])

dst = np.float32([[offset, 0], [frame_in_w - offset, 0], [frame_in_w - offset, frame_in_h], [offset, frame_in_h]])
M = cv2.getPerspectiveTransform(src,dst)

In [None]:
def capture_frames(videoIn, frame_queue):
    while True:
        ret, frame = videoIn.read()
        if not ret:
            print("Failed to grab frame.")
            break
        if frame_queue.full():
            # If the queue is full (only 1 frame), remove the oldest frame
            frame_queue.get()  # Discard the old frame to keep the queue size at 1
        frame_queue.put(frame)

In [None]:
# PWM SETUP (Using MMIO).....
# Load your custom overlay
motor_control = Overlay("motor_control.bit")

right_motor = motor_control.axi_timer_0 #PMODB P3 for right PWM
left_motor = motor_control.axi_timer_1  #PMODB P5 for left PWM

# extract register addresses (will be the same for every Axi Timer)
R_TCSR0 = motor_control.ip_dict['axi_timer_0']['registers']['TCSR0']
R_TCSR1 = motor_control.ip_dict['axi_timer_0']['registers']['TCSR1']
R_TCSR0_address = R_TCSR0['address_offset']
R_TCSR1_address = R_TCSR1['address_offset']
R_TCSR0_register = R_TCSR0['fields'] # bit_offset for address
R_TCSR1_register = R_TCSR1['fields']
R_TLR0 = motor_control.ip_dict['axi_timer_0']['registers']['TLR0']
R_TLR1 = motor_control.ip_dict['axi_timer_0']['registers']['TLR1']
R_TLR0_address = R_TLR0['address_offset']
R_TLR1_address = R_TLR1['address_offset']

# create the configuration values for the control register
R_temp_val_0 = 0
R_temp_val_1 = 0

# The PWMA0 bit in TCSR0 and PWMB0 bit in TCSR1 must be set to 1 to enable PWM mode.
R__temp_val_0 = set_bit(R_temp_val_0, R_TCSR0_register['PWMA0']['bit_offset'])
R_temp_val_1 = set_bit(R_temp_val_1, R_TCSR1_register['PWMA1']['bit_offset'])

# The GenerateOut signals must be enabled in the TCSR (bit GENT set to 1). The PWM0
# signal is generated from the GenerateOut signals of Timer 0 and Timer 1, so these
# signals must be enabled in both timer/counters
R_temp_val_0 = set_bit(R_temp_val_0, R_TCSR0_register['GENT0']['bit_offset'])
R_temp_val_1 = set_bit(R_temp_val_1, R_TCSR1_register['GENT1']['bit_offset'])

# The counter can be set to count up or down. UDT
R_temp_val_0 = set_bit(R_temp_val_0, R_TCSR0_register['UDT0']['bit_offset'])
R_temp_val_1 = set_bit(R_temp_val_1, R_TCSR1_register['UDT1']['bit_offset'])

# set Autoreload (ARHT0 = 1)
R_temp_val_0 = set_bit(R_temp_val_0, R_TCSR0_register['ARHT0']['bit_offset'])
R_temp_val_1 = set_bit(R_temp_val_1, R_TCSR1_register['ARHT1']['bit_offset'])

# enable timer (ENT0 = 1)
R_temp_val_0 = set_bit(R_temp_val_0, R_TCSR0_register['ENT0']['bit_offset'])
R_temp_val_1 = set_bit(R_temp_val_1, R_TCSR1_register['ENT1']['bit_offset'])

In [None]:
L_TCSR0 = motor_control.ip_dict['axi_timer_1']['registers']['TCSR0']
L_TCSR1 = motor_control.ip_dict['axi_timer_1']['registers']['TCSR1']
L_TCSR0_address = L_TCSR0['address_offset']
L_TCSR1_address = L_TCSR1['address_offset']
L_TCSR0_register = L_TCSR0['fields'] # bit_offset for address
L_TCSR1_register = L_TCSR1['fields']
L_TLR0 = motor_control.ip_dict['axi_timer_1']['registers']['TLR0']
L_TLR1 = motor_control.ip_dict['axi_timer_1']['registers']['TLR1']
L_TLR0_address = L_TLR0['address_offset']
L_TLR1_address = L_TLR1['address_offset']

# create the configuration values for the control register
L_temp_val_0 = 0
L_temp_val_1 = 0

# The PWMA0 bit in TCSR0 and PWMB0 bit in TCSR1 must be set to 1 to enable PWM mode.
L_temp_val_0 = set_bit(L_temp_val_0, L_TCSR0_register['PWMA0']['bit_offset'])
L_temp_val_1 = set_bit(L_temp_val_1, L_TCSR1_register['PWMA1']['bit_offset'])

# The GenerateOut signals must be enabled in the TCSR (bit GENT set to 1). The PWM0
# signal is generated from the GenerateOut signals of Timer 0 and Timer 1, so these
# signals must be enabled in both timer/counters
L_temp_val_0 = set_bit(L_temp_val_0, L_TCSR0_register['GENT0']['bit_offset'])
L_temp_val_1 = set_bit(L_temp_val_1, L_TCSR1_register['GENT1']['bit_offset'])

# The counter can be set to count up or down. UDT
L_temp_val_0 = set_bit(L_temp_val_0, L_TCSR0_register['UDT0']['bit_offset'])
L_temp_val_1 = set_bit(L_temp_val_1, L_TCSR1_register['UDT1']['bit_offset'])

# set Autoreload (ARHT0 = 1)
L_temp_val_0 = set_bit(L_temp_val_0, L_TCSR0_register['ARHT0']['bit_offset'])
L_temp_val_1 = set_bit(L_temp_val_1, L_TCSR1_register['ARHT1']['bit_offset'])

# enable timer (ENT0 = 1)
L_temp_val_0 = set_bit(L_temp_val_0, L_TCSR0_register['ENT0']['bit_offset'])
L_temp_val_1 = set_bit(L_temp_val_1, L_TCSR1_register['ENT1']['bit_offset'])

In [None]:
# GPIO SETUP (Using MMIO)......
motor_enable_address = motor_control.ip_dict['axi_gpio_0']['phys_addr']
motor_enable = MMIO(motor_enable_address, 8)
motor_enable.write(0x4, 0x0) # Write to tri-state register to configure IO as OUTPUT

_period_ = 10000000 # 50Hz, 20ms

In [None]:
# Load your custom overlay
motor_control = Overlay("motor_control.bit")

# GPIO SETUP (Using MMIO)......
motor_enable_address = motor_control.ip_dict['axi_gpio_0']['phys_addr']
#RANGE = ??? Should be number of bytes. 8/4 = 2x 32-bit locations
motor_enable = MMIO(motor_enable_address, 8) # Add RANGE if needed as second argument
motor_enable.write(0x4, 0x0) # Write to tri-state register to configure IO as OUTPUT

print("Program running")

time.sleep(15)

videoIn = cv2.VideoCapture(0)
videoIn.set(cv2.CAP_PROP_FRAME_WIDTH, frame_in_w)
videoIn.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_in_h)
print("capture device is open: " + str(videoIn.isOpened()))

frame_queue = queue.Queue(maxsize=1)

capture_thread = threading.Thread(target=capture_frames, args=(videoIn, frame_queue))
capture_thread.daemon = True
capture_thread.start()

In [None]:
while (True):
    if frame_queue.empty():
        continue    
        
    frame_vga = frame_queue.get()
    
    b_thresholded = threshold(frame_vga)
    binary_warped = cv2.warpPerspective(b_thresholded,M, (frame_in_w, frame_in_h))[y_top:y_bottom, :]
    kernel = np.ones((20, 20), np.uint8)
    
     # Clean small blobs
    binary_cleaned = cv2.morphologyEx(binary_warped, cv2.MORPH_OPEN, kernel)

    # Fill small gaps
    binary_cleaned = cv2.morphologyEx(binary_cleaned, cv2.MORPH_CLOSE, kernel)
    
    leftx, lefty, rightx, righty = detect_lane_lines_connected_components(binary_cleaned)
    
    if leftx.size > 0 and lefty.size > 0 and rightx.size > 0 and righty.size > 0:
        # Fit a second order polynomial to each
        left_fit = np.polyfit(lefty, leftx, 2)
        right_fit = np.polyfit(righty, rightx, 2)

        angle = compute_lane_heading_angle(left_fit=left_fit, right_fit=right_fit, y_eval=y_eval)
    elif leftx.size > 0 and lefty.size > 0:
        # Fit a second order polynomial to each
        left_fit = np.polyfit(lefty, leftx, 2)
        right_fit = [0,0,0]

        angle = compute_lane_heading_angle(left_fit=left_fit, y_eval=y_eval)
    elif rightx.size > 0 and righty.size > 0:
        # Fit a second order polynomial to each
        left_fit = [0,0,0]
        right_fit = np.polyfit(righty, rightx, 2)

        angle = compute_lane_heading_angle(right_fit=right_fit, y_eval=y_eval)

    print("Steering angle: " + str(angle))
    fuzzy_control(angle)    