In [2]:
%matplotlib tk

import numpy as np
from typing import List, Optional, Tuple

def plot_multiple_y_axes(
    x: np.ndarray,
    y_data: List[np.ndarray],
    labels: List[str],
    colors: Optional[List[str]] = None,
    x_label: str = "X-axis",
    y_labels: Optional[List[str]] = None,
    title: Optional[str] = None,
    add_zero_lines: bool = True,
    vertical_lines: Optional[List[Tuple[float, str]]] = None
) -> None:
    """
    Plots multiple y-series each with their own y-axis on a single plot, with optional zero lines
    and vertical lines at specified timestamps. Vertical line labels (state names) are drawn vertically
    (rotated 90Â°) and positioned above the plot.

    Parameters:
    - x (array-like): Data for the x-axis.
    - y_data (list of array-like): List containing y-axis data series.
    - labels (list of str): List of labels for each y-axis series.
    - colors (list of str, optional): List of colors for each series. Defaults to Matplotlib's default colors.
    - x_label (str, optional): Label for the x-axis. Default is "X-axis".
    - y_labels (list of str, optional): List of labels for each y-axis. If not provided, `labels` are used.
    - title (str, optional): Title of the plot.
    - add_zero_lines (bool, optional): Whether to add horizontal zero lines for each y-axis. Default is True.
    - vertical_lines (list of tuples, optional): List of tuples (timestamp, state_name) where vertical lines
      will be drawn.

    Returns:
    - None. Displays the plot.
    """
    import matplotlib.pyplot as plt
    from mpl_toolkits.axes_grid1 import host_subplot
    import mpl_toolkits.axisartist as AA
    from matplotlib.transforms import blended_transform_factory

    # Parameter Validation
    if not isinstance(y_data, list):
        raise TypeError("y_data should be a list of y-series.")
    if not isinstance(labels, list):
        raise TypeError("labels should be a list of strings.")
    if len(y_data) != len(labels):
        raise ValueError("y_data and labels must have the same length.")
    if colors and len(colors) != len(y_data):
        raise ValueError("Length of colors must match number of y-series.")
    if y_labels and len(y_labels) != len(y_data):
        raise ValueError("Length of y_labels must match number of y-series.")
    if vertical_lines is not None and not isinstance(vertical_lines, list):
        raise TypeError("vertical_lines must be a list of tuples (timestamp, state_name).")

    num_series = len(y_data)
    if num_series < 1:
        raise ValueError("At least one y-series must be provided.")

    # Define default colors if not provided
    if not colors:
        prop_cycle = plt.rcParams['axes.prop_cycle']
        default_colors = prop_cycle.by_key()['color']
        colors = default_colors * ((num_series // len(default_colors)) + 1)
        colors = colors[:num_series]

    # Define default y_labels if not provided
    if not y_labels:
        y_labels = labels

    # Create Host Subplot
    fig = plt.figure(figsize=(10, 6))
    host = host_subplot(111, axes_class=AA.Axes)
    plt.subplots_adjust(right=0.75)

    # Create Additional Axes
    par_axes = []
    for i in range(1, num_series):
        par = host.twinx()
        par_axes.append(par)

    # Offset the additional axes
    offset = 60  # pixels
    for i, par in enumerate(par_axes):
        new_fixed_axis = par.get_grid_helper().new_fixed_axis
        par.axis["right"] = new_fixed_axis(loc="right",
                                           axes=par,
                                           offset=(offset * i, 0))
        par.axis["right"].toggle(all=True)

    # Hide the right spine of the host plot
    host.axis["right"].toggle(all=True)

    # Set Labels
    host.set_xlabel(x_label)
    host.set_ylabel(y_labels[0])
    for i, par in enumerate(par_axes):
        par.set_ylabel(y_labels[i+1])

    # Plot the Data
    lines = []
    # Plot on host
    p, = host.plot(x, y_data[0], color=colors[0], label=labels[0])
    lines.append(p)
    # Plot on additional axes
    for i, par in enumerate(par_axes):
        p, = par.plot(x, y_data[i+1], color=colors[i+1], label=labels[i+1])
        lines.append(p)

    # Set Colors for Y-axis Labels to Match the Plot Lines
    host.axis["left"].label.set_color(colors[0])
    for i, par in enumerate(par_axes):
        par.axis["right"].label.set_color(colors[i+1])

    # Adjust the Position of the Additional Axes Labels
    for i, par in enumerate(par_axes):
        par.axis["right"].label.set_position(("axes", 1 + 0.1 * i))

    # Add Zero Lines
    if add_zero_lines:
        host.axhline(0, color=colors[0], linestyle='--', linewidth=1)
        for i, par in enumerate(par_axes):
            par.axhline(0, color=colors[i+1], linestyle='--', linewidth=1)

    # Add Vertical Lines with Vertical (rotated) Labels Above the Plot
    if vertical_lines:
        # Create a blended transform: x in data coords, y in axes fraction.
        transform = blended_transform_factory(host.transData, host.transAxes)
        for timestamp, state_name in vertical_lines:
            host.axvline(x=timestamp, color='gray', linestyle='--', linewidth=1)
            host.text(
                timestamp, 1.02, state_name,
                transform=transform,
                rotation=90,
                verticalalignment='bottom',
                horizontalalignment='center',
                fontsize=9,
                bbox=dict(facecolor='white', edgecolor='none', pad=1.0)
            )

    # Create a Combined Legend
    host.legend(lines, labels, loc='upper left')

    # Create a grid
    host.grid()

    # Set Title if provided
    if title:
        host.set_title(title)

    # Improve Layout to Prevent Overlapping Labels
    fig.canvas.draw()
    plt.show()


In [3]:
import subprocess
from typing import Optional

def execute_shell_command(command: str) -> Optional[str]:
    """
    Executes a shell command, prints the command and its output.

    Parameters:
    - command (str): The shell command to execute.

    Returns:
    - Optional[str]: The standard output from the command if successful; otherwise, None.
    """
    print(f"Executing Command: {command}\n{'-' * (len('Executing Command: ') + len(command))}")

    try:
        # Execute the command
        result = subprocess.run(
            command,
            shell=True,               # Executes through the shell
            check=True,               # Raises CalledProcessError for non-zero exit codes
            capture_output=True,      # Captures stdout and stderr
            text=True                 # Returns output as string instead of bytes
        )

        # Print standard output
        if result.stdout:
            print("Output:")
            print(result.stdout)

        # Print standard error (if any)
        if result.stderr:
            print("Error Output:")
            print(result.stderr)

        return result.stdout

    except subprocess.CalledProcessError as e:
        # Handle errors in execution
        print(f"Command failed with return code {e.returncode}")
        if e.stdout:
            print("Output:")
            print(e.stdout)
        if e.stderr:
            print("Error Output:")
            print(e.stderr)
        return None
    except FileNotFoundError:
        # Handle the case where the command is not found
        print("Error: Command not found.")
        return None
    except Exception as e:
        # Handle any other exceptions
        print(f"An unexpected error occurred: {e}")
        return None


In [4]:
file_suffix = '2025-02-19-6'

sensor_result = execute_shell_command(rf'py scripts/decode_protobuf_bin.py sensor D:\sensor\dat_{file_suffix}.pb3 sim_out/sensor.csv')
state_result = execute_shell_command(rf'py scripts/decode_protobuf_bin.py state D:\state\fsl_{file_suffix}.pb3 sim_out/state.csv')


Executing Command: py scripts/decode_protobuf_bin.py sensor D:\sensor\dat_2025-02-19-6.pb3 sim_out/sensor.csv
-------------------------------------------------------------------------------------------------------------
Command failed with return code 1
Output:
Copying input file to temporary

Error Output:
Traceback (most recent call last):
  File "c:\Users\imomg\Programming\PSP-HA-Firmware\scripts\decode_protobuf_bin.py", line 124, in <module>
    with open(input_bin_path, 'rb') as original_file:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^
FileNotFoundError: [Errno 2] No such file or directory: 'D:\\sensor\\dat_2025-02-19-6.pb3'

Executing Command: py scripts/decode_protobuf_bin.py state D:\state\fsl_2025-02-19-6.pb3 sim_out/state.csv
----------------------------------------------------------------------------------------------------------
Command failed with return code 1
Output:
Copying input file to temporary

Error Output:
Traceback (most recent call last):
  File "c:\Users\imomg\Program

In [8]:
import pandas as pd

sensor = pd.read_csv("sim_out/sensor.csv")
state = pd.read_csv("sim_out/state.csv")

combined = pd.merge_asof(
    sensor,
    state,
    on='timestamp',
    direction='nearest',  # Can be 'backward', 'forward', or 'nearest'
)

state_names = [
    "FP_INIT",
    "FP_WAIT",
    "FP_READY",
    "FP_BOOST",
    "FP_COAST",
    "FP_DROGUE",
    "FP_MAIN",
    "FP_LANDED",
    "FP_ERROR",
]

transitions = combined[combined["flight_phase"] != combined["flight_phase"].shift()]
transitions = [
    (transition["timestamp"]/1e6, state_names[int(transition["flight_phase"])])
    for idx, transition in transitions.iterrows()
]

def pressure_alt(p):
    return 44330 * (1 - (((p) / 1013.25)**( 1 / 5.255)))

presalt = pressure_alt(combined['pressure']) - pressure_alt(combined['pressure'].iloc[0])

times = combined['timestamp']/1e6
pressure = combined['pressure']
altitude = combined['pos_vert']
velocity = combined['vel_vert']
acceleration = combined['acc_vert']

plot_multiple_y_axes(
    times,
    [altitude, velocity, acceleration, pressure],
    ['altitude', 'velocity', 'acceleration', 'pressure'],
    vertical_lines=transitions
)
