In [182]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow import keras

In [183]:
import gym
env=gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False, render_mode="rgb_array")

In [184]:
def stateToDQNInput(state, num_states=16):
    # Create a one-hot encoded vector using NumPy
    one_hot_vector = np.zeros(num_states, dtype=np.float32)
    one_hot_vector[state] = 1
    
    # Convert the NumPy array to a PyTorch tensor
    one_hot_tensor = torch.tensor(one_hot_vector, dtype=torch.float32)
    
    return one_hot_tensor

In [185]:
import numpy as np

In [186]:
import torch

model=torch.nn.Sequential(
torch.nn.Linear(16,16),
torch.nn.ReLU(),
torch.nn.Linear(16,4),
torch.nn.Softmax(dim=-1)
)
optimizer=torch.optim.Adam(model.parameters(),lr=0.01)    

In [187]:
dr=1
lr=0.01

In [188]:
model(stateToDQNInput(0))

tensor([0.2304, 0.1912, 0.2403, 0.3381], grad_fn=<SoftmaxBackward0>)

In [189]:
from torch.distributions import Categorical

In [190]:
probs=model(stateToDQNInput(0))
probs

tensor([0.2304, 0.1912, 0.2403, 0.3381], grad_fn=<SoftmaxBackward0>)

In [191]:
m = Categorical(probs)
m

Categorical(probs: torch.Size([4]))

In [192]:
action = m.sample()
action

tensor(3)

In [193]:
env.reset()   

(0, {'prob': 1})

In [194]:
def train(numEpisodes):
    for i in range(numEpisodes):
        logProbs=[]
        rewards=[]
        state=env.reset()[0]   
        terminated=False
        truncated=False
        while(not truncated and not terminated):
            probs=model(stateToDQNInput(state))
            m = Categorical(probs)
            action = m.sample()
            log_prob = m.log_prob(action)
            state,reward,terminated,truncated,_ =env.step(action.item())
            rewards.append(reward)
            logProbs.append(log_prob)
        discountedReturns=[]
        for t in range(len(rewards)):
            G=0.0
            for k,r in enumerate(rewards[t:]):
                G=G+(dr**k)*r
            discountedReturns.append(G)
        loss=[]
        for LP,G in zip(logProbs,discountedReturns):
            loss.append(-LP*G)
        policy_loss = torch.sum(torch.stack(loss))
        print(policy_loss.item())
        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

In [195]:
train(1000)

0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
15.695785522460938
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
25.697677612304688
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.

In [196]:
images = []

In [197]:
def test():
    state = env.reset()[0]
    terminated=False
    truncated=False
    steps=0
    img = env.render()
    images.append(img)
    while(not truncated and not terminated):
        steps=steps+1
        if(steps>15):
            print("Failed to reach home. Maximum steps taken")
            break
        else:
            probs=model(stateToDQNInput(state))
            m = Categorical(probs)
            action = m.sample()
            state,reward,terminated,truncated,_ =env.step(action.item())
            img = env.render()
            images.append(img)
            

In [198]:
test()

In [199]:
import imageio
imageio.mimsave("test.png", [np.array(img) for i, img in enumerate(images)], fps=5)