<a href="https://colab.research.google.com/github/alecuc/POHMM_RHU_keystroke/blob/main/POHMM_RHU_keystroke.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os # Path related
import numpy as np # Mathematics
from sklearn.metrics import accuracy_score # ACC calc
from sklearn.metrics import roc_curve as _roc_curve # EER calc
from sklearn.metrics import auc # AUC calc
import pandas as pd # Dataset processing
import itertools
from typing import Optional, Union
try:
  import pohmm
except:
  !pip install pohmm
import warnings

Path related variables

In [None]:
ROOT_DIR = os.getcwd()
DATA_DIR = os.path.join(ROOT_DIR, "data")
HMM_DATA_DIR = os.path.join(DATA_DIR, "hmm")
POHMM_DATA_DIR = os.path.join(DATA_DIR, "pohmm")
RAW_DATA_DIR = os.path.join(DATA_DIR, "raw")
RAW_DATASET_FILE = os.path.join(RAW_DATA_DIR, "keystroke_51.xls")

# POHMM
POHMM_ADAPTED_DATASET_FILE = os.path.join(POHMM_DATA_DIR, "pohmm_keystroke_51_adapted.csv")
POHMM_EVENT_SCORES_DATASET_FILE = os.path.join(POHMM_DATA_DIR, "pohmm_keystroke_51_event_scores.csv")
POHMM_RAW_SESSION_SCORES_DATASET_FILE = os.path.join(POHMM_DATA_DIR, "pohmm_keystroke_51_raw_session_scores.csv")
POHMM_NORMALIZED_SESSION_SCORES_DATASET_FILE = os.path.join(POHMM_DATA_DIR,
                                                            "pohmm_keystroke_51_normalized_session_scores.csv")
POHMM_SUMMARY_DATASET_FILE = os.path.join(POHMM_DATA_DIR, "pohmm_keystroke_51_summary.csv")

# HMM
HMM_ADAPTED_DATASET_FILE = os.path.join(HMM_DATA_DIR, "hmm_keystroke_51_adapted.csv")
HMM_EVENT_SCORES_DATASET_FILE = os.path.join(HMM_DATA_DIR, "hmm_keystroke_51_event_scores.csv")
HMM_RAW_SESSION_SCORES_DATASET_FILE = os.path.join(HMM_DATA_DIR, "hmm_keystroke_51_raw_session_scores.csv")
HMM_NORMALIZED_SESSION_SCORES_DATASET_FILE = os.path.join(HMM_DATA_DIR, "hmm_keystroke_51_normalized_session_scores.csv")
HMM_SUMMARY_DATASET_FILE = os.path.join(HMM_DATA_DIR, "hmm_keystroke_51_summary.csv")

Dataset related fields

In [None]:
ID_FIELD = "id"
TRIALS_FIELD = "trials"
USERNAME_FIELD = "UserName"
PP_FIELD = "PP"
PR_FIELD = "PR"
RR_FIELD = "RR"
RP_FIELD = "RP"
DATA_FIELD = "date"

Processed dataset related fields

In [None]:
FOLD_FIELD = "fold"
QUERY_USER_FIELD = "query_user"
QUERY_SESSION_FIELD = "query_session"
REFERENCE_USER_FIELD = "reference_user"
SCORE_FIELD = "score"
NORMALIZED_SCORE_FIELD = "nscore"
RESULT_FIELD = "result"
EVENT_INDEX_FIELD = "event_idx"
EVENT_FIELD = "event"
STATE_FIELD = "state"
RANK_FIELD = "rank"

ROC fields

In [None]:
ROC_THRESHOLD = "threshold"
FALSE_ACCEPTANCE_RATE = "far"
FALSE_REJECTION_RATE = "frr"

Summary fields

In [None]:
SESSION_ACCURACY = "S-ACC"
SESSION_EQUAL_ERROR_RATE = "S-EER"
USER_ACCURACY = "U-ACC"
USER_EQUAL_ERROR_RATE = "U-EER"
AREA_UNDER_ROC_CURVE = "AUC"
CONTINUOUS_IDENTIFICATION_ACCURACY = "CIA"
AVERAGE_MAXIMUM_REJECTION_TIME = "AMRT"

