In [None]:
"""
import_SteerboxEnv = Method to start the environment

Setting up a custom OpenAI Gym environment by inheriting gym.Env

Actual environment is class SteerboxEnv: 
    A. Must define action space and observation space in the constructor:
    
    B. Must implement theree methods as part of gym interface: 
        1. step
        2. reset 
        3. render

"""

def import_SteerboxEnv():
    import os
    import time
    import math
    import serial
    import struct
    
    import random
    import numpy as np
    
    from typing import Callable, List, Tuple

    import gym
    from gym import logger, spaces
    from gym.utils import seeding

    class SteerboxEnv(gym.Env):
        def __init__(self, mode="train"):
            
            """
            Initialization param for Environment
            train and eval: just changes values of max_steps to 100 and 300 resp.
            
            """
            self.set_mode(mode)
            
            self.pos_success_range = 0.1 #
            self.pos_threshold = 0.5 #
            self.c_trans = 0.01 #

            """
            Used later in the obeservation space:
                Essentially, an array with 4 elements: defines that we can observe 4 things.
                What do there 4 things mean physically?
                    1. Position Threshold: 
                    2. 
                    3. 
                    4. 
            """
            high = np.array([self.pos_threshold * 1.5,#position of the wheel
                    np.finfo(np.float32).max, # Machine dependent max value for a float32 datatype
                    np.finfo(np.float32).max,
                    np.finfo(np.float32).max,])

            """
            Action: Discrete variable that can take one of two values
            Observation: Lowest accepted value = - high, Highest accepted value = + high
                        : Box = real valued quantity that can lie inside the range
            
            """
            self.action_space = spaces.Discrete(2)
            self.observation_space = spaces.Box(-high, high, dtype=np.float32)

            self.seed() #does nothing
            self.viewer = None
            self.state = None

            self.voltage_mag = 0.8
            self.last_voltage = 0
            
            for f in os.listdir("/dev"): # find the arduino's serial port
                if f.startswith("ttyUSB"):
                    p = "/dev/"+f
                    
            self.ser = serial.Serial(p, baudrate=115200, timeout=0.5)
            self._interact(0, reset=True, ignore_powerup=True)
        
        
        def set_mode(self, mode):
            assert mode in ["train", "eval"]
            self.mode = mode
            self.max_steps = 100 if mode == "train" else 300

        def seed(self, seed=None):
            pass
        
        """
        Params: 
            voltage
            reset
            ignore_powerup
            
        Return: 
        
        
        """
        # read the current position of the wheel, then send the given voltage command
        def _interact(self, voltage, reset=False, ignore_powerup=False):
            
            if reset:
                voltage = 0
                self.last_voltage = 0
                
                # tell hardware to stop a bunch
                self.ser.write(b'\x00\x00\x00\x00')
                # clear anything out of the serial receive buffer
                
                time.sleep(0.1) # ensure hardware times out
                self.ser.reset_input_buffer()

                # tell it to stop again now that we know it's alive
                self.ser.write(b'\x00')
                
                # wait for the updated status
                _, _, error = struct.unpack("<hBB", self.ser.read(4))
                
                if not ignore_powerup and error == 0x81:
                    raise Exception("hardware reset itself unexpectedly")
                # acknowledge the error
                
                self.ser.write(b'\x80')

            this_pos, last_quadrature_errors, last_time_ms = struct.unpack("<hBB", self.ser.read(4))
            
            if last_time_ms & 0x80: # error state
                raise Exception("error type: "+str(last_time_ms & 0x7F))
                
            if last_quadrature_errors > 1 or (last_time_ms > 20 and last_time_ms < 127):
                print("oh NO", last_quadrature_errors, last_time_ms)
                
            this_pos = float(this_pos)/(4*2802) # convert encoder counts to fractions of a circle
            
            if abs(this_pos) > 1.5: # wheel is rotated too much, stop the experiment before damage
                self.ser.write(b'\x00\x00\x00\x00')
                raise Exception("TOO FAR!")
            
            if self.last_voltage < voltage: # move motor voltage closer to target voltage
                self.last_voltage += min(0.05, voltage-self.last_voltage)
            else:
                self.last_voltage -= min(0.05, self.last_voltage-voltage)
            
            # send motor voltage and direction
            if self.last_voltage >= 0:
                cmd = int(self.last_voltage*127)
            else:
                cmd = 128 + int(-self.last_voltage*127)
            self.ser.write(bytes([cmd]))

            return this_pos

        def _compute_next_state(self, state, action):
            voltage = self.voltage_mag if action == 1 else -self.voltage_mag
            
            voltage *= 14/15 # scale voltage down a little to avoid too wild of an action
            
            v = self.last_voltage
            this_pos = self._interact(voltage)
            
            state = (this_pos, this_pos-state[0], v, self.last_action)
            self.last_action = action
            
            return state
        
        """
        Execute one time step within an environment.
        
        """
        def step(self, action):
            assert self.action_space.contains(action), "%r (%s) invalid" % (
                action,
                type(action),
            )
            self.state = self._compute_next_state(self.state, action)
            pos, vel, _, _ = self.state

            self.episode_step += 1

            # Forbidden States (S-)
            if (
                pos < -self.pos_threshold
                or pos > self.pos_threshold
            ):
                done = True
                cost = 1
            # Goal States (S+)
            elif (
                -self.pos_success_range < pos < self.pos_success_range
                and -0.01 < vel < 0.01
            ):
                done = False
                cost = 0
            else:
                done = False
                cost = self.c_trans

            # Check for time limit
            info = {"time_limit": self.episode_step >= self.max_steps}

            return np.array(self.state), cost, done, info

        """
        Reset the state of the environment so that we get an initial state
        """
        def reset(self):
            self.last_action = 0

            self._interact(0, reset=True)
            time.sleep(0.5)
            self._interact(0, reset=True)
            pos = ppos = self._interact(0)
            
            # rotate wheel to random initial position
            goal = ((2*random.random())-1)*0.5*self.pos_threshold
            
            if pos > goal:
                while pos > goal+0.1:
                    pos = self._interact(-0.7)
            else:
                while pos < goal-0.1:
                    pos = self._interact(0.7)

            self._interact(0, reset=True)
            time.sleep(0.5)
            pos = self._interact(0, reset=True)
            print("goal:", ppos, "->", goal, "~", pos)

            self.episode_step = 0
            
            self.state = (pos, 0, 0, 0)

            return np.array(self.state)

        # No virtual environment to render
        def render(self, mode="human"):
            pass

        def close(self):
            self.ser.close()

        def get_goal_pattern_set(self, size: int = 200):
            """Use hint-to-goal heuristic to clamp network output.

            Parameters
            ----------
            size : int
                The size of the goal pattern set to generate.

            Returns
            -------
            pattern_set : tuple of np.ndarray
                Pattern set to train the NFQ network.

            """
            goal_state_action_b = [
                np.array(
                    [
                        # NOTE(seungjaeryanlee): The success state in hint-to-goal is not relaxed.
                        # TODO(seungjaeryanlee): What is goal velocity?
                        np.random.uniform(-self.pos_success_range, self.pos_success_range),
                        0,#np.random.uniform(-0.02, 0.02),
                        0,
                        np.random.randint(2),
                        np.random.randint(2),
                    ]
                )
                for _ in range(size)
            ]
            goal_target_q_values = np.zeros(size)

            return goal_state_action_b, goal_target_q_values

        def generate_rollout(
            self, get_best_action: Callable = None, render: bool = False
        ) -> List[Tuple[np.array, int, int, np.array, bool]]:
            """
            Generate rollout using given action selection function.

            If a network is not given, generate random rollout instead.

            Parameters
            ----------
            get_best_action : Callable
                Greedy policy.
            render: bool
                If true, render environment.

            Returns
            -------
            rollout : List of Tuple
                Generated rollout.
            episode_cost : float
                Cumulative cost throughout the episode.

            """
            rollout = []
            episode_cost = 0
            obs = self.reset()
            done = False
            info = {"time_limit": False}
            while not done and not info["time_limit"]:
                if get_best_action:
                    action = get_best_action(obs)
                else:
                    action = self.action_space.sample()

                next_obs, cost, done, info = self.step(action)
                rollout.append((obs, action, cost, next_obs, done))
                episode_cost += cost
                obs = next_obs

                if render:
                    self.render()

            return rollout, episode_cost

    return SteerboxEnv

