# System identification on joints

## Introduction

This notebook contains calculations to do what is called "system identification" on joints. It means finding transfer functions which describe the responses of these systems.

Input data is made with another notebook named "Motor control characterization". To know more about the origin of this data, please read the other notebook.

## Import of Python packages

### Built-in packages

In [None]:
import os
import glob

### Installed packages

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

import seaborn as sns
from scipy import signal as scipysig
import numpy as np

from tqdm import tqdm_notebook

### Local packages

In [None]:
import utils.data as datautils
import utils.signal as sigutils

## Import of input data

All data of all the joints cannot be loaded at once as it would require too much memory. Therefore, below is declared a function to quickly load data from a specific joint. Data must be located in the following file architecture:

```
.
├── build
   ├── <robot_name>
      ├── <joint_name>
      │  ├── <filename>.pickle
      └── <joint_name>
      │  ├── <filename>.pickle
      ⁝
```

Where `<robot_name>` and `<joint_name>` are arbitrary directory names.

And where `filename` matches the following format: `synthesis_<move_type>_<joint_name>_<current_min>-<current_max>_<temp_min>-<temp_max>_<load_dir>`, where:
* `move_type` is whether "step" or "sine",
* `joint_name` is a joint name ("HeadYaw", "RElbowRoll", …),
* `current_min` is the minimum current of the concerned operating point,
* `current_max` is the maximum current of the concerned operating point,
* `temp_min` is the minimum temperature of the concerned operating point,
* `temp_max` is the maximum temperature of the concerned operating point,
* `load_dir` is the load direction of the concerned operating point (whether 1, -1, or 0).


In [1]:
def get_joint_step_data(joint_name, base_dir='./build/myRobot'):
    """Return the synthesis object for every operating group of the joint."""
    joint_dir = os.path.join(base_dir, joint_name)
    all_pickle_files = glob.glob('{}/*step*.pickle'.format(joint_dir))
    data = [datautils.load_op_data(f) for f in all_pickle_files]
    return data

## System identification by performance parameters identification

### Theory

Let’s approximate our system as a second order linear system around an operating point. Its transfer function can be written as:

\begin{equation}
G(s) = \frac{K\omega_n^2}{s^2 + 2\zeta\omega_ns + \omega_n^2}
\end{equation}

Where:
* $K$ is the gain
* $\omega_n$ is the undamped natural angular frequency
* $\zeta$ is the damping ratio

For an overdamped system ($\zeta > 1$), **TODO**

For an underdamped system ($\zeta < 1$), the **step response** can be characterized by the following parameters:

<table width='75%'>
<col width="20%">
<thead>
<tr>
<th>Parameter</th>
<th>Definition</th>
<th>Link with transfer function</th>
</tr>
</thead>
    
<tbody>
<tr>
<td>Damped natural frequency</td>
<td>Frequency of oscillations in the step response.</td>
<td>\begin{equation}
    \omega_d = \omega_n \sqrt{1 - \zeta^2}
    \end{equation}
    </td>
</tr>
    
<tr>
<td>10-90% rising time</td>
    <td>In a step response, it is the required time to go from 10% to 90% of the final value $y(\infty)$.</td>
<td>\begin{equation}
    t_r \simeq \frac{2.3 \zeta^2 - 0.078 \zeta + 1.12}{\omega_n}
    \end{equation}
    </td>
</tr>

<tr>
<td>Peak time</td>
<td>Time corresponding to the first maximum value of the step response.</td>
<td>\begin{equation}
    t_p = \frac{\pi}{\omega_n \sqrt{1 - \zeta^2}}
    \end{equation}
    </td>
</tr>
    
<tr>
<td>2% settling time</td>
<td>The time for the step response to reach and stay within 2% of the steady-state value $y(\infty)$,
    it is therefore the lowest $t_s$ so that: 
    \begin{equation}
    |y(t) - y(\infty)| \leq 0.02 y(\infty), \enspace \forall t \geq t_s
    \end{equation}
    </td>