Continuous verification fields

In [None]:
PENALTY_FIELD = "penalty"
CV_THRESHOLD_FIELD = "threshold"
MAXIMUM_REJECTION_TIME_FIELD = "mrt"
AVERAGE_MAXIMUM_REJECTION_TIME_FIELD = "amrt"

Miscellaneous

In [None]:
HARDCODED_PASSWORD = "rhu.university"
MAX_ROWS_PER_USER = 15
REFERENCE_FOLD_THRESHOLD = 13

Setup `numpy` seed to predefined value in order to be able to replicate executions. <br>
Set pandas options for the sake of printing a complete `pd.DataFrame` (debug purposes).
Get rid of "invalid value encountered in log" `RuntimeWarning`(s)

In [None]:
def setup(seed=1234,
          precision=8,
          max_columns=None,
          max_rows=None,
          split_df_print=False) -> None:
    """
    :param seed: numpy seed
    :param precision: numpy print precision
    :param max_columns: max columns to display in a pd.DataFrame print
    :param max_rows: max rows to display in a pd.DataFrame print
    :param split_df_print: Split the pd.DataFrame on a new line when printing if the pd.DataFrame is huge (horizontally)?
    :return: None
    """
    np.random.seed(seed)
    np.set_printoptions(precision=precision)
    pd.set_option("max_columns", max_columns)
    pd.set_option("max_rows", max_rows)
    pd.set_option("display.expand_frame_repr", split_df_print)
    warnings.filterwarnings("ignore", message="invalid value encountered in log")

Creates a file specified by the `file_and_ext` param and saves `df` content in it

In [None]:
def save_data(df: pd.DataFrame,
              file_and_ext: str) -> None:
    """
    :param df: pandas.DataFrame to be saved
    :param file_and_ext: Full path and extension of the .csv file which will store the df current content
    :return: None
    """
    df.to_csv(file_and_ext)

Split a char-separated string into a list containing floating point values contained in the string

In [None]:
def str_to_float_list(s: str,
                      c=";") -> list:
    """
    :param s: the string to be split
    :param c: the character which separates the different values in the string
    :return: the list
    """
    return list(map(float, s.split(c)[:-1]))

Process rhu university dataset in order to pair every character from the password to its corresponding `PP` - `PR` - `RP` - `RR` values. If `is_pohmm` is `True`, process the dataset for `POHMM` training. Process it for `HMM` otherwise. 

<br>

`PP`, `RP` and `RR` number of values are one less than the total amount of characters in the password: `"rhu.university"`.
<br>
With this in mind, the standard deviation of the said values has been added to the corresponding lists.

<br>

Globals: <br>
`user_map`: map number to users<br>
`counter`: count the number of users<br>
`trials`: count inputs from a single user<br>

<br>

`process_row` processes a single row from the input dataset.
`preprocess_rhu_university` wraps `process_row` in order to process the whole raw input dataset. Results are saved in `ADAPTED_DATASET_FILE`


In [None]:
user_map = {}
counter = 0
trials = {}


