Import librairies and packages

In [1]:
from tdmclient import ClientAsync, aw

import os
import sys
import numpy as np
import math
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import Vision.CamCalib as calib
import Vision.EnvTracker as tracker
import GlobalNav.GlobalNavHelper as d_star
import time
import cv2
import threading
import importlib


importlib.reload(d_star)
importlib.reload(tracker)

sys.path.insert(0, os.path.join(os.getcwd(), 'LocalNav/sensors'))
from local_occupancy import thymio_coords

sys.path.insert(0, os.path.join(os.getcwd(), 'Kalman'))
from Kalman import Kalman

sys.path.insert(0, os.path.join(os.getcwd(), 'plot'))
from plot import plot

sys.path.insert(0, os.path.join(os.getcwd(), 'NMPC'))
from nmpc import nmpc
from find_limits import find_limits

sys.path.insert(0, os.path.join(os.getcwd(), 'Motion'))
from PdControl import pd_control

sys.path.insert(0, os.path.join(os.getcwd(), 'LocalNav'))
sys.path.insert(0, os.path.join(os.getcwd(), 'GlobalNav'))
from LocalNav import localNav
#from GlobalNav import GlobalNav
%matplotlib tk



Initialize components & objects

In [2]:
#Start Thymio
USE_THYMIO = True

if USE_THYMIO:
    #Create a Client Object:
    client = ClientAsync()
    node = await client.wait_for_node()
    await node.lock()

    client.process_waiting_messages()

In [3]:
# Parameters
MAP_W, MAP_H = (120, 120)  # In cm

# Position
# abs_pos = [[20, 20, 0]] # absolute x,y,theta coordinates of the robot
# Goal Position
# goal_position = [[20, 20]]

# Initialize camCalib object to retrieve camera parameters
cam_calib = calib.CamCalib(None, (16,9))
cam_mat, dist_coefs, rvecs, tvecs = cam_calib.load_camera_params()
quit = False
run = True
wait_for_map = True
map_created = False

# Initialize the environment tracker object to create global map and track thymio
env_tracker = tracker.EnvTracker((MAP_W, MAP_H), cam_mat, dist_coefs)

map_global = np.zeros((MAP_W, MAP_H))

NEXT_TARGET_DIST = 5
KIDNAPPING_DIST = 20

def set_motors(left_speed, right_speed):
    v = {
        "motor.left.target": [left_speed],
        "motor.right.target": [right_speed],
    }
    if USE_THYMIO:
        aw(node.set_variables(v))

#test_pose = np.array([10,10,3*math.pi/4])
#test_target = np.array([10,20])
#print(pd_control(test_pose, test_target))
set_motors(0,0)


In [4]:
def plot_mpc(time_points, states, controller, ax):

    for a in ax:
        a.clear()
    ax[0].plot(time_points, states['x'], label='x')
    ax[0].set_xlabel('Time')
    ax[0].set_ylabel('Position x')
    ax[0].legend()

    ax[1].plot(time_points, states['y'], label='y')
    ax[1].set_xlabel('Time')
    ax[1].set_ylabel('Position y')
    ax[1].legend()


    ax[2].plot(time_points, states['theta'], label='theta')
    ax[2].set_xlabel('Time')
    ax[2].set_ylabel('Angle theta')
    ax[2].legend()

    ax[3].plot(time_points, controller['u_r'], label='wr')
    ax[3].plot(time_points, controller['u_l'], label='wl')
    ax[3].set_xlabel('Time')
    ax[3].set_ylabel('Controller u')
    ax[3].legend()

    plt.tight_layout()


Main loop (run thymio)

