In [1]:
import gym
import numpy as np
import random
import matplotlib.pyplot as plt
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from collections import deque
from IPython.display import clear_output

In [18]:
def value_iteration(env, gamma=0.9, epsilon=1e-6):
    # Initialize value function
    nS = env.nrow * env.ncol
    nA = 4
    V = np.zeros(nS)
    
    while True:
        delta = 0
        for s in range(nS):
            max_q = -np.inf
            for a in range(nA):
                q = 0
                for prob, next_state, reward, done in env.P[s][a]:
                    q += prob * (reward + gamma * V[next_state])
                max_q = max(max_q, q)
            
            delta = max(delta, np.abs(max_q - V[s]))
            V[s] = max_q
        
        if delta < epsilon:
            break
    
    # Compute optimal policy
    policy = np.zeros(nS, dtype=int)
    for s in range(nS):
        max_q = -np.inf
        best_action = 0
        for a in range(nA):
            q = 0
            for prob, next_state, reward, done in env.P[s][a]:
                q += prob * (reward + gamma * V[next_state])
            if q > max_q:
                max_q = q
                best_action = a
        
        policy[s] = best_action
    
    return policy

In [75]:
# Create the FrozenLake environment
env = gym.make('FrozenLake-v1', is_slippery=False, render_mode='human')

# Run value iteration
optimal_policy = value_iteration(env)
print(optimal_policy)

# Play one episode using the learned policy
state, _ = env.reset()
print(state)
done = False
while not done:
    action = optimal_policy[state]
    state, reward, done, _, _ = env.step(action)
    print(state)
    env.render()

env.close()


[1 2 1 0 1 0 1 0 2 1 1 0 0 2 2 0]
0


  if not isinstance(terminated, (bool, np.bool8)):


4
8
9
13
14
15


In [78]:
# Create the FrozenLake environment
env = gym.make('FrozenLake-v1', is_slippery=False)
env.reset(seed=0)

# Q-learning
number_of_states = env.observation_space.n
number_of_actions = env.action_space.n
print( "States = ", number_of_states)
print( "Actions = ", number_of_actions)

num_episodes = 1000
steps_total = []
rewards_total = []
egreedy_total = []

# PARAMS 
# Discount on reward
gamma = 0.95
# Factor to balance the ratio of action taken based on past experience to current situtation
learning_rate = 0.9

egreedy = 0.7
egreedy_final = 0.1
egreedy_decay = 0.999

Q = np.zeros([number_of_states, number_of_actions])

for i_episode in range(num_episodes):
    state, _ = env.reset()
    step = 0
    while True:
        step += 1
        
        # Act greedy sometimes to allow exploration
        if np.random.rand(1) < egreedy:
            action = env.action_space.sample()
        else:
            action = np.argmax(Q[state, :])
        
        new_state, reward, done, info, _ = env.step(action)
        
        # Update Q-Table with new knowledge
        Q[state, action] = Q[state, action] + learning_rate * (reward + gamma * np.max(Q[new_state, :]) - Q[state, action])
        
        state = new_state
        
        if done:
            steps_total.append(step)
            rewards_total.append(reward)
            egreedy_total.append(egreedy)
            # print("Episode finished after %i steps" % step)
            break
    
    # Reduce chance of random action as we train the model.
    egreedy *= egreedy_decay
    egreedy = max(egreedy_final, egreedy)

print("Percent of episodes finished successfully: {0}".format(sum(rewards_total)/num_episodes))
print("Percent of episodes finished successfully (last 100 episodes): {0}".format(sum(rewards_total[-100:])/100))
print("Average number of steps: %.2f" % (sum(steps_total)/num_episodes))
print("Average number of steps (last 100 episodes): %.2f" % (sum(steps_total[-100:])/100))
print(Q)