def preprocess_rhu_university(raw_dataset_input_file: str,
                              is_pohmm: bool) -> None:
    """
    :param raw_dataset_input_file: Full path and extension of the .xls raw dataset file
    :param is_pohmm: True -> craft POHMM df or False -> craft HMM df 
    :return: None
    """

    def process_row(idx_row: (int, pd.Series)) -> Optional[pd.DataFrame]:
        """
        :param idx_row: tuple containing row number and row data
        :return: none if the inputs from a single user reached the threshold or
                      if one of the fields is invalid
                pd.DataFrame containing the processed row if the input is valid
        """
        global counter
        _, row = idx_row
        user = row[USERNAME_FIELD].lower()

        # If the user isn't mapped yet, map it and increment the counter
        if user not in user_map:
            user_map[user] = counter
            trials[user] = 0
            counter += 1

        # If the number of inputs from a single user reached the threshold,
        # abort further processings
        if trials[user] == MAX_ROWS_PER_USER:
            return None

        # Try to process PP, PR, RP, RR fields.
        pp_field = str_to_float_list(row[PP_FIELD])
        pp_field.insert(0, np.std(pp_field))
        rp_field = str_to_float_list(row[RP_FIELD])
        rp_field.append(np.std(rp_field))
        rr_field = str_to_float_list(row[RR_FIELD])
        rr_field.append(np.std(rr_field))
        pr_field = str_to_float_list(row[PR_FIELD])

        # In case of errors, abort further processings
        # if None in pp_field or None in rp_field or None in rr_field:
        #    return None

        # If the user didn't input all the needed characters
        # abort further processings
        if len(pp_field) == len(pr_field) == len(HARDCODED_PASSWORD):
            trials[user] += 1
        else:
            return None

        # If we reach this, everything went as planned.
        # Create and return the processed row as a pd.DataFrame.
        return pd.DataFrame.from_dict({
            USERNAME_FIELD: user_map[user],
            TRIALS_FIELD: row[TRIALS_FIELD],
            EVENT_FIELD: list(HARDCODED_PASSWORD) if is_pohmm else np.ones(len(HARDCODED_PASSWORD)),
            PP_FIELD: pp_field,
            PR_FIELD: pr_field,
            RP_FIELD: rp_field,
            RR_FIELD: rr_field
        })

    user_map.clear()
    counter = 0
    trials.clear()

    # Concat all processed rows obtained from process_row
    df = pd.concat(map(process_row, pd.read_excel(raw_dataset_input_file).iterrows())).set_index(
        [USERNAME_FIELD, TRIALS_FIELD])

    # And save the so crafted pd.DataFrame.
    save_data(df, POHMM_ADAPTED_DATASET_FILE if is_pohmm else HMM_ADAPTED_DATASET_FILE)

Split the input dataset based on the numpy arrays in the arguments. Further processes it in order to apply a stratified k-fold cross validation.

<br>

The template is simply filtered. Genuine and impostor are instead processed in order to assign every other user to the current processed user.

In [None]:
def split_dataset(df: pd.DataFrame,
                  template_reps: np.ndarray,
                  genuine_reps: np.ndarray,
                  impostor_reps: np.ndarray) -> (pd.DataFrame, pd.DataFrame, pd.DataFrame):
    """
    :param df: pd.DataFrame to be processed
    :param template_reps: the numpy array containing which values of a user's single input are taken as a template
    :param genuine_reps: the numpy array containing which values of a user's single input are taken as genuine inputs
    :param impostor_reps: the numpy array containing which values of a user's single input are taken as impostor inputs
    :return: the split dataset in the three parts.
    """

    # Filter based on the input numpy arrays
    df_template = df[df.index.get_level_values(1).isin(template_reps)]
    df_genuine = df[df.index.get_level_values(1).isin(genuine_reps)]
    df_impostor = df[df.index.get_level_values(1).isin(impostor_reps)]

    # Setup genuine pd.DataFrame
    df_genuine.index.names = [USERNAME_FIELD, TRIALS_FIELD]
    df_genuine = df_genuine.reset_index()
    df_genuine[QUERY_USER_FIELD] = df_genuine[USERNAME_FIELD]
    df_genuine = df_genuine.set_index([USERNAME_FIELD, QUERY_USER_FIELD, TRIALS_FIELD])

    # Setup impostor pd.DataFrame
    df_impostor.index.names = [USERNAME_FIELD, TRIALS_FIELD]
    df_impostor = df_impostor.reset_index()
    df_impostor[QUERY_USER_FIELD] = df_impostor[USERNAME_FIELD]
    df_impostor = df_impostor.set_index([USERNAME_FIELD, QUERY_USER_FIELD, TRIALS_FIELD])

    # Create a comparison row with all other users, for each user
    dfs_impostor = []

    for user in df.index.get_level_values(0).unique():
        df_tmp = df_impostor.drop(user, level=0).reset_index().copy()
        df_tmp[USERNAME_FIELD] = user
        dfs_impostor.append(df_tmp)

    # Add the comparison rows to the impostor pd.DataFrame
    df_impostor = pd.concat(dfs_impostor).set_index(
        [USERNAME_FIELD, QUERY_USER_FIELD, TRIALS_FIELD])

    return df_template, df_genuine, df_impostor

