Implementation of the Q-Learning algorithm using an $\epsilon$-greedy policy, discrete states and a linear model

In [None]:
import tensorflow as tf
import numpy as np
import gym
from gym import spaces
import matplotlib.pyplot as plt
from tqdm import tqdm
import math
import rpyc
import time
import pickle

# High level settings

In [None]:
# should the robots check for distance traveled?
#   True: The robot will take constant range mesurments and use them as reward for learning. 
#         If the measurments go outside the range specified by MOVEMENT_RANGE, any mition will stop until 
#         the robot is manually moved back to its starting position
MOVEMENT_RANGE_CHECK = True
# Range in wich it is save to move in. Measured from a wall to the ultra sound sensor at the back of the robot.
# Useful for ensuring there is enough space to move in both directions. 
# Can also be used to prevent the robot from falling of a desk.
# Valid range: [0,254]
MOVEMENT_RANGE = [20,125]

# Connecting to the robot

Perform these steps before attemptiong to connect:

1. start the robot
2. ensure the connection to the robot is working. Follow tutporial: https://www.ev3dev.org/docs/networking/
2. start the rpyc server on the robot. This can be done via command line:
    1. ssh into robot
```
$ ssh robot@ev3dev.lcoal
$ Password: maker
```
    2. start rpyc server script
```
$ ./rpyc_server.sh
```

In [None]:
# establish connection to rpyc server on robot
ROBOT_HOSTNAME = "ev3dev.local"
conn = rpyc.classic.connect(ROBOT_HOSTNAME)
conn.execute("print('Hello Slave. I am your master!')")
print("sucessfully connected to the robot")

# Robot Implementation
The `CrawlerRobot` class is used to control basic movements of the robot.

In [None]:
class CrawlerRobot:
    """
    Class for managing the one armed crawl robot build by Steffen.
    """
    def __init__(self, connection):
        """ Initialize the robot and calibrate the motors.
        """
        self.conn = connection
        motors = connection.modules['ev3dev2.motor'] 
        self.m1 = motors.LargeMotor('outA')
        self.m1.stop_action = 'hold'
        self.m2 = motors.LargeMotor('outB')
        self.m2.stop_action = 'hold'
        sensors = conn.modules['ev3dev2.sensor.lego'] 
        self.t1 = sensors.TouchSensor('in1')
        self.t2 = sensors.TouchSensor('in2')
        self.ult = sensors.UltrasonicSensor('in3')
        self.m1_pos = -1
        self.m2_pos = -1
        self.distance = -1
        sound = connection.modules['ev3dev2.sound'] 
        self.sound = sound.Sound()
        
        self.m1_rotation_levels = {0:-120, 1:-240, 2:-270}
        self.m2_rotation_levels = {0:120, 1:190, 2:220}
        
        self.calibrate()
        
    def calibrate(self):
        """calibration function
        """
        # calibrate m1
        self.m1.stop_action = 'hold'
        self.m1.run_direct(duty_cycle_sp=50) # move m1 up
        while True:
            if self.t1.is_pressed == True:
                self.m1.position = 0
                self.m1.reset()
                self.m1.stop_action = 'hold'
                break

        # calibrate m2
        self.m2.stop_action = 'coast'
        self.m2.run_direct(duty_cycle_sp=-30) # move m1 up
        while True:
            if self.t2.is_pressed == True:
                self.m2.reset()
                self.m2.position = 0
                break
        self.m2.stop_action = 'hold'
        
        self.m1_pos = -1
        self.m2_pos = -1
        self.reset_distance()
        return
    
    def move_mot1_to_pos(self, position):
        if position != self.m1_pos:
            self.m1.on_to_position(20, position=self.m1_rotation_levels[position])
            self.m1_pos = position
        return
    
    def move_mot2_to_pos(self, position):
        if position != self.m2_pos:
            self.m2.on_to_position(20, position=self.m2_rotation_levels[position])
            self.m2_pos = position
        return
    
    def reset_distance(self):
        """resets the distance measure. Call this after manually moving the robot"""
        self.distance = self.ult.distance_centimeters
        return
        
    def move(self, m1_pos, m2_pos):
        """moves motors to position
        """
        # move motors
        self.move_mot1_to_pos(m1_pos)
        self.move_mot2_to_pos(m2_pos)
        # measure distance
        dist_new = self.ult.distance_centimeters
        
        if MOVEMENT_RANGE_CHECK:
            # check for faulty measurments (happens approx 0.1% of the time)
            retry = 3
            while dist_new > 250 or abs(dist_new - self.distance) > 50:
                retry -= 1
                if retry <= 0:
                    # accept last trusted measurment
                    print("ignoring new faulty measurment, new: %i, last: %i" % (dist_new, self.distance))
                    dist_new = self.distance
                    break
                time.sleep(0.5)
                dist_new = self.ult.distance_centimeters


            # check for enough space remaining
            if dist_new > MOVEMENT_RANGE[1] or dist_new < MOVEMENT_RANGE[0]:
                self.out_of_space_stop()
                dist_new = self.distance
            
        
        dist_traveled = dist_new - self.distance
        self.distance = dist_new
        
        return dist_traveled
    
    def walk(self):
        self.reset_distance()
        dist = 0
        for i in range(4):
            dist += self.move(0,0)
            dist += self.move(1,0)
            dist += self.move(2,0)
            dist += self.move(2,1)
            dist += self.move(2,2)
            dist += self.move(1,2)
            dist += self.move(0,2)
            dist += self.move(0,1)
        return dist
    
    def out_of_space_stop(self):
        self.notify("Ran out of space. Move me and press button to restart")
        while True:
            if self.t1.is_pressed == True:
                break
            time.sleep(0.1)
            if self.t2.is_pressed == True:
                break
            time.sleep(0.1)
        time.sleep(0.5)
        self.reset_distance()
        return
        
    def notify(self, text):
        self.sound.speak(text)
        return

