# DDPG PROJECT

Import gym and define pendulum environment

In [None]:
import gym
import numpy as np
from matplotlib import pyplot as plt

#env = gym.make('Pendulum-v1', g=9.81, render_mode = 'human')

Check API-conformity

In [None]:
from gym.utils.env_checker import check_env
check_env(env)

Import helpers

In [None]:
import helpers

## Heuristic policy

##### Wrap environment in a NormalizedEnv class

In [None]:
env = gym.make('Pendulum-v1', g=9.81)
#env = gym.make('Pendulum-v1', g=9.81, render_mode = 'human')
env

In [None]:
env = helpers.NormalizedEnv(env)
env

##### RandomAgent

In [None]:
random_agent = helpers.RandomAgent(env)

In [None]:
rewards = []

for _ in range(10):
    state = env.reset()
    state = state[0]
    
    trunc = False
    cur_reward = 0
    it = 0
    while not trunc:   
        it+=1
        action = random_agent.compute_action(state) 
        (next_state, reward, term, trunc, info) = env.step(action)
        state = next_state
        cur_reward += reward
        
        if term or trunc:
            observation, info = env.reset()
    
    #print(it)
    rewards.append(cur_reward)

rewards = [np.sum(rewards[:i+1])/(i+1) for i in range(len(rewards))]
print("Sum rewards: ",rewards)
# env.close()

##### Heuristic pendulum agent

In [None]:
class HeuristicPendulumAgent:
    def __init__(self, env, t):
        self.state_size = env.observation_space.shape[0]
        self.action_size = env.action_space.shape[0]
        self.t = t
    def compute_action(self, state):
        if state[0] <= 0:
            return self.t * np.sign(state[2])
        else:
            return -self.t * np.sign(state[2])

In [None]:
heur_agent = HeuristicPendulumAgent(env,0.8)
rewards = []

#Buffer = ReplayBuffer(max_size = 400)

for _ in range(10):
    state = env.reset()
    state = state[0]

    trunc = False
    cur_reward = 0
    it = 0
    while not trunc:
        it+= 1
        action = heur_agent.compute_action(state) 
        (next_state, reward, term, trunc, info) = env.step(action)
        #Buffer.store_transition((state, action, reward, next_state, trunc))
        state = next_state
        cur_reward += reward

        if term or trunc:
            observation, info = env.reset()
            
    rewards.append(cur_reward)
    #print(it)


#bat = Buffer.batch_buffer(3)
#print(bat)
rewards = [np.sum(rewards[:i+1])/(i+1) for i in range(len(rewards))]
print("Sum rewards: ",rewards)

Effect of fixed torque

In [None]:
torques = np.linspace(-1,1,30)
avg_rewards = []

for t in torques:
    heur_agent = HeuristicPendulumAgent(env,t)
    rewards = []

    for _ in range(10):
        state = env.reset()
        state = state[0]

        trunc = False
        cur_reward = 0

        while not trunc:
            action = heur_agent.compute_action(state) 
            (next_state, reward, term, trunc, info) = env.step(action)
            state = next_state
            cur_reward += reward

            if term or trunc:
                observation, info = env.reset()

        rewards.append(cur_reward)
    avg_rewards.append(rewards[-1])

plt.figure()
plt.plot(torques, avg_rewards)
plt.xlabel('Fixed torque')
plt.ylabel('Avg reward (over 10 runs)')

In [None]:
env.close()

# Q function of the heuristic policy

