In [1]:
import numpy as np
import cv2
import matplotlib.pyplot as plt
import math
import time
import tdmclient
import tdmclient.notebook
import threading

import camera as cam
import vision_utils as utils
import Global_tomerge as global_nav
import local_nav as local_nav

In [2]:
global robot_and_goal, object_corners
object_corners ={}
robot_and_goal = {}

object_corners = {
    'Quadrilateral_1': [(53, 107), (107, 107), (107, 53), (53, 53)],
    'Quadrilateral_2': [(120, 200), (200, 280), (280, 200), (200, 120)],
    'Quadrilateral_3': [(243, 87), (297, 87), (297, 33), (243, 33)]
}

robot_and_goal = {
"robot" : (50,15),
"goal" : (255, 250)
}


**Navigation Thread**

In [20]:
await tdmclient.notebook.start()

In [None]:
@tdmclient.notebook.sync_var
def thymio_speed(speed_left,speed_right):
    global speed_left_global
    global speed_right_global
    speed_right_global = speed_right
    speed_left_global = speed_left

@tdmclient.notebook.sync_var
def thymio_state(has_global_path):
    global state_global
    thymio_state = "Local"
    if state_global == 1:
        thymio_state = "Avoidance"
        has_global_path = False
    return thymio_state, has_global_path

def thymio_turn(angle_diff):
    if angle_diff > 0:
            thymio_speed(-50,50)
            time.sleep(0.05*angle_diff)
            thymio_speed(0,0)
    else:
        thymio_speed(50,-50)
        time.sleep(0.05*angle_diff)
        thymio_speed(0,0)

In [None]:
%%run_python
speed_left_global = 0
speed_right_global = 0
speedGain = 0    
obstSpeedGain = [6, 4, -2, -6, -8]
state_global = 0
DELTA = 1
timer_period[0] = 10  # 10ms sampling time

@onevent 
def timer0():
    global prox_ground_delta, prox_horizontal, speed0, speedGain,obstSpeedGain, motor_left_target, motor_right_target
    global speed_left_global, speed_right_global, state_global
    spLeft = speed_left_global - speedGain
    spRight = speed_right_global + speedGain
    
    # adjustment for obstacles ("gradient" due to obstacles)
    for i in range(5):
        spLeft += prox_horizontal[i] * obstSpeedGain[i] // 100
        spRight += prox_horizontal[i] * obstSpeedGain[4 - i] // 100
    # motor control
    motor_left_target = spLeft
    motor_right_target = spRight

    if spLeft - spRight < DELTA and spRight - spLeft < DELTA:
        state_global = 0
    else:
        state_global = 1



In [None]:
def nav_thread(path=[]):
    global robot_and_goal, object_corners, robot_angle
    state = 'Local'
    has_gobal_path = False
    robot_angle = 0
    robot_size = 23
    run = True
    counter = 0
    print('nav_thread')
    while run:
        state,has_gobal_path = thymio_state(has_gobal_path)
        if state == "Local":
            if not has_gobal_path:
                print('global')
                if robot_and_goal.get('robot') is not None:
                    print('robot')
                    expended_corners = global_nav.grow_obstacles(object_corners, robot_size)
                    points_name2coord = global_nav.name2coord(expended_corners, robot_and_goal)
                    adjacent_list = global_nav.generate_adjacency_list(expended_corners, robot_and_goal)
                    distances = global_nav.calculate_distances(adjacent_list, points_name2coord)
                    shortest_path = global_nav.find_path(adjacent_list, points_name2coord)
                    for name in shortest_path:
                        path.append(points_name2coord[name])
                    has_gobal_path = True
                    counter = 0
            
        # for local state
        x_pos = path[0][0]
        y_pos = path[0][1]
        theta = 0 # from camera
        initial_state = np.array([x_pos, y_pos, 0.0, 0.0, theta, 0])
        initial_covariance = np.diag([1.0, 1.0, 1.0, 1.0, 1.0, 1.0])  # Initial covariance matrix
        process_noise = np.diag([0.03, 0.03, 0.01, 0.01, 0.02, 0.01])  # Process noise covariance matrix
        measurement_noise = np.array([0.1, 0.1])  # Measurement noise variance

        # Initialize Kalman filter
        local_state = initial_state
        covariance = initial_covariance
        Ts = 0.5    
        motor_left_target = 0
        motor_right_target = 0
        cor = 2 #correction factor for the target coordinates (1cm)
        coordinate = path[counter+1]
        #while tup(state[0],state[1]) != coordinate:
        if (local_state[0]>(coordinate[0]+cor) or local_state[0]<(coordinate[0]-cor)) and (local_state[1]>(coordinate[1]+cor) or local_state[0]<(coordinate[1]-cor)):
            cam_working = robot_and_goal.get('robot') is not None 
            if cam_working:
                theta = robot_angle
            else:
                theta = local_state[4]
            input_val = np.array([motor_left_target, motor_right_target])               
            local_state, covariance = local_nav.predict(local_state, input_val, covariance, Ts, theta, process_noise)              
            local_state, covariance = local_nav.measure(local_state, covariance, cam_working)               
            angle_diff = local_nav.turn_to_target(local_state, coordinate, local_state[4])
            thymio_turn(angle_diff)
            motor_left_target, motor_right_target = local_nav.go_to_target(local_state, coordinate, local_state[4])
            thymio_speed(motor_left_target, motor_right_target)
        else:
            counter += 1
            if counter == len(path)-1:
                run = False
                thymio_speed(0,0)
                break


