# Preference based Reinforcement Learning  
Below are the equations describing the estimated probability according to the reward function that trajectory segment 1 will be preferable to trajectory segment 2 and the loss incurred based on that estimation and reality respectively. The last equation is the derivative for the loss function with respect to some parameter in the reward function.  
$$P(\sigma^1 > \sigma^2) = \frac{\exp{(r(\sigma^1))}}{\exp{(r(\sigma^1))} + \exp{(r(\sigma^2))}} = \text{sigmoid}(r_1 - r_2)$$
$$Loss(\hat r_\theta) = -\sum_{\sigma^1,\sigma^2,\mu} (1 - \mu) \ln{(P(\sigma^1,\sigma^2, \hat r))} + \mu\ln{(P(\sigma^2,\sigma^1, \hat r))}$$

In [1]:

import jax
import jax.numpy as jnp

from numpy.random import normal, uniform, seed
from numpy import sqrt, sum, abs, exp, where, array, zeros
from math import factorial

import torch
import torch.nn as nn
from torch.nn.utils import parameters_to_vector, vector_to_parameters

from RLHFPrefLib_old import CrossEntropyLoss

from RLHFPrefLib import pref_estimation

import matplotlib.pyplot as plt

In [2]:
# --- Helper Functions ---
def print_chart(dictionary):
    values = []
    for key, value in dictionary.items():
        print(f'{key:>20}', end="")
        values += [value]
    print()
    for row in zip(*values):
        for column in row:
            print(f'{column:>20.2f}', end="")
        print()
        

## Reward Function and Model Definition

## Basic Reward Function

In [3]:

def reward1(state, action, slope=1e-1):
    answer = state[0] + state[1]
    distance = answer - action
    y1 = 1 + slope*distance
    y2 = 1 - slope*distance
    return y1 if distance <=0 else y2

def reward2(state, action):
    answer = state[0] + state[1]
    distance = answer - action
    return (1 + torch.exp(-distance))**-1

def reward(state, action, theta):
    if not isinstance(state, torch.Tensor):
        state = torch.tensor(state, dtype=float)
    r1 = theta[0]*reward1(state, action)
    r2 = theta[1]*reward2(state, action)
    return r1 + r2

def max_action(state, reward_function, theta, init_action=0.0, lr=0.1, epochs=50):
    action = torch.tensor(init_action, requires_grad=True)

    for epoch in range(epochs):
        reward = reward_function(state, action, theta)
        reward.backward()
        
        with torch.no_grad():
            action += lr*action.grad
            action.grad.zero_()
    action.requires_grad_(False)
    return action.detach()

def evaluation(states, actions, reward, theta):
    return torch.mean((torch.sum(states, axis=1) - actions)**2)

In [4]:

# --- Initialize Parameters ---
theta = torch.tensor([0.0, 0.0], requires_grad=True)
lr = 1.5e-1

# --- Evaluate ---
seed(42)
states = torch.tensor(uniform(0, 10, size=(7, 2)).astype(int), requires_grad=False)

greedy_actions = torch.tensor([max_action(s, reward, theta, init_action=10.0) for s in states])

mse_loss = torch.mean((torch.sum(states, axis=1) - greedy_actions)**2) / 5

print(f'Parameters: {theta}')
print_chart({
    'Answers': torch.sum(states, axis=1),
    'Actions': greedy_actions
})
print(f'MSE Loss = {mse_loss:.3f}')

theta.grad.zero_()


# --- Train ---
seed(60)
epochs = 10 #200
for epoch in range(epochs):
    rand_range = exp(-(epoch - 300) / 500)
    if (epoch + 1) % 50 == 0:
        print(f'  {int((epoch+1)*100 / epochs)}% complete')
    # --- Set up States ---
    states = torch.tensor(uniform(0, 10, size=(20, 2)).astype(int))
    answers = torch.sum(states, axis=1)

    # --- Find Actions ---
    greedy_actions = torch.stack([max_action(s, reward, theta.detach(), init_action=10.0) for s in states]).detach()
    random_actions = greedy_actions + torch.tensor(uniform(-rand_range, rand_range, size=len(states)))

    # --- Get Preferences ---
    prefs = (torch.abs(answers - greedy_actions) < torch.abs(answers - random_actions)).int()

    # --- Calculate Loss ---
    reward_pred = lambda state, action: reward(state, action, theta)
    loss = CrossEntropyLoss(states, zip(greedy_actions, random_actions), prefs, reward_pred)

    loss.backward()

    gradient = theta.grad
    with torch.no_grad():
        theta += lr * gradient
    
    theta.grad.zero_()

