# THYMIO PROJECT

Thomas Pierre Xavier Blanchard, Tanguy Pierre Cedoz, Maxime Sami Julien Leriche, Riccardo Vianello

In [None]:
from tdmclient import ClientAsync
client = ClientAsync()
node = await client.wait_for_node()
await node.lock()

In [None]:
test_functions = True

Functions

In [None]:
from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False


In [None]:
#function to set the speed of the wheels
def motors(left, right):
    return {
        "motor.left.target": [left],
        "motor.right.target": [right],
    }

node.send_set_variables(motors(0, 0))

# Local Avoidance

In [None]:
#function to see if an obstacle is detected or not
def detection_obstacle(obstacle_threshold, verbose = False):
    # if one of the sensors in front of the robot has a value higher than the threshold we return true
    if any([x>obstacle_threshold for x in node['prox.horizontal'][:-2]]):
        if verbose: print("\t\t Saw an obstacle")
        return True
    
    return False

In [None]:
#function to see if an obstacle is detected or not with less influence from the right sensor
def detection_obstacle_turning(obstacle_threshold, verbose = False):
    # if one of the sensors in front of the robot has a value higher than the threshold we return true
    if any([x>obstacle_threshold for x in node['prox.horizontal'][:-3]]) or (node['prox.horizontal'][4] > obstacle_threshold + 200):
        if verbose: print("\t\t Saw an obstacle")
        return True
    
    return False

## instead of going straight we should implement the motion control function to follow the optimal path
## for now I've defined the going straight function to test the obstacle avoidance algorithm

In [None]:
async def go_straight(motor_speed=170, obstacle_threshold=3000, verbose=True):
    """
    Go Straight Behaviour we need to change it with the following of the optimal path 
    param motor_speed: the Thymio's motor speed
    param obstacle_threshold: threshold starting which it is considered that thymio has detected an object
    param verbose: whether to print status messages or not
    """
    if verbose: print("Starting go straight behaviour")
    
    # Move forward, i.e. set motor speeds
    # here we should put the module that follows the optimal path
    await node.set_variables(motors(motor_speed, motor_speed))
    
    # Until one of the prox sensor sees an obstacle
    obstacle = False
    
    await node.wait_for_variables({"prox.horizontal"})
    
    while not obstacle:
        # here we should put the module that follows the optimal path
        # as soon as an obstacle is detected it goes out from this module
        if detection_obstacle(obstacle_threshold, verbose=verbose):
            obstacle=True
            if verbose: print("\t Saw an obstacle in front, exiting go straight behaviour")
        await client.sleep(0.2) #otherwise, variables would not be updated
    return 

if test_functions:
    await go_straight(100,1000,True)
    await node.set_variables(motors(0, 0)) 

In [None]:
#for the moment it is not useful
def test_pass_wall(wall_threshold, verbose=True):
    """
    Tests whether one of the proximity sensors saw a wall
    param wall_threshold: threshold starting which it is considered that the sensor saw a wall
    param verbose: whether to print status messages or not
    """
    if any([x>wall_threshold for x in node['prox.horizontal'][:-2]]):
        if verbose: print("\t\t Still following the wall")
    else:
        #wait 3 seconds to really pass the wall
        if verbose: print("\t\t Wall passed")
        return True
    
    return False

In [None]:
# we need to implement a function that 
# detects when we're back on the optimal path
# for now i was trying to use this function to go out of local avoidance
def test_ground_white(white_threshold, verbose=False):
    """
    Tests whether the two ground sensors have seen white
    param white_threshold: threshold starting which it is considered that the sensor saw white
    param verbose: whether to print status messages or not
    """
    if all([x>white_threshold for x in node['prox.ground.reflected']]):
        if verbose: print("\t\t Saw white on the ground")
        return True
    return False

In [None]:
async def wall_following(motor_speed=170, obstacle_threshold=2900, wall_threshold=500, verbose=True):
    """
    Wall following behaviour of the local avoidance
    param motor_speed: the Thymio's motor speed
    param obstacle_threshold: threshold starting which it is considered that the sensor saw an object
    param wall_threshold: threshold starting which it is considered that the sensor saw an object (not used for the moment)
    param verbose: whether to print status messages or not
    """
    
    if verbose: print("Starting wall following behaviour")
    saw_black = False
    
    if verbose: print("\t Moving forward")
    await node.set_variables(motors(motor_speed, motor_speed))
           
    prev_state="forward"
    
    while not saw_black:
        
        if detection_obstacle(obstacle_threshold, verbose=False):
            if prev_state=="forward": 
                if verbose: print("\tSaw wall, turning clockwise")
                await node.set_variables(motors(-motor_speed+40, motor_speed-40))
                prev_state="turning"
        
        else:
            if prev_state=="turning": 
                if verbose: print("\t Moving forward")
                # first I move forward for a while
                await node.set_variables(motors(motor_speed, motor_speed))
                await client.sleep(1)
                # then I start turning right
                await node.set_variables(motors(motor_speed+20, motor_speed-90))
                prev_state="forward"
                
        # we need to define the function that checks that we've 
        #returned to the optimal path to go out of this function
        
        
        #if test_pass_wall(wall_threshold, verbose=True): 
            #await client.sleep(4.7)
            #saw_black = True
            
        await client.sleep(0.05) #otherwise, variables would not be updated
    return 

if test_functions:
    #await wall_following(verbose=True)
    await node.set_variables(motors(0, 0))