# Enviroment Implementation

The `CrawlerEnv` implements and `openAI` Gym interface for the crawling robot learning problem. This abstraction layer allows using standardized reinforcement learning code for learning. It also allows us to test the same learning implementation on other (simulated) `openAI` enviroments. 

This implementation loosly based on the cliff walking gym: https://github.com/podondra/gym-gridworlds/blob/master/gym_gridworlds/envs/cliff_env.py

In [None]:
class CrawlerEnv(gym.Env):
    metadata = {'render.modes': []}
    
    def __init__(self):
        # states of each motor
        self.m0_states = 3
        self.m1_states = 3
        
        # action space
        self.action_space = spaces.Discrete(4)
        
        # state space
        self.observation_space = spaces.Tuple((
                spaces.Discrete(self.m0_states),
                spaces.Discrete(self.m1_states)
                ))
        self.observation_space.n = self.m0_states * self.m1_states
        
        # reward space
        self.reward_range = [- math.inf, math.inf]
        
        # action encodings
        self.moves = {
                0: (-1, 0),   # m0 down
                1: (1, 0),   # m0 up
                2: (0, -1),  # m1 down
                3: (0, 1),  # m1 up
                }
        
        # connect to robot
        self.robot = CrawlerRobot(conn)
        self.reset()
        
        
    def step(self, action):
        """Run one timestep of the environment's dynamics. When end of
        episode is reached, you are responsible for calling `reset()`
        to reset this environment's state.
        Accepts an action and returns a tuple (observation, reward, done, info).
        Args:
            action (object): an action provided by the environment
        Returns:
            observation (object): agent's observation of the current environment
            reward (float) : amount of reward returned after previous action
            done (boolean): whether the episode has ended, in which case further step() calls will return undefined results
            info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)
        """
        # calculate new state
        x, y = self.moves[action]
        self.state = self.state[0] + x, self.state[1] + y
        
        reward = 0

        # keep states in allowed Box
        for arm in [0,1]:
            if self.state[arm] < 0:
                reward -= 3
        self.state = max(0, self.state[0]), max(0, self.state[1])
        self.state = (min(self.state[0], self.m0_states - 1),
                      min(self.state[1], self.m1_states - 1))

        # perform movement
        reward += self.robot.move(*self.state)
        
        return self.state_as_int, reward, False, {}
    
    
    def reset(self):
        """Resets the state of the environment and returns an initial observation.
        
        Returns: observation (object): the initial observation of the
            space.
        """
        self.robot.reset_distance()
        self.robot.move(0, 0)
        self.state = (0, 0)
        return self.state_as_int
    
    
    def render(self, mode, close=False):
        """Renders the environment.
        """
        # raise NotImplementedError
        super(CrawlerEnv, self).render(mode=mode)
    
    @property
    def state_as_int(self):
        return self.state[0] + self.m0_states * self.state[1]
        
    

