In [1]:
import cv2
import numpy as np
import math

import queue, threading

import RPi.GPIO as gpio
import time

def detect_edges(frame):
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    lower_green = np.array([40, 40, 40])
    upper_green = np.array([70, 255, 255])
    mask = cv2.inRange(hsv, lower_green, upper_green)
    
    edges = cv2.Canny(mask, 200, 400)
    return edges

def region_of_interest(edges):
    height, width = edges.shape
    mask = np.zeros_like(edges)

    polygon = np.array([[
        (0, height * 1 / 2),
        (width, height * 1 / 2),
        (width, height),
        (0, height),
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255)
    cropped_edges = cv2.bitwise_and(edges, mask)
    return cropped_edges

def detect_line_segments(cropped_edges):
    rho = 1
    angle = np.pi / 180
    min_threshold = 10
    line_segments = cv2.HoughLinesP(cropped_edges, rho, angle, min_threshold, 
                                    np.array([]), minLineLength=8, maxLineGap=4)

    return line_segments

def average_slope_intercept(frame, line_segments):
    lane_lines = []
    if line_segments is None:
        print('No line_segment segments detected')
        return lane_lines

    height, width, _ = frame.shape
    left_fit = []
    right_fit = []

    boundary = 1/3
    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary

    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                continue
            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = fit[0]
            intercept = fit[1]
            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))

    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))

    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))

    return lane_lines

def make_points(frame, line):
    height, width, _ = frame.shape
    slope, intercept = line
    y1 = height
    y2 = int(y1 * 1 / 2) 

    x1 = max(-width, min(2 * width, int((y1 - intercept) / slope)))
    x2 = max(-width, min(2 * width, int((y2 - intercept) / slope)))
    return [[x1, y1, x2, y2]]

def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
    line_image = np.zeros_like(frame)
    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)
    return line_image

def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5 ):
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape

    steering_angle_radian = steering_angle / 180.0 * math.pi
    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)

    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)

    return heading_image

def determine_steering_angle(frame):
    muchii = detect_edges(frame)
    imagine_crop = region_of_interest(muchii)
    segmente = detect_line_segments(imagine_crop)
    marcaje = average_slope_intercept(frame, segmente)
    imagine_marcaje = display_lines(frame, marcaje)
    
    if len(marcaje) >= 2: # cazul in care am 2 marcaje
        height, width, _ = frame.shape
        _, _, left_x2, _ = marcaje[0][0]
        _, _, right_x2, _ = marcaje[1][0]
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)
        
        angle_to_mid_radian = math.atan(x_offset / y_offset) 
        angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi) 
        steering_angle = angle_to_mid_deg + 90
    elif len(marcaje) == 1: # cazul in care am un singur marcaj
        height, width, _ = frame.shape
        x1, _, x2, _ = marcaje[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)

        angle_to_mid_radian = math.atan(x_offset / y_offset)  
        angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)  
        steering_angle = angle_to_mid_deg + 90
    else: 
        steering_angle = 90

    return steering_angle, imagine_marcaje

In [2]:
enA = 17
in1 = 27
in2 = 22

enB = 25
in3 = 23
in4 = 24

pwmA = None
pwmB = None
dutyCycleA = 50 
dutyCycleB = 50 

def init():    
    global pwmA, pwmB
    gpio.setmode(gpio.BCM)
    gpio.setup(enA, gpio.OUT)
    gpio.setup(in1, gpio.OUT)
    gpio.setup(in2, gpio.OUT)
    
    gpio.setup(in3, gpio.OUT)
    gpio.setup(in4, gpio.OUT)
    gpio.setup(enB, gpio.OUT)

    pwmA = gpio.PWM(enA, 100)  
    pwmA.start(0)

    pwmB = gpio.PWM(enB, 100) 
    pwmB.start(0)

def stop_robot_motion():
    pwmA.stop()
    pwmB.stop()
    gpio.cleanup() 

def stop_motion():
    pwmA.ChangeDutyCycle(0)
    pwmB.ChangeDutyCycle(0)

def forward(dutyCycleA, dutyCycleB):
    pwmA.ChangeDutyCycle(dutyCycleA)
    pwmB.ChangeDutyCycle(dutyCycleB)
    
    gpio.output(in1, False)
    gpio.output(in2, True)
    
    gpio.output(in3, False)
    gpio.output(in4, True)
    
def reverse(dutyCycleA, dutyCycleB):
    pwmA.ChangeDutyCycle(dutyCycleA)
    pwmB.ChangeDutyCycle(dutyCycleB)
    
    gpio.output(in1, True)
    gpio.output(in2, False)
    
    gpio.output(in3, True)
    gpio.output(in4, False)
    
