TODO:
 * Math is wrong when moving negative angles (-90 gives 111 deg? WTF?)
 * Define rest of reset()
 * Define rest of step()
 * Create reward function (for reset() and step())
 * Test environment
 * Set up experiments (WandB?)
 * Train!

In [None]:
# !python -m pip install gymnasium==0.28.1
# !python -m pip install stable-baselines3[extra]==2.1.0

In [1]:
import time

import gymnasium as gym
import stable_baselines3 as sb3
import matplotlib.pyplot as plt
import numpy as np
from control_comms import ControlComms, StatusCode, DebugLevel

# Check versions
print(f"gym version: {gym.__version__}")
print(f"sb3 version: {sb3.__version__}")

gym version: 0.28.1
sb3 version: 2.1.0


In [2]:
# Communication settings
SERIAL_PORT = "COM6"    # Check your devices
BAUD_RATE = 500000      # Must match what's in the Arduino code!
CTRL_TIMEOUT = 1.0      # Seconds
DEBUG_LEVEL = DebugLevel.DEBUG_ERROR

# Reinforcement learning settings
STP_MOVE_MIN = -10.0
STP_MOVE_MAX = 10.0
STP_ANGLE_MIN = -360.0
STP_ANGLE_MAX = 360.0
ENV_TIMEOUT = 10.0

In [3]:
# Communication constants
CMD_SET_HOME = 0        # Set current stepper position as home (0 deg)
CMD_MOVE_TO = 1         # Move stepper to a particular position (deg)
CMD_MOVE_BY = 2         # Move stepper by a given amount (deg)
CMD_SET_STEP_MODE = 3   # Set step mode
CMD_SET_BLOCK_MODE = 4  # Set blocking mode
CMD_NOP = 5             # Take no action, just receive observation
STEP_MODE_1 = 0         # 1 division per step
STEP_MODE_2 = 1         # 2 divisions per step
STEP_MODE_4 = 2         # 4 divisions per step
STEP_MODE_8 = 3         # 8 divisions per step
STEP_MODE_16 = 4        # 16 divisions per step
STATUS_OK = 0           # Stepper idle
STATUS_STP_MOVING = 1   # Stepper is currently moving

# Set to desired step mode
STEP_MODE = STEP_MODE_8

In [4]:
# Close connection to Arduino board (if open)
try:
    controller.close()
except:
    pass

In [5]:
# Connect to Arduino board
controller = ControlComms(timeout=CTRL_TIMEOUT, debug_level=DEBUG_LEVEL)
ret = controller.connect(SERIAL_PORT, BAUD_RATE)
if ret is not StatusCode.OK:
    print("ERROR: Could not connect to board")
    
# Set stepper mode and current position as "home"
controller.step(CMD_SET_STEP_MODE, [STEP_MODE_8])
controller.step(CMD_SET_HOME, [0])

CTRL TEST: b'{"status":0,"timestamp":222285,"terminated":false,"observation":[0.00,0.00]}\r\n'
CTRL TEST: b'{"status":0,"timestamp":222289,"terminated":false,"observation":[0.00,0.00]}\r\n'


(0, 222289, False, [0.0, 0.0])

In [46]:
# TEST
controller.step(CMD_MOVE_BY, [-90])

CTRL TEST: b'{"status":0,"timestamp":523794,"terminated":false,"observation":[4.80,0.00]}\r\n'


(0, 523794, False, [4.8, 0.0])

In [47]:
controller.step(CMD_NOP, [0])

CTRL TEST: b'{"status":0,"timestamp":526005,"terminated":false,"observation":[323.40,111.60]}\r\n'


(0, 526005, False, [323.4, 111.6])

## Build gym Environment

Subclass gymnasium.Env to create a custom environment. Learn more here:<br>
https://gymnasium.farama.org/tutorials/gymnasium_basics/environment_creation/