# Create enviroment, Calibrate

This creates the training enviroment and calibrates the motors of the robot.

### ! Ensure that the motors calibrate properly. Once hitting the buttons, they should not bounce back too much  !
Repeat the calibration process multiple times if neccessary.

In [None]:
env = CrawlerEnv()
number_of_actions = env.action_space.n
number_of_states = env.observation_space.n
print("|S| =", number_of_states)
print("|A| =", number_of_actions)

# Test deterministic movements

In [None]:
env.robot.walk()

In [None]:
env.robot.move(1,1)

# Implementation of tabular Q-learning

In [None]:
def save_model(f_name="model.p"):
    """ saves the current model parameters as the given file name
    """
    model_params = {"weights":trained_weights,
                    "epsilon":epsilon,
                    "eta" : eta,
                    "R_list" : R_list,
                    "E_list" : E_list,
                    "iter" : len(R_list),
                    "gamma" : gamma,
                    "initial_epsilon" : initial_epsilon
                   }
    pickle.dump( model_params, open( f_name, "wb" ) )
    return

def new_model():
    """ creates a new model with default parameters
    """
    return {"weights":None,
            "epsilon":0.5,
            "eta" : 0.5,
            "R_list" : [],
            "E_list" : [],
            "iter" : 0,
            "gamma" : 0.8
           }

def load_model(f_name="model.p", learning=False):
    """ loads a model from the given file. Sets exploration and learning rate to 0.
    """
    model = pickle.load( open( f_name, "rb" ) )
    if not learning:
        model["epsilon"] = 0
        model["eta"] = 0
    return model

In [None]:
tf.reset_default_graph()

# Define linear model
# y = W^T x     ,y is R^4 for 4 actions, x is R^48 for 48 states, W is R^48x4

# one-hot encoding of current state s
x = tf.placeholder(shape=[None, number_of_states],dtype=tf.float32)  # input state

# weights of the Q(s, a) function
W = tf.Variable(tf.zeros([number_of_states, number_of_actions]))

# y = values for all 4 actions in state s, represents Q(s, . )
y = tf.matmul(x, W) 

# get best action
argmax_y = tf.argmax(y, 1)

#  Loss is mesured difference between  target and predicted Q values.
# target = values to learn
target = tf.placeholder(shape=[None, number_of_actions], dtype=tf.float32)
lr = tf.placeholder(dtype=tf.float32)
loss = tf.reduce_sum(tf.square(target - y))
trainer = tf.train.GradientDescentOptimizer(learning_rate = lr)
train_step = trainer.minimize(loss)

# Q-Learning

load a model or create a new one

In [None]:
model = new_model()

In [None]:
model = load_model("./model_1_perfect/model_30min.p", learning=False)

The actual learning process

In [None]:
initial_epsilon = epsilon = model["epsilon"]   # epsilon for epsilon-greedy selection
number_of_episodes = 100
max_number_of_steps = 35

init = tf.global_variables_initializer()