Function that takes all the samples from a single user and returns a fitted model

In [None]:
def train_model(df: pd.DataFrame) -> pohmm.Pohmm:
    """
    :param df: pandas.DataFrame containing samples from a single user
    :return: the fitted model
    """
    emissions = []
    for col in df.columns.difference([EVENT_FIELD]):
        if col in [PR_FIELD, PP_FIELD, RP_FIELD, RR_FIELD]:
            emissions.append((col, 'normal'))
        else:
            emissions.append((col, 'normal'))

    hmm = pohmm.Pohmm(n_hidden_states=2, init_spread=2, thresh=1e-6, max_iter=1000,
                      emissions=emissions, smoothing='freq')

    hmm.fit_df(list(zip(*df.groupby(level=[0, 1])))[1])
    return hmm

Obtain identification and verification results using stratified k-fold cross validation and a model that scores a sample.

<br>

Creates a scores dataframe with cols: `FOLD_FIELD`, `REFERENCE_USER_FIELD`, `QUERY_USER_FIELD`, `QUERY_SESSION_FIELD`, `EVENT_INDEX_FIELD` 

In [None]:
def cv_event_scores(fold: (pd.DataFrame, pd.DataFrame, pd.DataFrame)) -> pd.DataFrame:
    """
    :param fold: the dataset split in reference for the model, genuine users and impostors.
    :return scores: the pd.DataFrame containing processed input fold, and a score for each one of the users' session.
    """
    scores = []
    reference, genuine, impostor = fold
    models = {}

    # Train a model for each user
    for reference_user, reference_data in reference.groupby(level=[0]):
        models[reference_user] = train_model(reference_data)

    for (reference_user, query_user, query_session), query_data in itertools.chain(genuine.groupby(level=[0, 1, 2]),
                                                                                   impostor.groupby(
                                                                                       level=[0, 1, 2])):
        # Evaluate scores and states for each users (see pohmm source code)
        score = models[reference_user].score_events_df(query_data.reset_index(drop=True))
        state = models[reference_user].predict_states_df(query_data.reset_index(drop=True))

        # Create a pd.DataFrame which contains the processed data
        df = pd.DataFrame({FOLD_FIELD: 0,
                           REFERENCE_USER_FIELD: reference_user,
                           QUERY_USER_FIELD: query_user,
                           QUERY_SESSION_FIELD: query_session,
                           EVENT_INDEX_FIELD: np.arange(len(query_data)),
                           EVENT_FIELD: query_data[EVENT_FIELD].values,
                           SCORE_FIELD: score[SCORE_FIELD],
                           STATE_FIELD: state[STATE_FIELD],
                           },
                          columns=[FOLD_FIELD, REFERENCE_USER_FIELD, QUERY_USER_FIELD,
                                   QUERY_SESSION_FIELD, EVENT_INDEX_FIELD,
                                   EVENT_FIELD, SCORE_FIELD, STATE_FIELD])
        scores.append(df)

    # Obtain a pd.DataFrame from the scores list and reset its index
    scores = pd.concat(scores).reset_index(drop=True)

    # Compute the rank field and add it as a column in the scores pd.DataFrame
    scores[RANK_FIELD] = scores.groupby([FOLD_FIELD, QUERY_USER_FIELD,
                                                QUERY_SESSION_FIELD, EVENT_INDEX_FIELD])[
                                    SCORE_FIELD].rank(ascending=False) - 1
    return scores

Normalize session scores

