# Bump test following errors vs time

In [None]:
# Times Square parameters

t_start = "2024-09-01T12:00:00"
t_end = "2024-09-15T06:00:00"
id = 409


In [None]:
import sys, time, os, asyncio, glob
from datetime import datetime
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.backends.backend_pdf import PdfPages
from matplotlib.ticker import FormatStrFormatter
from astropy.time import Time, TimeDelta
from lsst.ts.xml.tables.m1m3 import FATable, FAIndex, force_actuator_from_id, actuator_id_to_index
from lsst_efd_client import EfdClient

import lsst.summit.utils.butlerUtils as butlerUtils
from lsst.summit.utils.utils import dayObsIntToString
from lsst.summit.utils.efdUtils import calcNextDay

from lsst.summit.utils.efdUtils import getEfdData
from lsst.summit.utils.tmaUtils import TMAEvent
from lsst.ts.xml.enums.MTM1M3 import BumpTest


In [None]:
def rms_error(times, errors):
    error = 0.0
    num = 0
    for i, t in enumerate(times):
        if t > 3.0 and t < 4.0:
            num += 1
            error += errors[i]**2
    if num == 0:
        return np.nan
    else:
        return np.sqrt(error / num)

def plot_bumps_and_errors(axs, bump, bt_result, force, follow, applied, p_s):
    BUMP_TEST_DURATION = 5.0  # seconds
    measured_forces_times = []
    measured_forces_values = []
    following_error_values = []
    applied_forces_times = []
    applied_forces_values = []
    t_starts = []
    if p_s == "Primary":
        plot_index = 0
    else:
        plot_index = 1

    results = bt_result[bt_result[bump] == BumpTest.TESTINGPOSITIVE]
    for bt_index in range(len(results)):
        t_start = Time(
            bt_result[bt_result[bump] == BumpTest.TESTINGPOSITIVE][
                "timestamp"
            ].values[bt_index]
            - 1.0,
            format="unix_tai",
            scale="tai",
        )
        t_starts.append(t_start.isot.split('.')[0])
        t_end = Time(
            t_start + TimeDelta(BUMP_TEST_DURATION, format="sec"),
            format="unix_tai",
            scale="tai",
        )

    
        measured_forces = getEfdData(
            client,
            "lsst.sal.MTM1M3.forceActuatorData",
            columns=[force, follow, "timestamp"],
            begin=t_start,
            end=t_end,
        )
    
        applied_forces = getEfdData(
            client,
            "lsst.sal.MTM1M3.appliedForces",
            columns=[applied, "timestamp"],
            begin=t_start,
            end=t_end,
        )

        t0 = measured_forces["timestamp"].values[0]
        measured_forces["timestamp"] -= t0
        applied_forces["timestamp"] -= t0
    
        # It is easier/faster to work with arrays
        measured_forces_time = measured_forces["timestamp"].values
        measured_forces_times.append(measured_forces_time)
        measured_forces_value = measured_forces[force].values
        if p_s != "Primary":
            if 'MINUS' in p_s:
                measured_forces_value = np.array(measured_forces_value) / -np.sqrt(2.0)
            if 'PLUS' in p_s:
                measured_forces_value = np.array(measured_forces_value) / np.sqrt(2.0)
        measured_forces_values.append(measured_forces_value)
        following_error_value = measured_forces[follow].values
        following_error_values.append(following_error_value)
        applied_forces_time = applied_forces["timestamp"].values
        applied_forces_times.append(applied_forces_time)
        applied_forces_value = applied_forces[applied].values
        applied_forces_values.append(applied_forces_value)

    axs[0][plot_index].set_title(f"Actuator {id} {p_s} forces vs time")
    axs[0][plot_index].plot(applied_forces_times[0], applied_forces_values[0])
    for i in range(len(measured_forces_times)):
        axs[0][plot_index].plot(measured_forces_times[i], measured_forces_values[i], label=t_starts[i])
    axs[0][plot_index].set_xlim(0,4.0)
    axs[0][plot_index].set_xlabel("Time(sec.)")
    axs[0][plot_index].set_ylim(-100, 400)
    axs[0][plot_index].set_ylabel("Force(N)")
    #axs[0][plot_index].legend(loc='lower right')

    axs[1][plot_index].set_title(f"Actuator {id} {p_s} following error RMS")
    times = []
    errors = []
    for i in range(len(measured_forces_times)):
        times.append(t_starts[i])
        errors.append(rms_error(measured_forces_times[i], following_error_values[i]))
    
    axs[1][plot_index].plot(times, errors, marker='x')
    axs[1][plot_index].set_ylim(0,100)
    axs[1][plot_index].set_yscale('symlog', linthresh=10)
    axs[1][plot_index].set_yticks([0,2,4,6,8,10,50,100])
    axs[1][plot_index].yaxis.set_major_formatter(FormatStrFormatter('%.1f'))
    axs[1][plot_index].tick_params(axis='x', rotation=90)
    axs[1][plot_index].set_ylabel("RMS following error 3.0<T<4.0 seconds")
    return

In [None]:
def actuator_error(
    fig: plt.Figure,
    client: object,
    fa_id: int,
    bt_results: pd.DataFrame,
) -> (np.array, np.array, np.array, np.array, np.array):
    """
    Parameters
    ----------
    client : object
        The EFD client object to retrieve data from.
    bt_results : pandas.DataFrame
        The bump test results data. Used if input is a bump test.
        Default is None
    fa_id : int
        The ID of the force actuator.

    Returns
    -------
    fig:  a pyplot figure
    
    """
    axs = fig.subplots(2,2)
    plt.gcf().subplots_adjust(bottom=0.25, wspace=0.3, hspace=0.3)
    
    # Grab the Force Actuator Data from its ID
    fa_data = force_actuator_from_id(fa_id)
    bt_result = bt_results[bt_results["actuatorId"] == fa_id]
    
    # First the primary forces
    bump = f"primaryTest{fa_data.index}"
    force = f"primaryCylinderForce{fa_data.index}"
    applied = f"zForces{fa_data.z_index}"
    follow = f"primaryCylinderFollowingError{fa_data.index}"
    plot_bumps_and_errors(axs, bump, bt_result, force, follow, applied, "Primary")

    # Now the secondary  forces  
    if fa_data.actuator_type.name == "DAA":
        bump = f"secondaryTest{fa_data.s_index}"
        force = f"secondaryCylinderForce{fa_data.s_index}"
        follow = f"secondaryCylinderFollowingError{fa_data.s_index}"
        secondary_name = fa_data.orientation.name
        if secondary_name in ["X_PLUS", "X_MINUS"]:
            applied = f"xForces{fa_data.x_index}"
        elif secondary_name in ["Y_PLUS", "Y_MINUS"]:
            applied = f"yForces{fa_data.y_index}"
        else:
            raise ValueError(f"Unknown secondary name {secondary_name}")
            
        plot_bumps_and_errors(axs, bump, bt_result, force, follow, applied, secondary_name)

    return


In [None]:
%matplotlib inline
client = EfdClient('usdf_efd')
bumps = await client.select_time_series("lsst.sal.MTM1M3.logevent_forceActuatorBumpTestStatus", "*",\
                                        Time(t_start, scale='utc'), Time(t_end, scale='utc'))
fig = plt.figure(figsize=(10,12))
actuator_error(fig, client, id, bumps)