# Set learning parameters
gamma = model["gamma"]  # gamma
eta = model["eta"]  # (initial) learning rate
R_list = model["R_list"]  # list gathering returns (accumulated rewards) for each episode
E_list = model["E_list"]  # list gathering exploration factors
with tf.Session() as sess:
    sess.run(init)
    if not (model["weights"] is None):
        W.load(model["weights"])
    for i in tqdm(range(number_of_episodes)):
        s = env.reset()  # reset environment and get first state
        R = 0  # return (accumulated reward)
        for t in range(max_number_of_steps):  # maximum number of steps
            # Choose an action greedily (with e chance of random action) from the Q-network, based on current state
            # x: one-hot encoding of current state
            # a is a skalar represented as a list eg: [1]
            # Q is current approximation of the action-value function, vector over all states
            a, Q = sess.run([argmax_y, y], feed_dict={x:np.eye(1, number_of_states, s)}) 
            if np.random.rand(1) < epsilon:
                # epsilon-greedy with random exploration here
                a[0] = env.action_space.sample()
            # Observe new state and reward from environment
            # s_prime = new state
            # r = reward of action
            # d = boolean, true if terminal state reached
            s_prime, r, d, _ = env.step(a[0])
            # Compute Q' by feeding the new state into the network, get vector of values of actions of new state
            Q_prime = sess.run(y, feed_dict={x:np.eye(1, number_of_states, s_prime)})
            # Compute maximum value of Q_prime and set  target value for chosen action
            max_Q_prime = np.max(Q_prime)  # get maximum value of new state
            Q_target = Q
            Q_target[0, a[0]] = r + gamma * max_Q_prime  # update Q by updating the value of the action sampled
            # Train network using target and predicted Q values
            _ = sess.run([train_step], feed_dict={x:np.eye(1, number_of_states, s), target:Q_target, lr:eta})
            R += r
            s = s_prime
            if t >= max_number_of_steps - 1:  # episode end
                # Reduce probability of random actions over time
                if initial_epsilon > 0.:
                    epsilon = 1./((i/10) + (1./initial_epsilon))
                    eta = 1./((i/10) + (1./initial_epsilon))
                break
        
        R_list.append(R) # reward accumulated in episode
        E_list.append(epsilon) # exploration factor
        
        if i%10 == 0:
            # take model snapshot
            trained_weights = sess.run(W)
            save_model("model_%s_epocs.p" % i)
        
    # save weights
    trained_weights = sess.run(W)

Save model

In [None]:
save_model("model.p")

Utility functions

In [None]:
def visualize_weights(weights):
    def best_action(actions):
        a = np.argmax(actions)
        if a == 0:
            return "m1 up   "
        elif a == 1:
            return "m1 down "
        elif a == 2:
            return "m2 away "
        elif a == 3:
            return "m2 close"
        
    m1_pos = {0: "up     ", 1: "mid    ", 2: "down   "}
    m2_pos = {0: "away   ", 1: "mid    ", 2: "close  "}
        
    print("M1 pos   | M2 pos   | best action | w m1 up  | m1 down  | m2 away  | m2 close ")
    print("---------|----------|-------------|----------|----------|----------|----------")
    for i in range(9):
        print(" %s | %s  | %s    | %6.3f   | %6.3f   | %6.3f   | %6.3f  " % (m1_pos[(int(i%3))], m2_pos[(int(i/3))], best_action(weights[i,:]), weights[i,0], weights[i,1], weights[i,2], weights[i,3]))
        
        
visualize_weights(trained_weights)

In [None]:
plt.plot(R_list, 'g.')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title("Reward gathered")
plt.show()

plt.plot(E_list, 'r')
plt.xlabel('Episode')
plt.ylabel('Exploration factor')
plt.title("Exploration factor")
plt.show()

In [None]:
xs = []
for i in range(100):
    xs.append(1./((i/10) + (1./initial_epsilon)))
    
plt.plot(xs, 'r')
plt.xlabel('Episode')
plt.ylabel('Exploration factor')
plt.title("Exploration factor")
plt.show()