# autonomous

To compute and run trajectories between to points with the Dingo-Gen3 lite.

In [1]:
# Insérez ici le numéro de votre équipe
# (en) Insert here your team number
VOTRE_NUMERO_EQUIPE = 0
VOTRE_STATION = 1  # votre poste de travail au laboratoire [1, 2, 3 ou 4]
VOTRE_ROBOT_REEL = 2  # indiqué derrière et manette Playsation (pas 02, juste 2 par exemple)
# modifier cette variable selon que vous contrôler la simulation ou le robot réel
# (en) modify this variable according to whether you control the simulation or the real robot
SIM = True

## Importation des paquets requis (Importing required packages)

In [None]:
import sys
sys.path.append('/opt/ros/noetic/lib/python3/dist-packages')
sys.path.append('/usr/lib/python3/dist-packages')
sys.path.append('~/mobile_manip_ws/devel/lib/python3/dist-packages/')

In [2]:
from os import environ
import numpy as np
import math
from scipy.spatial.transform import Rotation as R
from lab_utils.plan_utils import *
import rospy
from jackal_msgs.msg import Drive
from nav_msgs.msg import Odometry, MapMetaData
from geometry_msgs.msg import Pose, PoseWithCovarianceStamped
from std_msgs.msg import Float64
from geometry_msgs.msg import Twist as GT
from matplotlib import pyplot as plt
from matplotlib import image as mpimg
from mobile_manip.srv import ReachName, GetValues, ReachValues

In [None]:
if SIM:
    #environ['ROS_MASTER_URI'] = "http://localhost:1137{}/".format(VOTRE_NUMERO_EQUIPE)
    environ['ROS_MASTER_URI'] = "http://localhost:11311/" #Ordi personnel
else:
    environ['ROS_MASTER_URI'] = "http://cpr-ets05-0{}:11311/".format(VOTRE_ROBOT_REEL)
    environ['ROS_IP'] = "192.168.0.8{}".format(VOTRE_STATION)

In [None]:
# Créer et démarrer un nouveau noeud
# (en) Create and start a new node
rospy.init_node('mobile_manip_navigation', anonymous=True)

## Utility functions

In [None]:
# Fonction pour le calcul de l'orientation à partir d'un quaternion
# (en) Function for calculating the orientation from a quaternion
def get_heading_from_quaternion(q):
    r = R.from_quat([q.x, q.y, q.z, q.w])
    angles = r.as_euler('xyz', degrees=False)
    return angles[2]

def get_quaternion_from_heading(z):
    r = R.from_euler('z', z)
    return r.as_quat()

# Returns the smallest difference between to angles in the range [-pi, pi]
# (en) Retourne la plus petite différence entre deux angles dans l'intervalle [-pi, pi]
def wraptopi(angle):
    xwrap=np.remainder(angle, 2*np.pi)
    if np.abs(xwrap)>np.pi:
        xwrap -= 2*np.pi * np.sign(xwrap)
    return xwrap
def angle_diff(a, b):
    diff = a - b
    if diff > math.pi: diff  -= 2*math.pi
    if diff < -math.pi: diff += 2*math.pi
    return diff

## Importation de la carte (format png)

In [None]:
start = Point(1380, 1130)
goal = Point(1625, 1550)
fig, ax = plt.subplots(figsize=(18,10))
image = mpimg.imread("a2230_map_closed_fliped.png")
plt.imshow(image, origin="lower")
ax.add_artist(plt.Circle((start.x, start.y), 6, color='r'))
ax.add_artist(plt.Circle((goal.x, goal.y), 6, color='y'))
plt.show()

## Création des 'callbacks' (create callbacks)

Chaque fonction est associée à un sujet ROS et enregistre en continue sont contenu dans une variable globale.

(en) Each function is associated with a ROS topic and continuously records its content in a global variable.

In [None]:
# Realsense Pose subscriber callback
pose_msg = Pose()
def pose_callback(msg):
    global pose_msg
    # Convert pose to the map frame
    x = msg.pose.pose.position.x
    y = msg.pose.pose.position.y
    theta = get_heading_from_quaternion(msg.pose.pose.orientation)
    pose_msg.position.x = x * math.cos(init_pose['theta']) - y * math.sin(init_pose['theta']) + init_pose['x']
    pose_msg.position.y = x * math.sin(init_pose['theta']) + y * math.cos(init_pose['theta']) + init_pose['y']
    quat = get_quaternion_from_heading(theta + init_pose['theta'])
    pose_msg.orientation.x = quat[0]
    pose_msg.orientation.y = quat[1]
    pose_msg.orientation.z = quat[2]
    pose_msg.orientation.w = quat[3]

