# Fitting a HMM to the AdVitam Dataset


## Notebook Overview:


Fitting a [Hidden Markov Model (HMM)](https://en.wikipedia.org/wiki/Hidden_Markov_model) to the [AdVitam](https://doi.org/10.1016/j.dib.2023.109027) dataset.

- HMM was implemented using the [hmmlearn](https://hmmlearn.readthedocs.io/en/latest/index.html) python library.

<br></br>

**Defining Takeover Quality Quantitatively**

This is typically done using one or more of these 3 metrics:

- Takeover Time (TOT)
- Sudden Vehicle Deviation
- Response Budget

<br></br>

**References:**

- Lawrence R. Rabiner “A tutorial on hidden Markov models and selected applications in speech recognition”, Proceedings of the IEEE 77.2, pp. 257-286, 1989.
- Jeff A. Bilmes, “A gentle tutorial of the EM algorithm and its application to parameter estimation for Gaussian mixture and hidden Markov models.”, 1998.
- Mark Stamp. “[A revealing introduction to hidden Markov models](<(http://www.cs.sjsu.edu/~stamp/RUA/HMM.pdf)>)”. Tech. rep. Department of Computer Science, San Jose State University, 2018.


<br></br>


<hr style="border-top: 1px solid white;">


## Preamble


Python Libraries


In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import preprocessing
import hmmlearn.hmm as hmm
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.model_selection import (
    cross_validate,
    RandomizedSearchCV,
    train_test_split,
    GridSearchCV,
)

<br></br>


Custom Functions


In [2]:
from useful_functions.driving_data.dd_dictionary import create_dd_dictionary
from useful_functions.driving_data.process_driving_data import processing_driving_data

from useful_functions.physio_data.pd_dictionary import create_pd_dictionary
from useful_functions.physio_data.process_physio_timestamps import process_physio_timestamps
from useful_functions.physio_data.process_physio_data import process_physio_data

from useful_functions.demographic_data.process_driver_demographic_data import (
    process_driver_demographic_data,
)

from useful_functions.construct_observations import construct_observations

from useful_functions.takeover_dataframe import create_takeover_timestamps
from useful_functions.check_for_missing_data import check_for_missing_data

<br></br>
<br></br>


# Importing Data + Preprocessing


---


Storing the folder paths to raw data


In [3]:
driving_data_folder = "../AdVitam/Exp2/Raw/Driving"
physio_data_folder = "../AdVitam/Exp2/Raw/Physio/Txt"

<br></br>


Storing a list of participants to exclude


In [4]:
participants_to_exclude = check_for_missing_data(driving_data_folder, physio_data_folder)
participants_to_exclude.append("NST77")

<br></br>


**Participants Excluded:**


| Participant       | Reason                                                                                          |
| ----------------- | ----------------------------------------------------------------------------------------------- |
| NST77             | Driving file contains obstacles = "TriggeredObs2TriggeredObs3" and "TriggeredObs3TriggeredObs4" |
| NST91, ST84, ST60 | Does not contain a physio file                                                                  |


<br></br>
<br></br>


## Driving Data


<hr style="border-top: 1px dashed white; border-bottom: 0px">


### Data Description


**Driver Data:**
| Feature | Description | Notes |
| --- | --- | --- |
| Time | Time elapsed since the software was launched (in seconds) | |
| EngineSpeed | Engine speed (in rpm) | Removed |
| GearPosActual | Current gear | Removed |
| GearPosTarget | Next planned gear | Removed |
| AcceleratorPedalPos | Position of gas pedal. | Recording problem, Removed |
| DeceleratorPedalPos | Position of brake pedal. | Recording problem, Removed |
| SteeringWheelAngle | Steering wheel angle (in degrees) | |
| VehicleSpeed | Vehicle speed (in mph) | |
| Position X | Vehicle position along the x-axis in the simulated driving environment | |
| Position Y | Vehicle position along the y-axis in the simulated driving environment | |
| Position Z | Vehicle position along the z-axis in the simulated driving environment | |
| Autonomous Mode (T/F) | Autonomous pilot status. | True = Activated, False = Deactivated (driver in control) |
| Obstacles | Events that occurred during the driving simulation. | See Below |


<br></br>


**Obstacles:**

| Event         | Description                                                                                                                                                                                             |
| ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TriggeredObsX | Time at which each takeover request was triggered by the experimenter                                                                                                                                   |
| Obs1          | Deer                                                                                                                                                                                                    |
| Obs2          | Traffic cone                                                                                                                                                                                            |
| Obs3          | Frog                                                                                                                                                                                                    |
| Obs4          | Traffic cone                                                                                                                                                                                            |
| Obs5          | False alarm (x2)                                                                                                                                                                                        |
| Detected      | Time at which the driver pressed the steering wheel button to notify he/she understood the situation. The driver is in control of the car when the value of the column "Autonomous Mode (T/F)" is False |


<br></br>


### Driving Data Dictionary


Creates a dictionary of the raw driving data files.


In [5]:
driving_data_dictionary = create_dd_dictionary(driving_data_folder, participants_to_exclude)

<br></br>


### Processing driving data


Steps Taken

1. Label encode the `Obstacles` column
2. Fill `Obstacles` column with `Nothing`
3. Convert `Time` column to pandas timedelta
4. Resample driver data to 10ms (100Hz)


In [6]:
# Fitting a Label Encoder to the Obstacles
driver_data = driving_data_dictionary["NST01"]
driver_data = driver_data.fillna("Nothing")
enc = preprocessing.LabelEncoder()
enc.fit(driver_data["Obstacles"])

# Processing the Driving Data
driving_data_dictionary = processing_driving_data(driving_data_dictionary, enc)

<br></br>


### Creating driving data takeover timestamps


Created a dataframe to store the trigger time, takeover time, release time, and TOT for each obstacle, for every participant.


In [7]:
driving_timestamps = create_takeover_timestamps(driving_data_dictionary, enc)

<br></br>
<br></br>


## Physiological Signals & Markers


<hr style="border-top: 1px dashed white; border-bottom: 0px">


### Data Description


**Signals:**

| Feature | Description            | Notes  |
| ------- | ---------------------- | ------ |
| min     | Time Elapsed           |        |
| ECG     | Electrocardiogram      | 1000Hz |
| EDA     | Electrodermal Activity | 1000Hz |
| RESP    | Resperatory            | 1000Hz |


<br></br>


**Markers:**

Contains the timestamps for each period of the experiment.

- Training1 = Baseline phase
- Training2 = Practice phase in the driving simulator
- Driving = Main driving session in conditionally automated driving.

Be careful, the timestamps are here in seconds while they are in minutes in the raw data.


<br></br>


**Timestamps:**

Time elapsed (in seconds) between the start of the main driving session and the appearance of the obstacles.

- TrigObsX: the time when the driver pressed the button to report having understood the situation
- DetObsX: and the time when the driver actually took over control
- RepObsX: X corresponds to one of obstacle or the false alarm.


<br></br>


### Physio data dictionary


Creates a dictionary of the raw physiological data and their markers


In [8]:
phsyiological_data_dictionary = create_pd_dictionary(physio_data_folder, participants_to_exclude)

<br></br>


### Physio timestamps


A dataframe to store the trigger time, takeover time, release time, and TOT for each obstacle, for every participant. Similar to `driver_timestamps`.


In [9]:
physio_timestamps = pd.read_csv(
    "../AdVitam/Exp2/Preprocessed/Physio and Driving/timestamps_obstacles.csv"
)

<br></br>


### Processing the Physio Timestamps

Steps:

1. Change column names to match driving timestamps
1. Remove preselected participants
1. Reformat subject id to match
1. Transfrom timestamps into timedelta objects


In [10]:
physio_timestamps = process_physio_timestamps(physio_timestamps, participants_to_exclude)

<br></br>


### Processing the Physiological data


Steps:

1. Resampled data to 10ms (100Hz)
2. Trimmed the data down to each experimental phase (Baseline, Training, Driving)


In [11]:
phsyiological_data_dictionary = process_physio_data(phsyiological_data_dictionary)

<br></br>


## Driver Demographic Data


<hr style="border-top: 1px dashed white; border-bottom: 0px">


### Data Description


**Driver Demographic Data Description:**
| Feature | Description | Note |
| --- | --- | --- |
| code | Code of participant Secondary Task (ST) vs No ST (NST) + unique id (1,2,...) | In the form (ST/NST)# |
| date | Day of data collection | Removed |
| time | Hour of data collection | Removed |
| condition | Experimental condition for mental workload | Removed (contained in participant code |
| sex | Participant sex | |
| age | Age of participants in years | |
| mothertongue | Participants first language | |
| education | Highest education degree | |
| driving_license | Year of obtenstion of driving license | |
| km_year | Number of kilometers covered per year in average | |
| accidents | Number of accidents during the last 3 years | |
| nasa_tlx_N | Answer to the NASA TLX for question N | Removed |
| danger_O | Subjective ranking of the danger of obstacle O | Removed |
| realism_O | Subjective ranking of the realism of obstacle O | Removed |
| sart_N_O | Subjective answer to the sart for question N related to obstacle O | Removed |
| demand_O | Demands on attentional resources (complexity, variability, and instability of the situation) | Removed |
| supply_O | Supply of attentional resources (division of attention, arousal, concentration, and spare mental capacity) | Removed |
| understanding_O | Understanding of the situation (information quantity, information quality and familiarity). | Removed |


<br></br>


### Driver Demographic Data


Grabbing the driver demographic data


In [12]:
driver_demographic_data = pd.read_csv(
    "../AdVitam/Exp2/Preprocessed/Questionnaires/Exp2_Database.csv",
    usecols=[
        "code",
        "sex",
        "age",
        "mothertongue",
        "education",
        "driving_license",
        "km_year",
        "accidents",
    ],
)

<br></br>


### Processing driver demographic data


Steps:

1. Remove preselected participants
2. Reformat code to match data
3. Coverting driving licence from year obtained to of years obtained
4. Normalize km/y


In [13]:
driver_demographic_data = process_driver_demographic_data(
    driver_demographic_data, participants_to_exclude
)

<br></br>


# Constructing Sequence of Observations


---


The idea is to train 2 HMM trained to observations assosiated with a 'slow' takeover, and a 'fast' takeover.


In [14]:
slow_observations, fast_observations = construct_observations(
    driving_data_dictionary,
    phsyiological_data_dictionary,
    driving_timestamps,
    physio_timestamps,
    driver_demographic_data,
)

<br>
<br>


# Training the HMMs


---


**Steps:**

1. Split the data into training, testing, and validation
2. Format the data so it can be read by HMM


In [15]:
# train test split
slow_observations_train, slow_observations_test = train_test_split(
    slow_observations, test_size=0.1, random_state=42
)

fast_observations_train, fast_observations_test = train_test_split(
    fast_observations, test_size=0.1, random_state=42
)

# create a length array
slow_lengths_train = np.array([len(obs) for obs in slow_observations_train])
slow_lengths_test = np.array([len(obs) for obs in slow_observations_test])

fast_lengths_train = np.array([len(obs) for obs in fast_observations_train])
fast_lengths_test = np.array([len(obs) for obs in fast_observations_test])

# create a single array for training
s = np.vstack(slow_observations_train)
f = np.vstack(fast_observations_train)

# create a single array for testing
s_test = np.vstack(slow_observations_test)
f_test = np.vstack(fast_observations_test)

<br>
<br>


# Hyperparameters


---


In [16]:
# initializing the hyperparameters
n_components = np.arange(1, 11)
covariance_type = ["spherical", "diag", "full"]
min_covar = np.arange(0.1, 1.1, 0.1)
algorithm = ["viterbi", "map"]
random_state = np.arange(1, 11)
n_iter = np.arange(1, 11)
tol = np.arange(0.001, 0.011, 0.001)
verbose = [True, False]
hyperparametes = {
    "n_components": n_components,
    "covariance_type": covariance_type,
    "min_covar": min_covar,
    "algorithm": algorithm,
    "random_state": random_state,
    "n_iter": n_iter,
    "tol": tol,
    "verbose": verbose,
}

In [17]:
# HMM Classifier
class HMMClassifier(BaseEstimator, ClassifierMixin):
    # Initializing the model
    def __init__(self, n_components=1, covariance_type="diag", min_covar=0.1, algorithm="viterbi", random_state=1, n_iter=1, tol=0.001, verbose=False):
        self.n_components = n_components
        self.covariance_type = covariance_type
        self.min_covar = min_covar
        self.algorithm = algorithm
        self.random_state = random_state
        self.n_iter = n_iter
        self.tol = tol
        self.verbose = verbose
        self.model = None

    def fit(self, X):
        # Assuming X is a list where each element is a sequence of observations
        lengths = np.array([len(x) for x in X])
        X = np.vstack(X)
        self.model = hmm.GaussianHMM(
            n_components=self.n_components,
            covariance_type=self.covariance_type,
            min_covar=self.min_covar,
            algorithm=self.algorithm,
            random_state=self.random_state,
            n_iter=self.n_iter,
            tol = self.tol,
            verbose=self.verbose,
        )
        self.model.fit(X, lengths)
        return self

    def predict(self, X):
        # Assuming X is a list where each element is a sequence of observations
        lengths = np.array([len(x) for x in X])
        X = np.vstack(X)
        return self.model.predict(X, lengths)

    def score(self, X):
        # Implement a scoring function, e.g., log likelihood of the data
        scores = [self.model.score(x.reshape(-1, 1)) for x in X]
        return np.mean(scores)
    
    def get_params(self, deep=True):
        # Return parameters
        return {
            "n_components": self.n_components,
            "covariance_type": self.covariance_type,
            "min_covar": self.min_covar,
            "algorithm": self.algorithm,
            "random_state": self.random_state,
            "n_iter": self.n_iter,
            "tol": self.tol,
            "verbose": self.verbose,
        }

In [18]:
# initialize the model
slow_model = HMMClassifier()
fast_model = HMMClassifier()

In [19]:
# initialize the grid search cv
slow_grid = GridSearchCV(slow_model, hyperparametes, cv=5, n_jobs=-1, verbose=1)
fast_grid = GridSearchCV(fast_model, hyperparametes, cv=5, n_jobs=-1, verbose=1)

# fit the model
slow_grid.fit(s)
fast_grid.fit(f)

Fitting 5 folds for each of 1200000 candidates, totalling 6000000 fits




<br>
<br>


In [None]:
# # train the HMMs
# slow_hmm = hmm.GaussianHMM(
#     n_components=1,
#     random_state=3,
# )
# slow_hmm.fit(s, slow_lengths_train)

# fast_hmm = hmm.GaussianHMM(
#     n_components=1,
#     random_state=3,
# )
# fast_hmm.fit(f, fast_lengths_train)

# # test the HMM
# accuracy = 0

# for obs in slow_observations_test:
#     if slow_hmm.score(obs) > fast_hmm.score(obs):
#         accuracy += 1

# for obs in fast_observations_test:
#     if fast_hmm.score(obs) > slow_hmm.score(obs):
#         accuracy += 1

# accuracy = accuracy / (len(slow_observations_test) + len(fast_observations_test))
# print("Accuracy: ", accuracy)