In [None]:
#still to be checked
async def alternative_wall_following(motor_speed=170, obstacle_threshold=3000, wall_threshold=500, verbose=True):
    """
    Wall following behaviour of the local avoidance
    param motor_speed: the Thymio's motor speed
    param obstacle_threshold: threshold starting which it is considered that the sensor saw an object
    param wall_threshold: threshold starting which it is considered that the sensor saw an object (not used for the moment)
    param verbose: whether to print status messages or not
    """
    
    if verbose: print("Starting wall following behaviour")
    saw_black = False
    
    if verbose: print("\t Moving forward")
    await node.set_variables(motors(motor_speed, motor_speed))
           
    prev_state="forward"
    second_prev_state = "forward"
    
    while not saw_black:
        
        if detection_obstacle(obstacle_threshold, verbose=False):
            if prev_state=="forward": 
                if verbose: print("\tSaw wall, turning clockwise")
                await node.set_variables(motors(-motor_speed+40, motor_speed-40))
                prev_state="turning"
        
        else:
            if prev_state=="turning":
                while not detection_obstacle(obstacle_threshold, verbose=False):
                    if verbose: print("\t Moving forward")
                    if second_prev_state=="forward": 
                        await node.set_variables(motors(motor_speed, motor_speed-90))
                        second_prev_state="turning"
                    else:
                        await node.set_variables(motors(motor_speed, motor_speed))
                        second_prev_state="forward"
                    # first I move forward for a while
                    
                    await client.sleep(0.5)
                    # then I start turning right
                prev_state="forward"
                
        # we need to define the function that checks that we've 
        #returned to the optimal path to go out of this function
        # that must be written in the following commented if
        
        #if test_pass_wall(wall_threshold, verbose=True): 
            #await client.sleep(4.7)
            #saw_black = True
            
        await client.sleep(0.05) #otherwise, variables would not be updated
    return 

if test_functions:
    #await wall_following(verbose=True)
    await node.set_variables(motors(0, 0))

In [None]:
async def Local_avoidance(speed = 170, verbose=True):
    while True:
        # Step 1: go straight (should be follow the path once we have it)
        await go_straight(speed, verbose=verbose)
        
        # Step 2: wall following
        await wall_following(speed, verbose=verbose)
        
        # Step 3: it should be return following the optimal path
        #await go_straight(speed, verbose=verbose)

In [None]:
await node.set_variables(motors(0, 0)) 

Parameters that are working
- speed 170
- time going straight 1 second
- turning write +20 left speed, - 90 right speed
- obstacle_threshold = 3000

In [None]:
if test_functions:
    await Local_avoidance(170, verbose=True)

In [None]:
# for the moment I'm not using this
async def turn_right(motor_speed=100, obstacle_threshold=2000, wall_threshold=500, verbose=True):
    """
    Wall following behaviour of the FSM
    param motor_speed: the Thymio's motor speed
    param wall_threshold: threshold starting which it is considered that the sensor saw a wall
    param white_threshold: threshold starting which it is considered that the ground sensor saw white
    param verbose: whether to print status messages or not
    """
    
    if verbose: print("Starting wall following behaviour")
    saw_black = False
    
    if verbose: print("\t Moving forward")
    await node.set_variables(motors(motor_speed, motor_speed))
           
    prev_state="forward"
    
    while not saw_black:
        
        if detection_obstacle(obstacle_threshold, verbose=False):
            if prev_state=="forward": 
                if verbose: print("\tSaw wall, turning clockwise")
                await node.set_variables(motors(-motor_speed, motor_speed))
                prev_state="turning"
        
        else:
            if prev_state=="turning": 
                if verbose: print("\t Moving forward")
                await node.set_variables(motors(motor_speed, motor_speed))
                prev_state="forward"

        if test_pass_wall(wall_threshold, verbose=True): 
            await client.sleep(4.2)
            saw_black = True
        await client.sleep(0.1) #otherwise, variables would not be updated
    return 

if test_functions:
    await wall_following(verbose=True)
    await node.set_variables(motors(0, 0))

In [None]:
# just to print something (Riccardo)

async def print_sensor_values(sensor_id, print_range=10, delta_time=0.2):
    """
    Print the sensor value sensor_id print_range times, every delta_time seconds
    """
    await node.wait_for_variables({str(sensor_id)})
    for i in range(print_range):
        print(list(node[sensor_id][:-2]))
        
        await client.sleep(delta_time)
        
await print_sensor_values('prox.horizontal')

In [None]:
# it might not be useful, I need to modify it (Riccardo)

acquire_data = True
Ts = 0.1
thymio_data = []

def motors(left, right):
    return {
        "motor.left.target": [left],
        "motor.right.target": [right],
    }

def get_data():
    thymio_data.append({"prox":list(node["prox.horizontal"]), 
                        "left_speed":node["motor.left.speed"],
                        "right_speed":node["motor.right.speed"]})
    

if acquire_data:
    await node.wait_for_variables() # wait for Thymio variables values
    rt = RepeatedTimer(Ts, get_data) # it auto-starts, no need of rt.start()

    try:
        # time.sleep would not work here, use asynchronous client.sleep method instead
        await client.sleep(5)
        node.send_set_variables(motors(55, 50))
        await client.sleep(10) # your long-running job goes here...
    finally:
        rt.stop() # better in a try/finally block to make sure the program ends!
        node.send_set_variables(motors(0, 0))
else:
    node.send_set_variables(motors(0, 0))

# Filtering

From the current probability map of position, get the next one based on measurements and instructions.