## Deep QNetworks

Deep Qlearning with experience replay.

In [4]:
import sys
sys.path.insert(0, '/home/alexserra98/uni/r_l/project/Deep-QNetworks/src_code')

### Init

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import copy
from qnetworks import ReplayBuffer
from deep_qnetworks import DQN #, SnakeEnv
from tqdm import tqdm


In [6]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
batch_size = 32  # Size of batch taken from replay buffer

## Env

In [42]:
import numpy as np
import pygame

import gymnasium as gym
from gymnasium import spaces

class SnakeEnv(gym.vector.VectorEnv):

    def __init__(self, Lx, Ly, start = None, end = None):
        
        # World shape
        self.Ly, self.Lx = Lx, Ly
        #self.observation_space = spaces.Box(low=0, high=3, shape=[Lx, Ly])
        # start and end positions
        self.start = [0,0] if start is None else start
        self.end = None if end is None else end
        self.current_state = self.start
        self.state_in = None
        
        self.done = False

        # space of actions  [Down,  Up,  Right,Left]
        #self.action_space = spaces.Discrete(4) 
        self.actions = np.array([[1,0],[-1,0],[0,1],[0,-1]])
        self.num_actions = len(self.actions)
        self.body = []
        
    def set_state_in(self, state_in):
        self.state_in = state_in

    def reset(self):
        """
        Restart snake by setting current state to start
        """
        self.current_state = self.start
        self.body = []
        self.done = False
        
    def step(self, action):
        """
        Evolves the environment given action A and current state.
        """
        #state = self.state_in.cpu()
        a = self.actions[action] # action is an integer in [0,1,2,3]
        
        S_new = copy.deepcopy(state)

        S_new[:2] += a

        # add a penalty for moving
        reward = -1

        # if the snake eats itself, add penalty 
        if S_new[:2] in self.body:
            self.done = True
            reward = -1000

        # update all the body segments in reverse order
        for i in range(len(self.body)-1,0,-1):
            self.body[i] = self.body[i-1]
        
        # update the first segment
        if len(self.body) > 0:
            self.body[0] = self.current_state

        # If we go out of the world, we enter from the other side
        if (S_new[0] == self.Ly):
            S_new[0] = 0
        elif (S_new[0] == -1):
            S_new[0] = self.Ly - 1
        elif (S_new[1] == self.Lx):
            S_new[1] = 0
        elif (S_new[1] == -1):
            S_new[1] = self.Lx - 1

        elif np.all(S_new[:2] == S_new[2:]):
            self.done = True       
            reward = 100  # if we reach the reward we get a reward of 100
            # add an element to the body
            new_segment = self.body[-1] if len(self.body) > 0 else S_new[:2]
            self.body.append(new_segment)
        
        # change the current position
        self.current_state = S_new[:2]
        return S_new, reward, self.done
    


    def get_image(self,state):
        """
        Represent the game as an image, state input is a tuple of 4 elements
        (x,y,x_food,y_food)
        """
        image = np.zeros((self.Lx,self.Ly))
        if state[2] >= 0 and state[2] < self.Lx and state[3] >= 0 and state[3] < self.Ly:
            image[int(state[2]), int(state[3])] = 1

        if state[0] >= 0 and state[0] < self.Lx and state[1] >= 0 and state[1] < self.Ly:
            image[int(state[0]), int(state[1])] = 1
        else:
            # if the agent is out of the world, it is dead and so we cancel the food as well
            # this check is just for safety reasons, if we allow the snake to go through the walls
            # this should never happen
            image[int(state[2]), int(state[3])] = 0 
            
        for i in range(len(self.body)):
            if self.body[i][0] >= 0 and self.body[i][0] < self.Lx and self.body[i][1] >= 0 and self.body[i][1] < self.Ly:
                image[int(self.body[i][0]), int(self.body[i][1])] = 1
            
        return image

    def select_epsilon_greedy_action(self, model, state, epsilon):
        """
        Take random action with probability epsilon, 
        else take best action.
        """
        result = np.random.uniform()
        if result < epsilon:
            return np.random.choice(np.arange(self.num_actions)) 
        else:
            # input is a tensor of floats
            images = self.get_image(state[0]) 
            input = torch.as_tensor(images, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(model.device)

            qs = model(input).cpu().data.numpy()
            return np.argmax(qs)

## Init Env

In [43]:
# initialize the environment 
Lx = 20
Ly = 20

env = SnakeEnv(Lx,Ly)

# The first model makes the predictions for Q-values which are used to make a action.
model = DQN(in_channels =1, num_actions=env.num_actions, input_size=env.Lx)
# The target model makes the prediction of future rewards.
# The weights of a target model get updated every 10000 steps thus when the
# loss between the Q-values is calculated the target Q-value is stable.
model_target = DQN(in_channels = 1, num_actions=env.num_actions, input_size=env.Lx)

model.to(device)
model_target.to(device)

# In the Deepmind paper they use RMSProp however then Adam optimizer
# improves training time
optimizer = torch.optim.Adam(model.parameters(), lr=0.00025)
# huber loss
loss_function = nn.HuberLoss()

num_actions = env.num_actions
action_space = np.arange(num_actions)

In [24]:
# brief check to see the snake moves as expected

env.reset()
S_new, reward, done = env.step([0,0,2,2], 2)
S_new, reward, done = env.step(S_new, 2)
S_new, reward, done = env.step(S_new, 0)
S_new, reward, done = env.step(S_new, 0)
print(env.body)
S_new, reward, done = env.step([S_new[0],S_new[1],3,3], 2)
S_new, reward, done = env.step(S_new, 0)
print(reward)

S_new, reward, done = env.step(S_new, 0)
S_new, reward, done = env.step(S_new, 0)
S_new, reward, done = env.step(S_new, 0)
S_new, reward, done = env.step(S_new, 0)
S_new, reward, done = env.step(S_new, 1)
print(reward)


env.get_image(S_new)

TypeError: SnakeEnv.step() takes 2 positional arguments but 3 were given

## Train

In [44]:
def train_step(states, actions, rewards, next_states, dones, discount):
    """
    Perform a training iteration on a batch of data sampled from the experience
    replay buffer.

    Takes as input:
        - states: a batch of states
        - actions: a batch of actions
        - rewards: a batch of rewards
        - next_states: a batch of next states
        - dones: a batch of dones
        - discount: the discount factor, standard discount factor in RL to evaluate less long term rewards
    """

    # compute targets for Q-learning
    # the max Q-value of the next state is the target for the current state
    # the image to be fed to the network is a grey scale image of the world
    images = [env.get_image(next_state) for next_state in next_states]
    input = torch.as_tensor(np.array(images), dtype=torch.float32).unsqueeze(1).to(device)
    max_next_qs = model_target(input).max(-1).values

    # if the next state is terminal, then the Q-value is just the reward
    # otherwise, we add the discounted max Q-value of the next state
    target = rewards + (1.0 - dones) * discount * max_next_qs

    # then to compute the loss, we also need the Q-value of the current state
    images = [env.get_image(state) for state in states]
    input = torch.as_tensor(np.array(images), dtype=torch.float32).unsqueeze(1).to(device)
    qs = model(input)

    # for each state, we update ONLY the Q-value of the action that was taken

    #action_masks = F.one_hot(torch.as_tensor(np.array(actions)).long(), num_actions)
    action_masks = F.one_hot(actions.long(), num_actions)
    masked_qs = (action_masks * qs).sum(dim=-1)
    loss = loss_function(masked_qs, target.detach())

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss

### train

In [45]:
# initialize the buffer, with a size of 100000, when it is full, it will remove the oldest element
buffer = ReplayBuffer(size = 100000, device=device) 

cur_frame = 0
last_100_ep_rewards = []
max_steps_per_episode = 100
max_num_episodes = 10000

epsilon = 1.0
epsilon_min = 0.1  # Minimum epsilon greedy parameter
epsilon_max = 1.0  # Maximum epsilon greedy parameter

# Train the model after 4 actions
update_after_actions = 4
# How often to update the target network
update_target_network = 10000

# Number of frames to take random action and observe output
epsilon_random_frames = 50000
# Number of frames for exploration
epsilon_greedy_frames = 100000.0

filename = 'dqn_results.txt'

env.start = np.array([0,0])

for episode in tqdm(range(max_num_episodes)):
    env.reset()
    episode_reward = 0

    # state is a tuple of 4 values made of starting position and goal position
    # start of an episode is always [0,0] for snake and a random position for goal
    start_x = env.start[0]
    start_y = env.start[1]
    goal_x = np.random.randint(0,env.Lx)
    goal_y = np.random.randint(0,env.Ly)
        
    state = [start_x, start_y, goal_x, goal_y]


    #done = False
    timestep = 0

    while timestep < max_steps_per_episode:
    
        cur_frame += 1

        state_in = torch.from_numpy(np.expand_dims(state, axis=0)).to(device)
        action = env.select_epsilon_greedy_action(model, state_in, epsilon)
        
        env.set_state_in(state)
        next_state, reward, done = env.step(action)
        episode_reward += reward
    

        # Save actions and states in replay buffer
        buffer.add(state, action, reward, next_state, done)
        state = next_state
        cur_frame += 1
    
        # Train neural network.
        if len(buffer) > batch_size and cur_frame % update_after_actions == 0:
            states, actions, rewards, next_states, dones = buffer.sample(batch_size)
            loss = train_step(states, actions, rewards, next_states, dones, discount=0.99)
        
        # Update target network every update_target_network steps.
        if cur_frame % update_target_network == 0:
            model_target.load_state_dict(model.state_dict())

        timestep += 1

        if timestep > epsilon_random_frames:
            epsilon -= (epsilon_max - epsilon_min) / epsilon_greedy_frames
            epsilon = max(epsilon, epsilon_min)
    

    
    if len(last_100_ep_rewards) == 100:
        last_100_ep_rewards = last_100_ep_rewards[1:]
    last_100_ep_rewards.append(episode_reward)

    running_reward = np.mean(last_100_ep_rewards)

    if episode+1 % 100 == 0:
        """ print(f'Episode {episode}/{max_num_episodes}. Epsilon: {epsilon:.3f}.'
        f' Reward in last 100 episodes: {running_reward:.2f}') """

        # write on file current average reward
        with open(filename, 'a') as f:
            f.write(f'{episode},{running_reward:.2f}, {epsilon:.3f}\n')

    # Condition to consider the task solved
    # e.g. to eat at least 6 consecutive food items
    # without eating itself, considering also the moves to reach the food
    if running_reward > 500: 
        print("Solved at episode {}!".format(episode))
        break

  0%|          | 6/10000 [00:33<15:42:12,  5.66s/it]


KeyboardInterrupt: 