In [None]:
from Go2Py.sim.mujoco import Go2Sim
import numpy as np

In [None]:
robot = Go2Sim()
robot.sitDownReset()

In [None]:
import threading
from pynput import keyboard
class BaseRemote:
    def __init__(self):
        pass
    def startSeq(self):
        return False
    def standUpDownSeq(self):
        return False

    def flushStates(self):
        pass

remote = BaseRemote()

class KeyboardRemote(BaseRemote):
    def __init__(self):
        super().__init__()
        self.start_seq_flag = False
        self.stand_up_down_seq_flag = False
        self.listener_thread = threading.Thread(target=self._listen_to_keyboard)
        self.listener_thread.daemon = True
        self.listener_thread.start()

    def _on_press(self, key):
        try:
            if key.char == 's':  # Start sequence
                self.start_seq_flag = True
            elif key.char == 'u':  # Stand up/down sequence
                self.stand_up_down_seq_flag = True
        except AttributeError:
            pass  # Special keys (like space) will be handled here

    
    def _on_release(self, key):
        try:
            if key.char == 's':  # Start sequence
                self.start_seq_flag = False
            elif key.char == 'u':  # Stand up/down sequence
                self.stand_up_down_seq_flag = False
        except AttributeError:
            pass  # Special keys (like space) will be handled here

    def _listen_to_keyboard(self):
        with keyboard.Listener(on_press=self._on_press, on_release=self._on_release) as listener:
            listener.join()

    def startSeq(self):
        if self.start_seq_flag:
            self.start_seq_flag = False
            return True
        return False

    def standUpDownSeq(self):
        if self.stand_up_down_seq_flag:
            self.stand_up_down_seq_flag = False
            return True
        return False

    def flushStates(self):
        self.stand_up_down_seq_flag = False
        self.start_seq_flag = False

In [None]:
remote = KeyboardRemote()

In [None]:
import time
class SafetyHypervisor():
    def __init__(self, robot):
        self.robot = robot

    def unsafe(self):
        return False
    
    def controlTimeout(self):
        if time.time() - robot.latest_command_stamp > 0.1:
            print('controller timeout')
            return True
        else:
            return False 

safety = SafetyHypervisor(robot)

In [None]:
import threading
import numpy as np
import time
from enum import Enum


class FSM:
    def __init__(self, robot, remote, safety_hypervisor, user_controller_callback=None):
        self.robot = robot
        self.remote = remote
        self.remote.flushStates()
        self.safety = safety_hypervisor
        self.user_controller_callback = user_controller_callback

        self.state = "damping"
        self.tracking_kp = np.array(4*[150, 150, 150.]).reshape(12)
        self.tracking_kv = np.array(12*[3.])
        self.damping_kv = np.array(12*[2.])


        self.tracking_complete = True
        self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))
        self.fsm_dT = 1./50.
        self.control_dT = 1./400.
        self.dT = self.fsm_dT

        self.modes = {"tracking":self.trackingControlUpdate,
                      "damping" :self.dampingControlUpdate,
                      "user": self.userControlUpdate,
                     }
        self.setMode("damping")

        self.running = True
        self.fsm_thread = threading.Thread(target = self.update)
        self.fsm_thread.start()
        # if the robot is a simulation, create a thread for stepping it
        if robot.simulated:
            self.sim_thread = threading.Thread(target=self.simUpdate)
            self.sim_thread.start()

    def setMode(self, mode):
        assert mode in self.modes.keys(), 'the requested control update mode is not implemented'
        self.updateCommands = self.modes[mode]
        # print(f'setting mode to {mode}')
        
    def moveTo(self, target, duration=0.5):
        # assert self.tracking_complete, 'The previous moveTo command is not completed yet!'
        self.q_start = self.robot.getJointStates()['q']
        self.q_target = target
        self.time = 0.0
        self.move_duration = duration
        self.q_des = lambda t: [self.q_start + np.clip((t)/self.move_duration,0, 1)*(self.q_target - self.q_start), \
            True if np.clip((t)/self.move_duration,0, 1)==1 else False]  # q_des(t), Movement finished
        self.tracking_complete = False
        self.setMode("tracking")

    def trackingControlUpdate(self):
        self.time +=self.dT
        q_des, done = self.q_des(self.time)
        self.robot.setCommands(q_des, np.zeros(12), self.tracking_kp, self.tracking_kv, np.zeros(12))
        self.tracking_complete = done

    def dampingControlUpdate(self):
        self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))

    def userControlUpdate(self):
        if self.user_controller_callback is not None:
            user_controller_callback(robot, remote)

    def simUpdate(self):
        while self.running:
            self.robot.step()
            time.sleep(robot.dt)
    
    def update(self):
        while self.running:
            getattr(self, self.state)()
            time.sleep(self.dT)
            self.updateCommands()
    
    def close(self):
        self.running = False

    # The following functions each are the states of the FSM
    def damping(self):
        # print('damping')
        if self.remote.standUpDownSeq():
            self.moveTo(robot.prestanding_q, 1)
            self.state = "pre_standing"
        else:
            self.setMode("damping")
            self.state = "damping"

    def pre_standing(self):
        # print("pre_stance")
        if self.tracking_complete:
            self.moveTo(robot.standing_q, duration=1.5)
            self.state = 'standing'
        else:
            self.state = "pre_standing"
            
    def standing(self):
        # print("standing")
        if self.tracking_complete:
            # self.moveTo(robot.standing_q, duration=1)
            self.state = 'locked_stance'
        else:
            self.state = "standing"

    def locked_stance(self):
        # print("locked_stance")
        if self.remote.startSeq():
            self.setMode("user")
            self.dT = self.control_dT
            self.state = "user_loop"
            self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12))
        elif self.remote.standUpDownSeq() or robot.overheat():
            self.moveTo(robot.sitting_q, duration = 1.5)
            self.state = "sitting"

    def user_loop(self):
        # print("user_loop")
        if self.safety.unsafe():
            self.dT = self.fsm_dT
            self.setMode("damping")
        elif self.remote.standUpDownSeq() or self.safety.controlTimeout():
            self.dT = self.fsm_dT
            self.moveTo(robot.standing_q, duration = 1)
            self.timer = time.time()
            self.state = "switch_back_to_locked_stance"
        else:
            self.state = "user_loop"

    def sitting(self):
        # print('sitting')
        if self.tracking_complete:
            self.setMode("damping")
            self.state = 'damping'
        else:
            self.state = "sitting"

    def switch_back_to_locked_stance(self):
        if time.time()-self.timer > 0.5:
            # print("going back to locked stance")
            self.state = "locked_stance"

In [None]:
fsm = FSM(robot, remote, safety)

In [None]:
fsm.moveTo(robot.sitting_q, 1)

In [None]:
fsm.moveTo(robot.standing_q, 2)

In [None]:
fsm.setMode("damping")

In [None]:
fsm.close()
robot.sitDownReset()
