## Unequal Game solution with CCEM method

In [24]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy
from IPython.display import clear_output

Implement environment

In [25]:
class UnequalGame:

    def __init__(self, initial_x=1, dt=0.005, terminal_time=2, u_action_max=2, v_action_max=1):
        self.u_action_max = u_action_max
        self.v_action_max = v_action_max
        self.terminal_time = terminal_time
        self.dt = dt
        self.initial_x = initial_x
        self.state = self.reset()

    def reset(self):
        self.state = np.array([0, self.initial_x])
        return self.state

    def step(self, u_action, v_action):
        t, x = self.state
        x = x + (u_action - v_action) * self.dt
        t += self.dt
        self.state = np.array([t, x])

        reward = 0
        done = False
        if t >= self.terminal_time:
            reward = x ** 2
            done = True

        return self.state, reward, done, None


Implement agent

In [26]:
class Network(torch.nn.Module):

    def __init__(self, input_shape, output_shape):
        super().__init__()
        self.linear_1 = torch.nn.Linear(input_shape[0], 50)
        self.linear_2 = torch.nn.Linear(50, 30)
        self.linear_3 = torch.nn.Linear(30, output_shape[0])
        self.relu = torch.nn.ReLU()
        self.tang = torch.nn.Tanh()

    def forward(self, input_):
        hidden = self.relu(self.linear_1(input_))
        hidden = self.relu(self.linear_2(hidden))
        output = self.tang(self.linear_3(hidden))
        return output


class CCEMAgent(torch.nn.Module):

    def __init__(self, state_shape, action_shape, action_max, reward_param=1, percentile_param=70, noise_decrease=0.98,
                 tau=1e-2, learning_rate=1e-2, n_learning_per_fit=16):
        super().__init__()
        self.action_max = np.abs(action_max)
        self.reward_param = reward_param  # equal to 1 if agent wants to maximize reward otherwise -1
        self.percentile_param = percentile_param
        self.noise_decrease = noise_decrease
        self.noise_threshold = 1
        self.min_noise_threshold = 0.1
        self.tau = tau
        self.n_learning_per_fit = n_learning_per_fit
        self.network = Network(state_shape, action_shape)
        self.optimizer = torch.optim.Adam(params=self.network.parameters(), lr=learning_rate)

    def get_action(self, state):
        state = torch.FloatTensor(state)
        predicted_action = self.network(state).detach().numpy()
        noise = self.noise_threshold * np.random.uniform(low=-1, high=1)
        action = np.clip(predicted_action + noise, -self.action_max, self.action_max)
        return action

    def get_elite_states_and_actions(self, sessions, prefix):
        """
          Select sessions with the most or least reward
          by percentile
        """
        total_rewards = [session['total_reward'] for session in sessions]
        reward_threshold = np.percentile(total_rewards, self.percentile_param)

        elite_states = []
        elite_actions = []
        for session in sessions:
            if self.reward_param * (session['total_reward'] - reward_threshold) > 0:
                elite_states.extend(session['states'])
                elite_actions.extend(session[f'{prefix}actions'])

        return torch.FloatTensor(elite_states), torch.FloatTensor(elite_actions)

    def learn_network(self, loss):
        self.optimizer.zero_grad()
        old_network = deepcopy(self.network)
        loss.backward()
        self.optimizer.step()

        for new_parameter, old_parameter in zip(self.network.parameters(), old_network.parameters()):
            new_parameter.data.copy_((1 - self.tau) * new_parameter + self.tau * old_parameter)

        return None

    def fit(self, sessions, prefix=''):
        elite_states, elite_actions = self.get_elite_states_and_actions(sessions, prefix=prefix)

        for _ in range(self.n_learning_per_fit):
            predicted_action = self.network(elite_states)
            loss = torch.mean((predicted_action - elite_actions) ** 2)
            self.learn_network(loss)

        if self.noise_threshold > self.min_noise_threshold:
            self.noise_threshold *= self.noise_decrease

        return None


Train agents

