### Deep Reinforcement Learning from Human Preferences

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt
import matplotlib

import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
class GridWorld():
    def __init__(self):
        self.state = self.init_state()
        self.step_count = 0

    def init_state(self):
        xs = random.sample(range(0, 7), 5)
        ys = random.sample(range(0, 7), 5)
        piece1 = (xs[0], ys[0])
        playerpos = (xs[4], ys[4])
        positions = list(np.array([playerpos, piece1]).flatten())
        return positions

    def reset(self):
        self.state = self.init_state()
        self.step_count = 0

    def step(self, action):
        self.step_count += 1
        if action == 0:    # UP
            self.state[1] += 1
        elif action == 1:  # RIGHT
            self.state[0] += 1
        elif action == 2:  # DOWN
            self.state[1] -= 1
        elif action == 3:  # LEFT
            self.state[0] -= 1
        
        if self.step_count >= 25:
            self.reset()
            reward = int(rewarder(torch.tensor(self.state, dtype=torch.float32).unsqueeze(0)).item())
            playing = False
            return reward, playing
        
        if self.state[0] == self.state[2] and self.state[1] == self.state[3]:
            playing = False
            reward = int(rewarder(torch.tensor(self.state, dtype=torch.float32).unsqueeze(0)).item())
            self.reset()
            return playing, reward
        
        playing = True
        reward = int(rewarder(torch.tensor(self.state, dtype=torch.float32).unsqueeze(0)).item())
        return reward, playing

In [None]:
class Actor(nn.Module):
    def __init__(self, in_size,num_actions, num_hidden_units):
        super(Actor, self).__init__()
        self.shared_1 = nn.Linear(in_size, num_hidden_units)
        self.actor = nn.Linear(num_hidden_units, num_actions)

    def forward(self, input_obs):
        x = F.relu(self.shared_1(input_obs))
        return self.actor(x)

In [None]:
class Rewarder(nn.Module):
    def __init__(self, in_size, num_hidden_units):
        super(Rewarder, self).__init__()
        self.shared_1 = nn.Linear(in_size, num_hidden_units)
        self.reward = nn.Linear(num_hidden_units, 1)

    def forward(self, input_obs):
        x = F.relu(self.shared_1(input_obs))
        return self.reward(x)

In [None]:
agent = Actor(4, 4, 100)
rewarder = Rewarder(4, 100)
env = GridWorld()

In [None]:
def calculate_g(reward_trajectory, gamma):
    ez_discount = np.array([gamma ** n for n in range(len(reward_trajectory))])
    gs = []
    reward_trajectory = np.array(reward_trajectory)
    for ts in range(len(reward_trajectory)):
        to_end_rewards = reward_trajectory[ts:]
        eq_len_discount = ez_discount[:len(reward_trajectory[ts:])]
        total_value = np.multiply(to_end_rewards, eq_len_discount)
        g = sum(total_value)
        gs.append(g)
    return gs

In [None]:
def step_episode(env, model):
    env.reset()
    action_probs_list = []
    rewards = []
    states = []
    actions = []
    playing = True
    while playing:
        obs = torch.tensor(env.state, dtype=torch.float32).unsqueeze(0)
        action_logits = agent(obs)
        action_probs = F.softmax(action_logits, dim=-1)
        selected_action_idx = torch.multinomial(action_probs, 1).item()
        states.append(obs)
        actions.append(selected_action_idx)

        reward, playing = env.step(selected_action_idx)

        probability_of_taking_selected_action = action_probs[0, selected_action_idx]
        action_probs_list.append(probability_of_taking_selected_action)
        rewards.append(reward)
    
    return action_probs_list, rewards, states, actions

In [None]:
def actor_loss(action_probs, rewards):
    gs = calculate_g(rewards, 0.99)
    action_log_probs = torch.log(torch.stack(action_probs))
    loss = -torch.sum(action_log_probs * torch.tensor(gs, dtype=torch.float32))
    return loss

In [None]:
optimizer_rewarder = optim.Adam(rewarder.parameters(), lr=0.0005)
optimizer_actor = optim.Adam(agent.parameters(), lr=0.0005)

In [None]:
def decode_action(action):
    actions = {
        0: 'up',
        1: 'right',
        2: 'down',
        3: 'left'
    }
    return actions[action]

In [None]:
def compare(transition_ids, states, actions):
    d1xs = [states[transition_ids[0]][0][0].item(), states[transition_ids[0]][0][2].item()]
    d1ys = [states[transition_ids[0]][0][1].item(), states[transition_ids[0]][0][3].item()]
    d2xs = [states[transition_ids[1]][0][0].item(), states[transition_ids[1]][0][2].item()]
    d2ys = [states[transition_ids[1]][0][1].item(), states[transition_ids[1]][0][3].item()]

    fig, ax = plt.subplots(2, 2)
    color = ['yellow', 'green']
    color_indices = [0, 1]
    colormap = matplotlib.colors.ListedColormap(color)
    ax[0, 0].scatter(d1xs, d1ys, c=color_indices, cmap=colormap)
    ax[0, 0].set_title(str(decode_action(actions[transition_ids[0]])))
    ax[0, 1].scatter(d2xs, d2ys, c=color_indices, cmap=colormap)
    ax[0, 1].set_title(str(decode_action(actions[transition_ids[1]])))
    plt.show()

In [None]:
def preference_update(states, actions, rewarder):
    transition_ids = random.sample(range(len(states)-2), 2)
    compare(transition_ids, states, actions)
    pref = input('select preference a: left, d: right, s: same  ')
    dists = {'a': [1, 0], 'd': [0, 1], 's': [0.5, 0.5]}
    dist = dists[pref]

    reward1 = rewarder(states[transition_ids[0]+1])
    reward2 = rewarder(states[transition_ids[1]+1])
    p1 = torch.exp(reward1) / (torch.exp(reward1) + torch.exp(reward2))
    p2 = torch.exp(reward2) / (torch.exp(reward1) + torch.exp(reward2))
    loss = -torch.log(p1) * dist[0] - torch.log(p2) * dist[1]
    
    optimizer_rewarder.zero_grad()
    loss.backward()
    optimizer_rewarder.step()

In [None]:
average_length = []
for episode in range(5000):
    action_probs, rewards, states, actions = step_episode(env, agent)
    loss = actor_loss(action_probs, rewards)
    
    optimizer_actor.zero_grad()
    loss.backward()
    optimizer_actor.step()
    
    average_length.append(len(rewards))
    
    preference_update(states, actions, rewarder)
    
    print('Episode:', episode, '\nAverage steps to target:', np.mean(average_length[-100:]))