# NN Categorical Signals

Notebook used to evaluate categorical signal data.

## Imports

In [1]:
from pathlib import Path
import os
import subprocess
from datetime import datetime
from typing import List
import pandas as pd
from copy import deepcopy
from revcan.reverse_engineering.models.experiment import Extern_Signal, Value, Experiment, Signal
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from colorama import Fore, Style

import revcan.reverse_engineering.models.NNs.SignalMatchingNN_CategoricalSignals as smnn

## Set experiment

Set the experiment name and folder of the combined experiment file (containing all measurements for an experiment including ground truth values).

In [None]:
# TODO: Set name and folder of experiment
experiment_name = "experiment_non_constant_signals"

experiment_folder = "../../data/experiments/car/ride_height/2025-06-02_17_25_17"


Path(experiment_folder).mkdir(parents=True, exist_ok=True)

experiment_file = os.path.join(experiment_folder, f"{experiment_name}.json")
experiment = Experiment.load(experiment_file)

!python ../scripts_for_doip_new/display_experiment_metadata.py --experiment_file_path "{experiment_file}"

## Preprocessing & Training

### Data Loading

In [None]:
# Load data
full_data = smnn.load_data(experiment)
print("full_data Shape:", full_data.shape)
print("Remaining NaN values:", full_data.isna().sum().sum())

### Split data into Train and Test set

In [None]:
train_df, test_df = smnn.custom_train_test_split(full_data)
print("Train Shape:", train_df.shape)
print("Test Shape:", test_df.shape)

### Preprocessing

Includes one-hot encoding and splitting the data into one data frame per signal.
This is done for the train and the test set

In [None]:
# Expand Values of Train set
signal_data_train = smnn.split_df_by_signal(train_df)
skipped_signals_train = 0

for signal_key, df in signal_data_train.items():
    try:
        df = smnn.expand_signal_df(df)
        df = smnn.one_hot_encode_ground_truth(df) # One-Hot Encoding
        signal_data_train[signal_key] = df
        print(f"Signal {signal_key} → Shape: {df.shape}")
    except ValueError as e:
        skipped_signals_train += 1
        print(f"\033[91mSkipped Signal {signal_key}: {e}\033[0m")

print(f"Expanding of values of train set complete - Signals skipped: {skipped_signals_train}")

In [None]:
# Expand Values of Test Set
signal_data_test = smnn.split_df_by_signal(test_df)
skipped_signals_test = 0

for signal_key, df in signal_data_test.items():
    try:
        df = smnn.expand_signal_df(df)
        df = smnn.one_hot_encode_ground_truth(df) # One-Hot Encoding
        signal_data_test[signal_key] = df
        print(f"Signal {signal_key} → Shape: {df.shape}")
    except ValueError as e:
        skipped_signals_test += 1
        print(f"\033[91mSkipped Signal {signal_key}: {e}\033[0m")

print(f"Expanding of values of test set complete - Signals skipped: {skipped_signals_test}")

## Preprocess Data & Train NN per Signal

The epochs, batch_size and hidden layers config can be set and modified individually.
Activating the check_for_ambigous_signals flag, excludes signals that have a feature overlap (same value for different ground truth categories).

In [None]:
## NN hyperparameters
epochs = 100
batch_size = 64
hidden_layers_config=[16]
check_for_ambiguous_signals = True

results = {}
models = {}
for signal_key in signal_data_train:
    # Check if found test data for signal
    if signal_key not in signal_data_test:
        print(f"\033[93mSkipping {signal_key} — not in test set\033[0m")
        continue
    
    if check_for_ambiguous_signals:
        # Check whether signal has the same byte value for more then one ground_truth class
        if smnn.is_ambiguous_signal(signal_data_train[signal_key]):
            print(f"\033[93mSkipping {signal_key} — feature overlap between classes\033[0m")
            continue


    try:
        # Preprocess data
        X_train, y_train = smnn.preprocess_signal_df(signal_data_train[signal_key])
        X_test, y_test = smnn.preprocess_signal_df(signal_data_test[signal_key])

        # Check for signals with no variance
        if smnn.is_useless_signal(train_df) or smnn.is_useless_signal(test_df):
            print(f"\033[93mSkipping {signal_key} — input features have no variation\033[0m")
            continue

        # Train model
        model, metrics = smnn.train_signal_model(X_train, y_train, X_test, y_test, hidden_layers_config=hidden_layers_config, epochs=epochs,batch_size=batch_size)
        results[signal_key] = metrics
        models[signal_key] = model

        accuracy = metrics['accuracy']
        precision = metrics['precision']

        print(f"Trained {signal_key} → Accuracy: {accuracy:.3f}, Precision: {precision:.3f}")
    except Exception as e:
        print(f"\033[91mFailed {signal_key}: {e}\033[0m")


