Tutorial: https://www.youtube.com/playlist?list=PL58zEckBH8fCMIVzQCRSZVPUp3ZAVagWi

In [1]:
# just running the game
import gymnasium as gym
import flappy_bird_gymnasium

env = gym.make('FlappyBird-v0', render_mode='human', use_lidar=False)

obs, _ = env.reset()

while True:
    action = env.action_space.sample()
    obs, reward, done, _, info = env.step(action)
    if done:
        break
    
env.close()

Video 2 notes
A Deep Q Network is a regular deep NN - sounds like the replay stuff is outside the neural network

Inputs for flappy birds are the position information for the pipes and bird,  the outputs are the Q values for flap or not - the expected reward for each action


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(DQN, self).__init__()
        # defining the layers
        # fc1 is the transformation from state (input) to hidden layer
        # fc2 is the transformation from hidden layer to action (output)
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, action_dim)
        
    def forward(self, x):
        # forward pass through the network
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [3]:
# run the NN one time through
state_dim = 12 # 12 inputs
action_dim = 2 # 2 outputs
net = DQN(state_dim, action_dim) # create the network
 # generate random states, first dimension is batch size (number of rand states)
state = torch.randn(10, state_dim)
output = net(state) # forward pass through the network
print(state)

tensor([[ 9.9993e-01,  1.1834e-01,  6.9033e-01, -3.9294e-01, -2.2587e-01,
          1.1529e+00,  4.5699e-01,  3.4491e-01,  2.7865e-01,  5.3891e-01,
         -1.3032e+00, -3.8859e-01],
        [ 3.1827e-01, -1.8708e+00, -1.2252e+00, -9.7853e-01,  1.0051e+00,
          1.6186e+00, -5.1980e-02,  2.2819e-01,  4.5173e-02, -2.9757e-01,
         -1.2731e-01, -7.0373e-01],
        [ 1.8289e+00,  3.7351e+00, -1.0253e+00, -4.2256e-01, -1.1814e+00,
          1.3474e+00, -2.1806e-01,  1.2536e+00,  7.2125e-02, -7.5097e-01,
          2.4802e-01, -1.5918e+00],
        [ 4.5894e-01, -6.3050e-01, -6.0852e-01, -9.9601e-01,  2.7270e-01,
         -2.9998e-01,  1.7015e+00,  2.7607e-01, -1.6556e+00, -3.9673e-03,
          6.1247e-01, -2.5779e-01],
        [-1.9561e-01, -6.8119e-01,  9.7846e-01,  2.6073e-01, -4.0109e-01,
         -6.4016e-01, -4.1082e-01,  1.0213e+00, -3.4293e-01, -2.7927e-01,
         -1.0036e-01, -2.0914e-01],
        [-1.7060e+00,  1.9428e+00,  4.8418e-01, -5.6570e-04,  1.4472e-01,
      

In [4]:
# make a class for the agent

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class Agent:
    def run (self, is_train, render=False):
        # run the agent
        env = gym.make('FlappyBird-v0', render_mode='human' if render else None, use_lidar=False)
        
        state_dim = env.observation_space.shape[0]
        action_dim = env.action_space.n
        
        policy_net = DQN(state_dim, action_dim).to_device(device)
        
        obs, _ = env.reset()
        while True:
            action = env.action_space.sample()
            obs, reward, done, _, info = env.step(action)
            if done:
                break
        env.close()

Video 3 notes

Experience replay:
  - an experience is defined as a tuple of (state, action, reward, next_state, terminated)
  - save these experiences in a replay buffer (first in first out)
  - the epsilon ( $\epsilon$ ) greedy policy is used to select the action
    - $\epsilon$ is the probability of selecting a random action, else the best action is selected
    - kinda of a stochastic annealing type thing
  

In [5]:
# replay memory
from collections import deque
import random

class ReplayMemory:
    def __init__(self, capacity, seed=None):
        self.memory = deque(maxlen=capacity)
        if seed is not None:
            random.seed(seed)
        
    def append(self, transition): # transition is a tuple of (state, action, next_state, reward, done)
        # append a transition to the buffer
        self.memory.append(transition)
        
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)

In [6]:
import itertools
import yaml

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class Agent:
    def __init__(self, hyperparam_option):
        with open('hyperparameters.yml', 'r') as f:
            all_hyperparams = yaml.safe_load(f)
            self.hyperparams = all_hyperparams[hyperparam_option]
            self.replay_memory_size = self.hyperparams['replay_memory_size'] # size of the replay memory
            self.mini_batch_size = self.hyperparams['mini_batch_size'] # size of the training data set sampled from the replay memory
            self.epsilon_init = self.hyperparams['epsilon_init'] # proportion of actions that are random
            self.epsilon_decay = self.hyperparams['epsilon_decay'] # decay rate of epsilon
            self.epsilon_min = self.hyperparams['epsilon_min'] # minimum value of epsilon
    
    
    
    def run (self, is_train, render=False):
        # run the agent
        env = gym.make('FlappyBird-v0', render_mode='human' if render else None, use_lidar=False)
        
        state_dim = env.observation_space.shape[0]
        action_dim = env.action_space.n
        
        rewards_per_episode = []
        epsilon_history = []
        
        policy_net = DQN(state_dim, action_dim).to(device)
        
        if is_train:
            memory = ReplayMemory(capacity=10000)
            epsilon = self.epsilon_init
        
        
        
        for episode in itertools.count():
            state, _ = env.reset()
            # convert anything going into the network to a tensor
            state = torch.tensor(state, dtype=torch.float, device=device).to(device)
            
            episode_reward = 0.0
            done = False
            while not done:
                # Picking an action
                if is_train and random.random() < epsilon:
                    action = env.action_space.sample()
                    action = torch.tensor(action, dtype=torch.int64, device=device)
                else:
                    with torch.no_grad():
                        action = policy_net(state.unsqueeze(dim=0)).squeeze().argmax()
                
                # Processing
                new_state, reward, done, _, info = env.step(action.item())
                
                # accumulate reward
                episode_reward += reward
                
                # convert new state and reward to tensors on device
                new_state = torch.tensor(new_state, dtype=torch.float, device=device)
                reward = torch.tensor(reward, dtype=torch.float, device=device)
                
                if is_train:
                    memory.append((state, action, new_state, reward, done))
                    
                state = new_state
                
            rewards_per_episode.append(episode_reward)
            
            # in this implementation we're using a geometric decay for epsilon (taking the product of epsilon_decay and current epsilon)
            # a linear decay is another option, decreasing epsilon by a fixed amount each episode (adjust epsilon_decay hyperparameter accordingly)
            epsilon = max(epsilon * self.epsilon_decay, self.epsilon_min)
            epsilon_history.append(epsilon)
        

In [12]:
agent = Agent('cartpole1')
agent.run(is_train=True)

KeyboardInterrupt: 

Video 5 notes
- Now we have a policy network, which determines the action we should take
- We need a target network, which is used to calculate future Q values
- Every once in a while we copy the policy network over to the training network
- This stabilizes the training process, so that the network used to estimate outcomes doesn't change every time we try to make a decision

In [11]:
import itertools
import yaml

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class Agent:
    def __init__(self, hyperparam_option):
        with open('hyperparameters.yml', 'r') as f:
            all_hyperparams = yaml.safe_load(f)
            self.hyperparams = all_hyperparams[hyperparam_option]
            self.replay_memory_size = self.hyperparams['replay_memory_size'] # size of the replay memory
            self.mini_batch_size = self.hyperparams['mini_batch_size'] # size of the training data set sampled from the replay memory
            self.epsilon_init = self.hyperparams['epsilon_init'] # proportion of actions that are random
            self.epsilon_decay = self.hyperparams['epsilon_decay'] # decay rate of epsilon
            self.epsilon_min = self.hyperparams['epsilon_min'] # minimum value of epsilon
            self.network_sync_rate = self.hyperparams['network_sync_rate'] # how often to update the target network
            self.learning_rate_a = self.hyperparams['learning_rate_a'] 
            self.discount_factor_g = self.hyperparams['discount_factor_g'] # how much to discount future rewardsvs sooner rewards
            self.loss_fn = nn.MSELoss() # loss function (mean squared error)
            self.optimizer = None
    
    
    
    def run (self, is_train, render=False):
        # run the agent
        env = gym.make('FlappyBird-v0', render_mode='human' if render else None, use_lidar=False)
        
        state_dim = env.observation_space.shape[0]
        action_dim = env.action_space.n
        
        rewards_per_episode = []
        epsilon_history = []
        
        policy_net = DQN(state_dim, action_dim).to(device)
        
        if is_train:
            memory = ReplayMemory(capacity=10000)
            epsilon = self.epsilon_init
            target_net = DQN(state_dim, action_dim).to(device)
            target_net.load_state_dict(policy_net.state_dict())
            
            step_count = 0
            
            # policy network optimizer
            self.optimizer = torch.optim.Adam(policy_net.parameters(), lr=self.learning_rate_a)
        
        
        
        for episode in itertools.count():
            state, _ = env.reset()
            # convert anything going into the network to a tensor
            state = torch.tensor(state, dtype=torch.float, device=device).to(device)
            
            episode_reward = 0.0
            done = False
            while not done:
                # Picking an action
                if is_train and random.random() < epsilon:
                    action = env.action_space.sample()
                    action = torch.tensor(action, dtype=torch.int64, device=device)
                else:
                    with torch.no_grad():
                        action = policy_net(state.unsqueeze(dim=0)).squeeze().argmax()
                
                # Processing
                new_state, reward, done, _, info = env.step(action.item())
                
                # accumulate reward
                episode_reward += reward
                
                # convert new state and reward to tensors on device
                new_state = torch.tensor(new_state, dtype=torch.float, device=device)
                reward = torch.tensor(reward, dtype=torch.float, device=device)
                
                if is_train:
                    memory.append((state, action, new_state, reward, done))
                    
                    step_count += 1
                    
                state = new_state
                
            rewards_per_episode.append(episode_reward)
            
            # in this implementation we're using a geometric decay for epsilon (taking the product of epsilon_decay and current epsilon)
            # a linear decay is another option, decreasing epsilon by a fixed amount each episode (adjust epsilon_decay hyperparameter accordingly)
            epsilon = max(epsilon * self.epsilon_decay, self.epsilon_min)
            epsilon_history.append(epsilon)
            
            if len(memory) >= self.mini_batch_size:
                mini_batch = memory.sample(self.mini_batch_size)
                
                self.optimize(mini_batch, policy_net, target_net)
                
                if step_count > self.network_sync_rate:
                    target_net.load_state_dict(policy_net.state_dict())
                    step_count = 0
    
    def optimize(self, mini_batch, policy_net, target_net):
        # slow but easy to understand version 
        # for state, action, new_state, reward, done in mini_batch:
        #     if done:
        #         target_q = reward
        #     else:
        #     # calculate the target value
        #         with torch.no_grad():
        #             target_q = reward + self.discount_factor_g * target_net(new_state).max()
            
        #     current_q = policy_net(state)
            
        #     loss = self.loss_fn(current_q, target_q)
            
        #     self.optimizer.zero_grad() # clear the gradients
        #     loss.backward() # compute gradients (backpropagation)
        #     self.optimizer.step()
        
        # fast version
        # transpose the batch of experiences
        states, actions, new_states, rewards, dones = zip(*mini_batch)
        
        # stack tensors to create batch tensors
        states = torch.stack(states)
        actions = torch.stack(actions)
        new_states = torch.stack(new_states)
        rewards = torch.stack(rewards)
        dones = torch.tensor(dones).float().to(device)
        
        with torch.no_grad():
            # calculate target q values (expected future rewards)
            target_q = rewards + (1-dones) * self.discount_factor_g * target_net(new_states).max(dim=1)[0]
            
        # calculate the Q value from the current policy
        current_q = policy_net(states).gather(dim=1, index=actions.unsqueeze(dim=1)).squeeze()
            
        loss = self.loss_fn(current_q, target_q)
            
        self.optimizer.zero_grad() # clear the gradients
        loss.backward() # compute gradients (backpropagation)
        self.optimizer.step()
        
        
            
        