In [27]:
def generate_session(u_agent, v_agent, env, t_max=200):
    """
    Generate session on environment with agent
    """
    states = []
    u_actions = []
    v_actions = []
    total_reward = 0
    done = False
    state = env.reset()
    while not done:
        u_action = u_agent.get_action(state)
        v_action = v_agent.get_action(state)
        next_state, reward, done, _ = env.step(u_action[0], v_action[0])
        states.append(state)
        u_actions.append(u_action)
        v_actions.append(v_action)
        total_reward += reward
        state = next_state

    return {'states': states, 'u_actions': u_actions, 'v_actions': v_actions, 'total_reward': total_reward}


def train_agents(u_agent, v_agent, env, n_epochs, n_sessions, epsilon, test=False):
    last_mean_reward = 0
    mean_rewards = []
    
    for epoch in range(n_epochs):
        sessions = [generate_session(u_agent, v_agent, env) for _ in range(n_sessions)]
        mean_reward = np.mean([session['total_reward'] for session in sessions])
        if not test:
            u_agent.fit(sessions, prefix='u_')
        v_agent.fit(sessions, prefix='v_')
        mean_rewards.append(mean_reward)
        print('epoch: {0}, mean reward: {1}'.format(epoch, mean_reward))
        # if epoch > 0:
        #    clear_output(True)
        #    plt.plot(range(len(mean_rewards)), mean_rewards)
        #    plt.show()
        if np.abs(last_mean_reward - mean_reward) < epsilon:
            break
        last_mean_reward = mean_reward
        
    return np.array(mean_rewards)

In [28]:
env = UnequalGame()
u_agent = CCEMAgent((2,), (1,), percentile_param=30, action_max=env.u_action_max, reward_param=-1)
v_agent = CCEMAgent((2,), (1,), percentile_param=70, action_max=env.v_action_max, reward_param=1)
n_epochs = 100
n_sessions = 100
epsilon = 1e-5

In [None]:
mean_rewards = train_agents(u_agent, v_agent, env, n_epochs, n_sessions, epsilon)

epoch: 0, mean reward: 0.3351087898760319
epoch: 1, mean reward: 0.46109226925460683
epoch: 2, mean reward: 0.5056249891227123
epoch: 3, mean reward: 0.5204197324090394
epoch: 4, mean reward: 0.5625592511373864
epoch: 5, mean reward: 0.6083741915019903
epoch: 6, mean reward: 0.635795213083863
epoch: 7, mean reward: 0.6102800791928601
epoch: 8, mean reward: 0.5727067590023898
epoch: 9, mean reward: 0.5227293809481021
epoch: 10, mean reward: 0.4606627875835747
epoch: 11, mean reward: 0.430565317711096
epoch: 12, mean reward: 0.4049933114357043
epoch: 13, mean reward: 0.3777978850006759
epoch: 14, mean reward: 0.3506407173768569
epoch: 15, mean reward: 0.34042857352315126
epoch: 16, mean reward: 0.3424351156831399
epoch: 17, mean reward: 0.3460571928285636
epoch: 18, mean reward: 0.3736614338788269
epoch: 19, mean reward: 0.41264125599221657
epoch: 20, mean reward: 0.4469050544761336
epoch: 21, mean reward: 0.5086008940904685
epoch: 22, mean reward: 0.5329486013959746
epoch: 23, mean rewa

In [None]:
plt.plot(range(len(mean_rewards)), mean_rewards, '-o')
plt.xticks(range(len(mean_rewards)))
plt.xlabel('epoch')
plt.ylabel('Mean reward')
plt.title('Mean train rewards over epochs')
plt.show()

Test u_gent

In [None]:
v_test_agent = CCEMAgent((2,), (1,), percentile_param=70, action_max=env.v_action_max, reward_param=1)

In [None]:
test_rewards = train_agents(u_agent, v_test_agent, env, n_epochs, n_sessions, epsilon, test=True)

In [None]:
plt.plot(range(len(test_rewards)), test_rewards, '-o')
plt.xticks(range(len(test_rewards)))
plt.xlabel('epoch')
plt.ylabel('Mean reward')
plt.title('Mean test rewards over epochs')
plt.show()