In [None]:
import cv2
import glob
import numpy as np
import matplotlib.pyplot as plt
import sys
import os
import json
from time import time, sleep

sys.path.append('..')
%load_ext autoreload
%autoreload 2

In [None]:
#connection with robot
from tdmclient import ClientAsync, aw
client = ClientAsync()
node = await client.wait_for_node()
await node.lock()

In [None]:
#===============ROBOT INSTRUCTIONS==================================
#function to get the value of the prox
async def get_prox_value():
    await node.wait_for_variables({"prox.horizontal"})
    return list(node["prox.horizontal"])

#set the speed of the motors
async def set_motor_speed(left, right):
    await node.set_variables({"motor.left.target": [left], "motor.right.target": [right]})

async def get_motor_speed():
    await node.wait_for_variables({"motor.left.speed", "motor.right.speed"})
    return node["motor.left.speed"], node["motor.right.speed"]

In [None]:
await set_motor_speed(0, 0)

## Initialize detector

This part initializes the detector. It calibrates the camera, and computes the perspective transformation from the first image.

Make sure to setup the camera properly and still. The first image is the most important!

In [None]:
# https://www.geeksforgeeks.org/python-opencv-capture-video-from-camera/
# Initialize the camera
cam = cv2.VideoCapture(0, cv2.CAP_DSHOW)
cam.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)

# Get the default frame width and height
frame_width = int(cam.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cam.get(cv2.CAP_PROP_FRAME_HEIGHT))

In [None]:
# take a picture
ret, initial_image = cam.read()
plt.imshow(cv2.cvtColor(initial_image, cv2.COLOR_BGR2RGB))
#save the image
# cv2.imwrite('calibration_image_test.jpg', initial_image)

In [None]:
# save the image
cv2.imwrite('calibration_image.jpg', initial_image)

Loading the detector

In [None]:
from src.vision import VisionDetector
from src.utils import add_robot
# initial_image= cv2.imread('calibration_image_test.jpg')
aruco_info = json.load(open('../assets/aruco/state.json', 'r'))

map_size = (1179, 830) # (width, height) in mm
detector = VisionDetector(initial_image, map_size)

Showing the initial corrected image

In [None]:
# take a picture
ret, initial_image = cam.read()
corrected_image = detector.get_corrected_image(initial_image)
plt.imshow(cv2.cvtColor(corrected_image, cv2.COLOR_BGR2RGB))

Now we can process this image with our computer vision pipeline, to compute the shortest path to follow for the robot to collect all the red circle objectives. 

In [None]:
from src.path_planning import path_planning, create_obstacle_map
from src.vision import extend_polygons
from src.utils import add_route, add_circles, add_polygons

robot_size = 100

ret, initial_image = cam.read()
corrected_image = detector.get_corrected_image(initial_image)

robot_position_cv, robot_orientation_cv, robot_corners = detector.get_robot_pose(corrected_image, is_corrected=True)

# add a white circle around the robot
hidden_robot_image = cv2.circle(corrected_image.copy(), tuple(robot_position_cv.astype(int)), robot_size, (255, 255, 255), -1)

polygons = detector.find_polygons(hidden_robot_image, min_area=200, thresh_1=127, thresh_2=255)
extended_polygons = extend_polygons(polygons, robot_size)
polygon_image = add_polygons(corrected_image, extended_polygons)

circles = detector.find_circles(polygon_image, 1, 300, 300, 40) #215, 24
circle_positions = circles.squeeze().reshape(-1, 3)[:, :2]
circle_image = add_circles(polygon_image, circles)

image_aruco = add_robot(circle_image, robot_position_cv, robot_orientation_cv, robot_corners)
image_circles = add_circles(image_aruco, circles)
optimal_route, optimal_route_positions, points, adjacency_matrix = path_planning(robot_position_cv, circle_positions, extended_polygons, corrected_image.shape)
route_image = add_route(image_circles, np.int32(optimal_route_positions))
all_target_cv = np.array(optimal_route_positions)[1:, :]

plt.imshow(cv2.cvtColor(route_image, cv2.COLOR_BGR2RGB))

We can also visualize the obstacle map and the path that was found to convince ourselves that it is indeed the shortest possible path!

In [None]:
# showing the obstacle map
obstacle_map = create_obstacle_map(extended_polygons, corrected_image.shape)
circled = add_circles(obstacle_map, circles)
route_image = add_route(circled, np.int32(optimal_route_positions))
plt.imshow(circled)

