In [1]:
from rclpy.time import Time
from pathlib import Path
from rosbags.rosbag2 import Reader
from rosbags.typesys import Stores, get_typestore, get_types_from_msg
import numpy as np
import matplotlib.pyplot as plt
from builtin_interfaces.msg import Time as Time_msg
from matplotlib.patches import Circle, Rectangle
import ff_control.ff_control.transformer_controller.ff_scenario as ff
%matplotlib ipympl

typestore = get_typestore(Stores.ROS2_HUMBLE)
add_types = {}
add_types.update(get_types_from_msg(Path('ff_msgs/msg/Pose2D.msg').read_text(), 'ff_msgs/msg/Pose2D'))
add_types.update(get_types_from_msg(Path('ff_msgs/msg/Twist2D.msg').read_text(), 'ff_msgs/msg/Twist2D'))
add_types.update(get_types_from_msg(Path('ff_msgs/msg/FreeFlyerState.msg').read_text(), 'ff_msgs/msg/FreeFlyerState'))
add_types.update(get_types_from_msg(Path('ff_msgs/msg/FreeFlyerStateStamped.msg').read_text(), 'ff_msgs/msg/FreeFlyerStateStamped'))
add_types.update(get_types_from_msg(Path('ff_msgs/msg/ThrusterCommand.msg').read_text(), 'ff_msgs/msg/ThrusterCommand'))
typestore.register(add_types)

def import_bag(bag_path):
    # create reader instance and open for reading
    with Reader(bag_path) as reader:
        # topic and msgtype information is available on .connections list
        for connection in reader.connections:
            print(connection.topic, connection.msgtype)

        # iterate over messages
        goal_msg = []
        state_msg = []
        action_msg = []
        for connection, timestamp, rawdata in reader.messages():
            msg = typestore.deserialize_cdr(rawdata, connection.msgtype)
            if connection.topic == '/robot/robot/goal':
                goal_msg.append(msg)
            elif connection.topic == '/robot/commands/binary_thrust':
                action_msg.append(msg)
            elif connection.topic == '/robot/est/state':
                state_msg.append(msg)

    def time2float(time:Time_msg) -> float:
        return time.sec + time.nanosec*1e-9
    
    # Import goal and compute initial time
    goal = {
        'time' : np.array([time2float(g.header.stamp) for g in goal_msg]),
        'x' : np.array([g.state.pose.x for g in goal_msg]),
        'y' : np.array([g.state.pose.y for g in goal_msg]),
        'psi' : np.array([g.state.pose.theta for g in goal_msg]),
        'vx' : np.array([g.state.twist.vx for g in goal_msg]),
        'vy' : np.array([g.state.twist.vy for g in goal_msg]),
        'wz' : np.array([g.state.twist.wz for g in goal_msg])
    }
    time_i_goal = goal['time'][0]
    goal['time'] = goal['time'] - time_i_goal

    # Import state and actions from the initial time onwards
    state_time = np.array([time2float(s.header.stamp) for s in state_msg])
    state = {
        'time' : state_time[state_time >= time_i_goal] - time_i_goal,
        'x' : np.array([s.state.pose.x for s in state_msg])[state_time >= time_i_goal],
        'y' : np.array([s.state.pose.y for s in state_msg])[state_time >= time_i_goal],
        'psi' : np.array([s.state.pose.theta for s in state_msg])[state_time >= time_i_goal],
        'vx' : np.array([s.state.twist.vx for s in state_msg])[state_time >= time_i_goal],
        'vy' : np.array([s.state.twist.vy for s in state_msg])[state_time >= time_i_goal],
        'wz' : np.array([s.state.twist.wz for s in state_msg])[state_time >= time_i_goal]
    }

    action_time = np.array([time2float(a.header.stamp) for a in action_msg])
    action = {
        'time' : action_time[action_time >= time_i_goal]  - time_i_goal,
        'switches' : (np.array([a.switches for a in action_msg]).T)[:, action_time >= time_i_goal]
    }

    return state, action, goal

