In [1]:
import numpy as np

In [3]:
 transition_probabilities = [  # shape=[s, a, s']
                                [[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]],
                                [[0.0, 1.0, 0.0], None, [0.0, 0.0, 1.0]],
                                [None, [0.8, 0.1, 0.1], None]
                            ]
 rewards = [  # shape=[s, a, s']
            [[+10, 0, 0], [0, 0, 0], [0, 0, 0]],
            [[0, 0, 0], [0, 0, 0], [0, 0, -50]],
            [[0, 0, 0], [+40, 0, 0], [0, 0, 0]]
        ]
 # shape = [s]
 possible_actions = [[0, 1, 2], [0, 2], [1]]

## Q-value Iteration Algorithm :)
Q_{k+1}(s,a) = ∑_{s'} T(s,a,s') * [R(s,a,s') + γ * max_{a'} Q_k(s',a')] for all s,a


In [206]:
q_values = np.full((3,3), -np.inf)
for state, action in enumerate(possible_actions):
    q_values[state][action] = 0

In [207]:
gamma = 0.90
for iteration in range(50):
    q_prev = q_values.copy()
    for s in range(3):
        for a in possible_actions[s]:
            q_values[s][a] = np.sum([transition_probabilities[s][a][sp] * (rewards[s][a][sp] + gamma * q_prev[sp].max()) for sp in range(3)]) 
q_values

array([[18.91891892, 17.02702702, 13.62162162],
       [ 0.        ,        -inf, -4.87971488],
       [       -inf, 50.13365013,        -inf]])

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import gymnasium as gym

#creating the gym env
env = gym.make('CartPole-v1', render_mode='rgb_array')
obs, info = env.reset(seed=42)

class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(4,32),
            nn.ELU(),
            nn.Linear(32,32),
            nn.ELU(),
            nn.Linear(32,2)
        )
    def forward(self, x):
        return self.model(x)

model = SimpleModel()
# list(model.parameters())
shapes = [param.shape for param in model.parameters()]
print(shapes)

input_shape = [4]  # == env.observation_space.shape
n_outputs = 2  # == env.action_space.n

def epsilon_greedy_policy(state, epsilon=0):
    if(np.random.rand()<epsilon):
        return np.random.randint(model.model[-1].out_features) # (model.model[-1].out_features) = 2 
    else:
        Q_val = model(torch.tensor(state, dtype=torch.float32).unsqueeze(0))
        # return Q_val.argmax(dim=1).item()
        return Q_val
state = np.array([0.1, 0.2, 0.3, 0.8])  # Example state
action = epsilon_greedy_policy(state, epsilon=0.1)
print(action)

from collections import deque
replay_buffer = deque(maxlen=2000)

#Each expereince consists of -> [s, a, r, s', done, truncated]

def sample_experiences(batch_size):
    """
    It will just sample expereince from replay buffer

    Args:
        batch_size (int): self_explanatory

    Returns:
        Here 
            1. We could directly return BATCH which would have following shape (batch_size, 6) for example:
            
                [
                    ([1, 2, 3, 4], 0, 1.0, [5, 6, 7, 8], False, False),    # Experience 1
                    ([17, 18, 19, 20], 0, -1.0, [21, 22, 23, 24], False, True), # Experience 2
                    ([33, 34, 35, 36], 0, 0.0, [37, 38, 39, 40], True, True)    # Experience 3
                ]
            
            2. We are here returning all s together, then all a together then all rewards and so on, eg:
            
                states      : [[1, 2, 3, 4], [17, 18, 19, 20], [33, 34, 35, 36]]
                actions	    : [0, 0, 0]
                rewards	    : [1.0, -1.0, 0.0]
                next_states	: [[5, 6, 7, 8], [21, 22, 23, 24], [37, 38, 39, 40]]
                dones       : [False, False, True]
                truncated   : [False, True, True]
                
    """
    
    indices = np.random.randint(len(replay_buffer), size=batch_size)
    batch = [replay_buffer[index] for index in indices]
    return [np.array([experience[field_index] for experience in batch]) for field_index in range(6)]

def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    #taking action in a particular environemnt
    next_state, reward, done, truncated, info = env.step(action=action)
    #entering elements in deque
    replay_buffer.append((state, action, reward, next_state, done, truncated))
    return next_state, reward, done, truncated, info  


batch_size = 32
discount_factor = 0.95
optimizer = torch.optim.NAdam(model.parameters(), lr=1e-2)
loss_fn = nn.MSELoss()

def training_step(batch_size):
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones, truncateds = experiences
    
    states = torch.FloatTensor(states)
    actions = torch.LongTensor(actions)
    rewards = torch.FloatTensor(rewards)
    next_states = torch.FloatTensor(next_states)
    dones = torch.FloatTensor(dones)
    truncateds = torch.FloatTensor(truncateds)
    
    with torch.no_grad():
        #******************* Vaiii this is not trainable-> Just predicting ********************
        next_Q_values = model(next_states)
        
        max_next_Q_values, _ = next_Q_values.max(dim=1)
        
        # Ensure dones and truncateds are boolean tensors
        if not dones.dtype == torch.bool:
            dones = dones.bool()
        if not truncateds.dtype == torch.bool:
            truncateds = truncateds.bool()
            
        # Now perform the logical OR and convert to float
        terminal = dones | truncateds
        runs = 1.0 - terminal.float()
        
        target_Q_values = rewards + runs * discount_factor * max_next_Q_values
        target_Q_values = target_Q_values.unsqueeze(1)
    
    mask = F.one_hot(actions, num_classes=n_outputs)
    
    # This one is trainable here
    all_Q_values = model(states)
    Q_values = torch.sum(all_Q_values * mask, dim=1, keepdims=True)
    
    loss = loss_fn(Q_values, target_Q_values)
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

for episode in range(600):
    obs, info = env.reset()
    for step in range(200):
        epsilon = max(1 - episode / 500, 0.01)
        obs, reward, done, truncated, info = play_one_step(env, obs, epsilon)
        if done or truncated:
            break
    if episode > 50:
        training_step(batch_size)
