In [536]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px
import warnings
warnings.filterwarnings("ignore")

plt.rcParams["figure.figsize"] = (20, 9)

In [537]:
df = pd.read_csv("./data/anshal_EAR.csv")
# df = pd.read_csv("./Feature_Extraction/p5/p5_10_EAR.csv")

# standerdize the data
mean, std_dev = df["EAR"].describe()[1], df["EAR"].describe()[2]
# df["EAR"] = df["EAR"].apply(lambda x: (x - mean)/std_dev)


print(mean, std_dev)

0.3262191313851102 0.0524547242905411


In [538]:
df["EMA"] = df["EAR"].ewm(50).mean()
df["CMA"] = df["EAR"].expanding().mean()
df["SMA"] = df["EAR"].rolling(50, min_periods=1).mean()

df["EMA_MEAN"] = [df["EMA"].mean()] * df.shape[0]

df.dropna(inplace=True)

In [539]:
# Rate of change
def get_ROC(x1:int, y1:float, x2:int, y2:float) -> float:
    delta_change = (y2-y1) / (x2-x1)
    return delta_change

delta_change = []
for i in range(df.shape[0] - 1):
    change = get_ROC(i, df["EAR"].iloc[i], i+1, df["EAR"].iloc[i+1])
    delta_change.append(np.floor(change))

delta_change.insert(0, 0)
df["DC"] = delta_change

In [540]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go


fig = make_subplots(rows=1, cols=1
)

fig.append_trace(
    go.Scatter(x = df["FN"], y = df["EAR"],  name = "EAR", mode = "lines"),
    row=1, col=1
)
fig.append_trace(
    go.Scatter(x = df["FN"], y = df["EMA"],  name = "EMA", mode = "lines"),
    row=1, col=1
)
fig.append_trace(
    go.Scatter(x = df["FN"], y = df["EMA_MEAN"],  name = "EMA_MEAN", mode = "lines"),
    row=1, col=1
)

fig.update_layout(height=400, width=1550, title_text="EAR with EMA and MEAN")
fig.show()

In [541]:
from itertools import groupby, count

def get_Binks(df:pd.DataFrame) -> list:
    num_blinks = []
    # threshold = (min(df["EAR"]) + max(df["EMA"])) / 2
    threshold = 0.25

    for i in range(df.shape[0]):
        # if df["EAR"].iloc[i] - df["EMA"].iloc[i] < threshold:
        if df["EAR"].iloc[i] < threshold:
            num_blinks.append(i)
    
    return num_blinks


def accumulate_Blinks(blinks:list) -> list:
    groups = groupby(blinks, key=lambda item, c = count():item-next(c))
    final_list = [list(g) for k, g in groups]

    return final_list

In [542]:
result_full = accumulate_Blinks(get_Binks(df))
num_blinks = len(result_full)
print(f"Total number of blinks detected are = {num_blinks}.")

result = [int(np.floor(np.mean(i))) for i in result_full]

Total number of blinks detected are = 10.


## Feature Extraction from the Blinks

> FREQUENCY

In [543]:
# NOTE: FREQUENCY

def get_Count(list1, l, r):
    # Traversing the list with on line for loop
    # check if number is in range of not
    return sum( l <= x <= r for x in list1)

def get_Frequency(df, result):
    freq_by_min = {}
    
    for i in df["MINUTE"].unique():
        df_tmp = df[df["MINUTE"] == i]
        freq_by_min[i] = get_Count(result, min(df_tmp["FN"]), max(df_tmp["FN"]))

    return freq_by_min

In [544]:
p1_0_freq = get_Frequency(df, result)
print(p1_0_freq)

{1: 10}


> AMPLITUDE

In [545]:
# NOTE: AMPLITUDE

def get_Amplitude(df, result):
    fn_2_amplitude = {}
    for i in result:
        fn_2_amplitude[i] = df["EAR"].iloc[i]
    
    return fn_2_amplitude

In [546]:
amplitudes = get_Amplitude(df, result)

In [547]:
np.mean(list(amplitudes.values()))

0.15365114795254414

> DURATION

In [548]:
duration = [(len(i) / 30) for i in result_full]
np.mean(duration)

0.11333333333333333

> Velocity Calculation and frame count mechanism and duration

In [549]:
# def get the frames of blink
def get_Blink_Frames(mid_frame:int, df:pd.DataFrame) -> tuple:
    start, end = mid_frame, mid_frame

    while abs(np.round(abs(df["EMA"].iloc[start]) - abs(df["EAR"].iloc[start]))) > 0:
        start -= 1

    while abs(np.round(abs(df["EMA"].iloc[end]) - abs(df["EAR"].iloc[end]))) > 0:
        end += 1

    return (start+1, end + 1)

# def get the time difference
def get_Blink_Duration(mid_frame:int, df:pd.DataFrame) -> tuple:
    start, end = get_Blink_Frames(mid_frame, df)

    duration = (df["TIMESTAMP"].iloc[mid_frame] - df["TIMESTAMP"].iloc[start]) / 1000

    return duration, start, mid_frame, end