In [5]:
# Open camera feed & initialize map
cv2.destroyAllWindows()
cv2.namedWindow("Thymio Tracking", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Thymio Tracking", 1680, 1050)
cam_feed = cv2.VideoCapture(0)
cam_feed.set(cv2.CAP_PROP_FRAME_WIDTH, 1680)
cam_feed.set(cv2.CAP_PROP_FRAME_HEIGHT, 1050)
if not cam_feed.isOpened():
    print("ERROR :: Could not open camera video feed!")
map_created, map_global, goal_pose, thymio_init_pose = env_tracker.wait_for_map(cam_feed, cam_calib, window_name="Thymio Tracking")

print("Thymio init pose: ", thymio_init_pose)
print("GOAL: ", goal_pose)

# Switch map X,Y
map_global = np.transpose(map_global)

# Create figure to show the plots
plt.show()
quit = False
frame_count = 0
fig_main = plt.figure(3, figsize=(10,5))
ax_main = fig_main.add_subplot(121)
ax_main.set_title('Global View')
ax_dstar = fig_main.add_subplot(122)
ax_dstar.set_title('D* Best Path')
#fig_mpc, ax_mpc = plt.subplots(4,1)
#fig_mpc.set_size_inches((10,6))

# GLOBAL PLANNING 
if np.sum(goal_pose) >= 0 and np.sum(thymio_init_pose[:2]) > 0:
    # If valid pose for goal and thymio
    best_path = d_star.FindGlobalPath(tuple(thymio_init_pose[:2].astype(int)), tuple(goal_pose.astype(int)), map_global.T, map_global.T, ax_dstar)

# Adjustments before tracking
track_thymio = True
show_markers = False
frame_counter = 0
text_pose = 'POS: ??'
text_angle = 'ANGLE: ??'
text_vel = 'VEL: ??'
text_ang_vel = 'ANG. VEL: ??'
text_goal = f'GOAL: {np.round(goal_pose, 1)}'
pose_hist = np.zeros((2,10))
angle_hist = np.zeros(10)
cv2.resizeWindow("Thymio Tracking", 1200, 1000)

reach_goal = False
L = []
T = []
abs_pos = np.array([[0,0,0]])

optimal_state_estimate_k = np.array([0.0,0.0,0.0])
covariance_estimate_k = np.array([[0.1,0,0],[0,0.1,0],[0,0,0.1]]) #à modifier

L.append(optimal_state_estimate_k)
T.append(covariance_estimate_k)
controller_r = 0
controller_l = 0
next_point_index = min(NEXT_TARGET_DIST+1, best_path.shape[1] - 1)
dest_point = np.array([[best_path[0,next_point_index], best_path[1,next_point_index]]])
print("NEXT STATE: ", dest_point)
while run and not quit:
    # -------------
    # VISION
    # ------------
    # Acquire cam image and perform thymio tracking
    success, frame = cam_feed.read()
    if not success:
        run = False
    # Detect map & thymio
    frame_corrected, roi = cam_calib.undistord(frame, cam_mat, dist_coefs)
    x,y,w,h = roi
    frame_corrected = frame_corrected[y:y+h, x:x+w, :]
    if frame_count % 100:
        env_tracker.detectMarkers(frame_corrected)
        env_tracker.updateMapROI(frame_corrected)
    if track_thymio:
        success, thymio_pose, thymio_vel, img_thymio = env_tracker.updateThymio(frame_corrected, show_markers)
        img_sidebar = np.zeros((img_thymio.shape[0], 200, 3), dtype=np.uint8)
        if success:
            # Correction Y
            abs_pos = np.array([thymio_pose[:]])
            #abs_pos[0,1] = 0.95*abs_pos[0,1]
            text_pose = f'POSE: {np.round(thymio_pose[:2], 1)}'
            text_angle = f'ANGLE: {round(thymio_pose[2]*180/math.pi, 0)}'
            text_vel = f'VEL: {np.round(thymio_vel[:2], 1)}'
            text_ang_vel = f'ANG. VEL: {round(thymio_vel[2]*180/math.pi, 1)}'
        cv2.putText(img_sidebar, text_pose, (10, 100), cv2.FONT_HERSHEY_TRIPLEX, 0.5, [0,255,0], 1)
        cv2.putText(img_sidebar, text_angle, (10, 175), cv2.FONT_HERSHEY_TRIPLEX, 0.5, [0,255,0], 1)
        cv2.putText(img_sidebar, text_vel, (10, 250), cv2.FONT_HERSHEY_TRIPLEX, 0.5, [0,255,0], 1)
        cv2.putText(img_sidebar, text_ang_vel, (10, 325), cv2.FONT_HERSHEY_TRIPLEX, 0.5, [0,255,0], 1)
        cv2.putText(img_sidebar, text_goal, (10, 400), cv2.FONT_HERSHEY_TRIPLEX, 0.5, [0,255,0], 1)
        # Concatenate images
        img_thymio = cv2.hconcat([img_thymio, img_sidebar])
    else:
        img_thymio = env_tracker.getProjectedMap(frame_corrected)
    # Get latest goal pose
    if np.linalg.norm(np.array([best_path[0,-1], best_path[1,-1]]) -  env_tracker._goal_pose) > 5:
        # Update the path if goal changes
        goal_pose = env_tracker._goal_pose
        best_path = d_star.FindGlobalPath(tuple(abs_pos[0][:2].astype(int)), tuple(goal_pose.astype(int)), map_global.T, temp.T, ax_dstar)
        next_point_index = min(10, best_path.shape[1] - 1)
        dest_point = np.array([[best_path[0,next_point_index], best_path[1,next_point_index]]])
        
    cv2.imshow("Thymio Tracking",img_thymio)
    key = cv2.waitKey(1)
    if key == ord('q'):
        run = False
    elif key == ord(' '):
        # Track thymio or not
        track_thymio = not track_thymio
    elif key == ord('m'):
        show_markers = True
    
    if USE_THYMIO:
        await node.wait_for_variables({"prox.horizontal"})
        prox_horizontal = list(node.v.prox.horizontal)
        #print(f'prox_hor: {prox_horizontal}')
    else:
        prox_horizontal = np.zeros(7)

    temp = map_global.copy()
    map_global = localNav(abs_pos, np.array([goal_pose[:]]), prox_horizontal, map_global)

    if not np.array_equal(map_global,temp):
        # Go back for the pose
        set_motors(0,0)
        #back_pos = abs_pos[0][:2] - 5*np.array([math.cos(abs_pos[0][2]), math.sin(abs_pos[0][2])])
        #print(abs_pos[0][:2])
        print("RECALCULATING PATH....")
        new_best_path = d_star.FindGlobalPath(tuple(abs_pos[0][:2].astype(int)), tuple(goal_pose.astype(int)), map_global.T, temp.T, ax_dstar)
        if len(new_best_path) > 0:
            best_path = new_best_path
            next_point_index = min(NEXT_TARGET_DIST, best_path.shape[1] - 1)
        dest_point = np.array([[best_path[0,next_point_index], best_path[1,next_point_index]]])
        
    # -----------
    # FILTERING
    # ----------
    optimal_state_estimate_k,covariance_estimate_k = Kalman(abs_pos[0],[math.sqrt(thymio_vel[0]**2+thymio_vel[0]**2), thymio_vel[0]],L[-1],T[-1])
    L.append(optimal_state_estimate_k)
    T.append(covariance_estimate_k)
    # --------
    # MOTION
    # -------
    if frame_count % 1 == 0:
        #xmin_bound, xmax_bound, ymin_bound, ymax_bound = find_limits(map_global, abs_pos)
        #controller_l, controller_r, states, time_points, controller = nmpc(abs_pos, np.array([goal_pose[:]]), 
        #                                                                   xmin_bound, ymin_bound, xmax_bound, ymax_bound)
        #plot_mpc(time_points, states, controller, ax_mpc)
        #fig_mpc.canvas.draw()
        #fig_mpc.canvas.flush_events()
        
        controls = pd_control(optimal_state_estimate_k, dest_point[0])
        controller_l, controller_r = (controls[0], controls[1])
    if reach_goal:
        min_dist = 2
    else:
        min_dist = NEXT_TARGET_DIST
    dist_to_tracker = np.linalg.norm(optimal_state_estimate_k[:2] - dest_point[0])
    if dist_to_tracker > KIDNAPPING_DIST:
        # Thymio was kidnapped, recompute global path
        set_motors(0,0)
        new_best_path = d_star.FindGlobalPath(tuple(optimal_state_estimate_k[:2].astype(int)), tuple(goal_pose.astype(int)), map_global.T, temp.T, ax_dstar)
        if len(new_best_path) > 0:
            best_path = new_best_path
            next_point_index = min(10, best_path.shape[1] - 1)
        dest_point = np.array([[best_path[0,next_point_index], best_path[1,next_point_index]]])

    elif dist_to_tracker > min_dist:
        left_m = int(controller_l) * 1
        right_m = int(controller_r) * 1
        set_motors(left_m, right_m)
        print(f'Left motor: {left_m} | Right motor: {right_m}')
    else:
        # GO to next point
        if reach_goal:
            set_motors(0,0)
        next_point_index = min(next_point_index + NEXT_TARGET_DIST, best_path.shape[1] - 1)
        reach_goal = next_point_index == (best_path.shape[1] - 1)
        dest_point = np.array([[best_path[0,next_point_index], best_path[1,next_point_index]]])
        print("NEXT TARGET: ", dest_point)
    # -------
    # PLOTTING
    # -------
    if frame_count % 1 == 0:
        ax_main.clear()
        plot(abs_pos, thymio_coords, np.array([goal_pose[:]]), map_global, abs_pos[0][2], 
             0, 0, 0, 0, ax_main, best_path=best_path, next_target=dest_point[0])
        fig_main.canvas.draw()
        fig_main.canvas.flush_events()

    frame_count += 1

if cam_feed:
    cam_feed.release()
cv2.destroyAllWindows()
plt.close('all')

set_motors(0,0)

Pixels per grid cell: 6.866666666666666 x 5.275
Goal detected @:  [85.922 24.739]
Thymio init pose:  [60 99 -1]
GOAL:  [85.922 24.739]
initialization
state =  1
path found !
NEXT STATE:  [[63 93]]
Left motor: 129 | Right motor: 156
Left motor: 124 | Right motor: 151
NEXT TARGET:  [[65 88]]
Left motor: 158 | Right motor: 177
Left motor: 146 | Right motor: 166
Left motor: 121 | Right motor: 147
NEXT TARGET:  [[67 83]]
Left motor: 153 | Right motor: 167
Left motor: 136 | Right motor: 153
Left motor: 116 | Right motor: 142
NEXT TARGET:  [[70 78]]
Left motor: 147 | Right motor: 166
Left motor: 131 | Right motor: 154
Left motor: 114 | Right motor: 142
NEXT TARGET:  [[72 73]]
Left motor: 150 | Right motor: 151
Left motor: 132 | Right motor: 132
RECALCULATING PATH....
update
state =  2
path found !
Left motor: 142 | Right motor: 139
RECALCULATING PATH....
update
state =  2
path found !
Left motor: 122 | Right motor: 153
RECALCULATING PATH....
update
state =  2
path found !
Left motor: -100 | R

In [6]:
cam_feed.release()
set_motors(0,0)