In [None]:
'''state_TTOSCP, action_TTOSCP, goal_TTOSCP = import_bag(bag_path='./../../bags/open_loop/corridoio')
state_TTORHC, action_TTORHC, goal_TTORHC = import_bag(bag_path='./../../bags/closed_loop/corridoioTTOMPC')
state_CVXSCP, action_CVXSCP, goal_CVXSCP = import_bag(bag_path='./../../bags/open_loop/corridoioSCP2') # AVEVAMO MESSO r=0.23 m per evitare sbattesse???????????????????????????????????????????''
state_CVXRHC, action_CVXRHC, goal_CVXRHC = import_bag(bag_path='./../../bags/closed_loop/corridoioSCPMPC') '''
state_ARTSCP40, action_ARTSCP40, goal_ARTSCP40 = import_bag(bag_path='./../../bags/open_loop/tf40_art')
state_ARTSCP80, action_ARTSCP80, goal_ARTSCP80 = import_bag(bag_path='./../../bags/open_loop/tf80_art')
state_ARTSCP_OA, action_ARTSCP_OA, goal_ARTSCP_OA = import_bag(bag_path='./../../bags/open_loop/tf40_art_var_obs')

In [3]:
def ff_plot(state_list, goal_list, case_list, color_list):
    fig, ax = plt.subplots(figsize=(15,10))
    for state, goal, color, case in zip(state_list, goal_list, color_list, case_list):
        ax.plot(state['x'], state['y'], color[0], linewidth=3,  label='state_'+case, zorder=3)
        ax.plot(goal['x'], goal['y'], color[1], linewidth=3, label='goal_'+case, zorder=3)
    ax.add_patch(Rectangle((0,0), ff.table['xy_up'][0], ff.table['xy_up'][1], fc=(0.5,0.5,0.5,0.2), ec='k', label='table', zorder=2.5))
    for n_obs in range(ff.obs_nominal['radius'].shape[0]):
        label_obs = 'obs' if n_obs == 0 else None
        label_robot = 'robot radius' if n_obs == 0 else None
        ax.add_patch(Circle(ff.obs_nominal['position'][n_obs,:], ff.obs_nominal['radius'][n_obs], fc='r', label=label_obs, zorder=2.5))
        ax.add_patch(Circle(ff.obs_nominal['position'][n_obs,:], ff.obs_nominal['radius'][n_obs]+ff.robot_radius, fc='r', alpha=0.2, label=label_robot, zorder=2.5))
    ax.grid(True)
    ax.set_aspect('equal')
    ax.set_xlabel('X [m]', fontsize=10)
    ax.set_ylabel('Y [m]', fontsize=10)
    ax.grid(True)
    ax.legend(loc='best', fontsize=10)
    plt.show()

In [13]:
def ff_plot_paper(state_list, goal_list, case_list, color_list, to_plot_list=[True,False], obs=ff.obs_nominal, final_time_list=None, savefig=None):
    fig, ax = plt.subplots(figsize=(7,6))
    box = ax.get_position()
    ax.set_position([box.x0, box.y0 + box.height * 0., box.width, box.height * 0.85])
    for n, (state, goal, color, case) in enumerate(zip(state_list, goal_list, color_list, case_list)):
        if to_plot_list[1] and ('MPC' not in case):
            label = case if final_time_list is None else (case + ' ($T_f$=' + final_time_list[n] +'s)')
            ax.plot(goal['x'], goal['y'], color[1], linewidth=2, label=label, zorder=3)
        if to_plot_list[0]:
            label = (case+'+PID') if final_time_list is None else (case + '+PID' + ' ($T_f$=' + final_time_list[n] + 's)')#if ('SCP' in case) else case
            ax.plot(state['x'], state['y'], color[0], linewidth=2,  label=label, zorder=3)
    ax.add_patch(Rectangle((0,0), ff.table['xy_up'][0], ff.table['xy_up'][1], fc=(0.5,0.5,0.5,0.2), ec='k', label='table', zorder=2.5))
    for n_obs in range(obs['radius'].shape[0]):
        label_obs = None# 'obstacles' if n_obs == 0 else None
        label_robot = 'keep-out-zones' if n_obs == 0 else None
        ax.add_patch(Circle(obs['position'][n_obs,:], obs['radius'][n_obs], fc='r', alpha=0.7, label=label_obs, zorder=2.5))
        ax.add_patch(Circle(obs['position'][n_obs,:], obs['radius'][n_obs]+ff.robot_radius, fc='r', alpha=0.2, label=label_robot, zorder=2.5))
    ax.set_xlabel('X [m]', fontsize=15)
    ax.set_ylabel('Y [m]', fontsize=15)
    '''ax.set_xticks([-1.8, -1.5, -1.2, -0.9, -0.6, -0.3, 0, 0.3])
    ax.set_yticks([-0.4, 0, 0.4])'''
    #ax.set_xlim([-0.5,4])
    ax.grid(True)
    ax.set_aspect('equal')
    ax.legend(loc='upper left', bbox_to_anchor=(-0.04, 1+2*0.13), fontsize=14, ncols=3)
    #ax.legend(loc='lower right', fontsize=12, ncols=1)
    ax.set_facecolor(np.array([250,250,250])/255)
    plt.show()
    if savefig is not None:
        plt.savefig(savefig + '.svg')