# def get blink velocity in frames/sec
def get_Blink_Velocity(start:int, mid:int, duration:float, df:pd.DataFrame) -> float:

    EAR_S = abs(df["EAR"].iloc[start])
    EAR_M = abs(df["EAR"].iloc[mid])

    velocity = (EAR_M - EAR_S) / duration

    return velocity


In [550]:
duration_results = []
for i in result:
    duration_results.append(get_Blink_Duration(i, df))

> PERCLOS INDEX

In [551]:
TIME_INTERVAL = 900 # 30 seconds
PERCLOS = {}
window = []

for i in duration_results:
    if i[2] < TIME_INTERVAL:
        window.append((i[3] - i[1]))
    else:
        PERCLOS[TIME_INTERVAL] = window
        window = []
        TIME_INTERVAL += 900

In [552]:
def get_PERCLOS(duration_results:list, time_window:int = 900):
    TIME_INTERVAL = time_window
    PERCLOS = {}
    window = []

    for i in duration_results:
        if i[2] < TIME_INTERVAL:
            window.append((i[3] - i[1]))
        else:
            PERCLOS[TIME_INTERVAL] = window
            window.clear()
            TIME_INTERVAL += time_window

    return PERCLOS

In [553]:
PERCLOS_INDEX = []

for i in PERCLOS.keys():
    PERCLOS_INDEX.append((np.sum(PERCLOS[i])) / 450)

## Dataset for SVM from person 5

### Blinks

In [554]:
df_svm = df.copy()
df_svm["class"] = [0] * df_svm.shape[0]

In [555]:
def annotate_blink_frames(df:pd.DataFrame, result):
    for idx in result:
        df["class"].iloc[idx] = 1
        for i in range(1, 7):
            df["class"].iloc[idx - i] = 1
            df["class"].iloc[idx + i] = 1
    print("Done indexing")

    return df

In [556]:
df_svm = annotate_blink_frames(df_svm, result)

Done indexing


In [557]:
test_df = df_svm[df_svm["class"] == 1]

In [558]:
blink_ear_frames = []

start, end = 0,13

while end < test_df.shape[0]:
    blink_ear_frames.append(np.array(test_df["EAR"][start:end]))

    start += 13
    end += 13 

In [559]:
svm_data = pd.DataFrame()

for ear_vec in blink_ear_frames:
    svm_data = svm_data.append(pd.Series(ear_vec), ignore_index=True)

In [560]:
svm_data["class"] = [1] * svm_data.shape[0]
svm_data.shape
# svm_data.to_csv("./train_data/blink2.csv", index = None)

(9, 14)

### Non-Blinks

In [561]:
to_drop = list(test_df["FN"])

df_nb = df
for i in to_drop:
    df_nb = df_nb.drop(df.index[i-1])

In [562]:
df_nb.shape

(729, 11)

In [563]:
nb_ear_frames = []

start, end = 0,13

while len(nb_ear_frames) < 160:
    nb_ear_frames.append(np.array(df_nb["EAR"][start:end]))

    start += 13
    end += 13

In [564]:
len(nb_ear_frames)

160

In [565]:
nb_dataframe = pd.DataFrame()

for ear_vec in nb_ear_frames:
    nb_dataframe = nb_dataframe.append(pd.Series(ear_vec), ignore_index=True)

In [566]:
nb_dataframe["class"] = [0] * nb_dataframe.shape[0]
nb_dataframe.shape
# nb_dataframe.to_csv("./train_data/nonblink2.csv", index = None)

(160, 14)

## Test the SVM model

In [567]:
import pickle
from sklearn.preprocessing import StandardScaler
sc = StandardScaler()

sc.mean_ = [0.50741131, 0.50657204, 0.50085083, 0.47528443, 0.3986786,  0.31181558,
 0.30208963, 0.30327507, 0.3504944,  0.41590232, 0.4526851,  0.47268597,
 0.48495746]

sc.scale_ = [0.03347708, 0.0345071 , 0.04984739, 0.0954779 , 0.17012681,
       0.21660294, 0.22195069, 0.22107812, 0.18719406, 0.12510521,
       0.08377453, 0.06175941, 0.04869713]

blink_ear_frames = sc.transform(blink_ear_frames)
nb_ear_frames = sc.transform(nb_ear_frames[:50])

model = pickle.load(open("./models/svm_ear_blinks.pkl", "rb"))

In [568]:
blink_preds = []
for arr in blink_ear_frames:
    blink_preds.append(model.predict(arr.reshape(1, -1)))

non_blink_preds = []
for arr in nb_ear_frames:
    non_blink_preds.append(model.predict(arr.reshape(1, -1)))

In [572]:
pd.DataFrame(blink_preds).value_counts()

1    8
0    1
dtype: int64

In [573]:
pd.DataFrame(non_blink_preds).value_counts()

0    27
1    23
dtype: int64