SteerboxEnv = import_SteerboxEnv()

In [None]:
def import_NFQAgent():
    """Reinforcement learning agents."""
    from typing import List, Tuple

    import gym
    import numpy as np
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim


    class NFQAgent:
        def __init__(self, nfq_net: nn.Module, optimizer: optim.Optimizer):
            """
            Neural Fitted Q-Iteration agent.

            Parameters
            ----------
            nfq_net : nn.Module
                The Q-Network that returns estimated cost given observation and action.
            optimizer : optim.Optimzer
                Optimizer for training the NFQ network.

            """
            self._nfq_net = nfq_net
            self._optimizer = optimizer

        def get_best_action(self, obs: np.array) -> int:
            """
            Return best action for given observation according to the neural network.

            Parameters
            ----------
            obs : np.array
                An observation to find the best action for.

            Returns
            -------
            action : int
                The action chosen by greedy selection.

            """
            #import time
            #s = time.monotonic()
            q_left = self._nfq_net(
                torch.cat([torch.FloatTensor(obs), torch.FloatTensor([0])], dim=0)
            )
            q_right = self._nfq_net(
                torch.cat([torch.FloatTensor(obs), torch.FloatTensor([1])], dim=0)
            )
            
            #print((time.monotonic()-s)*1000)

            # Best action has lower "Q" value since it estimates cumulative cost.
            return 1 if q_left >= q_right else 0

        def generate_pattern_set(
            self,
            rollouts: List[Tuple[np.array, int, int, np.array, bool]],
            gamma: float = 0.95,
        ):
            """Generate pattern set.

            Parameters
            ----------
            rollouts : list of tuple
                Generated rollouts, which is a tuple of state, action, cost, next state, and done.
            gamma : float
                Discount factor. Defaults to 0.95.

            Returns
            -------
            pattern_set : tuple of torch.Tensor
                Pattern set to train the NFQ network.

            """
            # _b denotes batch
            state_b, action_b, cost_b, next_state_b, done_b = zip(*rollouts)
            
            state_b = torch.FloatTensor(state_b)
            action_b = torch.FloatTensor(action_b)
            
            cost_b = torch.FloatTensor(cost_b)
            next_state_b = torch.FloatTensor(next_state_b)
            done_b = torch.FloatTensor(done_b)

            state_action_b = torch.cat([state_b, action_b.unsqueeze(1)], 1)
            assert state_action_b.shape == (len(rollouts), state_b.shape[1] + 1)

            # Compute min_a Q(s', a)
            q_next_state_left_b = self._nfq_net(
                torch.cat([next_state_b, torch.zeros(len(rollouts), 1)], 1)
            ).squeeze()
            q_next_state_right_b = self._nfq_net(
                torch.cat([next_state_b, torch.ones(len(rollouts), 1)], 1)
            ).squeeze()
            q_next_state_b = torch.min(q_next_state_left_b, q_next_state_right_b)

            # If goal state (S+): target = 0 + gamma * min Q
            # If forbidden state (S-): target = 1
            # If neither: target = c_trans + gamma * min Q
            # NOTE(seungjaeryanlee): done is True only when the episode terminated
            #                        due to entering forbidden state. It is not
            #                        True if it terminated due to maximum timestep.
            with torch.no_grad():
                target_q_values = cost_b + gamma * q_next_state_b * (1 - done_b)

            return state_action_b, target_q_values

        def train(self, pattern_set: Tuple[torch.Tensor, torch.Tensor]) -> float:
            """Train neural network with a given pattern set.

            Parameters
            ----------
            pattern_set : tuple of torch.Tensor
                Pattern set to train the NFQ network.

            Returns
            -------
            loss : float
                Training loss.

            """
            state_action_b, target_q_values = pattern_set
            for _ in range(300):
                predicted_q_values = self._nfq_net(state_action_b).squeeze()
                loss = F.mse_loss(predicted_q_values, target_q_values)

                self._optimizer.zero_grad()
                loss.backward()
                self._optimizer.step()

            return loss.item()

        def evaluate(self, eval_env: gym.Env, render: bool) -> Tuple[int, str, float]:
            """Evaluate NFQ agent on evaluation environment.

            Parameters
            ----------
            eval_env : gym.Env
                Environment to evaluate the agent.
            render: bool
                If true, render environment.

            Returns
            -------
            episode_length : int
                Number of steps the agent took.
            success : bool
                True if the agent was terminated due to max timestep.
            episode_cost : float
                Total cost accumulated from the evaluation episode.

            """
            episode_length = 0
            obs = eval_env.reset()
            done = False
            info = {"time_limit": False}
            episode_cost = 0
            while not done and not info["time_limit"]:
                action = self.get_best_action(obs)
                obs, cost, done, info = eval_env.step(action)
                episode_cost += cost
                episode_length += 1

                if render:
                    eval_env.render()

            success = (
                episode_length == eval_env.max_steps
                and abs(obs[0]) <= eval_env.pos_success_range
                and abs(obs[1]) <= 0.01
            )

            return episode_length, success, episode_cost

    return NFQAgent

