### Импорт модулей

In [1]:
import random
from collections import namedtuple
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import time
import matplotlib.pyplot as plt
from IPython import display
from IPython.display import clear_output

### Попытка завести на GTX 1050. Но CUDA 11.6 а надо 11.8 :(

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

### Задания констант для нейросети, обучения и протоколирования

In [3]:
HIDDEN_LAYER1 = 128
HIDDEN_LAYER2 = 32
LEARNING_RATE = 0.001

BATCH_SIZE = 100
PERCENTILE = 30
DISCOUNT = 0.9

Episode = namedtuple('Episode', ['reward', 'reward_with_discount', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', ['state', 'action'])

### Классы энвайромента и нейросети, а также функции создания батчей и их фильтрации

In [4]:
class DiscreteOneHotWrapper(gym.ObservationWrapper):
    def __init__(self, env):
        super(DiscreteOneHotWrapper, self).__init__(env)
        self.observation_space = gym.spaces.Box(0.0, 1.0, (env.observation_space.n, ), dtype=np.float32)

    def observation(self, observation):
        res = np.copy(self.observation_space.low)
        res[observation] = 1.0
        return res

class NeuralNet(nn.Module):
    # def __init__(self, obs_size, hidden_layer1, n_actions):
    def __init__(self, obs_size, hidden_layer1, hidden_layer2, n_actions):
        super(NeuralNet, self).__init__()
        # self.net = nn.Sequential(
        #     nn.Linear(obs_size, hidden_layer1),
        #     nn.ReLU(),
        #     nn.Linear(hidden_layer1, n_actions),
        # )
        
        self.net = nn.Sequential(
            nn.Linear(obs_size, hidden_layer1),
            nn.ReLU(),
            nn.Linear(hidden_layer1, hidden_layer2),
            nn.ReLU(),
            nn.Linear(hidden_layer2, n_actions)
        )

    def forward(self, x):
        return self.net(x)

def generate_batches_of_episodes(env, net, batch_size, actions_n):

    episode_reward = 0.0
    episode_steps = []
    batch = []

    sm = nn.Softmax(dim=1)

    # Reset the environment and capture the current state
    state, _ = env.reset()

    while True:

        # Use the neural network with random.choice to choose an action
        state_t = torch.FloatTensor([state])
        action_probs_t = sm(net(state_t))
        action_probs = action_probs_t.data.numpy()[0]
        action = np.random.choice(actions_n, p=action_probs)

        # Apply a step using the chosen action
        next_state, reward, terminated, truncated, _ = env.step(action)

        # Add the reward to the total reward for this episode
        episode_reward += reward

        # Record the state before the action was taken and the action itself
        episode_steps.append(EpisodeStep(state=state, action=action))

        # Check if the episode has ended
        if terminated or truncated:

            # Discount the total episode reward to create variability between episodes
            episode_reward_with_discount = episode_reward * (DISCOUNT ** len(episode_steps))

            # Record the episode
            batch.append(Episode(reward=episode_reward, reward_with_discount=episode_reward_with_discount, steps=episode_steps))

            # Reset vars
            episode_reward = 0.0
            episode_steps = []
            next_state, _ = env.reset()

            if len(batch) == batch_size:

                # Return the batch to the training loop
                yield batch
                batch = []

        state = next_state

def filter_batch(batch, percentile):

    # Set a threshold based on the n-th percentile of discounted episode rewards within the batch
    episode_reward_threshold = np.percentile(list(map(lambda s: s.reward_with_discount, batch)), percentile)

    best_episodes = []
    batch_states = []
    batch_actions = []

    for episode in batch:
        if episode.reward_with_discount > episode_reward_threshold:

            # Add the states and actions from a high performing episode
            batch_states.extend(map(lambda step: step.state, episode.steps))
            batch_actions.extend(map(lambda step: step.action, episode.steps))

            best_episodes.append(episode)

    return best_episodes[-500:], torch.FloatTensor(batch_states), torch.LongTensor(batch_actions), episode_reward_threshold

def visualize(env):
    plt.imshow(env.render(mode='rgb_array'))
    display.display(plt.gcf())
    display.clear_output(wait=True)

def render_n_steps(env, net, steps_n):

    sm = nn.Softmax(dim=1) 
    state, _ = env.reset()

    for i in range(steps_n):

        state_t = torch.FloatTensor([state])

        if net is None:
            # Choose a random step
            action = env.action_space.sample()
        else:
            # Choose a step using the (trained) neural network
            action_probs_t = sm(net(state_t))
            action = np.argmax(action_probs_t.data.numpy()[0])

        state, reward, terminated, truncated, _ = env.step(action)

        # Render the step on the display
        env.render()

        if terminated or truncated: state, _ = env.reset()            


### Обучение

In [5]:
# env = DiscreteOneHotWrapper(gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False, render_mode="human"))
# render_n_steps(env, None, 50)

# env = DiscreteOneHotWrapper(gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False))
env = DiscreteOneHotWrapper(gym.make("FrozenLake-v1", desc=["SFFF", "FHHF", "FFHF", "HFGF"], map_name="4x4", is_slippery=False))

observation_shape = env.observation_space.shape[0]
actions_n = env.action_space.n
    
net = NeuralNet(observation_shape, HIDDEN_LAYER1, HIDDEN_LAYER2, actions_n)
# net = NeuralNet(observation_shape, HIDDEN_LAYER1, actions_n)
net = net.to(device)
objective = nn.CrossEntropyLoss()
optimiser = optim.Adam(params=net.parameters(), lr=LEARNING_RATE)

best_episodes_memory = []

for iteration, batch in enumerate(generate_batches_of_episodes(env, net, BATCH_SIZE, actions_n)):

    mean_episode_reward = float(np.mean(list(map(lambda s: s.reward, batch))))
    mean_episode_reward_with_discount = float(np.mean(list(map(lambda s: s.reward_with_discount, batch))))

    # Check the mean reward within the batch
    if mean_episode_reward > 0.85:
        print("Environment solved!")
        break

    best_episodes_memory, batch_states_t, batch_actions_t, episode_reward_threshold = filter_batch(best_episodes_memory+batch, PERCENTILE)

    if not best_episodes_memory:
        # print("Skip step")
        continue

    optimiser.zero_grad()
    action_predictions = net(batch_states_t)
    loss = objective(action_predictions, batch_actions_t)
    loss.backward()
    optimiser.step()

    # Report performance
    print(f"{iteration}:\tLoss: {round(loss.item(), 4)}\tMean ep reward: {round(mean_episode_reward, 4)}\tMean ep reward with disc: {round(mean_episode_reward_with_discount, 4)}")


  state_t = torch.FloatTensor([state])
  if not isinstance(terminated, (bool, np.bool8)):


2:	Loss: 1.3719	Mean ep reward: 0.01	Mean ep reward with disc: 0.0019
3:	Loss: 1.3729	Mean ep reward: 0.01	Mean ep reward with disc: 0.0017
4:	Loss: 1.3617	Mean ep reward: 0.02	Mean ep reward with disc: 0.0079
5:	Loss: 1.3494	Mean ep reward: 0.02	Mean ep reward with disc: 0.0031
6:	Loss: 1.3469	Mean ep reward: 0.02	Mean ep reward with disc: 0.0027
7:	Loss: 1.3445	Mean ep reward: 0.02	Mean ep reward with disc: 0.0084
8:	Loss: 1.3538	Mean ep reward: 0.02	Mean ep reward with disc: 0.0035
9:	Loss: 1.3501	Mean ep reward: 0.02	Mean ep reward with disc: 0.0054
10:	Loss: 1.3531	Mean ep reward: 0.04	Mean ep reward with disc: 0.0108
11:	Loss: 1.3461	Mean ep reward: 0.03	Mean ep reward with disc: 0.0094
12:	Loss: 1.3408	Mean ep reward: 0.04	Mean ep reward with disc: 0.0095
13:	Loss: 1.3382	Mean ep reward: 0.02	Mean ep reward with disc: 0.0074
14:	Loss: 1.334	Mean ep reward: 0.04	Mean ep reward with disc: 0.0103
15:	Loss: 1.332	Mean ep reward: 0.05	Mean ep reward with disc: 0.0115
16:	Loss: 1.3285

### Тестирование

In [6]:
env = DiscreteOneHotWrapper(gym.make("FrozenLake-v1", desc=["SFFF", "FHHF", "FFHF", "HFGF"], map_name="4x4", is_slippery=False, render_mode="human"))
render_n_steps(env, net, 30)

In [7]:
env.close()