In [None]:
origin = Point(0, 0)
map_resolution = 0
amcl_origin = { 'x': origin.x, 'y': origin.y, 'theta': 0.0}
def map_callback(msg):
    global origin, amcl_origin, map_resolution
    map_resolution = msg.resolution
    origin.x = np.abs(msg.origin.position.x) # en m
    origin.y = np.abs(msg.origin.position.y) # en m
    amcl_origin = { 'x': origin.x, 'y': origin.y, 'theta': 0.0} # en m

amcl_pose = Pose()
def amcl_callback(msg):
    global amcl_pose
    x = msg.pose.pose.position.x
    y = msg.pose.pose.position.y
    theta = get_heading_from_quaternion(msg.pose.pose.orientation)
    amcl_pose.position.x = x * math.cos(amcl_origin['theta']) - y * math.sin(amcl_origin['theta']) + amcl_origin['x']
    amcl_pose.position.y = x * math.sin(amcl_origin['theta']) + y * math.cos(amcl_origin['theta']) + amcl_origin['y']
    quat = get_quaternion_from_heading(theta + amcl_origin['theta'])
    amcl_pose.orientation.x = quat[0]
    amcl_pose.orientation.y = quat[1]
    amcl_pose.orientation.z = quat[2]
    amcl_pose.orientation.w = quat[3]

In [None]:
# ROS subscribers et publishers
cmd_drive_pub = rospy.Publisher('/mobile_manip/dingo_velocity_controller/cmd_drive', Drive, queue_size=1)
cmd_vel_pub = rospy.Publisher('/cmd_vel', GT, queue_size=1)
initial_pose_pub = rospy.Publisher('/initialpose', PoseWithCovarianceStamped, queue_size=1)
map_sub = rospy.Subscriber('/map_metadata', MapMetaData, map_callback) # Important before amcl_sub to get map_data
amcl_sub = rospy.Subscriber('/amcl_pose', PoseWithCovarianceStamped, amcl_callback)

## Contrôleur du mouvement (movement controller)

In [None]:
interwheel_distance = 0.3765
left_wheel_radius = 0.098
right_wheel_radius = 0.098
def move_robot(linear, angular):
    vel_left  = (linear - angular * interwheel_distance / 2.0) / left_wheel_radius
    vel_right = (linear + angular * interwheel_distance / 2.0) / right_wheel_radius

    if SIM:
        # Envoi des commandes au roues par topic ROS
        # (en) Sending commands to the wheels via ROS topic
        cmd_drive_msg = Drive()
        cmd_drive_msg.drivers[0] = vel_left
        cmd_drive_msg.drivers[1] = vel_right
        cmd_drive_pub.publish(cmd_drive_msg)
    else:
        # Envoi des commandes au controlleur du robot
        # (en) Sending commands to the robot controller
        cmd_vel_msg = GT()
        cmd_vel_msg.linear.x = linear
        cmd_vel_msg.angular.z = angular
        cmd_vel_pub.publish(cmd_vel_msg)

## Changer la position initiale du robot (AMCL)

### AMCL

Adaptive monte carlo localization (AMCL) is a probabilistic localization system for a robot moving in 2D. It implements the adaptive (or KLD-sampling) Monte Carlo localization approach (as described by Dieter Fox), which uses a particle filter to track the pose of a robot against a known map. The robot's pose is estimated by a sample set of weighted particles. Each particle represents a possible state of the robot, including its position and orientation.

The particel sample is revised at each time step, based on a motion model and observations. The observations used in AMCL are range measurements, provided by a range sensor. AMCL uses the occupancy grid mapping package to build a map of the environment from the sensor data. 

In [None]:
def pub_initial_position(x, y, theta):
        # Publishing new initial position (x, y, theta) --> for localization
        # :param x x-position of the robot
        # :param y y-position of the robot
        # :param theta theta-position of the robot

        initpose = PoseWithCovarianceStamped()
        initpose.header.stamp = rospy.get_rostime()
        initpose.header.frame_id = "map"
        initpose.pose.pose.position.x = x
        initpose.pose.pose.position.y = y
        quaternion = get_quaternion_from_heading(theta)

        initpose.pose.pose.orientation.w = quaternion[0]
        initpose.pose.pose.orientation.x = quaternion[1]
        initpose.pose.pose.orientation.y = quaternion[2]
        initpose.pose.pose.orientation.z = quaternion[3]

        initial_pose_pub.publish(initpose)

## Trajectoire avec A* (A* trajectory)

### A-Star (A*)

A* is a graph traversal and path search algorithm, which is often used in many fields of computer science due to its completeness, optimality, and optimal efficiency. One major practical drawback is its O(b^d) space complexity, as it stores all generated nodes in memory. Thus, in practical travel-routing systems, it is generally outperformed by algorithms which can pre-process the graph to attain better performance, as well as memory-bounded approaches; however, A* is still the best solution in many cases.

