# Reinforcement Learning
## Q-Learning 

The goal here is to implement a Reinforcement Learning algorithm: Q-learning.
### Description : 
An robot/agent can move in a m*n maze. The environment can have a random component: with each choice of movement (up, down, left or right), the agent can move in an unwanted direction. This behavior is configurable.

### Credits
Based on [Akka Zemmari](https://www.labri.fr/perso/zemmari/) RL courses at [ENSEIRB-MATMECA](https://enseirb-matmeca.bordeaux-inp.fr/fr)

Q-learning implemention by Fabien Couthouis.

In [1]:
import random
class Game:
    ACTION_UP = 0
    ACTION_LEFT = 1
    ACTION_DOWN = 2
    ACTION_RIGHT = 3
    
    ACTIONS = [ACTION_UP, ACTION_LEFT, ACTION_DOWN, ACTION_RIGHT]
    ACTIONS_NAMES = ['UP','LEFT','DOWN','RIGHT']
    
    MOVEMENTS = {
        ACTION_UP: (1, 0),
        ACTION_RIGHT: (0, 1),
        ACTION_LEFT: (0, -1),
        ACTION_DOWN: (-1, 0)
    }
    
    num_actions = len(ACTIONS)
    
    def __init__(self, n, m, nb_blocks,wrong_action_p=0.1, alea=False):
        self.n = n
        self.m = m
        self.nb_blocks = nb_blocks
        self.wrong_action_p = wrong_action_p
        self.alea = alea
        self.generate_game()
        
    def _position_to_id(self, x, y):
        """Return the square id"""
        return x + y * self.n
    
    def _id_to_position(self, id):
        """Return the square corresponding to the given id"""
        return (id % self.n, id // self.n)
    
    def generate_game(self):
        """Random game generation"""
        cases = [(x, y) for x in range(self.n) for y in range(self.m)]
        start = random.choice(cases)
        cases.remove(start)
        end = random.choice(cases)
        cases.remove(end)

        self.blocks = []
        for b in range(self.nb_blocks):
            block = random.choice(cases)
            cases.remove(block)
            self.blocks.append(block)
        
        self.position = start
        self.end = end
        
        self.counter = 0
        
        if not self.alea:
            self.start = start
        return self._get_state()
    
    def reset(self):
        if not self.alea:
            self.position = self.start
            self.counter = 0
            return self._get_state()
        else:
            return self.generate_game() 
    
    def _get_grid(self, x, y):
        grid = [
            [0] * self.n for i in range(self.m)
        ]
        grid[x][y] = 1
        return grid
    
    def _get_state(self):
        if self.alea:
            return [self._get_grid(x, y) for (x, y) in
                    [self.position, self.end, self.block]]
        return self._position_to_id(*self.position)
   
    def move(self, action):
        """
        Move the agent 
        :param action : action id
        :return ((state_id, end, hole, block), reward, is_final, actions)
        """
        self.counter += 1
        if action not in self.ACTIONS:
            raise Exception('Invalid action')
        
        choice = random.random()
        if choice < self.wrong_action_p :
            action = (action + 1) % 4
        elif choice < 2 * self.wrong_action_p:
            action = (action - 1) % 4
            
        d_x, d_y = self.MOVEMENTS[action]
        x, y = self.position
        new_x, new_y = x + d_x, y + d_y
        
        if (new_x, new_y) in self.blocks :
            return self._get_state(), -1, False, self.ACTIONS
        elif self.end == (new_x, new_y):
            self.position = new_x, new_y
            return self._get_state(), 10, True, self.ACTIONS
        elif new_x >= self.n or new_y >= self.m or new_x < 0 or new_y < 0:
            return self._get_state(), -1, False, self.ACTIONS
        elif self.counter > 190:
            self.position = new_x, new_y
            return self._get_state(), -10, True, self.ACTIONS
        else:
            self.position = new_x, new_y
            return self._get_state(), -1, False, self.ACTIONS
        
    def print(self):
        str = ""
        for i in range(self.n - 1, -1, -1):
            for j in range(self.m):
                if (i, j) == self.position:
                    str += "x"
                elif (i, j) in self.blocks:
                    str += "¤"
                elif (i, j) == self.end:
                    str += "@"
                else:
                    str += "."
            str += "\n"
        print(str)

It is assumed that the environment is fixed: the position of the elements will not be changed between each part. It is therefore a matter of learning the structure of the ground, to be able to move the agent correctly.

In the next part, we will implement Q-learning with table and with a simple neural network.


In [2]:
import numpy as np 
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, InputLayer

class Agent:
    """
    Parameters:
        game: Game object
        use_nn, optionnal (default=False): If True, train agent using a neural network, use Q-table otherwise
    """

    def __init__(self,game, use_nn=False):
        self.game = game
        self.num_states = self.game.n * self.game.m
        self.num_actions = Game.num_actions
        self.use_nn = use_nn
        self.max_training_iterations = self.num_states ** 3

        if self.use_nn:
            self.model = self._build_model()
        else:
            self.Q = np.zeros([self.num_states, self.num_actions])



    def _build_model(self):
        """Build nn keras model"""
        model = Sequential()
        model.add(InputLayer(batch_input_shape=(1,self.num_states)))
        model.add(Dense(8, activation='sigmoid'))
        model.add(Dense(self.num_actions, activation='linear'))
        #loss = ( reward + y * max( Q(s',a') - Q(s,a) ) )^2 
        model.compile(loss='mse', optimizer='adam', metrics=['mse'])

        return model


    def _act(self,s):
        """Select next action"""
        if self.use_nn:
            #np.identity is used to one hot encode state s
            a = np.argmax(self.model.predict(np.identity(self.num_states)[s:s + 1]))
        else:
            Q2 = self.Q[s,:]
            a = np.argmax(Q2)

        return a


    def _train_one_episode(self,lr,y):
        actions = []
        s = self.game.reset()
        states = []
        cumul_reward = 0
        nb_iterations = 0
        done = False

        while not done and nb_iterations < self.max_training_iterations:
            a = self._act(s)
            new_s,reward,done,_ = self.game.move(a)

            if self.use_nn:
                target = reward + y * np.max(self.model.predict(np.identity(self.num_states)[new_s:new_s + 1]))
                target_vector = self.model.predict(np.identity(self.num_states)[s:s + 1])[0]
                target_vector[a] = target
                self.model.fit(np.identity(self.num_states)[s:s + 1], target_vector.reshape(-1, self.num_actions), epochs=1,  verbose=False)

            else:
                self.Q[s,a] = self.Q[s,a] + lr * (reward + y * np.max(self.Q[new_s,:]) - self.Q[s,a])

            cumul_reward += reward
            s = new_s
            actions.append(a)
            states.append(s)
            nb_iterations += 1
        
        return states,actions,cumul_reward, nb_iterations



    def train(self,num_episodes = 1000, lr=0.85,y=0.99, use_nn=False):
        cumul_rewards_list,actions_list,states_list = [],[],[]

        print("Start of the game")
        self.game.print()


        for t in range(num_episodes):
            actions, states, cumul_reward, nb_iterations = self._train_one_episode(lr,y)

            actions_list.append(actions)
            states_list.append(states)
            cumul_rewards_list.append(cumul_reward)

            #No solution
            if nb_iterations >= self.max_training_iterations:
                print("No solution found after {} iterations".format(nb_iterations))
                break

        print("\nEnd of the game")
        self.game.print()
        print("Score over time: {}".format(sum(cumul_rewards_list[-100:])/100.0))

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [3]:
game = Game(8,8,15)
agent = Agent(game, use_nn=True)
agent.train(num_episodes=100)

Start of the game
........
.¤.¤.¤¤.
.¤...¤@.
¤..¤.x..
¤.......
¤¤.¤....
........
.¤¤..¤..


End of the game
........
.¤.¤.¤¤.
.¤...¤x.
¤..¤....
¤.......
¤¤.¤....
........
.¤¤..¤..

Score over time: 0.68