In [None]:
'''ff_plot_paper([state_CVXSCP, state_CVXRHC, state_TTOSCP, state_TTORHC], [goal_CVXSCP, goal_CVXRHC, goal_TTOSCP, goal_TTORHC], ['REL','REL-MPC','FT-TTO','FT-TTO-MPC'], [['b--','b:'],['b','none'],['c--','c:'],['c','none']], [True,True])'''
ff_plot_paper([state_ARTSCP40, state_ARTSCP80], [goal_ARTSCP40, goal_ARTSCP80], ['SCP ART','SCP ART'], [['b','b:'],['c','c:']], [True,True], final_time_list=['40','80'], savefig='FT_hardware_trajectory')
from ff_control.transformer_controller.optimization.ff_scenario_new import obs17 as obs_OA
obs_OA['radius'][:] = 0.2
ff_plot_paper([state_ARTSCP_OA, state_ARTSCP_OA], [goal_ARTSCP_OA, goal_ARTSCP_OA], ['SCP ART'], [['b','b:']], [True,True], obs=obs_OA, savefig='OA_hardware_trajectory')

In [6]:
def ff_plot_paper_x_legend(state_list, goal_list, case_list, color_list, to_plot_list=[True,False]):
    fig, ax = plt.subplots(figsize=(8,5))
    box = ax.get_position()
    ax.set_position([box.x0, box.y0 + box.height * 0., box.width, box.height * 0.85])
    for state, goal, color, case in zip(state_list, goal_list, color_list, case_list):
        if to_plot_list[1] and ('MPC' not in case):
            ax.plot(goal['x'], goal['y'], color[1], linewidth=2, label=case, zorder=3)
        if to_plot_list[0]:
            label = (case+'+PID') if ('SCP' in case) else case
            ax.plot(state['x'], state['y'], color[0], linewidth=2,  label=label, zorder=3)
    ax.add_patch(Rectangle((0,0), ff.table['xy_up'][0], ff.table['xy_up'][1], fc=(0.5,0.5,0.5,0.2), ec='k', label=None, zorder=2.5))
    ax.add_patch(Rectangle(ff.start_region['xy_low'], ff.start_region['xy_up'][0] - ff.start_region['xy_low'][0], ff.start_region['xy_up'][1] - ff.start_region['xy_low'][1], fc ='none', ec ='black', alpha=1., lw = 2, zorder=1, label='start region'))
    ax.add_patch(Rectangle(ff.goal_region['xy_low'], ff.goal_region['xy_up'][0] - ff.goal_region['xy_low'][0], ff.goal_region['xy_up'][1] - ff.goal_region['xy_low'][1], fc ='none', ec ='#00CC66', alpha=1., lw = 2, zorder=1, label='goal_region'))
    ax.add_patch(Rectangle(ff.start_region['xy_low'], ff.start_region['xy_up'][0] - ff.start_region['xy_low'][0], ff.start_region['xy_up'][1] - ff.start_region['xy_low'][1], fc ='black', ec ='black', alpha=0.2, lw = 2, zorder=1, label='start region'))
    ax.add_patch(Rectangle(ff.goal_region['xy_low'], ff.goal_region['xy_up'][0] - ff.goal_region['xy_low'][0], ff.goal_region['xy_up'][1] - ff.goal_region['xy_low'][1], fc ='#00CC66', ec ='#00CC66', alpha=0.2, lw = 2, zorder=1, label='goal region')) 
    for n_obs in range(ff.obs_nominal['radius'].shape[0]):
        label_obs = None if n_obs == 0 else None
        label_robot = None if n_obs == 0 else None
        ax.add_patch(Circle(ff.obs_nominal['position'][n_obs,:], ff.obs_nominal['radius'][n_obs], fc='r', alpha=0.7, label=label_obs, zorder=2.5))
        ax.add_patch(Circle(ff.obs_nominal['position'][n_obs,:], ff.obs_nominal['radius'][n_obs]+ff.robot_radius, fc='r', alpha=0.2, label=label_robot, zorder=2.5))
    ax.set_xlabel('X [m]', fontsize=15)
    ax.set_ylabel('Y [m]', fontsize=15)
    '''ax.set_xticks([-1.8, -1.5, -1.2, -0.9, -0.6, -0.3, 0, 0.3])
    ax.set_yticks([-0.4, 0, 0.4])'''
    #ax.set_xlim([-0.5,4])
    ax.grid(True)
    ax.set_aspect('equal')
    ax.legend(loc='upper left', bbox_to_anchor=(-0.04, 1+2*0.13), fontsize=14)
    ax.set_facecolor(np.array([250,250,250])/255)
    fig.savefig('bags_import.svg')
    plt.show()