# --- Evaluate ---
seed(42)
states = torch.tensor(uniform(0, 10, size=(7, 2)).astype(int))
greedy_actions = torch.tensor([max_action(s, reward, theta, init_action=10.0) for s in states])

mse_loss = torch.mean((torch.sum(states, axis=1) - greedy_actions)**2) / 5

print_chart({
    'Answers': torch.sum(states, axis=1),
    'Actions': greedy_actions
})
print(f'MSE Loss = {mse_loss:.3f}')

theta.grad.zero_()
print(f'Parameters: {theta}')

Parameters: tensor([0., 0.], requires_grad=True)
             Answers             Actions
               12.00               10.00
               12.00               10.00
                2.00               10.00
                8.00               10.00
               13.00               10.00
                9.00               10.00
               10.00               10.00
MSE Loss = 2.457
             Answers             Actions
               12.00               10.55
               12.00               10.55
                2.00                9.39
                8.00                9.33
               13.00               10.58
                9.00                9.29
               10.00               10.00
MSE Loss = 1.902
Parameters: tensor([1.2151, 0.0941], requires_grad=True)


## NN Reward Function

In [5]:
class RewardNet(nn.Module):

    def __init__(self, device = 'mps'):
        super().__init__()
        combos = factorial(3)
        self.net = nn.Sequential(
            nn.Linear(3, combos), nn.LeakyReLU(),
            nn.Linear(combos, combos), nn.LeakyReLU(),
            nn.Linear(combos, 1)
        )
        self.device = device
    
    def forward(self, x):
        if not torch.is_tensor(x):
            x = torch.tensor(x).float()
        return self.net(x).flatten()

    def action_max(self, x_init, lr=0.02, epochs=100):

        if not torch.is_tensor(x_init):
            x = torch.tensor(x_init, dtype=torch.float32, requires_grad=True)
        else:
            x = x_init.clone().detach()
            x.requires_grad_(True)
        
        for i in range(epochs):
            y = self.forward(x)
            y.backward()
            gradient = x.grad
            with torch.no_grad():
                addition = [0, 0, gradient[2]]
                x += lr*torch.tensor(addition)
            self.net.zero_grad()
        return x[2]

    def rand_weights(self, weight_var, bias_var, additive = False, seed = False):
        state = self.state_dict()

        if seed != False:
            torch.manual_seed(seed)

        weight_var = sqrt(weight_var)
        bias_var = sqrt(bias_var)

        for key in state.keys():
            params = state[key]
            center = 0.0 if not additive else params
            if 'weight' in key and not torch.is_tensor(center):
                state[key] = torch.normal(center, weight_var, size=params.shape)
            elif 'bias' in key and not torch.is_tensor(center):
                state[key] = torch.normal(center, bias_var, size=params.shape)
            elif 'weight' in key and torch.is_tensor(center):
                state[key] = torch.normal(center, weight_var)
            elif 'bias' in key and torch.is_tensor(center):
                state[key] = torch.normal(center, bias_var)
                
        self.load_state_dict(state)

    def residual(self, x, y):
        return y - self.forward(x)

    def freeze_params(self):
        flags = [p.requires_grad for p in self.parameters()]
        for p in self.parameters: p.requires_grad_(False)
        return flags
    
    def unfreeze_params(model, flags):
        for p, f in zip(model.parameters(), flags): p.requires_grad_(f)

