In [1]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Hyperparameters
learning_rate = 0.001
gamma = 0.99
map_size = 4  # 4x4 or 8x8 grid

class Policy(nn.Module):
    def __init__(self, state_size, action_size):
        super(Policy, self).__init__()
        self.data = []
        
        # Network architecture for discrete states
        self.embed = nn.Embedding(state_size, 16)
        self.fc1 = nn.Linear(16, 128)
        self.fc2 = nn.Linear(128, action_size)
        
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        
    def forward(self, x):
        # Convert discrete state to embedding
        x = self.embed(x)
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=-1)
        return x
      
    def put_data(self, item):
        self.data.append(item)
        
    def train_net(self):
        R = 0
        self.optimizer.zero_grad()
        
        # Calculate returns and update policy
        returns = []
        for r, _ in reversed(self.data):
            R = r + gamma * R
            returns.insert(0, R)
            
        for (r, prob), G in zip(self.data, returns):
            loss = -torch.log(prob) * G
            loss.backward()
            
        self.optimizer.step()
        self.data = []


In [2]:

env = gym.make('FrozenLake-v1', 
                map_name=f"{map_size}x{map_size}",
                is_slippery=False)

state_size = env.observation_space.n
action_size = env.action_space.n
pi = Policy(state_size, action_size)
print_interval = 100




In [3]:
score = 0.0
for n_epi in range(10000):
    s, _ = env.reset()
    done = False
    
    while not done:
        # Convert state to tensor
        s_tensor = torch.tensor(s)
        
        # Get action probabilities
        prob = pi(s_tensor)
        m = Categorical(prob)
        a = m.sample()
        
        # Take action
        s_prime, r, terminated, truncated, _ = env.step(a.item())
        done = terminated or truncated
        
        # Store transition
        pi.put_data((r, prob[a]))
        s = s_prime
        score += r
        
    # Update policy after each episode
    pi.train_net()
    
    # if n_epi % print_interval == 0 and n_epi != 0:
    print(f"Ep {n_epi:5d} | Avg Score: {score/print_interval:.2f}")
    score = 0.0
        
env.close()



In [None]:
env = gym.make('FrozenLake-v1', 
                map_name=f"{map_size}x{map_size}", 
                is_slippery=False, 
                render_mode="human")
state_size = env.observation_space.n
action_size = env.action_space.n

# Load trained policy
# policy = Policy(state_size, action_size)
# policy.load_state_dict(torch.load(model_path, map_location='cpu'))
policy.eval()

successes = 0
for ep in range(episodes):
    s, _ = env.reset()
    done = False
    total_reward = 0
    steps = 0
    while not done:
        s_tensor = torch.tensor(s)
        with torch.no_grad():
            probs = policy(s_tensor)
        action = torch.argmax(probs).item()  # Greedy action
        s, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward
        steps += 1
        env.render()
        time.sleep(sleep_time)
    print(f"Episode {ep+1}: Reward {total_reward}, Steps {steps}, {'Success' if reward == 1 else 'Fail'}")
    if reward == 1:
        successes += 1

print(f"\nSuccess rate: {successes}/{episodes} ({100*successes/episodes:.1f}%)")
env.close()
