In [None]:
import numpy as np
import torch
import torch.nn as nn
import gymnasium as gym

In [None]:
env = gym.make('FrozenLake-v1',is_slippery=False)

In [None]:
class OneHotWrapper(gym.ObservationWrapper):

  def __init__(self, env):
    super(OneHotWrapper, self).__init__(env)
    self.observation_space = gym.spaces.Box(0.0, 1.0,
                (env.observation_space.n, ), dtype=np.float32)

  def observation(self, observation):
      r = np.copy(self.observation_space.low)
      r[observation] = 1.0
      return r

env = OneHotWrapper(env)

In [None]:
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
HIDDEN_SIZE = 32


net= nn.Sequential(
     nn.Linear(obs_size, HIDDEN_SIZE),
     nn.Sigmoid(),
     nn.Linear(HIDDEN_SIZE, n_actions)
)

In [None]:
objective = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=net.parameters(), lr=0.001)

In [None]:
sm = nn.Softmax(dim=1)

def select_action(state):
        state_t = torch.FloatTensor([state])
        act_probs_t = sm(net(state_t))
        act_probs = act_probs_t.data.numpy()[0]
        action = np.random.choice(len(act_probs), p=act_probs)
        return action

In [None]:
#main variables
BATCH_SIZE = 100

GAMMA = 0.9

PERCENTILE = 30
REWARD_GOAL = 0.8

#helper classes
from collections import namedtuple

Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])

In [None]:
#initialization of variables
iter_no = 0
reward_mean = 0
full_batch = []
batch = []
episode_steps = []
episode_reward = 0.0
state_ = env.reset()
state = state_[0]

while reward_mean < REWARD_GOAL:
        action = select_action(state)
        next_state, reward, episode_is_done, _ , info = env.step(action)

        episode_steps.append(EpisodeStep(observation=state, action=action))
        episode_reward += reward

        if episode_is_done: # Episode finished
            batch.append(Episode(reward=episode_reward, steps=episode_steps))
            next_state_ = env.reset()
            next_state = next_state_[0]
            episode_steps = []
            episode_reward = 0.0

            if len(batch) == BATCH_SIZE: # New set of batches ready --> select "elite"
                reward_mean = float(np.mean(list(map(lambda s: s.reward, batch))))
                elite_candidates= batch
                returnG = list(map(lambda s: s.reward * (GAMMA ** len(s.steps)), elite_candidates))
                reward_bound = np.percentile(returnG, PERCENTILE)

                train_obs = []
                train_act = []
                elite_batch = []
                for example, discounted_reward in zip(elite_candidates, returnG):
                        if discounted_reward > reward_bound:
                              train_obs.extend(map(lambda step: step.observation, example.steps))
                              train_act.extend(map(lambda step: step.action, example.steps))
                              elite_batch.append(example)
                full_batch=elite_batch
                state=train_obs
                acts=train_act


                if len(full_batch) != 0 : # just in case empty during an iteration
                 state_t = torch.FloatTensor(state)
                 acts_t = torch.LongTensor(acts)
                 optimizer.zero_grad()
                 action_scores_t = net(state_t)
                 loss_t = objective(action_scores_t, acts_t)
                 loss_t.backward()
                 optimizer.step()
                 print("%d: loss=%.3f, reward_mean=%.3f" % (iter_no, loss_t.item(), reward_mean))
                 iter_no += 1
                batch = []
        state = next_state

  state_t = torch.FloatTensor([state])


[1;30;43mDie letzten 5000Â Zeilen der Streamingausgabe wurden abgeschnitten.[0m
96407: loss=0.492, reward_mean=0.620
96408: loss=0.492, reward_mean=0.570
96409: loss=0.484, reward_mean=0.680
96410: loss=0.501, reward_mean=0.660
96411: loss=0.485, reward_mean=0.640
96412: loss=0.486, reward_mean=0.610
96413: loss=0.479, reward_mean=0.600
96414: loss=0.472, reward_mean=0.590
96415: loss=0.468, reward_mean=0.680
96416: loss=0.466, reward_mean=0.630
96417: loss=0.512, reward_mean=0.580
96418: loss=0.477, reward_mean=0.560
96419: loss=0.479, reward_mean=0.650
96420: loss=0.514, reward_mean=0.540
96421: loss=0.470, reward_mean=0.680
96422: loss=0.462, reward_mean=0.670
96423: loss=0.489, reward_mean=0.590
96424: loss=0.505, reward_mean=0.610
96425: loss=0.455, reward_mean=0.590
96426: loss=0.456, reward_mean=0.520
96427: loss=0.459, reward_mean=0.600
96428: loss=0.479, reward_mean=0.640
96429: loss=0.496, reward_mean=0.660
96430: loss=0.480, reward_mean=0.570
96431: loss=0.484, reward_mean

In [None]:
test_env = OneHotWrapper(gym.make('FrozenLake-v1', is_slippery=False))
state_= test_env.reset()
state = state_[0]
test_env.render()
is_done = False
while not is_done:
   action = select_action(state)
   print(action)
   new_state, reward, is_done, _ , info = test_env.step(action)
   test_env.render()
   state = new_state
print("reward = ", reward)

1
1
2
1
2
1
1
1
1
2
reward =  1.0


In [None]:
from google.colab import drive
drive.mount('/content/drive')
save_path = '/content/drive/My Drive/model_weights.pth'
torch.save(net.state_dict(), save_path)

Mounted at /content/drive


In [None]:
checkpoint = {
    'epoch': iter_no,
    'model_state_dict': net.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': loss_t,
}
torch.save(checkpoint, '/content/drive/My Drive/checkpoint.pth')