In [None]:
def normalize_session_scores(session_scores: pd.DataFrame,
                             pivot=[FOLD_FIELD, QUERY_USER_FIELD, QUERY_SESSION_FIELD],
                             h=2) -> pd.DataFrame:
    """
    :param session_scores: pd.DataFrame containing session scores
    :param pivot: Values which the function will group the dataset by 
    :param h: stddev stuff
    :return: normalized pd.DataFrame
    """
    def _norm(df: pd.DataFrame) -> pd.DataFrame:
        """
        :param df: pd.DataFrame which will be normalized
        :return: normalized pd.DataFrame
        """
        lower = df[SCORE_FIELD].min()
        upper = df[SCORE_FIELD].max()

        df[NORMALIZED_SCORE_FIELD] = np.minimum(
            np.maximum((df[SCORE_FIELD] - lower) / (upper - lower), 0), 1)
        return df

    session_scores = session_scores.groupby(pivot).apply(_norm)
    return session_scores

Prepare session scores in order to calculate its accuracy in the ACC function

In [None]:
def session_identification(session_scores: pd.DataFrame) -> pd.DataFrame:
    """
    :param session_scores: pd.DataFrame
    :return: the ACC-processable pd.DataFrame
    """
    ide = session_scores.groupby([FOLD_FIELD, QUERY_USER_FIELD, QUERY_SESSION_FIELD]).apply(
        lambda x: x.iloc[np.argmax(x[SCORE_FIELD].values)][[REFERENCE_USER_FIELD]])
    ide.columns = [RESULT_FIELD]
    ide = ide.reset_index()
    return ide

See `sklearn.metrics.roc_curve`


In [None]:
def roc_curve(y_true: np.ndarray,
              y_score: np.ndarray) -> (np.ndarray, np.ndarray, np.ndarray):
    """
    :params and return types: np.ndarray
    :return: roc values
    """
    fpr, tpr, thresholds = _roc_curve(y_true, y_score, drop_intermediate=True)
    return fpr, 1 - tpr, thresholds

Generate Receiver Operating Characteristic (ROC) curve for each fold and interpolate it to get the same threshold values in each fold.

In [None]:
def session_roc(session_scores: pd.DataFrame,
                pivot=FOLD_FIELD) -> pd.DataFrame:
    """
    :param session_scores: pd.DataFrame containing session scores
    :param pivot: Values which the function will group the dataset by
    :return: Receiver Operating Characteristic (ROC) pd.DataFrame
    """
    # Generate an ROC curve for each fold, ordered by increasing threshold
    roc = session_scores.groupby(pivot).apply(
        lambda x: pd.DataFrame(np.c_[roc_curve((x[QUERY_USER_FIELD] == x[REFERENCE_USER_FIELD]).values.astype(np.int32),
                                               x[NORMALIZED_SCORE_FIELD].values.astype(np.float32))][::-1],
                               columns=[FALSE_ACCEPTANCE_RATE, FALSE_REJECTION_RATE, ROC_THRESHOLD]))

    # interpolate to get the same threshold values in each fold
    thresholds = np.sort(roc[ROC_THRESHOLD].unique())
    roc = roc.groupby(level=pivot).apply(lambda x: pd.DataFrame(np.c_[thresholds,
                                                                      np.interp(thresholds, x[ROC_THRESHOLD],
                                                                            x[FALSE_ACCEPTANCE_RATE]),
                                                                      np.interp(thresholds, x[ROC_THRESHOLD],
                                                                            x[FALSE_REJECTION_RATE])],
                                                                columns=[ROC_THRESHOLD, FALSE_ACCEPTANCE_RATE,
                                                                         FALSE_REJECTION_RATE]))
    roc = roc.reset_index(level=1, drop=True).reset_index()
    return roc

Obtain rank-n classification accuracy for each fold

In [None]:
def ACC(ide: pd.DataFrame) -> float:
    """
    :param ide: the identification pd.DataFrame on which classification accuracy needs to be obtained
    :return: accuracy score calculated by scikit learn module.
    """
    return accuracy_score(ide[QUERY_USER_FIELD].values, ide[RESULT_FIELD].values)