<td>A conservative estimate can be found from the decay envelope, that is by finding the time 
    for the envelop to decay to less than 2% of its initial value: 
    \begin{equation}
    \frac{\exp(-\zeta \omega_n t)}{\sqrt{1 - \zeta^2}} \leq 0.02
    \end{equation}
    Giving
    \begin{equation}
    t_s = - \frac{\ln(0.02\sqrt{1-\zeta^2})}{\zeta \omega_n}
    \end{equation}
    </td>
</tr>
    
<tr>
<td>Overshoot</td>
<td>If $y$ is the step response signal, it is defined by $O = \frac{y(t_p) - y(\infty)}{y(\infty) - y(0)}$.</td>
<td>\begin{equation}
    O = K.\exp(-\frac{\zeta\pi}{\sqrt{1 - \zeta^2}})
    \end{equation}
    </td>
</tr>
    
</tbody>
</table>

And the response of the system to a unit step can be expressed in time-domain by:

\begin{equation}
y(t) = K \bigg(1 - \exp(-\zeta \omega_n t)\Big(\cos(\omega_d t) + \frac{\zeta}{\sqrt{1 - \zeta^2}}\sin(\omega_d t)\Big)\bigg)
\end{equation}

Using the equation of the overshoot, we can find the expression of $\zeta$:

\begin{equation}
\zeta = \sqrt{\frac{\log\big(\frac{O}{K}\big)^2}{\pi^2 + \log\big(\frac{O}{K}\big)^2}}
\end{equation}

$\zeta$ can then be used in the equation of the peak time to find the natural frequency:

\begin{equation}
\omega_n = \frac{\pi}{t_p \sqrt{1 - \zeta^2}}
\end{equation}

A possible improvement of the model is the inclusion of a *dead time* $DT$.
Such a thing can be provoked by mechanical friction in the system for instance.

\begin{equation}
G(s) = \exp(-DT.s)\frac{K\omega_n^2}{s^2 + 2\zeta\omega_ns + \omega_n^2}
\end{equation}

---

**Some demonstrations…**

*Expression of the overshoot:*

\begin{equation}
y(t) = K \bigg(1 - \exp(-\zeta \omega_n t_p)\Big(\cos(\omega_d t_p) + \frac{\zeta}{\sqrt{1 - \zeta^2}}\sin(\omega_d t_p)\Big)\bigg) \\
y(t) = K \bigg(1 - \exp(-\zeta \omega_n t_p)\Big(\cos(\pi) + \frac{\zeta}{\sqrt{1 - \zeta^2}}\sin(\pi)\Big)\bigg) \\
y(t) = K \bigg(1 + \exp(-\zeta \omega_n t_p)\bigg) \\
y(t) = K \bigg(1 + \exp(-\zeta \omega_n \frac{\pi}{\omega_n \sqrt{1 - \zeta^2}})\bigg) \\
y(t) = K \bigg(1 + \exp(-\frac{\pi \zeta}{\sqrt{1 - \zeta^2}})\bigg) \\
y(t) = K (1 + O)
\end{equation}

Hence $O = K.\exp(-\frac{\zeta\pi}{\sqrt{1 - \zeta^2}})$

*Expression of the $\zeta$:*

\begin{equation}
O = K.\exp(-\frac{\zeta\pi}{\sqrt{1 - \zeta^2}}) \\
\implies \frac{\zeta \pi}{\sqrt{1 - \zeta^2}} = \log(\frac{O}{K}) \\
\implies (\zeta \pi)^2 = \log\big(\frac{O}{K}\big)^2 (1 - \zeta^2) \\
\implies \zeta^2 \Big(\pi^2 + \log\big(\frac{O}{K}\big)^2\Big) = \log\big(\frac{O}{K}\big)^2 \\
\implies \zeta = \sqrt{\frac{\log\big(\frac{O}{K}\big)^2}{\pi^2 + \log\big(\frac{O}{K}\big)^2}}
\end{equation}

### Processing

#### Functions

In [None]:
def get_zeta(overshoot, gain):
    return np.sqrt(
                (np.power(np.log(overshoot/gain), 2)) \
                /(np.power(np.pi, 2) + np.power(np.log(overshoot/gain), 2)))

