# 02 - Reinforcement Learning 

In [1]:
import os
import sys
print(sys.version)

3.10.7 (main, Sep  6 2022, 21:22:27) [GCC 12.2.0]


In [16]:
import numpy as np
import torch
from gymnasium import Env, spaces

In [2]:
sys.path.append("./SAC-Continuous-Pytorch/")

In [3]:
from SAC import SAC_Agent
from ReplayBuffer import RandomBuffer, device

In [8]:
class TranslatorEnv(Env):
    def __init__(self):
        
        ac_low = np.array([0, 0, 0, 0, -1], dtype=np.float32)
        ac_high = np.array([1, 1, 1, 1, 1], dtype=np.float32)
        ob_low = np.array([-1, -1], dtype=np.float32)
        ob_high = np.array([1, 1], dtype=np.float32)
        
        self.action_space = spaces.Box(low=ac_low, high=ac_high, dtype=np.float32)
        self.observation_space = spaces.Box(low=ob_low, high=ob_high, dtype=np.float32)
        
        self.state = None
        
        
    def reset(self):
        """Reset environment"""
        super().reset()
        
        self.state = np.zeros(
            self.observation_space.shape, dtype=self.observation_space.dtype
        )
        
        info = dict()
        
        return self.state, info
        
    def step(self, action):
        """
        1 - Send action
        2 - Recieve next observation and the loss
        
        Wait for an observation in observation space, and get the reward value."""
        
        if self.state is None:
            raise ValueError("call reset() before stepping")
        
                
        reward = 0
        info = dict()
        
        return self.state, reward, False, False, info
        
    def render(self):
        """Return an array of pixels rendering the observation_space"""
        return
    
    def close(self):
        """End environment simulation"""
        return

In [17]:
class Buffer:
    def __init__(self, state_dim, action_dim, max_size=256, device="cpu"):
        self.max_size = max_size
        self.ptr = 0
        self.size = 0
        self.state = np.zeros((max_size, state_dim))
        self.action = np.zeros((max_size, action_dim))
        self.reward = np.zeros((max_size, 1))
        self.next_state = np.zeros((max_size, state_dim))
        self.dead = np.zeros((max_size, 1),dtype=np.uint8)
        self.device = device
        
    def add(self, state, action, reward, next_state, dead=False):
        self.state[self.ptr] = state
        self.action[self.ptr] = action
        self.reward[self.ptr] = reward
        self.next_state[self.ptr] = next_state
        self.dead[self.ptr] = dead
        
        self.ptr = (self.ptr + 1) % self.max_size
        self.size = min(self.size + 1, self.max_size)
        
    def sample(self, batch_size):
        ind = np.random.randint(0, self.size, size=batch_size)
        
        with torch.no_grad():
            return (
                torch.FloatTensor(self.state[ind]).to(self.device),
                torch.FloatTensor(self.action[ind]).to(self.device),
                torch.FloatTensor(self.reward[ind]).to(self.device),
                torch.FloatTensor(self.next_state[ind]).to(self.device),
                torch.FloatTensor(self.dead[ind]).to(self.device),
            )

In [22]:
env = TranslatorEnv()
env.reset()

(array([0., 0.], dtype=float32), {})

In [19]:
replay_buffer = Buffer(env.observation_space.shape[0], env.action_space.shape[0])

In [18]:
kwargs = {
    "state_dim": env.observation_space.shape[0],
    "action_dim": env.action_space.shape[0],
    "gamma": 0.99,
    "hid_shape": (12, 12),
    "a_lr": 0.0001,
    "c_lr": 0.0001,
    "batch_size": 1,
    "alpha": 0.12,
    "adaptive_alpha": False,
}

In [13]:
model = SAC_Agent(**kwargs)

In [39]:
model.select_action(env.state, False)

array([ 0.83334345,  0.28204784,  0.7860619 , -0.5530963 , -0.67943853],
      dtype=float32)

### Flow:

1. Get current state and loss of each user ( $= dist(z^{(i)}_t, O)$ )
2. Predict actions for each user for next step $y^{(i)}_t$   from `model.select_action(state[i], deterministic=True)`