Obtain the Equal Error Rate for one fold

In [None]:
def EER(roc: pd.DataFrame) -> float:
    """
    :param roc: the Receiver Operating Characteristic pd.DataFrame
    :return: Equal Error Rate float
    """
    far, frr = roc[FALSE_ACCEPTANCE_RATE].values, roc[FALSE_REJECTION_RATE].values

    def perp(a: np.ndarray) -> np.ndarray:
        b = np.empty_like(a)
        b[0] = -a[1]
        b[1] = a[0]
        return b

    def seg_intersect(a1: np.array,
                      a2: np.array,
                      b1: np.array,
                      b2: np.array) -> np.ndarray:
        """
        :param a1: line segment endpoint for a segment
        :param a2: line segment endpoint for a segment
        :param b1: line segment endpoint for b segment
        :param b2: line segment endpoint for b segment
        :return: segment intersection between a and b
        """
        da = a2 - a1
        db = b2 - b1
        dp = a1 - b1
        dap = perp(da)
        denom = np.dot(dap, db)
        num = np.dot(dap, dp)
        return (num / denom) * db + b1

    d = far <= frr
    idx = np.diff(d).nonzero()[0][0]
    return seg_intersect(np.array([idx, far[idx]]),
                         np.array([idx + 1, far[idx + 1]]),
                         np.array([idx, frr[idx]]),
                         np.array([idx + 1, frr[idx + 1]]))[1]

Prepare session scores in order to identificate each event

In [None]:
def continuous_identification(scores: pd.DataFrame) -> pd.DataFrame:
    """
    :param scores: pd.DataFrame containing scores
    :return: the ACC-processable pd.DataFrame 
    """
    ide = scores.groupby([FOLD_FIELD, QUERY_USER_FIELD, QUERY_SESSION_FIELD, EVENT_INDEX_FIELD]).apply(
        lambda x: x.iloc[np.argmax(x[SCORE_FIELD].values)][[REFERENCE_USER_FIELD]])
    ide.columns = [RESULT_FIELD]
    ide = ide.reset_index()
    return ide

Continuous verification is enforced through a penalty function in which each new keystroke incurs a non-negative penalty within a sliding window. The penalty at any given time can be thought of as the inverse of trust. As behavior becomes more consistent with the model, the cumulative penalty within the window can decrease, and as it becomes more dissimilar, the penalty increases. The user is rejected if the cumulative penalty within the sliding window exceeds a threshold. The threshold is chosen for each sample such that the genuine user is never rejected, analogous to a 0% FRR in static verification.

In [None]:
def scores_penalty(scores: pd.DataFrame,
                   penalty_fun='sum',
                   window=25) -> pd.DataFrame:
    """
    :param scores: pd.DataFrame containing scores
    :param penalty_fun: the function which will be used to calculate penalty
    :param window: sliding window threshold 
    :return: pd.DataFrame containing penalties
    """

    def _penalty(df: pd.DataFrame) -> pd.DataFrame:
        """
        :param df: pd.DataFrame containing a row with a score
        :return: pd.DataFrame containing penalty (for one score)
        """
        if penalty_fun == 'sum':
            p = df[RANK_FIELD].rolling(window=window, center=False).sum()
            p[:window] = df[RANK_FIELD].values[:window].cumsum()
        elif penalty_fun == 'sumexp':
            p = (np.exp(df[RANK_FIELD]) - 1).rolling(window=window, center=False).sum()
            p[:window] = (np.exp(df[RANK_FIELD]) - 1)[:window].cumsum()

        df[PENALTY_FIELD] = p
        return df

    penalty = scores.copy().groupby([FOLD_FIELD, REFERENCE_USER_FIELD, QUERY_USER_FIELD, QUERY_SESSION_FIELD]).apply(_penalty)
    return penalty