In [None]:
def get_omegan(overshoot_time=None, zeta=None, omegad=None):
    if zeta and omegad:
        res = omegad / np.sqrt(1 - np.power(zeta, 2))
    elif zeta and overshoot_time:
        res = np.pi / (overshoot_time * np.sqrt(1 - np.power(zeta, 2)))
    else:
        raise ValueError('Cannot compute omega_n with given arguments.')
    return res

In [None]:
class NoPeakFound(RuntimeError):
    pass

def get_omegad_possibilities(signal, sample_rate):
    """Get possible values of omega_d using a FFT on the signal.
    
    It is not possible to get the real value of omegad directly, because it
    is hard to find the right peak among others in the FFT.
    """
    spectrum = np.fft.fft(signal)
    frequencies = np.fft.fftfreq(len(spectrum), sample_rate)
    positive_spectrum = spectrum[np.where(frequencies > 0)]
    peaks, peak_prop = scipysig.find_peaks(positive_spectrum, prominence=0.01)
    if len(peaks) == 0:
        raise NoPeakFound
    # Keep only the first 10 peaks as it **very unlikely** that the damped
    # frequency is after the 10th peak in the FFT
    if len(peaks) > 10:
        peaks = peaks[:10]
    main_freqs = frequencies[peaks]
    omegad_possibilities = main_freqs * 2.0 * np.pi
    return omegad_possibilities

In [None]:
def get_dead_time(signal, time):
    """Find the initial dead time in a step signal.
    
    A signal is considered as out of the dead time when it moves more than 0.001 rad
    (the noise limit).
    """
    signal = np.array(signal)
    time = np.array(time)
    indice_of_first_move = np.where((abs(signal - signal[0]) > 0.001))[0][0]
    start_time = time[indice_of_first_move - 1]
    dead_time = start_time - time[0]
    return dead_time

