# Local Navigation 

### Thymio connection

In [151]:
import tdmclient.notebook
await tdmclient.notebook.start()

### Import libraries

In [147]:
import math
import numpy as np
import serial
import time

In [152]:
await tdmclient.notebook.stop()

In [153]:
from tdmclient import ClientAsync, aw
client = ClientAsync()
node = await client.wait_for_node()
await node.lock()

Node 7ceccd72-4650-4181-812c-177085571dc0

In [154]:
await node.wait_for_variables()
node.var

{'_fwversion': [14, 0],
 '_id': [-1654],
 '_imot': [0, 0],
 '_integrator': [0, 0],
 '_productId': [8],
 '_vbat': [938, 946],
 'acc': [0, 0, 21],
 'acc._tap': [0],
 'button.backward': [0],
 'button.center': [0],
 'button.forward': [0],
 'button.left': [0],
 'button.right': [0],
 'buttons._mean': [11418, 9802, 14812, 14723, 16101],
 'buttons._noise': [97, 86, 123, 106, 113],
 'buttons._raw': [11426, 9761, 14808, 14730, 16104],
 'event.args': [0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0],
 'event.source': [-1654],
 'leds.bottom.left': [0, 0, 0],
 'leds.bottom.right': [0, 0, 0],
 'leds.circle': [0, 0, 0, 0, 0, 0, 0, 0],
 'leds.top': [0, 0, 0],
 'mic._mean': [259],
 'mic.intensity': [51],
 'mic.threshold': [0],
 'motor.left.pwm': [0],
 'motor.left.speed': [-7],
 'motor.left.target': [0],
 'motor.right.pwm': [0],
 'motor.right.speed': [0],
 'motor.right.target': [0],
 'prox.comm.r

In [155]:
# Connect to the Thymio robot
client.tdm_addr
client.tdm_port
th = node

### Desciption
Local navigation allows modulating the trajectory to avoid unforeseen, local obstacles. It often pushes the controller to leave the optimal path to make an avoidance manoeuvre. Once the obstacle is passed, the controller can go back to the optimal path or find a new one. An important issue is to decide when the controller has to start avoiding, and, more difficult, when the obstacle can be considered as passed.

### Parameters 

In [157]:
## Parameters for local navigation
pi = math.pi
threshold_obst = 3500 # Obstacles ahead (Global)
threshold_obst_local = 2500 # Obstacles locally 
threshold_obst_list = [3200, 3600, 3600, 3600, 3200] # Orient of robot during local nav

## Global variables
speed0 = 100       # nominal speed
speedGain = 2      # speed gain with obstacle
obstThrL = 10      # low obstacle threshold to switch state 1->0
obstThrH = 20     # high obstacle threshold to switch state 0->1
obstSpeedGainLocal = 5  # /100 (actual gain: 5/100=0.05)
obstSpeedGainGlobal = [6, 4, -2, -6, -8]  # /100

state = 1          # 0=towards goal, 1=obstacle avoidance
obst = [0,0]       # measurements from left and right prox sensors

timer_period[0] = 10   # 10ms sampling time

### Sensor readings

In [156]:
await node.wait_for_variables()
for i in range(10):
    # You can also replace "prox.horizontal" by an other prox sensor:
    print(list(node["prox.horizontal"]))
    await client.sleep(0.2)

[0, 0, 0, 2633, 4533, 0, 0]
[0, 0, 0, 2600, 4705, 0, 0]
[0, 0, 0, 2601, 4714, 0, 0]
[0, 0, 0, 2596, 4716, 0, 0]
[0, 0, 0, 2600, 4720, 0, 0]
[0, 0, 0, 2602, 4723, 0, 0]
[0, 0, 0, 2597, 4727, 0, 0]
[0, 0, 0, 2594, 4729, 0, 0]
[0, 0, 0, 2601, 4725, 0, 0]
[0, 0, 0, 2605, 4723, 0, 0]


In [158]:
# Reading sensor measurements
# sensor_prox = th["prox.horizontal"]
await node.wait_for_variables({"prox.horizontal"})
for i in range(10):
    sensor_prox = node.v.prox.horizontal 
    print(list(sensor_prox))
    await client.sleep(0.2)

[0, 0, 0, 2601, 4729, 0, 0]
[0, 0, 0, 2602, 4731, 0, 0]
[0, 0, 0, 2594, 4724, 0, 0]
[0, 0, 0, 2601, 4730, 0, 0]
[0, 0, 0, 2602, 4734, 0, 0]
[0, 0, 0, 2606, 4732, 0, 0]
[0, 0, 0, 2600, 4736, 0, 0]
[0, 0, 0, 2813, 4831, 0, 0]
[0, 0, 0, 2851, 4846, 0, 0]
[0, 0, 0, 2847, 4842, 0, 0]


### Functions

In [229]:
aw(node.lock())

Node 7ceccd72-4650-4181-812c-177085571dc0

In [230]:
@onevent
def move_forward(speed0):
#     global motor_left_target, motor_right_target
    v = {
        "motor.left.target": [speed0],
        "motor.right.target": [speed0],
    }
    aw(node.set_variables(v))

@onevent
def stop_moving():
#     global motor_left_target, motor_right_target
    x = {
        "motor.left.target": [0],
        "motor.right.target": [0],
    }
    aw(node.set_variables(x))

### Method 1
#### 1. ANN
- Train to find appropriate weights for each sensor

#### 2. Potential Field Navigation

In [234]:
@onevent 
def pt_field():
    global prox_ground_delta, speed0, speedGain, obstSpeedGainGlobal
    
    # speed based on nominal velocity and ground
    spLeft = speed0 - speedGain 
    spRight = speed0 + speedGain
    
    # adjustment for obstacles
    for i in range(5):
        print(sensor_prox[0])
        spLeft += sensor_prox[i] * obstSpeedGainGlobal[i] // 100
        spRight += sensor_prox[i] * obstSpeedGainGlobal[4 - i] // 100
#         await client.sleep(0.2)
    
    # motor control
    y = {
        "motor.left.target": [spLeft],
        "motor.right.target": [spRight],
    }
    aw(node.set_variables(y))
#     motor_left_target = spLeft
#     motor_right_target = spRight
    
    # Move in the same direction for 2 more seconds
#     move_forward(speed0)
#     time.sleep(0.2)
#     stop_moving()
    
#      # Continue towards goal
#     move_forward(speed0)   

In [232]:
# Test
# acquisition from the proximity sensors to detect obstacles
await node.wait_for_variables({"prox.horizontal"})

for i in range(1000): 
    sensor_prox = node["prox.horizontal"]
#     sensor_prox = node.v.prox.horizontal 
    print(list(sensor_prox)
    obst = [sensor_prox[0], sensor_prox[4]]

    if state == 0: 
        # switch from goal tracking to obst avoidance if obstacle detected
        if (obst[0] > obstThrH):
            state = 1
        elif (obst[1] > obstThrH):
            state = 1
    elif state == 1:
        if obst[0] < obstThrL:
            if obst[1] < obstThrL : 
                # switch from obst avoidance to goal tracking if obstacle got unseen
                state = 0
    if  state == 0 :
        # goal tracking: stop for now
        leds_top = [0,0,0]
        stop_moving()
    else:
        leds_top = [30,30,30]
        pt_field(sensor_prox)
        # obstacle avoidance: accelerate wheel near obstacle
        # motor_left_target = speed0 + obstSpeedGain * (obst[0] // 100)
        #motor_right_target = speed0 + obstSpeedGain * (obst[1] // 100)

    await client.sleep(0.2)

0
0
0
0
0


In [233]:
# Stop motion
move_forward(0)

In [200]:
aw(node.unlock())

In [180]:
# Test 2
move_forward(speed0)
while sum(sensor_prox[i] > threshold_obst_local for i in range(5)) > 0:
    pt_field()

## Workings

In [11]:
@onevent 
def combined_navigation():
    global prox_ground_delta, prox_horizontal, motor_left_target, motor_right_target, state, obst, speed0, speedGain, obstSpeedGainLocal, obstSpeedGainGlobal
    
    # acquisition from ground sensor for going toward the goal
    diffDelta = prox_ground_delta[1] - prox_ground_delta[0]

    # acquisition from the proximity sensors to detect obstacles
    obst = [prox_horizontal[0], prox_horizontal[4]]

    # tdmclient does not support yet multiple and/or in if statements:
    if state == 0: 
        # switch from goal tracking to obst avoidance if obstacle detected
        if (obst[0] > obstThrH) or (obst[1] > obstThrH):
            state = 1
    elif state == 1:
        if obst[0] < obstThrL and obst[1] < obstThrL:
            # switch from obst avoidance to goal tracking if obstacle got unseen
            state = 0

    if state == 0:
        # goal tracking: turn toward the goal
        leds_top = [0, 0, 0]
        motor_left_target = speed0 - speedGain * diffDelta
        motor_right_target = speed0 + speedGain * diffDelta
    else:
        leds_top = [30, 30, 30]
        # obstacle avoidance: accelerate wheel near obstacle
        motor_left_target = speed0 + obstSpeedGainLocal * (obst[0] // 100)
        motor_right_target = speed0 + obstSpeedGainLocal * (obst[1] // 100)

        # adjustment for obstacles ("gradient" due to obstacles)
        for i in range(5):
            motor_left_target += prox_horizontal[i] * obstSpeedGainGlobal[i] // 100
            motor_right_target += prox_horizontal[i] * obstSpeedGainGlobal[4 - i] // 100

In [31]:
## Artifical Neural Network on Sensors

# Sigmoid activation function
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

# Derivative of the sigmoid function
def sigmoid_derivative(x):
    return x * (1 - x)

# Neural network class
class NeuralNetwork:
    def __init__(self, input_size, hidden_size, output_size):
        # Initialize weights randomly
        self.weights_input_hidden = np.random.rand(input_size, hidden_size)
        self.weights_hidden_output = np.random.rand(hidden_size, output_size)

    def forward(self, inputs):
        # Propagate inputs through the network
        self.hidden_layer_activation = sigmoid(np.dot(inputs, self.weights_input_hidden))
        self.output = sigmoid(np.dot(self.hidden_layer_activation, self.weights_hidden_output))
        return self.output

    def backward(self, inputs, targets, learning_rate):
        # Backpropagation to update weights
        output_error = targets - self.output
        output_delta = output_error * sigmoid_derivative(self.output)

        hidden_layer_error = output_delta.dot(self.weights_hidden_output.T)
        hidden_layer_delta = hidden_layer_error * sigmoid_derivative(self.hidden_layer_activation)

        # Update weights
        self.weights_hidden_output += self.hidden_layer_activation.T.dot(output_delta) * learning_rate
        self.weights_input_hidden += inputs.T.dot(hidden_layer_delta) * learning_rate

input_size = 5  # Number of sensors
hidden_size = 8  # Number of hidden layer neurons
output_size = 2  # Number of motor commands

# Create a neural network
neural_network = NeuralNetwork(input_size, hidden_size, output_size)

# Example input (replace with actual sensor readings)
input_data = np.random.rand(input_size)

# Forward pass
output = neural_network.forward(input_data)
print("Output before training:", output)

# Example target (replace with actual desired motor commands)
target = np.array([0.8, 0.2])

# Backward pass (learning)
neural_network.backward(input_data, target, learning_rate=0.1)

# Forward pass after training
output_after_training = neural_network.forward(input_data)
print("Output after training:", output_after_training)

Output before training: [0.93523729 0.95171766]


ValueError: shapes (8,) and (2,) not aligned: 8 (dim 0) != 2 (dim 0)

In [13]:
## Combined Method of Local Avoidance + Potential Field Navigation
@onevent 
def combined_navigation():
    global prox_ground_delta, prox_horizontal, motor_left_target, motor_right_target, state, obst, speed0, speedGain, obstSpeedGainLocal, obstSpeedGainGlobal
    
    # acquisition from ground sensor for going toward the goal
    diffDelta = prox_ground_delta[1] - prox_ground_delta[0]

    # acquisition from the proximity sensors to detect obstacles
    obst = [prox_horizontal[0], prox_horizontal[4]]

    if state == 0: 
        # switch from goal tracking to obst avoidance if obstacle detected
        if (obst[0] > obstThrH) or (obst[1] > obstThrH):
            state = 1
    elif state == 1:
        if obst[0] < obstThrL and obst[1] < obstThrL:
            # switch from obst avoidance to goal tracking if obstacle got unseen
            state = 0

    if state == 0:
        # goal tracking: turn toward the goal
        leds_top = [0, 0, 0]
        motor_left_target = speed0 - speedGain * diffDelta
        motor_right_target = speed0 + speedGain * diffDelta
    else:
        leds_top = [30, 30, 30]
        # obstacle avoidance: accelerate wheel near obstacle
        motor_left_target = speed0 + obstSpeedGainLocal * (obst[0] // 100)
        motor_right_target = speed0 + obstSpeedGainLocal * (obst[1] // 100)

        # adjustment for obstacles ("gradient" due to obstacles)
        for i in range(5):
            motor_left_target += prox_horizontal[i] * obstSpeedGainGlobal[i] // 100
            motor_right_target += prox_horizontal[i] * obstSpeedGainGlobal[4 - i] // 100
    
    # Add forward motion for 2 seconds after local avoidance
    if state == 0:
        time.sleep(2)
        stopmotors()  # Stop after 2 seconds of forward motion

In [16]:
# Initialize Thymio
# th = Thymio.serial_default_port()
motor_left_target = 0
motor_right_target = 0

# Define parameters
num_sensors = 7
num_actions = 2  # For simplicity, assuming left or right actions

# Initialize Q-table (replace with neural network weights in a real-world scenario)
q_table = np.random.rand(num_sensors, num_actions)

# Training parameters
learning_rate = 0.1
discount_factor = 0.9
exploration_rate = 1.0
min_exploration_rate = 0.01
exploration_decay = 0.995

# Training loop
for episode in range(100):
    num_bins = 5
    state = tuple(int(sensor_value/(1000/num_bins)) for sensor_value in prox_horizontal)

    total_reward = 0

    while True:
        # Choose action using epsilon-greedy strategy
        if np.random.uniform(0, 1) < exploration_rate:
            action = np.random.randint(num_actions)
        else:
            action = np.argmax(q_table[state, :])

        # Perform action in the environment
        if action == 0:
            # Move left
            motor_left_target = -100
            motor_rightt_target = 100
        else:
            # Move right
            motor_left_target = 100
            motor_rightt_target = -100

        # Simulate environment (replace with actual robot interaction)
        next_state = prox_horizontal

        # Reward function (adjust based on your specific scenario)
        reward = -1 if any(sensor_value > 500 for sensor_value in next_state) else 1

        # Update Q-table using Q-learning equation
        q_table[state, action] = (1 - learning_rate) * q_table[state, action] + \
                                 learning_rate * (reward + discount_factor * np.max(q_table[next_state, :]))

        state = next_state
        total_reward += reward

        # Check for episode termination (replace with your specific termination condition)
        if total_reward < -100:
            break

    # Decay exploration rate
    exploration_rate = max(min_exploration_rate, exploration_rate * exploration_decay)

    print(f"Episode {episode + 1}, Total Reward: {total_reward}")

# Once training is done, you can use the learned Q-table to determine actions in a real-world scenario.


IndexError: index 10 is out of bounds for axis 0 with size 7

In [17]:
import numpy as np

# Sigmoid activation function
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

# Derivative of the sigmoid function
def sigmoid_derivative(x):
    return x * (1 - x)

# Neural network class
class NeuralNetwork:
    def __init__(self, input_size, hidden_size, output_size):
        # Initialize weights randomly
        self.weights_input_hidden = np.random.rand(input_size, hidden_size)
        self.weights_hidden_output = np.random.rand(hidden_size, output_size)

    def forward(self, inputs):
        # Propagate inputs through the network
        self.hidden_layer_activation = sigmoid(np.dot(inputs, self.weights_input_hidden))
        self.output = sigmoid(np.dot(self.hidden_layer_activation, self.weights_hidden_output))
        return self.output

    def backward(self, inputs, targets, learning_rate):
        # Backpropagation to update weights
        output_error = targets - self.output
        output_delta = output_error * sigmoid_derivative(self.output)

        hidden_layer_error = output_delta.dot(self.weights_hidden_output.T)
        hidden_layer_delta = hidden_layer_error * sigmoid_derivative(self.hidden_layer_activation)

        # Update weights
        self.weights_hidden_output += np.outer(self.hidden_layer_activation, output_delta) * learning_rate
        self.weights_input_hidden += np.outer(inputs, hidden_layer_delta) * learning_rate

# Example usage with training
input_size = 5  # Number of sensors
hidden_size = 8  # Number of hidden layer neurons
output_size = 2  # Number of motor commands

# Create a neural network
neural_network = NeuralNetwork(input_size, hidden_size, output_size)

# Training data (replace with your actual dataset)
training_inputs = np.array([[0.1, 0.2, 0.3, 0.4, 0.5], [0.6, 0.7, 0.8, 0.9, 1.0]])
training_outputs = np.array([[0.8, 0.2], [0.2, 0.8]])

# Training parameters
learning_rate = 0.1
epochs = 1000

# Training loop
for epoch in range(epochs):
    for i in range(len(training_inputs)):
        input_data = training_inputs[i]
        target = training_outputs[i]

        # Forward pass
        output = neural_network.forward(input_data)

        # Backward pass (learning)
        neural_network.backward(input_data, target, learning_rate)

    if epoch % 100 == 0:
        # Print training progress every 100 epochs
        error = np.mean(np.abs(training_outputs - neural_network.forward(training_inputs)))
        print(f"Epoch {epoch}, Error: {error}")

# Test the trained model
test_input = np.array([0.2, 0.3, 0.4, 0.5, 0.6])
predicted_output = neural_network.forward(test_input)
print(f"Predicted Output: {predicted_output}")


Epoch 0, Error: 0.4583485097727641
Epoch 100, Error: 0.30563568604187524
Epoch 200, Error: 0.29293927128374153
Epoch 300, Error: 0.29120255369946174
Epoch 400, Error: 0.2890043058379349
Epoch 500, Error: 0.2859616394972115
Epoch 600, Error: 0.28175705709210774
Epoch 700, Error: 0.2761508116890472
Epoch 800, Error: 0.26906361414992497
Epoch 900, Error: 0.260523120705205
Predicted Output: [0.50231524 0.51013992]


##### When met with unknown obstacles
- Sensor value: 
- Add weights to sensors

In [96]:
motor_right=0
motor_left=0

In [97]:
v = [32, 0, 32, 0, 32, 0, 32, 0]
leds_circle = v