Calculate area under the ROC curve (See `scikitlearn.metrics.auc`)


In [None]:
def AUC(roc: pd.DataFrame) -> float:
    """
    :param roc: the Receiver Operating Characteristic pd.DataFrame
    """
    return auc(roc[FALSE_REJECTION_RATE].values, roc[FALSE_ACCEPTANCE_RATE].values)

Determine the maximum lockout time for each impostor/query sample

In [None]:
def continuous_verification(penalty: pd.DataFrame) -> pd.DataFrame:
    """
    :param penalty: pd.DataFrame containing penalties
    :return: continuous verification results as pd.DataFrame
    """
    genuine_idx = penalty[REFERENCE_USER_FIELD] == penalty[QUERY_USER_FIELD]
    genuine = penalty[genuine_idx]
    lockout = genuine.groupby([QUERY_USER_FIELD, QUERY_SESSION_FIELD]).max()[[PENALTY_FIELD]]
    lockout = pd.DataFrame(lockout)
    lockout.columns = [CV_THRESHOLD_FIELD]

    impostor = penalty[~genuine_idx]

    def _mrt(df: pd.DataFrame) -> Union[np.ndarray, int]:
        """
        :param df: pd.DataFrame containing a row with a penalty
        :return: np.ndarray containing maximum rejection time
        """
        thresh = lockout.loc[tuple(df.iloc[0][[QUERY_USER_FIELD, QUERY_SESSION_FIELD]].values)].squeeze()
        #thresh = 645
        reject = (df[PENALTY_FIELD] > thresh)
        return np.where(reject)[0].min() if reject.any() else len(reject)

    mrt = impostor.groupby([REFERENCE_USER_FIELD, QUERY_USER_FIELD, QUERY_SESSION_FIELD]).apply(_mrt).reset_index()
    mrt.columns = [REFERENCE_USER_FIELD, QUERY_USER_FIELD, QUERY_SESSION_FIELD, MAXIMUM_REJECTION_TIME_FIELD]

    amrt = mrt.groupby([QUERY_USER_FIELD, QUERY_SESSION_FIELD])[MAXIMUM_REJECTION_TIME_FIELD].mean()
    amrt.columns = [AVERAGE_MAXIMUM_REJECTION_TIME_FIELD]

    results = pd.concat([amrt, lockout], axis=1).reset_index().rename(columns={MAXIMUM_REJECTION_TIME_FIELD: AVERAGE_MAXIMUM_REJECTION_TIME_FIELD})
    return results

Obtain results for a given dataset and features conditioned on the event column.



