In [1]:
import pandas as pd
import plotly.express as px
from compile import compile_csv
from decode import decode_frame
from main import read_frames_from_file

DBC_PATH = "testDBC.csv"
NFR_PATH = "testData/4_MSG.NFR"

In [2]:
# Compile DBC and decode all frames into a flat list of signal readings
decode_table = compile_csv(DBC_PATH)

signal_units = {}
for msg in decode_table.values():
    for sig in msg.signals:
        signal_units[(msg.frame_id, sig.name)] = sig.unit or ""

rows = []
for timestamp, frame_id, data in read_frames_from_file(NFR_PATH):
    decoded = decode_frame(frame_id, data, decode_table)
    if not decoded:
        continue
    msg = decode_table[frame_id]
    for signal_name, value in decoded.items():
        unit = signal_units.get((frame_id, signal_name), "")
        rows.append({
            "signal_name": signal_name,
            "timestamp_ms": timestamp,
            "value": value,
            "message_id": f"0x{frame_id:X}",
            "message_name": msg.name,
            "unit": unit,
        })

df = pd.DataFrame(rows)
print(f"{len(df)} signal readings from {df['signal_name'].nunique()} unique signals")
df.head(10)

6826 signal readings from 11 unique signals


Unnamed: 0,signal_name,timestamp_ms,value,message_id,message_name,unit
0,APPS1_Throttle,132,-21589.0,0x202,ECU_Throttle,
1,APPS2_Throttle,132,9490.0,0x202,ECU_Throttle,
2,APPS1_Throttle,132,-21589.0,0x202,ECU_Throttle,
3,APPS2_Throttle,132,9490.0,0x202,ECU_Throttle,
4,APPS1_Throttle,132,-21589.0,0x202,ECU_Throttle,
5,APPS2_Throttle,132,9490.0,0x202,ECU_Throttle,
6,APPS1_Throttle,19746,-21589.0,0x202,ECU_Throttle,
7,APPS2_Throttle,19746,9490.0,0x202,ECU_Throttle,
8,APPS1_Throttle,19746,-21589.0,0x202,ECU_Throttle,
9,APPS2_Throttle,19746,9490.0,0x202,ECU_Throttle,


In [3]:
# Plot all signals from a single message â€” one subplot per signal, shared x-axis
def plot_message(df, message_name):
    msg_df = df[df["message_name"] == message_name]
    signals = msg_df["signal_name"].unique()

    fig = px.line(
        msg_df,
        x="timestamp_ms",
        y="value",
        facet_row="signal_name",
        title=f"{message_name} signals",
        labels={"timestamp_ms": "Time (ms)", "value": "Value"},
    )
    fig.update_yaxes(matches=None)
    fig.update_layout(height=250 * len(signals))
    fig.show()

# Plot a single signal across time
def plot_signal(df, signal_name, message_name=None):
    sig_df = df[df["signal_name"] == signal_name]
    if message_name:
        sig_df = sig_df[sig_df["message_name"] == message_name]

    unit = sig_df["unit"].iloc[0] if len(sig_df) > 0 else ""
    y_label = f"{signal_name} ({unit})" if unit else signal_name

    fig = px.line(
        sig_df,
        x="timestamp_ms",
        y="value",
        title=signal_name,
        labels={"timestamp_ms": "Time (ms)", "value": y_label},
    )
    fig.show()

In [4]:
# Graph all signals from ECU_Throttle
plot_message(df, "ECU_Throttle")

In [5]:
# Graph all signals from Rear_Inverter_Motor_Status
plot_message(df, "Rear_Inverter_Motor_Status")

In [6]:
# Graph a single signal
plot_signal(df, "RPM", "Rear_Inverter_Motor_Status")