Домашнее задание 2.2

Исследуем алгоритм Кросс-Энтропии с использованием нейронных сетей.

In [24]:
import torch
from torch import nn 
import numpy as np
import pandas as pd
import gym
import plotly.express as px
import time 
import pickle

In [25]:
class CEMTwoLayer(nn.Module):
    def __init__(self, state_dim, action_n, hidden1, hidden2, lr=0.01, eps=0):
        super().__init__()
        self.state_dim = state_dim
        self.action_n = action_n
        self.lr = lr
        self.eps = eps

        self.network = nn.Sequential(
            nn.Linear(self.state_dim, hidden1),
            nn.ReLU(),
            nn.Linear(hidden1, hidden2),
            nn.ReLU(),
            nn.Linear(hidden2, action_n)
        )
        
        self.softmax = nn.Softmax(dim=0)
        self.optimizer = torch.optim.Adam(self.parameters(), lr=lr)
        self.loss = nn.CrossEntropyLoss()
        
    def forward(self, _input):
        return self.network(_input) 
    
    def get_action(self, state):
        state = torch.FloatTensor(state)
        logits = self.forward(state)
        action_prob = self.softmax(logits).detach().numpy()
        
        if eps:
            noise = np.ones(self.action_n) / self.action_n
            action_prob = (1 - self.eps) * action_prob + self.eps * noise
            action_prob = action_prob / np.sum(action_prob)
        action = np.random.choice(self.action_n, p=action_prob)
        return action
    
    def update_policy(self, elite_states, elite_actions):
        elite_states_tensor = torch.FloatTensor(elite_states)
        elite_actions_tensor = torch.LongTensor(elite_actions)

        loss = self.loss(self.forward(elite_states_tensor), elite_actions_tensor)
        loss.backward()
        self.optimizer.step()
        self.optimizer.zero_grad()

In [26]:
def get_trajectory(env, agent, trajectory_len, visualize=False):
    trajectory = {'states':[], 'actions': [], 'total_reward': 0}
    
    state = env.reset()
    trajectory['states'].append(state)
    
    for _ in range(trajectory_len):
        action = agent.get_action(state)
        trajectory['actions'].append(action)
        
        state, reward, done, _ = env.step(action)
        trajectory['total_reward'] += reward
        
        if done:
            break
    
        if visualize:
            env.render()
            
        trajectory['states'].append(state)
    return trajectory

def get_elite_trajectories(trajectories, q_param):
    total_rewards = [trajectory['total_reward'] for trajectory in trajectories]
    quantile = np.quantile(total_rewards, q=q_param) 
    return [trajectory for trajectory in trajectories if trajectory['total_reward'] > quantile]

In [27]:
def generate_batch(env, agent, batch_size, trajectory_len):
    batch_states, batch_actions, batch_rewards = [], [], []

    for _ in range(batch_size):
        states, actions = [], []
        total_reward = 0

        state = env.reset()
        for _ in range(trajectory_len):
            action = agent.get_action(state)
            new_state, reward, done, _ = env.step(action)
            states.append(state)
            actions.append(action)
            total_reward += reward
            state = new_state
            
            if done:
                batch_actions.append(actions)
                batch_states.append(states)
                batch_rewards.append(total_reward)
                break
    return batch_states, batch_actions, batch_rewards

In [28]:
def get_elite_states(batch_states, batch_actions, batch_rewards, q_param):
    quantile = np.quantile(batch_rewards, q=q_param) 

    elite_states = []
    elite_actions = []
    for i in range(len(batch_rewards)):
        if batch_rewards[i] > quantile:
            for j in range(len(batch_states[i])):
                elite_states.append(batch_states[i][j])
                elite_actions.append(batch_actions[i][j])
    
    return elite_states, elite_actions

In [36]:
env = gym.make('MountainCar-v0')

state_dim = 2
action_n = 3
lr = 1e-2
eps = 0.9

MAX_ITER = 300
BATCH_SIZE = 50
TRAJECTORY_LEN = 1000
Q = 0.75

In [37]:
best_model_total_rewards = []

In [39]:
Q = 0.75
lr = 1e-2

agent_two_layer = CEMTwoLayer(state_dim, action_n, hidden1=32, hidden2=32, lr=lr)

for i in range(MAX_ITER):
    agent_two_layer.eps * 0.99
    batch_states, batch_actions, batch_rewards = generate_batch(env, agent_two_layer, BATCH_SIZE, TRAJECTORY_LEN)
    elite_states, elite_actions = get_elite_states(batch_states, batch_actions, batch_rewards, Q)
    
    mean_reward = np.mean(batch_rewards)
    best_model_total_rewards.append({'agent': 'two_layer_16_16', 'epoch': i, 'q': Q, 'reward': mean_reward})
    agent_two_layer.update_policy(elite_states, elite_actions)
    print(f'{i}: mean_reward={mean_reward}')

time_end = time.time()

KeyboardInterrupt: 

In [14]:
px.line(best_model_total_rewards, x='epoch', y='reward')