In [8]:
class Pendulum(gym.Env):
    """
    Subclass gymnasium Env class
    
    This is the gym wrapper class that allows our agent to interact with our environment. We need
    to implement four main methods: step(), reset(), render(), and close(). We should also define
    the action_space and observation space as class members.
    
    Note: on Windows, time.sleep() is only accurate to around 10ms. As a result, setting fps_limit
    will give you a "best effort" limit.
    
    More information: https://gymnasium.farama.org/api/env/
    """
    
    def __init__(self, ctrl, timeout=0.0, stp_blocking=False):
        """
        Set up the environment, action, and observation shapes. Optional tiemout in seconds.
        """
        
        # Call superclass's constructor
        super().__init__()
        
        # Assign control comms object
        self.ctrl = ctrl
        
        # Define action space (scalar signifying how many degrees to move stepper by)
        self.action_space = gym.spaces.Box(
            low=STP_MOVE_MIN,
            high=STP_MOVE_MAX,
            shape=(1, 1),
            dtype=np.float32
        )
        
        # Define observation space 
        # [encoder angle, encoder angular velocity, stepper angle, stepper angular velocity]
        self.observation_space = gym.spaces.Box(
            low=np.array([-180, -np.inf, STP_ANGLE_MIN, -np.inf]),
            high=np.array([180, np.inf, STP_ANGLE_MAX, np.inf]),
            dtype=np.float32
        )
        
        # Record timesteps from microcontroller
        self.timestamp = 0
        
        # Set current stepper position as "home" and optionally set blocking
        controller.step(CMD_SET_HOME, [0])
        if stp_blocking:
            controller.step(CMD_SET_BLOCK_MODE, [1])
        else:
            controller.step(CMD_SET_BLOCK_MODE, [0])
        
    def step(self, action):
        """
        What happens when you tell the stepper motor to do something then record the observation.
        """
        
        # Move the stepper motor and wait for a response
        resp = self.ctrl.step(CMD_MOVE_BY, [action])
        if resp:
            status, timestamp, terminated, obs = resp
        else:
            print("ERROR: Could not communicate with Arduino")
        
        # TODO
        obs = self.observation_space.sample()
        reward = 0.0
        terminated = False
        truncated = False
        info = {}
        
        return obs, reward, terminated, truncated, info
    
    def reset(self):
        """
        Return the pendulum to the starting position
        """
        
        # Let the pendulum fall and return to the starting position
        time.sleep(1.0)
        resp = self.ctrl.step(CMD_MOVE_TO, [0.0])
        time.sleep(1.0)
        
        # TODO
        obs = self.observation_space.sample()
        info = {}
        
        return obs, info

## Test gym Environment

Test the gym wrapper before training

In [15]:
# Create our environment
try:
    env.close()
except NameError:
    pass
env = Pendulum(controller, stp_blocking=True, timeout=ENV_TIMEOUT)

CTRL TEST: b'{"status":0,"timestamp":268541,"terminated":false,"observation":[344.40,0.00]}\r\n'
CTRL TEST: b'{"status":0,"timestamp":268545,"terminated":false,"observation":[344.40,0.00]}\r\n'


In [25]:
# Try running the environment for a few steps
env.reset()
for _ in range(10):
    obs, reward, terminated, truncated, info = env.step(-9)

CTRL TEST: b'{"status":0,"timestamp":400791,"terminated":false,"observation":[20.40,0.00]}\r\n'
CTRL TEST: b'{"status":0,"timestamp":401849,"terminated":false,"observation":[7.80,0.00]}\r\n'
CTRL TEST: b'{"status":0,"timestamp":401890,"terminated":false,"observation":[10.80,192.60]}\r\n'
CTRL TEST: b'{"status":0,"timestamp":401934,"terminated":false,"observation":[12.90,183.60]}\r\n'
CTRL TEST: b'{"status":0,"timestamp":401980,"terminated":false,"observation":[12.90,174.60]}\r\n'
CTRL TEST: b'{"status":0,"timestamp":402020,"terminated":false,"observation":[13.80,165.60]}\r\n'
CTRL TEST: b'{"status":0,"timestamp":402065,"terminated":false,"observation":[13.50,156.60]}\r\n'
CTRL TEST: b'{"status":0,"timestamp":402110,"terminated":false,"observation":[12.30,147.60]}\r\n'
CTRL TEST: b'{"status":0,"timestamp":402154,"terminated":false,"observation":[10.80,138.60]}\r\n'
CTRL TEST: b'{"status":0,"timestamp":402198,"terminated":false,"observation":[9.30,129.60]}\r\n'
CTRL TEST: b'{"status":0,"