NFQAgent = import_NFQAgent()

In [None]:
#"""Networks for NFQ."""
"""
def import_NFQNetwork():
    
    import torch
    import torch.nn as nn


    class NFQNetwork(nn.Module):
        def __init__(self):
            
            """Networks for NFQ."""
            super().__init__()
            
            self.layers = nn.Sequential(
                
                nn.Linear(5, 5),
                nn.Sigmoid(),
                nn.Linear(5, 5),
                nn.Sigmoid(),
                nn.Linear(5, 1),
                nn.Sigmoid(),
            )

            # Initialize weights to [-0.5, 0.5]
            def init_weights(m):
                if type(m) == nn.Linear:
                    torch.nn.init.uniform_(m.weight, -0.5, 0.5)
                    # TODO(seungjaeryanlee): What about bias?

            self.layers.apply(init_weights)

        def forward(self, x: torch.Tensor) -> torch.Tensor:
            
            return self.layers(x)
    
    return NFQNetwork

NFQNetwork = import_NFQNetwork()
"""

"""
Forward propagation.

Parameters
----------
x : torch.Tensor
    Input tensor of observation and action concatenated.

Returns
-------
y : torch.Tensor
    Forward-propagated observation predicting Q-value.

"""

In [None]:
import torch
import torch.optim as optim
import random
import numpy as np