In [None]:
def compute_model_error(spr, tf, dead_time):
    joint = spr.metadata['joint_name']
    norm_df = sigutils.control.get_normalized_position_timeseries(spr.dataframe, spr.kpis['InitialCommandValue'], spr.kpis['CommandStepAmplitude'])
    tsim, yout, xout = scipysig.lsim(system=tf, 
                                      U=norm_df[joint + 'PositionActuatorValue'],
                                      T=norm_df['Time'],
                                      interp=False)
    dead_time_in_samples = int(dead_time // spr.dataframe['TimeDispersion'].mean())
    # Compute absolute error signal
    if dead_time_in_samples > 0:
        error_sig = abs(norm_df[joint + 'PositionSensorValue'].iloc[dead_time_in_samples:] - yout[:-dead_time_in_samples])
    else:
        error_sig = abs(norm_df[joint + 'PositionSensorValue'] - yout)
    error_sum = error_sig.sum()

    return error_sum
    

In [None]:
def get_2nd_order_tf(gain, omegan, zeta):
    num = [gain * np.power(omegan, 2)]
    den = [1, 
           2 * zeta * omegan, 
           np.power(omegan, 2)]

    tf = scipysig.TransferFunction(num, den)
    return tf

In [None]:
class CannotFindTF(RuntimeError):
    pass

def get_2nd_order_tf_for_spr(spr):
    """Return a tf and a delay for a step
    
    Args:
        spr: (ProcessingResult) a ProcessingResult object of a step.        
    Return:
        tf: scipy.signal.TransferFunction object.
        dead_time: the dead time delay to add to the response
    """    
    joint_name = spr.metadata['joint_name']
    
    # Compute parameters ---------------------------------------------------------------
    dead_time = get_dead_time(spr.dataframe[joint_name + 'PositionSensorValue'], spr.dataframe['Time'])
    
    gain = ((spr.kpis['FinalSensorValue'] - spr.kpis['InitialCommandValue']) 
            / spr.kpis['CommandStepAmplitude'])
    
    zeta = get_zeta(spr.kpis['OvershootPercentage']/100.0, gain)
    
    try:
        omegad_possibilities = get_omegad_possibilities(
                                    spr.dataframe[joint_name + 'PositionSensorValue'], 
                                    spr.dataframe['TimeDispersion'].mean())
    except NoPeakFound:
        raise CannotFindTF
    
    # Find the best estimation of omegad -----------------------------------------------
    
    # Compute the error for every possible omegad
    error_list = []
    for omegad in omegad_possibilities:
        omegan = get_omegan(zeta=zeta, omegad=omegad)
        tf = get_2nd_order_tf(gain, omegan, zeta)
        # Compute error between real curve and modeled one
        error_list.append(compute_model_error(spr, tf, dead_time))
    
    # Get the omegad implying the minimum error
    min_error_index = error_list.index(min(error_list))
    best_omegad = omegad_possibilities[min_error_index]

    # Compute the most suitable transfer function --------------------------------------
    omegan = get_omegan(zeta=zeta, omegad=best_omegad)
    tf = get_2nd_order_tf(gain, omegan, zeta)

    return tf, dead_time

In [None]:
def add_tf_to_every_step_data(op_group_step_data):
    """Add transfer functions to all steps of an operating point.
    
    Nothing is returned because transfer functions are added as a new entry
    in the 'kpis' dictionary of ProcessingResult objects.
    
    Args:
        op_group_step_data: (List[dict]) synthesis of steps for a single operating group
    """
    for step_processing_result in tqdm_notebook(op_group_step_data['processing_results'], leave=False):
        try:
            tf, dead_time = get_2nd_order_tf_for_spr(step_processing_result)
        except CannotFindTF:
            step_processing_result.kpis['transfer_function'] = None
            step_processing_result.kpis['dead_time'] = None
        else:
            step_processing_result.kpis['transfer_function'] = tf
            step_processing_result.kpis['dead_time'] = dead_time

#### Processing on joint data

In [None]:
# Get data
joint_name = 'Joint1'
all_operating_groups = get_joint_step_data(joint_name)

In [None]:
# Compute and save transfer function for every single step
for op_group_step_data in tqdm_notebook(all_operating_groups, leave=False):
    add_tf_to_every_step_data(op_group_step_data)

In [None]:
def select_random_steps(all_operating_groups, quantity):
    """Select random steps with an identified transfer function"""
    step_list = []
    while len(step_list) < quantity:
        # select random group
        group_index = np.random.randint(0, len(all_operating_groups) - 1)
        step_index = np.random.randint(0, len(all_operating_groups[group_index]['processing_results']) - 1)
        new_step = all_operating_groups[group_index]['processing_results'][step_index]
        if new_step.kpis['transfer_function'] is None:
            # This step does not have an identification… try again
            continue
        step_list.append(new_step)
    return step_list

In [None]:
# Select some random steps
random_steps = select_random_steps(all_operating_groups, 5)

In [None]:
# Display the identification result of these random steps
fig, axes = plt.subplots(5,1, figsize=(15,10))
for index, step in enumerate(random_steps):
    joint = step.metadata['joint_name']
    ax = axes[index]
    norm_df = sigutils.control.get_normalized_position_timeseries(step.dataframe, step.kpis['InitialCommandValue'], step.kpis['CommandStepAmplitude'])
    half_data_size = step.dataframe['Time'].size // 8
    simulation_result = scipysig.lsim(system=step.kpis['transfer_function'], 
                                      U=norm_df[joint + 'PositionActuatorValue'],
                                      T=norm_df['Time'],
                                      interp=False)
    
    ax.plot(norm_df['Time'].iloc[:half_data_size], 
            norm_df[joint + 'PositionActuatorValue'].iloc[:half_data_size],
            c='r')
    ax.plot(norm_df['Time'].iloc[:half_data_size], 
            norm_df[joint + 'PositionSensorValue'].iloc[:half_data_size],
            c='b')
    ax.plot(simulation_result[0][:half_data_size] + step.kpis['dead_time'], 
            simulation_result[1][:half_data_size],
            c='orange')

fig.tight_layout()

## Identification by non-linear regression

In [None]:
# Todo!