## Evaluation of Models

Displays the top performing DIDs.

In [None]:
top_n = 50
sorted_results = sorted(results.items(), key=lambda x: smnn.score(x[1]), reverse=True)

# Header
print(f"{'Rank':<6} {'Signal':<20} {'Length':<8}  {'Score':<10} {'Accuracy':<10} {'Precision':<10} {'Recall':<10} {'F1-Score':<10}")
print("-" * 90)

for i, (signal_key, metrics) in enumerate(sorted_results[:top_n]):
    # Signal length in bytes
    signal_df = signal_data_train[signal_key]
    byte_columns = [col for col in signal_df.columns if col.startswith("Byte_")]
    signal_length = len(byte_columns)
    
    # Get and calculate metrics
    score = smnn.score(metrics)
    accuracy = metrics['accuracy']
    precision = metrics['precision']
    recall = metrics['recall']
    f1 = metrics['f1']

    signal_str = f"{signal_key}"

    row = f"{i+1:<6} {signal_str:<20} {signal_length:<8} {score:<10.3f} {accuracy:<10.3f} {precision:<10.3f} {recall:<10.3f} {f1:<10.3f}"

    if accuracy == 1.0:
        print(Fore.GREEN + row + Style.RESET_ALL)
    else:
        print(row)

Ignore below line. Used to enable Run all.

In [None]:
raise SystemExit("STOP")

## Validation

Display raw train and test data for a specific signal.

In [None]:
signal_key = (16512, 17738)
#signal_key = (16512,11156) # Chassis Level Paper
#signal_key = (16398, 11527) # Chassis Level - Top candidate
#signal_key = (16512, 801) # Chassis Level Time
#signal_key = (16443, 646)  # Brake Pedal Activation 2
#signal_key = (16403, 11066) # Brake Pedal Activation
#signal_key = (16502, 2180) # Gear: No distinction between D1 and D2
#signal_key = (16400, 4104) # Gear: Distinction between D1 and D2

print(f"Server: {signal_key[0]}, DID: {signal_key[1]}")
print("Training Set Data:")
signal_data_train[signal_key]

In [None]:
print(f"Server: {signal_key[0]}, DID: {signal_key[1]}")
print("Test Set Data:")
signal_data_test[signal_key]

## Export Signal to CSV

In [None]:
import pandas as pd

def export_signal_from_experiment(experiment, server_id: int, did: int, output_path: str):
    for signal in experiment.measurements:
        # Find selected Signal
        if signal.serverid == server_id and signal.did.did == did:
            records = []
            # Append rows of data frame
            for i in range(len(signal.values)):
                ground_truth_gear = experiment.external_alphanumeric_measurements[0].values[i]
                ground_truth_speed = experiment.external_measurements[0].values[i].value[0]
                byte_values = signal.values[i].value
                records.append([ground_truth_gear, ground_truth_speed] + byte_values)

            # Build DataFrame
            max_len = max(len(row) - 2 for row in records)
            columns = ["Gear", "Speed"] + [f"Byte_{i}" for i in range(max_len)]
            df = pd.DataFrame(records, columns=columns)

            # Export
            df.to_csv(output_path, index=False)
            print(f"Exported signal (Server: {server_id}, DID: {did}) to '{output_path}'")
            return

    print(f"ERROR: Signal with Server: {server_id}, DID: {did} not found in the experiment.")

ServerID, DID = (16502, 2204)
csv_file = os.path.join(experiment_folder, f"gear_signal_{ServerID}_{DID}.csv")
export_signal_from_experiment(experiment, server_id=ServerID, did=DID, output_path=csv_file)


## Filter signals to keep top candidates

Can be used for additional training on top candidates

In [None]:
score_threshold = 0.7 # TODO: Define treshold

high_score_keys = []
for signal_key, metrics in results.items():
    if smnn.score(metrics) >= score_threshold:
        high_score_keys.append(signal_key)

signals_to_keep = []
for signal in experiment.measurements:
    if (signal.serverid, signal.did.did) in high_score_keys:
        signals_to_keep.append(signal)

experiment.measurements = signals_to_keep

print(f"{len(signals_to_keep)} - signals with score ≥ {score_threshold}")

### Save top candidates in new experiment file

In [None]:
experiment_file = os.path.join(experiment_folder, f"experiment_top_candidates_nn.json")
experiment.save(f"{experiment_file}")
!python ../scripts_for_doip_new/display_experiment_metadata.py --experiment_file_path "{experiment_file}"