In [7]:
from scipy.interpolate import interp1d
def eval_RMSE(state, goal, onlypos=False):
    tt = np.arange(goal['time'][0], 51, 0.01)
    if onlypos:
        f_goal = interp1d(goal['time'], np.vstack((goal['x'], goal['y'])), bounds_error=False, fill_value=np.vstack((goal['x'], goal['y']))[:,-1])
        f_state = interp1d(state['time'], np.vstack((state['x'], state['y'])), fill_value="extrapolate")
    else:
        f_goal = interp1d(goal['time'], np.vstack((goal['x'], goal['y'], goal['psi'], goal['vx'], goal['vy'], goal['wz'])), bounds_error=False, fill_value=np.vstack((goal['x'], goal['y'], goal['psi'], goal['vx'], goal['vy'], goal['wz']))[:,-1])
        f_state = interp1d(state['time'], np.vstack((state['x'], state['y'], state['psi'], state['vx'], state['vy'], state['wz'])), fill_value="extrapolate")
    interp_state = f_state(tt)
    interp_goal = f_goal(tt)
    return np.linalg.norm(interp_goal - interp_state, axis=0).mean()

In [None]:
rmse_CVXSCP = eval_RMSE(state_CVXSCP, goal_CVXSCP, onlypos=True)
rmse_TTOSCP = eval_RMSE(state_TTOSCP, goal_TTOSCP, onlypos=True)
rmse_CVXSCP, rmse_TTOSCP

In [None]:
state_to_plot = 'vy'
i_to_plot = 4
plt.figure()
plt.plot(goal_CVXSCP['time'], goal_CVXSCP[state_to_plot],'-*')
#plt.plot(tt, interp_goal[i_to_plot,:], '-')

In [15]:
import copy
def compute_consumption(action):
    tt = action['time']
    switches = action['switches']

    T_ON = np.zeros((switches.shape[0],))
    for i in range(switches.shape[0]):
        mask = switches[i,:-1]
        dts = tt[1:] - tt[:-1]
        T_ON[i] = np.sum(dts[mask])
    
    return T_ON

def cut_times(list_msg, last_index):
    list_out = []
    for msg in list_msg:
        mask = msg['time'] <= last_index
        msg = copy.deepcopy(msg)
        for key in msg.keys():
            if key == 'switches':
                msg[key] = msg[key][:,mask]
            else:
                msg[key] = msg[key][mask]
        list_out.append(msg)

    return list_out