## Main loop

### Initializing useful variables

Before anything, let's light up the thymio :)

In [None]:
leds_top = [25,0,25]
aw(node.set_variables({"leds.top" : leds_top}))

Now, we can convert the different variables we need in the mathematical basis:

In [None]:
from src.geometry import convert_points, convert_vec, get_angle, convert_angle_to_vector
from src.utils import add_line, add_route, add_circles, add_polygons, add_robot, add_point

robot_pos_m = convert_points(robot_position_cv, y_max=map_size[1])
robot_orientation_m = convert_vec(robot_orientation_cv)
all_target_m = convert_points(all_target_cv, y_max=map_size[1])
if circle_positions.squeeze().ndim == 1:
    circle_positions = circle_positions.reshape(-1)
else:
    circle_positions = circle_positions.squeeze()[:, :2]
objectives_m = convert_points(circle_positions, y_max=map_size[1])
robot_angle_m = get_angle(np.array([1, 0]), robot_orientation_m)

### Controller initialization

In [None]:
from src.navigation import FSMController

fsm = FSMController(
    robot_pos_m = robot_pos_m,
    all_target_m = all_target_m,
    objectives_m = objectives_m,
    extended_polygons = extended_polygons,
    img_shape = corrected_image.shape,
    first_prox = [0, 25, 0],
    prox_obstacle_threshold = 100,
    intensity = 10
)

### Kalman Filter initialization

In [None]:
from src.kalman_filter import KalmanFilter

dt_loop = 100 / 1000         # in s
DIST_WHEEL =  95        # in mm

speed_convertor = lambda x: 0.33*x - 0.89

kf = KalmanFilter(
    robot_pos_m=robot_pos_m,
    robot_angle_m=robot_angle_m,
    speed_convertor = speed_convertor,
    DIST_WHEEL = DIST_WHEEL,
    CAM_POS_MEASUREMENT_VAR = 0.1,
    CAM_ANGLE_MEASUREMENT_VAR=0.1,
    PROCESS_POS_VAR=1.5,
    PROCESS_ANGLE_VAR=0.003,
)

speed_left, speed_right = 0, 0

In [None]:
from src.utils import draw_kalman_pos, draw_kalman_angle
from time import time

kidnapping_threshold = 100
robot_in_vue = True
y_max = map_size[1]
start = time()
aw(node.set_variables({"leds.top" : leds_top}))
vec_to_print = np.array([0.,0.])

cv2.namedWindow("real_image", cv2.WINDOW_NORMAL)
cv2.resizeWindow("real_image", *map_size)

cv2.namedWindow("kalman_pos", cv2.WINDOW_NORMAL)
cv2.resizeWindow("kalman_pos", *map_size)

cv2.namedWindow("kalman_angle", cv2.WINDOW_NORMAL)
cv2.resizeWindow("kalman_angle", *map_size)

cv2.namedWindow("kalman_angle", cv2.WINDOW_NORMAL)
cv2.resizeWindow("kalman_angle", (640, 480))

video_writer = cv2.VideoWriter('video.avi',  
                         cv2.VideoWriter_fourcc(*'MJPG'), 
                         10, map_size) 
kalman_video_writer = cv2.VideoWriter('kalman_position.avi',  
                         cv2.VideoWriter_fourcc(*'MJPG'), 
                         10, map_size)
kalman_angle_writer = cv2.VideoWriter('kalman_angle.avi',  
                         cv2.VideoWriter_fourcc(*'MJPG'), 
                         10, (640, 480))