EPOCH = 500
TRAIN_ENV_MAX_STEPS = 100
EVAL_ENV_MAX_STEPS = 3000
DISCOUNT = 0.95
INIT_EXPERIENCE = 1
RANDOM_SEED = 4 # does not actually matter...

INCREMENT_EXPERIENCE = True
HINT_TO_GOAL = True

env = SteerboxEnv(mode="train")

if RANDOM_SEED is not None: # does not actually matter...
    random.seed(RANDOM_SEED ^ 0xdf9b89026423c)
    s = random.randrange(2**32)
    np.random.seed(s)
    torch.manual_seed(s)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    #train_env.seed(s)
    #eval_env.seed(s)

# Setup agent
nfq_net = NFQNetwork()
optimizer = optim.Rprop(nfq_net.parameters())
nfq_agent = NFQAgent(nfq_net, optimizer)

# NFQ Main loop
# A set of transition samples denoted as D
all_rollouts = []
total_cost = 0
if INIT_EXPERIENCE:
    for _ in range(INIT_EXPERIENCE):
        rollout, episode_cost = env.generate_rollout(
            None, render=False
        )
        all_rollouts.extend(rollout)
        total_cost += episode_cost
for epoch in range(EPOCH + 1):
    # Variant 1: Incermentally add transitions (Section 3.4)
    # TODO(seungjaeryanlee): Done before or after training?
    
    env.set_mode("train")

    state_action_b, target_q_values = nfq_agent.generate_pattern_set(all_rollouts)

    # Variant 2: Clamp function to zero in goal region
    # TODO(seungjaeryanlee): Since this is a regulator setting, should it
    #                        not be clamped to zero?
    if HINT_TO_GOAL:
        goal_state_action_b, goal_target_q_values = env.get_goal_pattern_set()
        goal_state_action_b = torch.FloatTensor(goal_state_action_b)
        goal_target_q_values = torch.FloatTensor(goal_target_q_values)
        state_action_b = torch.cat([state_action_b, goal_state_action_b], dim=0)
        target_q_values = torch.cat([target_q_values, goal_target_q_values], dim=0)

    nfq_net = NFQNetwork()
    optimizer = optim.Rprop(nfq_net.parameters())
    nfq_agent = NFQAgent(nfq_net, optimizer)

    env.reset()
    loss = nfq_agent.train((state_action_b, target_q_values))

    if INCREMENT_EXPERIENCE:
        new_rollout, episode_cost = env.generate_rollout(
            nfq_agent.get_best_action, render=False
        )
        all_rollouts.extend(new_rollout)
        total_cost += episode_cost
    
    env.set_mode("eval")

    # TODO(seungjaeryanlee): Evaluation should be done with 3000 episodes
    num_evals = 0
    while num_evals < 50:
        eval_episode_length, eval_success, eval_episode_cost = nfq_agent.evaluate(
            env, False
        )
        if not eval_success: break
        print(eval_episode_cost)
        num_evals += 1

    if INCREMENT_EXPERIENCE:
        print(
            "Epoch {:4d} | Train {:3d} / {:4.2f} | Eval {:4d} / {:5.2f} | Train Loss {:.4f}".format(  # noqa: B950
                epoch,
                len(new_rollout),
                episode_cost,
                eval_episode_length,
                eval_episode_cost,
                loss,
            )
        )
    else:
        print(
            "Epoch {:4d} | Eval {:4d} / {:5.2f} | Train Loss {:.4f}".format(
                epoch, eval_episode_length, eval_episode_cost, loss
            )
        )

    if num_evals > 0:
        print(
            "Epoch {:4d} | Total Cycles {:6d} | Total Cost {:4.2f} | Times {}".format(
                epoch, len(all_rollouts), total_cost, num_evals
            )
        )
        if num_evals == 5: break

#env.close()