def performances_analysis(state_list, goal_list, action_list, case_list, final_times_list=None):
    n_case = len(case_list)
    mosaic = [['pos'+str(n) for n in range(n_case)],
              ['vel'+str(n) for n in range(n_case)],
              ['act'+str(n) for n in range(n_case)],
              ['act'+str(n) for n in range(n_case)],
              ['act'+str(n) for n in range(n_case)]]
    if final_times_list is None:
        final_times_list = 40.0*np.ones((n_case,))
    else:
        final_times_list = np.array(final_times_list)
    last_index_list = (1.25*final_times_list).astype(int)
    fig,ax = plt.subplot_mosaic(mosaic, figsize=(20,20))#subplots(4,n_case,figsize=(20,10))
    for n, (state, goal, action, case, last_index) in enumerate(zip(state_list, goal_list, action_list, case_list, last_index_list)):
        state, goal, action = cut_times((state, goal, action), last_index)
        for i in range(2):
            state2show = ['x','y'] if i==0 else ['vx','vy']
            for j in range(len(state2show)):
                ax[mosaic[i][n]].plot(state['time'], state[state2show[j]], linewidth=2, label=state2show[j]+'_state_'+case)
                ax[mosaic[i][n]].plot(goal['time'], goal[state2show[j]], linewidth=2, label=state2show[j]+'_goal_'+case)
                ax[mosaic[i][n]].grid(True)
                ax[mosaic[i][n]].set_ylabel('Position [m]' if i==0 else 'Velocity [m/s]')
                ax[mosaic[i][n]].legend(loc='best')
        
        T_ON = compute_consumption(action)
        for i in range(action['switches'].shape[0]):
            #ax['act'+str(n)].stem(action['time'], i+action['switches'][i]/2, linefmt='C'+str(i), markerfmt='C'+str(i)+'o', bottom = i)
            ax['act'+str(n)].stem(action['time'], i+action['switches'][i]/2, linefmt='C'+str(i), markerfmt='C'+str(i)+'o', bottom = i)
            ax['act'+str(n)].text(-1.5, i+0.15, str(np.round(T_ON[i],2)), rotation='vertical')
        ax['act'+str(n)].set_title('Total firing time: '+str(np.round(T_ON.sum(),2))+' s')
        ax['act'+str(n)].set_ylabel('Thrustes [ON/OFF]')
        ax['act'+str(n)].set_xlabel('Time [s]')
        ax['act'+str(n)].grid(True)
    plt.show()

In [None]:
'''performances_analysis([state_CVXSCP, state_TTOSCP], [goal_CVXSCP, goal_TTOSCP], [action_CVXSCP, action_TTOSCP], ['CVXSCP','TTOSCP'])'''
performances_analysis([state_ARTSCP40, state_ARTSCP80], [goal_ARTSCP40, goal_ARTSCP80], [action_ARTSCP40, action_ARTSCP80], ['$T_f=40s$', '$T_f=80s$'], [40,80])
performances_analysis([state_ARTSCP_OA], [goal_ARTSCP_OA], [action_ARTSCP_OA], ['$T_f=40s$'])

In [None]:
fig,ax = plt.subplots(figsize=(10,4))#subplots(4,n_case,figsize=(20,10))
for n, (state, goal, case) in enumerate(zip([state_ARTSCP40, state_ARTSCP80], [goal_ARTSCP40, goal_ARTSCP80],  ['$T_f=40s$','$T_f=80s$'])):
    state2show = ['vx','vy']
    statelabel = ['$V_x$', '$V_y$']
    colors = [[['C3', 'darkred'], ['C1', 'chocolate']],
              [['C0', 'darkblue'], ['C2', 'darkgreen']]]
    for j in range(len(state2show)):
        ax.plot(state['time'], 100*state[state2show[j]], colors[n][j][0], alpha=0.5, linewidth=1, label=statelabel[j]+'_real, '+case)
        ax.plot(goal['time'], 100*goal[state2show[j]], colors[n][j][1], linewidth=3, label=statelabel[j]+'_ref, '+case)
        ax.grid(True)
        ax.set_ylabel('Velocity [cm/s]', fontsize=15)
        ax.set_xlabel('Time [s]', fontsize=15)
        ax.set_ylim([-2.5, 17.5])
        ax.legend(loc='best', ncols=2, fontsize=13)
plt.show()
plt.savefig('FT_velocity_profile.svg')