In [None]:
nav_thread = threading.Thread(target=nav_thread)

***Camera Thread***

In [8]:

def camera_thread():
    global robot_and_goal, object_corners, robot_angle, cov_noise        


    try:
        cap = cv2.VideoCapture(0)
        # data = cam.capture_data(cap)
        # cov_noise = cam.calculate_covariance_matrix(data)


        if not cap.isOpened():
            print("Error opening video stream or file")
            return
        mode = 'all'
        color_type = 'RGB'

        thresold  = [cam.COLOR_THREASHOLD,cam.SATURATION_THRESHOLD,cam.BRIGHTNESS_THRESHOLD]

        cv2.namedWindow("Camera Vision", cv2.WINDOW_NORMAL)
        cv2.createTrackbar("Color Threshold", "Camera Vision", thresold[0], 200, lambda x: utils.on_trackbar(x,thresold))
        cv2.createTrackbar("Saturation Threshold", "Camera Vision", thresold[1], 200, lambda x: utils.on_trackbar(x,thresold))
        cv2.createTrackbar("Brightness Threshold", "Camera Vision", thresold[2], 200, lambda x: utils.on_trackbar(x,thresold))
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            frame = cam.zoom_frame(frame,zoom_factor=2)
            frame, end, start, object_corners = cam.detection(frame,mode,color_type,thresold[0],thresold[1],thresold[2])
            
            robot_and_goal["robot"] = start
            robot_and_goal["goal"] = end
            print(robot_and_goal)
            print(object_corners)

            cv2.imshow("Camera Vision", frame)

            key = cv2.waitKey(25) & 0xFF
            if key == 27:  # ESC key to exit
                break
            elif key == ord('1'):  # '1' key to switch to blue mode
                mode = 'blue'
            elif key == ord('2'):  # '2' key to switch to green mode
                mode = 'green'
            elif key == ord('3'):  # '3' key to switch to black mode
                mode = 'black'
            elif key == ord('4'):  # '4' key to switch to black mode
                mode = 'all'
            elif key == ord('5'):  # '5' key to switch to RGB mode
                color_type = 'RGB'
            elif key == ord('6'):  # '6' key to switch to BGR mode
                color_type = 'BGR'

            time.sleep(0.01)

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        cap.release()
        cv2.destroyAllWindows()
        print("Done displaying the video")


In [9]:
camera_thread = threading.Thread(target=camera_thread)

In [10]:
camera_thread.start()

{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {}, 'goal': {}}
{}
{'robot': {'Goal': (133, 30)}, 'goal': {}}
{}
{'robot': {'Goal': (169, 3)}, 'goal': {}}
{}
{'robot': {'Goal': (165, 4)}, 'goal': {}}
{}
{'robot': {'Goal': (62, 4)}, 'goal': {'Position': (177, 0)}}
{}
{'robot': {'Goal': (62, 4)}, 'goal': {'Blue_Circle_1': ((42, 76), 0)}}
{}
{'robot': {'Goal': (57, 2)}, 'goal': {'Blue_Circle_1': ((18, 87), 0), 'Blue_Circle_2': ((41, 75), 1), 'Blue_Circle_3': ((138, 26), 0), 'Blue_Circle_4': ((162, 10), 1), 'Blue_Circle_5': ((175, 1), 2)}}
{}
{'robot': {'Goal': (61, 6)}, 'goal': {'Blue_Circle_1': ((9, 93), 1), 'Blue_Circle_2': ((103, 43), 3), 'Blue_Circle_3

In [None]:
nav_thread.start()

In [None]:
camera_thread.join()

In [None]:
nav_thread.join()