def left_turn(dutyCycleA, dutyCycleB):
    pwmA.ChangeDutyCycle(dutyCycleA)
    pwmB.ChangeDutyCycle(dutyCycleB) 

    # Motorul drept se roteste spre fata
    gpio.output(in1, True)
    gpio.output(in2, False)

    # Motorul stang se roteste spre spate
    gpio.output(in3, False)
    gpio.output(in4, True)
    
def right_turn(dutyCycleA, dutyCycleB):
    pwmA.ChangeDutyCycle(dutyCycleA)  
    pwmB.ChangeDutyCycle(dutyCycleB)

    # Motorul stang se roteste spre spate
    gpio.output(in1, False) 
    gpio.output(in2, True)

    # Motorul drept se roteste spre fata
    gpio.output(in3, True)
    gpio.output(in4, False)

In [3]:
class VideoCapture:

  def __init__(self, name):
    self.cap = cv2.VideoCapture(name)
    self.q = queue.Queue()
    t = threading.Thread(target=self._reader)
    t.daemon = True
    t.start()

  def _reader(self):
    while True:
      ret, frame = self.cap.read()
      if not ret:
        break
      if not self.q.empty():
        try:
          self.q.get_nowait()   
        except queue.Empty:
          pass
      self.q.put(frame)

  def read(self):
    return self.q.get()

In [4]:
from IPython.display import display
import ipywidgets.widgets as widgets

image_widget = widgets.Image(format='jpeg', width=300, height=300)
speed_widget = widgets.FloatSlider(value = 0.5, min = 0.0, max = 1.0, description = 'speed')
turn_gain_widget = widgets.FloatSlider(value = 0.3, min = 0.0, max = 2.0, description = 'turn gain')

display(widgets.VBox([
    # widgets.HBox([image_widget]),
    speed_widget,
    turn_gain_widget,
]))     

VBox(children=(FloatSlider(value=0.5, description='speed', max=1.0), FloatSlider(value=0.3, description='turn â€¦

In [5]:
display(image_widget)

Image(value=b'', format='jpeg', height='300', width='300')

In [6]:
cam_port = 0
cam = VideoCapture(cam_port) 

In [29]:
init()
try:
    while(True):
        image = cam.read() 
        image_widget.value = cv2.imencode('.jpeg', image)[1].tobytes()
        cv2.imwrite("poze7_ne/" + timp + ".png", image) 
        
        steering_angle, line_image = determine_steering_angle(image)
        heading_image = display_heading_line(line_image, steering_angle, (0,0,255), 6)
        timp = str(time.time())
        cv2.imwrite("poze7/" + timp + ".png", heading_image) 
    
        center = (90 - steering_angle) / 90
    
        stanga = float((speed_widget.value - turn_gain_widget.value * center) * 100)
        dreapta = float((speed_widget.value + turn_gain_widget.value * center) * 100)
        print("steering angle: " + str(steering_angle) + " stanga: " + str(stanga) + " dreapta: " + str(dreapta) + " timp: " + timp)
        
        if stanga >= 0 and dreapta >= 0:
            forward(stanga, dreapta)
        elif stanga >= 0 and dreapta < 0:
            right_turn(stanga, -dreapta)
        elif stanga <0 and dreapta >= 0:
            left_turn(-stanga, dreapta)
        
        
except KeyboardInterrupt:
    stop_robot_motion()

steering angle: 63 stanga: 25.0 dreapta: 55.00000000000001 timp: 1711104088.02062
steering angle: 145 stanga: 70.55555555555556 dreapta: 9.444444444444445 timp: 1711104088.1456144
steering angle: 147 stanga: 71.66666666666667 dreapta: 8.333333333333337 timp: 1711104088.2736976
steering angle: 120 stanga: 56.666666666666664 dreapta: 23.333333333333336 timp: 1711104088.3951018
steering angle: 159 stanga: 78.33333333333334 dreapta: 1.6666666666666663 timp: 1711104088.5035257
steering angle: 24 stanga: 3.3333333333333384 dreapta: 76.66666666666666 timp: 1711104088.6061194
steering angle: 28 stanga: 5.555555555555558 dreapta: 74.44444444444444 timp: 1711104088.7001755
No line_segment segments detected
steering angle: 90 stanga: 40.0 dreapta: 40.0 timp: 1711104088.7909894
steering angle: 160 stanga: 78.8888888888889 dreapta: 1.1111111111111127 timp: 1711104088.8845084
steering angle: 152 stanga: 74.44444444444444 dreapta: 5.555555555555558 timp: 1711104088.980392
steering angle: 150 stanga: 

In [28]:
stop_robot_motion()

  gpio.cleanup()