while True:
    #get the prox sensors values
    prox = await get_prox_value()

    # If we can't get the robot pose for any reason, we only use kalman estimation. If we get vision back and the robot is far enough from previous viewed position, we have a kidnapping
    try:
        ret, frame = cam.read()
        corrected_image = detector.get_corrected_image(frame)
        robot_pos_cv, robot_orientation_cv, robot_corners_cv = detector.get_robot_pose(corrected_image, is_corrected=True)
        robot_pos_m = convert_points(robot_pos_cv, y_max)
        robot_orientation_m = convert_vec(robot_orientation_cv)
        robot_angle_m = get_angle(np.array([1, 0]), robot_orientation_m)
        img_aruco = add_robot(corrected_image, robot_pos_cv, np.array(robot_orientation_cv), np.array(robot_corners_cv))
        if not robot_in_vue:
            print('robot back in vue')
            if np.linalg.norm(robot_pos_m - old_robot_pos_m) > kidnapping_threshold:
                print('kidnapping\nReplanning the route given the attained objectives')
                fsm.recompute_global_path(robot_pos_m)
        robot_in_vue = True
    except:
        img_aruco = corrected_image
        robot_pos_m = None
        robot_orientation_m = None
        robot_in_vue = False

    # kalman filter step
    speed_measure_left, speed_measure_right = await get_motor_speed()
    
    z_measure = np.append(robot_pos_m, robot_angle_m)
    if robot_pos_m is None or robot_orientation_m is None:
        z_measure = None

    end = time()
    dt = end - start

    motor_speeds = np.array([speed_measure_left, speed_measure_right])
    robot_pos_kalman, robot_angle_kalman, covariance_kalman = kf.estimate(motor_speeds, z_measure, dt)
    robot_orientation_kalman = convert_angle_to_vector(robot_angle_kalman)
    
    # do everything :=) for getting the command
    leds_top, speed_left, speed_right, stop = fsm.get_command(prox, robot_pos_kalman, robot_orientation_kalman)
    
    
    # setting the speed of the motors
    start = time()
    await set_motor_speed(speed_left, speed_right)
    aw(node.set_variables({"leds.top" : leds_top}))

    if stop: break #all target are reached so end the program

    if robot_in_vue:
        old_robot_pos_m = robot_pos_m

    # ----------- DRAWINGS ----------- #
    #display the image
    route_cv = convert_points(fsm.current_path, y_max)
    img_final = add_route(img_aruco, route_cv.astype(int))
    cv2.imshow('real_image', img_final)
    video_writer.write(img_final)

    # Kalman estimated position density
    img_kalman_pos      = draw_kalman_pos(robot_pos_kalman, covariance_kalman[:2, :2], map_size)
    cv2.imshow('kalman_pos', img_kalman_pos.T[::-1, :]) 
    kalman_video_writer.write(cv2.cvtColor(img_kalman_pos.T[::-1, :], cv2.COLOR_GRAY2BGR))
    
    # kalman estimated angle density
    img_kalman_angle = draw_kalman_angle(robot_angle_kalman, covariance_kalman[2, 2])
    cv2.imshow('kalman_angle', img_kalman_angle)
    kalman_angle_writer.write(cv2.cvtColor(img_kalman_angle, cv2.COLOR_RGBA2BGR))
    # print(img_kalman_angle.shape)
    # break


    # ----------- KEYBOARD INPUT ----------- #
    key = cv2.waitKey(1)

    # if 'space' -> pause the loop
    if key == ord(' '):
        print("programme en pause")
        await set_motor_speed(0, 0)
        while(1):
            if cv2.waitKey(1) == ord(' '):
                break
            if cv2.waitKey(1) == ord('q'):
                stop = 1
                break
                
    # if 'q' -> exit the loop
    if key == ord('q') or stop:
        print("terminated by user")
        break

await set_motor_speed(0, 0)

# recording 20 frames at the end of the loop
for i in range(20):
    ret, frame = cam.read()
    corrected_image = detector.get_corrected_image(frame)
    try:
        robot_pos_cv, robot_orientation_cv, robot_corners_cv = detector.get_robot_pose(corrected_image, is_corrected=True)
        robot_pos_m = convert_points(robot_pos_cv, y_max)
        robot_orientation_m = convert_vec(robot_orientation_cv)
        robot_angle_m = get_angle(np.array([1, 0]), robot_orientation_m)
        img_aruco = add_robot(corrected_image, robot_pos_cv, np.array(robot_orientation_cv), np.array(robot_corners_cv))
        cv2.imshow('real_image', img_aruco)
        video_writer.write(img_aruco)
    except:
        cv2.imshow('real_image', corrected_image)
        video_writer.write(corrected_image)

    cv2.imshow('kalman_pos', img_kalman_pos.T[::-1, :])
    kalman_video_writer.write(cv2.cvtColor(img_kalman_pos.T[::-1, :], cv2.COLOR_GRAY2BGR))
    cv2.imshow('kalman_angle', img_kalman_angle)
    kalman_angle_writer.write(cv2.cvtColor(img_kalman_angle, cv2.COLOR_RGBA2BGR))

    await client.sleep(0.2)
# Once finished, set the speed of the motors to 0
video_writer.release()
kalman_video_writer.release()
kalman_angle_writer.release()
cv2.destroyAllWindows()