# Implementation of DPG 

Implementing the COPDAQ algorithm. 
Reference paper: <a href="https://proceedings.mlr.press/v32/silver14.pdf">link</a>

In [1]:
import torch
import gymnasium as gym
import numpy as np
import torch.nn as nn
import matplotlib.pyplot as plt

from torch import Tensor

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

In [2]:
# linear function approximator
class Critic(nn.Module):
    
    def __init__(
        self, 
        in_features: int = 2, 
        out_features: int = 1,
        *args, 
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        
        self.linear = nn.Linear(in_features, out_features, bias=False)
    
    def forward(self, action, value, policy_action, policy_grad):
        q_value = (action - policy_action) * self.linear(policy_grad) + value
        return q_value
    
class Actor(nn.Module):
    
    def __init__(
        self, 
        env, 
        in_features: int = 2, 
        out_features: int = 1, 
        *args, 
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        
        self.linear = nn.Linear(in_features, out_features, bias=False)
        self.env = env
        
    def forward(self, obs):
        return self.linear(obs)
    
    def get_action(self, obs, scale=0.5):
        if isinstance(obs, np.ndarray):
            obs = torch.as_tensor(obs, dtype=torch.float32)
        
        with torch.no_grad():
            action = self(obs).cpu().detach().numpy()
        return self.env.action_space.sample()
    
        

class Baseline(nn.Module):
    
    def __init__(
        self, 
        in_features: int = 2, 
        out_features: int = 1,
        *args, 
        **kwargs
    ):
        super().__init__(*args, **kwargs)    
        
        self.linear = nn.Linear(in_features, out_features, bias=False)
        
    def forward(self, obs):
        return self.linear(obs)

In [3]:
from collections import deque

class RollingAverage:
    def __init__(self, window_size):
        self.window = deque(maxlen=window_size)
        self.averages = []
        self.all_rewards = []

    def update(self, value):
        self.window.append(value)
        self.averages.append(self.get_average)
        self.all_rewards.append(value)

    @property
    def get_average(self):
        return sum(self.window) / len(self.window) if self.window else 0.0

In [4]:
import random

class BasicExperienceReplay:
    
    def __init__(self, buffer_len=5000):
        self.store = {
            'states' : deque(maxlen=buffer_len),
            'actions' : deque(maxlen=buffer_len),
            'rewards' : deque(maxlen=buffer_len),
            'next_states' : deque(maxlen=buffer_len),
            'next_actions' : deque(maxlen=buffer_len),
            'dones' : deque(maxlen=buffer_len)
        }
    
    def update(
        self, 
        state, 
        action, 
        reward, 
        next_state,
        next_action, 
        done
    ):
        self.store['states'].append(state)
        self.store['actions'].append(action)
        self.store['rewards'].append(reward)
        self.store['next_states'].append(next_state)
        self.store['next_actions'].append(next_action)
        self.store['dones'].append(done)
    
    def sample(self, buffer_size):
        states = random.choices(self.store['states'], k=buffer_size)
        actions = random.choices(self.store['actions'], k=buffer_size)
        rewards = random.choices(self.store['rewards'], k=buffer_size)
        next_states = random.choices(self.store['next_states'], k=buffer_size)
        next_actions = random.choices(self.store['next_actions'], k=buffer_size)
        dones = random.choices(self.store['dones'], k=buffer_size)
        
        return (
            torch.as_tensor(np.array(states), dtype=torch.float32),
            torch.as_tensor(np.array(actions), dtype=torch.float32),
            torch.as_tensor(np.array(rewards), dtype=torch.float32),
            torch.as_tensor(np.array(next_states), dtype=torch.float32),
            torch.as_tensor(np.array(next_actions), dtype=torch.float32),
            torch.as_tensor(np.array(dones), dtype=torch.bool)
        )
        
    def __len__(self):
        return len(self.store['states'])

In [36]:
def train(
    env: gym.Env, 
    actor: Actor, 
    critic: Critic, 
    baseline: Baseline, 
    batch_size: int | bool = 1,
    gamma: float = 0.99,  
    timesteps: int = 1000,
    lr_w: float = 0.01,
    lr_theta: float = 0.001, 
    lr_v: float = 0.01,
):
        
    
    obs, _ = env.reset()
    ep_reward = 0
    metrics = RollingAverage(20)
    replay = BasicExperienceReplay()
    action = actor.get_action(obs)
    for step in range(1, timesteps):
        obs_prime, reward, terminated, truncated, _ = env.step(action)
        ep_reward += reward
        
        next_action = actor.get_action(obs)
        replay.update(obs, action, reward, obs_prime, next_action, terminated or truncated)
        
        obs = obs_prime
        action = next_action 
        
        if len(replay) > batch_size:
            batch_states, batch_actions, batch_rewards, batch_state_primes, batch_next_actions, _ = replay.sample(batch_size)
            actor_actions = actor(batch_states)
            values = baseline(batch_states)
            
            actor.linear.weight.grad = None
            loss = actor_actions.clone().squeeze(dim=0)
            # compute for each action dim the gradients
            for dim in loss:
                dim.backward(retain_graph=True)
            policy_grad = actor.linear.weight.grad.clone()
            
            q_values = critic(batch_actions, values, actor_actions, policy_grad.detach())
            
            actor_actions_next = actor(batch_state_primes)
            actor.linear.weight.grad = None
            loss2 = actor_actions_next.clone().squeeze(dim=0)
            for dim in loss2:
                loss2.backward(retain_graph=True)
            policy_grad_next = actor.linear.weight.grad.clone()
            
            with torch.no_grad():
                values_next = baseline(batch_state_primes)
                q_values_prime = critic(batch_next_actions, values_next, actor_actions_next, policy_grad_next.detach())
            
            # td error
            td_error = batch_rewards.view(batch_size, -1) + gamma * q_values_prime - q_values
            
            # update actor weights
            actor.linear.weight = nn.Parameter(actor.linear.weight.grad + lr_theta * critic.linear.weight)
            
            # update critic and baseline weights (1, 1) @ (1, 2) 
            phi = ((batch_actions - actor_actions) @ policy_grad)
            critic.linear.weight = nn.Parameter(critic.linear.weight + lr_w * td_error.item())
            baseline.linear.weight = nn.Parameter(baseline.linear.weight + lr_v * policy_grad)
            
            # print(td_error, phi, batch_actions, batch_rewards)
            
        # actor.update_epsilon(step)
        
        if terminated or truncated:
            obs, _ = env.reset()
            action = actor.get_action(obs)
            metrics.update(ep_reward)
            ep_reward = 0 
        action_step = action[0]
        print(f'Step: {step} | Avg Reward: {metrics.get_average:.3f} | state-action: {obs} {action_step:.3f}', end='\r')
            
        
         
    return metrics   

In [None]:
env = gym.make("Pendulum-v1")

TIMESTEPS = 10000
infeatures = 3
actor = Actor(env, in_features=infeatures)
critic = Critic(in_features=infeatures)
baseline = Baseline(in_features=infeatures)

metric_store = train(
    env, 
    actor, 
    critic, 
    baseline, 
    batch_size=1,
    timesteps=TIMESTEPS, 
    lr_theta=0.001, 
    lr_v=0.003, 
    lr_w=0.003,
)

Step: 320 | Avg Reward: -1219.051 | state-action: [-0.11023503 -0.99390554 -5.2233324 ] -0.308.695

In [39]:
env_test = gym.make("Pendulum-v1", render_mode='human')
rewards = []
# actor.eps = 0.99
for ep in range(1):
    obs, _ = env_test.reset()
    done = False
    ep_reward = 0
    
    while not done:
        action = actor(torch.as_tensor(obs, dtype=torch.float32).view(1, -1)).cpu().detach().numpy()
        # action = actor.get_action(obs)
        obs_prime, reward, terminated, truncated, _ = env_test.step(action)
        ep_reward += reward
        
        obs = obs_prime
        
        print(f'reward: {ep_reward}', end='\r')
    rewards.append(ep_reward)
rewards

  c = pygame.math.Vector2(c).rotate_rad(self.state[0] + np.pi / 2)


ValueError: cannot convert float NaN to integer