In [None]:
start = Point(int(amcl_pose.position.x/map_resolution), int(amcl_pose.position.y/map_resolution))
print(start.x,start.y, get_heading_from_quaternion(amcl_pose.orientation))

In [None]:
from lab_utils.astart import AStarPlanner
# Create the points for the algorithm
start = Point(int(amcl_pose.position.x/map_resolution), int(amcl_pose.position.y/map_resolution))
map_img = 1-image[:,:,0]
mat_map = map_img
map = BMPMap(width=map_img.shape[1], height=map_img.shape[0], mat=mat_map)
astarPlanner = AStarPlanner(map=map, step_size=6, collision_radius=12, heuristic_dist='Euclidean')
astarPlanner.plan(start=start, target=goal)

In [None]:
fig = plt.figure(figsize=(16,8))
ax2 = fig.add_subplot(1, 1, 1)
plt.imshow(image, origin="lower")
ax2.add_artist(plt.Circle((start.x, start.y), 10, color='r'))
ax2.add_artist(plt.Circle((goal.x, goal.y), 10, color='y'))
for i in range(len(astarPlanner.finalPath)-1):
    pt = astarPlanner.finalPath[i].tuple()
    ax2.add_artist(plt.Circle((pt[0], pt[1]), 5, color='g'))
plt.show()

## Suivre la trajectoire (Attention le robot va bouger!)

Gardez vos mains sur la télécommande.


## (en) Follow the trajectory (Attention the robot will move!)

(en) Keep your hands on the remote control.

In [None]:
current_wp = 0
while current_wp < len(astarPlanner.finalPath):
    target_x, target_y = astarPlanner.finalPath[current_wp].tuple()

    target_x *= map_resolution
    target_y *= map_resolution

    #print("From {}, {} to {}, {}".format(amcl_pose.position.x, amcl_pose.position.y, target_x, target_y))

    theta_target = math.atan2(target_y - amcl_pose.position.y, target_x - amcl_pose.position.x)
    theta = get_heading_from_quaternion(amcl_pose.orientation)

    dist = math.sqrt((target_x-amcl_pose.position.x)**2 + (target_y-amcl_pose.position.y)**2)
    dtheta = angle_diff(theta_target, theta)

    # print(dist)

    if dist > 0.4:
        if math.fabs(dtheta) > 0.785:
            linear = 0.0
        else:
            linear = 0.5

        angular = min(dtheta, 1.0) if dtheta >= 0 else max(dtheta, -1.0)
        move_robot(linear, angular)
    else:
        current_wp += 1

move_robot(0, 0)

### Seconde destination !

In [None]:
print(amcl_pose.position.x/map_resolution, amcl_pose.position.y/map_resolution)
goal = Point(700, 350)
start = Point(int(amcl_pose.position.x/map_resolution)-300, int(amcl_pose.position.y/map_resolution))
astarPlanner.plan(start=start, target=goal)

In [None]:
fig = plt.figure(figsize=(16,8))
ax2 = fig.add_subplot(1, 1, 1)
plt.imshow(image, origin="lower")
ax2.add_artist(plt.Circle((start.x, start.y), 10, color='r'))
ax2.add_artist(plt.Circle((goal.x, goal.y), 10, color='y'))
for i in range(len(astarPlanner.finalPath)-1):
    pt = astarPlanner.finalPath[i].tuple()
    ax2.add_artist(plt.Circle((pt[0], pt[1]), 5, color='g'))
plt.show()

## Suivre la trajectoire (Attention le robot va bouger!)

Gardez vos mains sur la télécommande.

## (en) Follow the trajectory (Attention the robot will move!)

(en) Keep your hands on the remote control.

In [None]:
current_wp = 0
while current_wp < len(astarPlanner.finalPath):
    target_x, target_y = astarPlanner.finalPath[current_wp].tuple()

    target_x *= map_resolution
    target_y *= map_resolution

    #print("From {}, {} to {}, {}".format(amcl_pose.position.x, amcl_pose.position.y, target_x, target_y))

    theta_target = math.atan2(target_y - amcl_pose.position.y, target_x - amcl_pose.position.x)
    theta = get_heading_from_quaternion(amcl_pose.orientation)

    dist = math.sqrt((target_x-amcl_pose.position.x)**2 + (target_y-amcl_pose.position.y)**2)
    dtheta = angle_diff(theta_target, theta)

    # print(dist)

    if dist > 0.4:
        if math.fabs(dtheta) > 0.785:
            linear = 0.0
        else:
            linear = 0.3

        angular = min(dtheta, 1.0) if dtheta >= 0 else max(dtheta, -1.0)
        move_robot(linear, angular)
    else:
        current_wp += 1

move_robot(0, 0)