In [None]:
import preamble
from virtual_dynamics import SimpleVirtualDynamics
from robot import Robot
from app_loop import AppLoop
from vocalizer import Vocalizer
from plot_client import PlotClient
import numpy as np
from numpy.typing import NDArray
import pandas as pd

In [None]:
class RobotApp(AppLoop):
    MASS = 10.0
    DAMPENING = 20.0
    AXES = Robot.TRANSLATION

    def __init__(self, use_plotter: bool = False):
        super().__init__()
        self.use_plotter = use_plotter

    def startup(self) -> None:
        self.robot = Robot("169.254.9.43",
                           translational_force_deadband=6.0,
                           rotational_force_deadband=0.5)
        self.robot.control.zeroFtSensor()
        self.p0 = self.robot.INIT_POSE[:3]

        self.dynamics = SimpleVirtualDynamics(M=self.MASS, B=self.DAMPENING, K=0)

        self.path, self.v_path = self.load_path('../data/reference_path.csv')
        self.forward_state = True

        self.vocalizer = Vocalizer()

        self.P = 0.5
        self.V = 0.5
        self.v_modded_x_err_integral = 0.0
        self.p_modded_x_err_integral = 0.0

        self.v_signal = 0.0
        self.p_signal = 0.0

        self.v_ema = 0.0
        self.p_ema = 0.0
        
        self.lambda_V = 0.1
        self.lambda_P = 0.1

        self.prev_x_err = 0.0

        self.verbal_sample_interval = 1.0
        self.last_verbal_sample_time = -100.0
        self.is_speaking = True
        self.speaking_start_t = -100.0
        self.speaking_end_t = -100.0
        self.speaking_end_x_err = 0.0

        self.historical_dx_err = []

        if self.use_plotter:
            self.plotter = PlotClient()
            self.plotter.create_plot("v_ema", title="$v_ema$", xlabel="Time (s)", ylabel="v_ema")
            self.plotter.create_line("v_ema", "v_ema", plot_style='-')
            self.plotter.create_plot("p_ema", title="$p_ema$", xlabel="Time (s)", ylabel="p_ema")
            self.plotter.create_line("p_ema", "p_ema", plot_style='-')
            self.plotter.create_plot("V", title="$V$", xlabel="Time (s)", ylabel="Verbal Compliance")
            self.plotter.create_line("V", "V", plot_style='-')
            self.plotter.create_plot("P", title="$P$", xlabel="Time (s)", ylabel="Physical Compliance")
            self.plotter.create_line("P", "P", plot_style='-')

    def generate_path(self, start_point: NDArray, end_point: NDArray, N: int) -> NDArray:
        length = np.linalg.norm(end_point - start_point)
        direction = (end_point - start_point) / length
        steps = np.linspace(0.0, length, N)
        return np.array([start_point + direction * step for step in steps] + [end_point - direction * step for step in steps])

    def load_path(self, file_path: str) -> NDArray:
        data = pd.read_csv(file_path)

        path = np.stack([data['p_x'].values, data['p_y'].values, data['p_z'].values], axis=1)
        path += self.p0 - path[0]

        v_path = np.stack([data['v_x'].values, data['v_y'].values, data['v_z'].values], axis=1)

        return path, v_path
    
    def update_path(self) -> None:
        p = self.robot.get_pose(self.AXES)

        if p[0] <= self.path[-1, 0] if self.forward_state else p[0] >= self.path[0, 0]:
            self.forward_state = not self.forward_state
            self.v_path *= -1.0

    def get_target(self, p: NDArray) -> NDArray:
        distances = np.linalg.norm(self.path - p, axis=1)
        index = np.argmin(distances)
        return self.path[index], self.v_path[index]

    def update(self, t: float, dt: float) -> None:
        period_start = self.robot.control.initPeriod()
        self.update_path()

        c_p = self.P + self.V
        c_v = 1.0 - self.V
        k = 400.0
        b = 200.0

        p = self.robot.get_pose(self.AXES)
        v = self.robot.get_velocity(self.AXES)
        F_h = self.robot.get_force(self.AXES)
        p_ref, v_ref = self.get_target(p)

        F_total = -k * (p - p_ref) - b * (v - v_ref)
        F_r = c_v / (c_p + c_v) * F_total
        F_v = c_p / (c_p + c_v) * F_total
        
        x_err = np.linalg.norm(F_h - F_v)

        if t - self.last_verbal_sample_time >= self.verbal_sample_interval:
            if np.random.rand() < max(0.25, self.V) and np.linalg.norm(F_v) > 5.0:
            # if np.random.rand() < self.V and np.linalg.norm(F_v) > 5.0:
                word_table = [
                    ['left', 'right'],
                    ['backward', 'forward'],
                    ['down', 'up']
                ]
                axis = np.argmax(np.abs(F_v))
                sign = 0 if F_v[axis] < 0 else 1
                word = word_table[axis][sign]
                self.vocalizer.utter(word, interupt=True)
                self.speaking_start_t = t
                self.is_speaking = True

            self.last_verbal_sample_time = t

        if self.is_speaking and t - self.speaking_start_t >= 0.5:
            self.speaking_end_t = t
            self.is_speaking = False
            self.speaking_end_x_err = x_err

        self.v_modded_x_err_integral += (0.1 * x_err + 0.5 * (x_err - self.speaking_end_x_err)) * np.exp2(-1.0 * (t - self.speaking_end_t)) * dt
        self.v_modded_x_err_integral = np.clip(self.v_modded_x_err_integral, -20.0, 20.0)
        self.V = 0.5 * np.tanh(-0.1 * self.v_modded_x_err_integral) + 0.5

        self.p_modded_x_err_integral += (0.001 * x_err + 0.015 * ((x_err - self.prev_x_err) / dt)) * np.linalg.norm(F_r) * dt
        self.p_modded_x_err_integral = np.clip(self.p_modded_x_err_integral, -20.0, 20.0)
        self.prev_x_err = x_err
        self.P = 0.5 * np.tanh(-0.1 * self.p_modded_x_err_integral) + 0.5

        # self.v_signal = (0.4 * x_err + 1.0 * (x_err - self.speaking_end_x_err)) * np.exp2(-1.0 * (t - self.speaking_end_t))

        # self.p_signal = (0.01 * x_err + 0.03 * ((x_err - self.prev_x_err) / dt)) * np.linalg.norm(F_r)
        # self.prev_x_err = x_err

        # decay_factor_V = np.exp2(-self.lambda_V * dt)
        # alpha_dynamic_V = 1 - decay_factor_V

        # decay_factor_P = np.exp2(-self.lambda_P * dt)
        # alpha_dynamic_P = 1 - decay_factor_P

        # self.v_ema = (1 - alpha_dynamic_V) * self.v_ema + alpha_dynamic_V * self.v_signal
        # self.V = 0.5 * np.tanh(-0.1 * self.v_ema) + 0.5

        # self.p_ema = (1 - alpha_dynamic_P) * self.p_ema + alpha_dynamic_P * self.p_signal
        # self.P = 0.5 * np.tanh(-0.1 * self.p_ema) + 0.5

        self.dynamics.apply_force(F_h + F_r, dt)
        self.robot.set_velocity(self.dynamics.get_velocity(),
                                Robot.TRANSLATION, acceleration=10)

        self.robot.control.waitPeriod(period_start)

        if self.use_plotter:
            self.plotter.update_line("v_ema", "v_ema", (t, self.v_modded_x_err_integral), mode='append')
            self.plotter.config_plot("v_ema", xlim=(t - 10.0, t))
            self.plotter.update_line("p_ema", "p_ema", (t, self.p_modded_x_err_integral), mode='append')
            self.plotter.config_plot("p_ema", xlim=(t - 10.0, t))
            self.plotter.update_line("V", "V", (t, self.V), mode='append')
            self.plotter.config_plot("V", xlim=(t - 10.0, t), ylim=(0.0, 1.0))
            self.plotter.update_line("P", "P", (t, self.P), mode='append')
            self.plotter.config_plot("P", xlim=(t - 10.0, t), ylim=(0.0, 1.0))

    def shutdown(self) -> None:
        self.robot.set_velocity(Robot.zeroed_translation_rotation())
        if self.use_plotter:
            self.plotter.close()

In [None]:
app = RobotApp(use_plotter=True)
app.run()