States =  16
Actions =  4
Percent of episodes finished successfully: 0.451
Percent of episodes finished successfully (last 100 episodes): 0.68
Average number of steps: 7.91
Average number of steps (last 100 episodes): 6.32
[[0.73509189 0.77378094 0.77378094 0.73509189]
 [0.73509189 0.         0.81450625 0.77378094]
 [0.77378094 0.857375   0.77358935 0.81450625]
 [0.81449089 0.         0.69508966 0.        ]
 [0.77378094 0.81450625 0.         0.73509189]
 [0.         0.         0.         0.        ]
 [0.         0.9025     0.         0.81450616]
 [0.         0.         0.         0.        ]
 [0.81450625 0.         0.857375   0.77378094]
 [0.81450625 0.9025     0.9025     0.        ]
 [0.857375   0.95       0.         0.857375  ]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.         0.9025     0.95       0.857375  ]
 [0.9025     0.95       1.         0.9025    ]
 [0.         0.         0.         0.        ]]


In [79]:
env = gym.make('FrozenLake-v1', is_slippery=False, render_mode='human')
state, _ = env.reset(seed=0)

done = False
while not done:
    action = np.argmax(Q[state, :])
    state, reward, done, _, _ = env.step(action)
    env.render()
env.close()

In [64]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class QNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, output_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

def deep_q_learning(env, num_episodes=1000, gamma=0.99, epsilon=1.0, epsilon_decay=0.99, epsilon_min=0.01, batch_size=32):
    input_dim = env.observation_space.n
    output_dim = env.action_space.n
    
    # Build the Deep Q-network
    model = QNetwork(input_dim, output_dim)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Initialize replay memory
    replay_memory = []
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False
        
        while not done:
            # Epsilon-greedy exploration strategy
            if np.random.uniform(0, 1) < epsilon:
                action = env.action_space.sample()
            else:
                q_values = model(torch.FloatTensor(np.eye(input_dim)[state]))
                action = torch.argmax(q_values).item()
            
            next_state, reward, done, _, _ = env.step(action)
            
            # Store the transition in replay memory
            replay_memory.append((state, action, reward, next_state, done))
            
            state = next_state
            
            # Sample a random minibatch from replay memory
            if len(replay_memory) >= batch_size:
                minibatch = np.random.choice(len(replay_memory), batch_size, replace=False)
                
                states_mb = torch.FloatTensor(np.eye(input_dim)[np.array([replay_memory[idx][0] for idx in minibatch])])
                actions_mb = torch.LongTensor(np.array([replay_memory[idx][1] for idx in minibatch]))
                rewards_mb = torch.FloatTensor(np.array([replay_memory[idx][2] for idx in minibatch]))
                next_states_mb = torch.FloatTensor(np.eye(input_dim)[np.array([replay_memory[idx][3] for idx in minibatch])])
                dones_mb = torch.FloatTensor(np.array([replay_memory[idx][4] for idx in minibatch]))
                
                # Calculate target Q-values using the Bellman equation
                Q_targets = model(states_mb).gather(1, actions_mb.unsqueeze(1)).squeeze(1)
                Q_targets_next = model(next_states_mb).max(1)[0].detach()
                Q_targets = rewards_mb + gamma * (1 - dones_mb) * Q_targets_next
                
                # Update the Q-network
                loss = F.mse_loss(Q_targets.unsqueeze(1), model(states_mb).gather(1, actions_mb.unsqueeze(1)))
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
            
            # Decay epsilon
            epsilon = max(epsilon * epsilon_decay, epsilon_min)
    
    # Extract optimal policy from the trained Q-network
    policy = np.argmax(model(torch.FloatTensor(np.eye(input_dim))).detach().numpy(), axis=1)
    
    return policy

In [80]:
# Create the FrozenLake environment
env = gym.make('FrozenLake-v1', is_slippery=False)
env.reset(seed=0)
# params
num_episodes = 200
batch_size = 8
gamma = 0.95
epsilon = 1.0
epsilon_decay = 0.97
epsilon_min = 0.01

optimal_policy = deep_q_learning(env, num_episodes, gamma, epsilon, epsilon_decay, epsilon_min, batch_size)
print(optimal_policy)

[1 0 0 0 1 1 3 2 2 2 1 2 2 3 2 0]


In [74]:
env = gym.make('FrozenLake-v1', is_slippery=False, render_mode='human')
state, _ = env.reset(seed=0)
done = False
while not done:
    action = optimal_policy[state]
    state, reward, done, *_ = env.step(action)
    env.render()

env.close()