We're going to use `BCEWithLogitsLoss(reduction="sum").(x, y)` which basically calculates:
$$\sum_{\sigma_1 ... \sigma_N}^N y_i*\ln(\sigma(x_i)) + (1 - y_i)*\ln(1 - \sigma(x_i))$$

# Test Training
The training below adjusts the reward based on an optimal reward function and the loss compared to that. So far I have been having issues :(

In [6]:
# Test w/ MSELoss

model = RewardNet()
model.rand_weights(0.1, 0.1, seed=81)
opt = torch.optim.Adam(model.parameters(), lr=1e-2)
MSELoss = nn.MSELoss(reduction='sum')
BCELoss = nn.BCEWithLogitsLoss()

def true_reward(state, action, slope=15e-2, intercept=3):
    answer = state[0] + state[1]
    distance = answer - action
    y1 = intercept + slope*distance
    y2 = intercept - slope*distance
    return y1 if distance <= 0 else y2

# --- Init Evaluation ---
model.eval()

seed(42)
states = torch.from_numpy(uniform(0, 20, size=(5,2)).astype(int)).float()
actions = torch.from_numpy(uniform(0, 40, size=5).astype(int)).float()
X = torch.column_stack((states, actions))

y_pred = model.forward(X).flatten()
y = torch.tensor([true_reward(s, a) for s,a in zip(states, actions)])


print_chart({
    'Distance': torch.abs(torch.sum(states, axis=1) - actions),
    'Predicted Reward': y_pred,
    'Actual Reward': y
})

print(f'loss: {MSELoss(y, y_pred)}')

# --- Training ---
model.train()
seed(81)
batch = 10
for epoch in range(300):
    
    # --- Set up ---
    states = torch.from_numpy(uniform(0, 20, size=(batch,2))).int().float()
    actions = torch.from_numpy(uniform(0, 40, size=batch)).int().float()
    y = torch.tensor([true_reward(s, a) for s,a in zip(states, actions)])
    opt.zero_grad()
    # --- Prediction --- 
    y_pred = model.forward(torch.column_stack((states, actions))).flatten()

    # --- adjust model --- 
    loss = MSELoss(y, y_pred)
    loss.backward()

    opt.step()

# --- Final Evaluation ---
model.eval()

seed(42)
states = torch.from_numpy(uniform(0, 20, size=(5,2)).astype(int))
actions = torch.from_numpy(uniform(0, 40, size=5).astype(int))
X = torch.column_stack((states, actions)).float()

y_pred = model.forward(X).flatten()
y = torch.tensor([true_reward(s, a) for s,a in zip(states, actions)])

print_chart({
    'Distance': torch.abs(torch.sum(states, axis=1) - actions),
    'Predicted Reward': y_pred,
    'Actual Reward': y
})
print(f'loss: {MSELoss(y, y_pred)}')

            Distance    Predicted Reward       Actual Reward
               26.00                1.21               -0.90
               13.00               -0.56                1.05
               27.00               -2.84               -1.05
               10.00                0.89                1.50
               19.00                1.72                0.15
loss: 13.070858001708984
            Distance    Predicted Reward       Actual Reward
               26.00               -0.70               -0.90
               13.00                1.41                1.05
               27.00               -0.97               -1.05
               10.00                1.50                1.50
               19.00                0.12                0.15
loss: 0.18015436828136444


# Future Training

In [25]:
model = RewardNet()
model.rand_weights(0.1, 0.1, seed=81)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
MSELoss = nn.MSELoss(reduction='sum')
BCELoss = nn.BCEWithLogitsLoss()

def true_reward(state, action, slope=15e-2, intercept=3):
    answer = state[0] + state[1]
    distance = answer - action
    y1 = intercept + slope*distance
    y2 = intercept - slope*distance
    return y1 if distance <= 0 else y2

model.eval()
seed(42)

test_size = 120
with torch.no_grad():
    states = torch.from_numpy(uniform(0, 20, size=(test_size,2)).astype(int)).float()
answers = torch.sum(states, axis=1)

greedy_actions = torch.tensor([model.action_max([s[0], s[1], 20.0]) for s in states])
random_actions = greedy_actions + torch.empty(greedy_actions.size()).uniform_(-2, 2)

X_greedy = torch.column_stack((states, greedy_actions))
X_random = torch.column_stack((states, random_actions))

r1 = model.forward(X_greedy)
r2 = model.forward(X_random)

prefs = (torch.abs(answers - greedy_actions) <= torch.abs(answers - random_actions)).float()

est_prefs = pref_estimation((X_greedy, X_random), model.forward)

accuracy = sum([torch.abs(p - ep) <= 0.5 for p, ep in zip(prefs, est_prefs)]) / test_size

print_chart({
    'Answers': answers[:5],
    'Greedy Actions': greedy_actions[:5],
    'Random Actions': random_actions[:5],
    'Real Prefs': prefs[:5],
    'Estimated Prefs': est_prefs[:5]
})
print(f'Accuracy: {accuracy:.3f}')

# --- training --- 
model.train()
seed(81)
batch=10
loss_history = []
for epoch in range(600):
    dist = exp(-((epoch - 200) / 100)) + .2
    # --- set up ---
    with torch.no_grad():
        states = torch.from_numpy(uniform(0, 20, size=(batch, 2))).int().float()
    answers = torch.sum(states, axis=1)

    # --- Action Calc ---
    greedy_actions = torch.tensor([model.action_max([s[0], s[1], 20.0]) for s in states])
    random_actions = greedy_actions + torch.empty(greedy_actions.size()).uniform_(-dist, dist)

    # --- Reward Calc ---
    X_greedy = torch.column_stack((states, greedy_actions))
    X_random = torch.column_stack((states, random_actions))

    r1 = model.forward(X_greedy)
    r2 = model.forward(X_random)

    # --- Calculate Loss + Update ---
    prefs = (torch.abs(answers - greedy_actions) <= torch.abs(answers - random_actions)).float()

    opt.zero_grad()
    loss = BCELoss(r1 - r2, prefs)
    loss_history += [loss]
    loss.backward()
    opt.step()

model.eval()
seed(42)

test_size = 300
with torch.no_grad():
    states = torch.from_numpy(uniform(0, 20, size=(test_size,2)).astype(int)).float()
answers = torch.sum(states, axis=1)

greedy_actions = torch.tensor([model.action_max([s[0], s[1], 20.0]) for s in states])
random_actions = greedy_actions + torch.empty(greedy_actions.size()).uniform_(-2, 2)

X_greedy = torch.column_stack((states, greedy_actions))
X_random = torch.column_stack((states, random_actions))

r1 = model.forward(X_greedy)
r2 = model.forward(X_random)

prefs = (torch.abs(answers - greedy_actions) <= torch.abs(answers - random_actions)).float()

est_prefs = pref_estimation((X_greedy, X_random), model.forward)

accuracy = sum([torch.abs(p - ep) <= 0.5 for p, ep in zip(prefs, est_prefs)]) / test_size

print_chart({
    'Answers': answers[:5],
    'Greedy Actions': greedy_actions[:5],
    'Random Actions': random_actions[:5],
    'Real Prefs': prefs[:5],
    'Estimated Prefs': est_prefs[:5]
})
print(f'Accuracy: {accuracy:.3f}')


             Answers      Greedy Actions      Random Actions          Real Prefs     Estimated Prefs
               26.00               12.34               11.86                1.00                0.50
               25.00               21.55               23.20                0.00                0.51
                6.00                7.62                6.51                0.00                0.46
               18.00                6.60                7.03                0.00                0.50
               26.00               21.55               19.66                1.00                0.51
Accuracy: 0.450
             Answers      Greedy Actions      Random Actions          Real Prefs     Estimated Prefs
               26.00               27.90               26.95                0.00                0.46
               25.00               30.82               29.59                0.00                0.45
                6.00                3.08                3.20               

In [40]:
state = [100.0, 221.0]

seg1 = state + [301.0]
seg2 = state + [400.0]

pref_estimation((seg1, seg2), model.forward)

tensor([1.0000], grad_fn=<SigmoidBackward0>)