In [None]:
def dataset_classification_results(is_pohmm: bool) -> None:
    """
    :param is_pohmm: True -> train POHMM or False -> train HMM 
    :return: None
    """
    # Load the preprocessed dataset
    df = pd.read_csv(POHMM_ADAPTED_DATASET_FILE if is_pohmm else HMM_ADAPTED_DATASET_FILE, index_col=[0, 1])

    # Create the validation folds
    reference_section = np.arange(1, REFERENCE_FOLD_THRESHOLD)
    genuine_section = np.arange(REFERENCE_FOLD_THRESHOLD, MAX_ROWS_PER_USER + 1)
    impostor_section = np.arange(REFERENCE_FOLD_THRESHOLD, MAX_ROWS_PER_USER + 1)
    fold = split_dataset(df, reference_section, genuine_section, impostor_section)

    # Calculate scores
    scores = cv_event_scores(fold)
    save_data(scores, POHMM_EVENT_SCORES_DATASET_FILE if is_pohmm else HMM_EVENT_SCORES_DATASET_FILE)

    # Aggregate the event scores within each session
    session_scores = scores.groupby([FOLD_FIELD, REFERENCE_USER_FIELD,
                                     QUERY_USER_FIELD, QUERY_SESSION_FIELD])[
        SCORE_FIELD].sum().reset_index()
    save_data(session_scores,
              POHMM_RAW_SESSION_SCORES_DATASET_FILE if is_pohmm else HMM_RAW_SESSION_SCORES_DATASET_FILE)

    # Normalize the session scores
    session_scores = normalize_session_scores(session_scores)
    save_data(session_scores,
              POHMM_NORMALIZED_SESSION_SCORES_DATASET_FILE if is_pohmm else HMM_NORMALIZED_SESSION_SCORES_DATASET_FILE)

    # Session and continuous identification, verification results
    session_ide = session_identification(session_scores)
    session_ver = session_roc(session_scores)

    continuous_ide = continuous_identification(scores)  # Identification of each event
    penalty = scores_penalty(scores)
    continuous_ver = continuous_verification(penalty)  # Minimum rejection time

    # Summarize
    session_acc = session_ide.groupby(FOLD_FIELD).apply(ACC).describe()
    session_eer = session_ver.groupby(FOLD_FIELD).apply(EER).describe()
    session_auc = session_ver.groupby(FOLD_FIELD).apply(AUC).describe()

    # User-dependent EER is obtained by deriving an ROC curve for each user
    user_eer = session_roc(session_scores, pivot=REFERENCE_USER_FIELD).groupby(REFERENCE_USER_FIELD).apply(
        EER).describe()
    user_acc = session_ide.groupby(QUERY_USER_FIELD).apply(ACC).describe()

    # Summarize continuous results, CI by session
    continuous_acc = continuous_ide.groupby([QUERY_USER_FIELD, QUERY_SESSION_FIELD]).apply(ACC).describe()

    # Maximum lockout time, averaged for each session (against all reference users), CI by session
    continuous_amrt = continuous_ver[AVERAGE_MAXIMUM_REJECTION_TIME_FIELD].describe()

    summary = pd.concat([session_acc, user_acc, session_eer, user_eer, session_auc, continuous_acc, continuous_amrt],
                        axis=1)

    summary.columns = [SESSION_ACCURACY,
                       USER_ACCURACY,
                       SESSION_EQUAL_ERROR_RATE,
                       USER_EQUAL_ERROR_RATE,
                       AREA_UNDER_ROC_CURVE,
                       CONTINUOUS_IDENTIFICATION_ACCURACY,
                       AVERAGE_MAXIMUM_REJECTION_TIME]
    print(summary)
    save_data(summary, POHMM_SUMMARY_DATASET_FILE if is_pohmm else HMM_SUMMARY_DATASET_FILE)

main

In [None]:
# Setup
setup()

##### POHMM #####
# Preprocess raw dataset
preprocess_rhu_university(RAW_DATASET_FILE, is_pohmm=True)

# Classify dataset and save results
print('-'*37 + " POHMM " + '-'*37)
dataset_classification_results(is_pohmm=True)

##### HMM #####
# Preprocess raw dataset
preprocess_rhu_university(RAW_DATASET_FILE, is_pohmm=False)

# Classify dataset and save results
print('\n' + '-'*38 + " HMM " + '-'*38)
dataset_classification_results(is_pohmm=False)

------------------------------------- POHMM -------------------------------------
          S-ACC      U-ACC     S-EER      U-EER       AUC         CIA        AMRT
count  1.000000  51.000000  1.000000  51.000000  1.000000  151.000000  151.000000
mean   0.708609   0.712418  0.053377   0.031142  0.015207    0.216651    3.880000
std         NaN   0.333464       NaN   0.065473       NaN    0.204942    2.411958
min    0.708609   0.000000  0.053377   0.000000  0.015207    0.000000    0.040000
25%    0.708609   0.666667  0.053377   0.000000  0.015207    0.071429    2.100000
50%    0.708609   0.666667  0.053377   0.006757  0.015207    0.142857    3.840000
75%    0.708609   1.000000  0.053377   0.027027  0.015207    0.285714    5.090000
max    0.708609   1.000000  0.053377   0.333333  0.015207    0.928571   12.520000

-------------------------------------- HMM --------------------------------------
          S-ACC      U-ACC     S-EER      U-EER       AUC         CIA        AMRT
count  1.000000