In [None]:
class ReplayBuffer:
    def __init__(self, max_size):
        self.max_size = max_size
        self.buffer = []
        
    def store_transition(self, trans):
        if len(self.buffer)<self.max_size:           
            self.buffer.append(trans)
    
    def batch_buffer(self, batch_size):
        n = len(self.buffer)
        indexes = np.random.permutation(n)[:min(batch_size,self.max_size)]
        return [self.buffer[i] for i in indexes]

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class QNetwork(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(4, 32)
        self.layer2 = nn.Linear(32, 32)
        self.output = nn.Linear(32, 1)

    def forward(self, x):
        x = self.layer1(x)
        x = F.relu(x)
        x = self.layer2(x)
        x = F.relu(x)
        x = self.output(x)
        return x

In [None]:
heur_agent = HeuristicPendulumAgent(env,0.8)
max_size = 1e4
Buffer = ReplayBuffer(max_size = max_size)

for _ in range(int(max_size/200)):
    state = env.reset()
    state = state[0]

    trunc = False
    
    while not trunc:
        action = heur_agent.compute_action(state) 
        (next_state, reward, term, trunc, info) = env.step(action)
        Buffer.store_transition((state, action, reward, next_state, trunc))
        state = next_state

        if term or trunc:
            observation, info = env.reset()


In [None]:
def train_epoch(model, agent, optimizer, criterion, gamma, Buffer, epoch, device):
    model.train()
    b = Buffer.batch_buffer(128)
    data = np.zeros((128,4))
    reward = np.zeros(128)
    next_data = np.zeros((128,4))
    trunc = np.zeros(128)
    for i,transition in enumerate(b):        
        data[i] = np.concatenate([transition[0], np.array([transition[1]])])
        reward[i] = transition[2]
        trunc[i] = transition[4]
        next_data[i] = np.concatenate([transition[3], np.array([agent.compute_action(transition[3])])])
    
    data = torch.tensor(data, dtype=torch.float32).to(device)   
    reward = torch.tensor(reward, dtype=torch.float32).to(device)
    next_data = torch.tensor(next_data, dtype=torch.float32).to(device)
    trunc = torch.tensor(trunc, dtype = torch.bool)
    optimizer.zero_grad()
    output = model(data)
    with torch.no_grad():
        target = (reward + trunc*model(next_data).flatten()).reshape(128,1)
    loss = criterion(output, target)
    loss.backward()
    optimizer.step()
    
    return loss

    

In [None]:
device = "cpu"
model = QNetwork().to(device)
lr = 1e-4
gamma = 0.99
optimizer = torch.optim.Adam(model.parameters(), lr = lr)
criterion = torch.nn.MSELoss()
num_epochs = 1000
history = []
for epoch in range(num_epochs):
    loss = train_epoch(model, heur_agent, optimizer, criterion, gamma, Buffer, epoch, device)
    if epoch%100 == 0:
        print(loss)
    history.append(loss.detach().numpy())
    

In [None]:
plt.plot(np.arange(1000), history)

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

fig = plt.figure()
ax = Axes3D(fig)
rad = np.linspace(0, 5, 100)
azm = np.linspace(-np.pi, np.pi, 100)
r, th = np.meshgrid(rad, azm)
speed = 0
torque = 0.8
speed_array = speed*np.ones(r.shape)
torque_array = torque*np.ones(r.shape)
data = np.stack([np.cos(th), np.sin(th), speed_array, torque_array], axis = 2)
data = torch.tensor(data, dtype = torch.float32)
z = model(data)
z = z.detach().numpy().squeeze()
print(shape(z))
plt.subplot(projection="polar")

plt.pcolormesh(th, r, z)
#plt.pcolormesh(th, z, r)

plt.plot(azm, r, color='k', ls='none') 
plt.grid()
plt.colorbar()

plt.show()



In [None]:
def reward(alpha, alpha_speed, torque):
    return -(alpha**2 + 0.1*alpha_speed**2 + 0.001*torque**2)

fig = plt.figure()
ax = Axes3D(fig)
rad = np.linspace(0, 5, 100)
azm = np.linspace(-np.pi, np.pi, 100)
r, th = np.meshgrid(rad, azm)
speed = 0.5
torque = 2
plt.subplot(projection="polar")
z_heur = reward(th, speed, torque*np.sign(np.cos(th))*np.sign(speed))
#print(z_heur)
plt.pcolormesh(th, r, z_heur)
#plt.pcolormesh(th, z, r)

plt.plot(azm, r, color='k', ls='none') 
plt.grid()
plt.colorbar()

plt.show()
