# E-Reader Project

## Load Packages

In [126]:
from os import listdir
from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np
from sklearn.svm import SVC
import seaborn as sns
import pandas as pd
import os
from decimal import Decimal
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from tsfresh.feature_extraction import extract_features, MinimalFCParameters, EfficientFCParameters
from tsfresh.utilities.dataframe_functions import make_forecasting_frame
from tsfresh.utilities.dataframe_functions import impute
from scipy import stats
from sklearn.ensemble import RandomForestClassifier
from statsmodels.tsa.stattools import acf, pacf
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from scipy.signal import argrelextrema
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
import pyautogui
from sklearn.metrics import plot_confusion_matrix
from sklearn.model_selection import cross_validate
import pickle


# Train the Data

In this section, we are aiming to read all the data and get a feature matrix for training

## Label Training Data

Firstly, read the data and get all of the events selected and labeled.

In [127]:
def eye_movement_ZC(Y, time, windowSize=0.5, thresholdEvents=20, downSampleRate=50):
    Y = Y - 500
    ind = np.arange(0, np.where(time == np.round(time[-1] - windowSize, 4))[0][0] + 1, downSampleRate)
    Y = Y - np.average(Y)
    timeMiddle = time[ind] + windowSize / 2
    testStat = np.empty(len(ind), dtype=int)
    for i in range(len(ind)):
        Y_subset = Y[(time >= time[ind[i]]) & (time < time[ind[i]] + windowSize)]
        if np.ndim(Y_subset) > 1:
            Y_subset = Y_subset[:, 1]
        testStat[i] = sum(Decimal(int(Y_subset[i])) * Decimal(int(Y_subset[i + 1])) <= 0 for i in range(len(Y_subset) - 1))
    predictedEvent = np.where(testStat < thresholdEvents)[0]
    eventTimes = timeMiddle[predictedEvent]
    gaps = np.where(np.diff(eventTimes) > windowSize)[0]
    if len(eventTimes) == 0:
        return None
    event_time_interval = [np.min(eventTimes)]
    for i in range(len(gaps)):
        event_time_interval.extend([eventTimes[gaps[i]], eventTimes[gaps[i] + 1]])
    event_time_interval.append(np.max(eventTimes))
    event_time_interval = np.reshape(event_time_interval, (-1, 2))

    predictedEventTimes = np.full(len(Y), False)
    for i in range(event_time_interval.shape[0]):
        predictedEventTimes[(event_time_interval[i, 0] <= time) & (event_time_interval[i, 1] >= time)] = True

    num_event = len(gaps) + 1
    movement_list = []
    y_values = []
    for i in range(event_time_interval.shape[0]):
        interval_start, interval_end = event_time_interval[i]
        interval_indices = (time >= interval_start) & (time <= interval_end)
        interval_Y = Y[interval_indices]
        y_values.append(interval_Y)
        movement = LR_detection(interval_Y)
        movement_list.append(movement)
    return {
        "num_event": num_event,
        "predictedEventTimes": predictedEventTimes,
        "predictedInterval": event_time_interval,
        "labels": movement_list,
        "signals": y_values
    }

def LR_detection(seq):
    maxval = np.max(seq)
    minval = np.min(seq)
    movement = "L" if maxval < -minval else "R"
    return movement
def record_all_training(files,method):
    ls_signals = []
    ls_labels = []
    ls_intervals = []
    for i in range(len(files)):
        wave = files[i]
        print(wave)
        if wave[-3:] != "wav":
            continue
        window_size, Y = wavfile.read(wave)
        timeSeq = []
        for i in range(len(Y)):
            timeSeq.append(i / window_size)
        timeSeq = np.array(timeSeq)
        if np.ndim(Y) > 1:
            Y = Y[:, 1]
            Y = Y-500
        Y = np.array(Y)
        if method == "zc":
            result = eye_movement_ZC(Y=Y, time=timeSeq)
        elif method == "max":
            result = eye_movement_max(Y=Y,time = timeSeq)
        if result == None:
            continue
        ls_signals.append(result["signals"])
        ls_labels.append(result["labels"])
        ls_intervals.append(result["predictedInterval"])
    ls_labels = [item for sublist in ls_labels for item in sublist]
    ls_signals = [item for sublist in ls_signals for item in sublist]
    ls_intervals = [item for sublist in ls_intervals for item in sublist]
    return {
        "ls_signals":ls_signals,
        "ls_labels":ls_labels,
        "ls_intervals": ls_intervals
    }

# make a feature matrix for the following classifier
def make_matrix(signals, labels):
    mean_ls = []
    sd_ls = []
    zero_crossing = []
    # entropy = []
    # lumpiness = []
    # flat_spots = []
    for i in range(len(signals)):
        mean = np.mean(signals[i])
        zero_crossing.append(len(np.where(np.diff(np.sign(signals[i])))[0]))
        sd = np.std(signals[i])
        mean_ls.append(mean)
        sd_ls.append(sd)
        #entropy.append(ts_entropy(signals))
        # lumpiness.append(ts_lumpiness(signals))
        # flat_spots.append(ts_flat_spots(signals))
    # , 'Signals': signals}
    dependent_vars = pd.DataFrame({'Mean': mean_ls, 'SD': sd_ls})
    feature_matrix = pd.concat([pd.Series(labels, name='Label'), dependent_vars], axis=1)
    feature_matrix.to_csv("matrix.csv", index=False)
    return feature_matrix


## Get features using Tsfresh

Get some useful features for each event using the Python package (Tsfresh)

In [128]:
def make_matrix_tsfresh(signals, labels):
    df_list = []
    for i, signal in enumerate(signals):
        temp_df = pd.DataFrame({'id': i, 'time': np.arange(len(signal)), 'value': signal.astype(float)})
        df_list.append(temp_df)

    df = pd.concat(df_list, ignore_index=True)
    extracted_features = extract_features(df, column_id='id', column_sort='time',default_fc_parameters=MinimalFCParameters())
    impute(extracted_features)
    extracted_features['Label'] = labels
    feature_matrix.to_csv("matrix.csv", index=False)
    return extracted_features

#matrix = make_matrix_tsfresh(ls_signals,ls_labels)


## Read matrix csv file

In [129]:
def save_feature_matrix_to_csv(feature_matrix, file_path):
    feature_matrix.to_csv(file_path, index=False)

def load_feature_matrix_from_csv(file_path):
    return pd.read_csv(file_path)

# Different Classifiers

The next stage is to build up several classifiers using different algorithms. Now, Knn, Random Forest, Svm, Logistic Regression, Decision Tree and Gradient Boosting is included.

## Knn

In [130]:
def classifier_knn(feature_matrix):
    X = feature_matrix.drop('Label', axis=1)
    y = feature_matrix['Label']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    knn = KNeighborsClassifier(n_neighbors=3)
    knn.fit(X_train, y_train)
    y_pred = knn.predict(5)
    accuracy = accuracy_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)

    return accuracy, cm, knn, X_test, y_test

## Random Forest

In [131]:
def classifier_rf(feature_matrix):
    X = feature_matrix.drop('Label', axis=1)
    y = feature_matrix['Label']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(X_train, y_train)
    y_pred = rf.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)

    return accuracy, cm, rf, X_test, y_test

## SVM

In [132]:
def classifier_svm(feature_matrix):
    X = feature_matrix.drop('Label', axis=1)
    y = feature_matrix['Label']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    svm = SVC(kernel='linear', C=1, random_state=42)
    svm.fit(X_train, y_train)
    y_pred = svm.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)

    return accuracy, cm, svm, X_test, y_test

## Decision Tree

In [133]:
def classifier_dt(feature_matrix):
    X = feature_matrix.drop('Label', axis=1)
    y = feature_matrix['Label']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    dt = DecisionTreeClassifier(random_state=42)
    dt.fit(X_train, y_train)
    y_pred = dt.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)

    return accuracy, cm, dt, X_test, y_test


## Logistic Regression

In [134]:
def classifier_lr(feature_matrix):
    X = feature_matrix.drop('Label', axis=1)
    y = feature_matrix['Label']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    lr = LogisticRegression(random_state=42)
    lr.fit(X_train, y_train)
    y_pred = lr.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)

    return accuracy, cm, lr, X_test, y_test


## Gradient Boosting

In [135]:
def classifier_gb(feature_matrix):
    X = feature_matrix.drop('Label', axis=1)
    y = feature_matrix['Label']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    gb = GradientBoostingClassifier(random_state=42)
    gb.fit(X_train, y_train)
    y_pred = gb.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)

    return accuracy, cm, gb, X_test, y_test


In [136]:
def predict(signals,classifier):
    for i, signal in enumerate(signals):
        temp_df = pd.DataFrame({'id': i, 'time': np.arange(len(signal)), 'value': signal.astype(float)})
        df_list.append(temp_df)

    df = pd.concat(df_list, ignore_index=True)
    extracted_features = extract_features(df, column_id='id', column_sort='time',default_fc_parameters=MinimalFCParameters())
    impute(extracted_features)    
    predicted_class = classifier.predict(extracted_features)
    return predicted_class

## Write classifiers into files

In [137]:
def save_classifier(classifier, filename):
    with open(filename, 'wb') as file:
        pickle.dump(classifier, file)
def train_and_save_classifiers(classifiers, feature_matrix):
    for name, classifier_func in classifiers.items():
        accuracy, _, classifier, _, _ = classifier_func(feature_matrix)
        save_classifier(classifier, f"{name}_classifier.pkl")
        print(f"{name} classifier trained and saved.")

def load_all_classifiers(filenames):
    classifiers = {}
    for filename in filenames:
        classifier_name = filename[:-4].replace("_", " ")
        classifiers[classifier_name] = load_classifier(filename)
        print(f"{classifier_name} classifier loaded.")
    return classifiers

    

## Comparison of Different Classifiers

## Simple Visualization of Accuracy

In [138]:
def compare_classifiers(classifiers, feature_matrix):
    names, accuracies = [], []
    for name, classifier_func in classifiers.items():
        accuracy, _, _, _, _ = classifier_func(feature_matrix)
        names.append(name)
        accuracies.append(accuracy)
    plt.figure(figsize=(12, 6))
    sns.barplot(x=names, y=accuracies)
    plt.xlabel("Classifier")
    plt.ylabel("Accuracy")
    plt.title("Comparison of Classifier Accuracies")
    plt.show()

classifiers = {
    "KNN": classifier_knn,
    "Random Forest": classifier_rf,
    "SVM": classifier_svm,
    "Logistic Regression": classifier_lr,
    "Decision Tree": classifier_dt,
    "Gradient Boosting": classifier_gb,
}
classifier_filenames = [
    "KNN_classifier.pkl",
    "Random_Forest_classifier.pkl",
    "SVM_classifier.pkl",
    "Logistic_Regression_classifier.pkl",
    "Decision_Tree_classifier.pkl",
    "Gradient_Boosting_classifier.pkl",
]
# compare_classifiers(classifiers, feature_matrix)

## Visualization of Confusion Matrix

In [139]:
def visualize_confusion_matrix(classifier, X_test, y_test):
    fig, ax = plt.subplots(figsize=(8, 8))
    plot_confusion_matrix(classifier, X_test, y_test, ax=ax, cmap=plt.cm.Blues)
    plt.title("Confusion Matrix")
    plt.show()

## Multiple classifiers

In [140]:
def plot_all_confusion_matrices(classifiers, feature_matrix):
    X = feature_matrix.drop('Label', axis=1)
    y = feature_matrix['Label']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    n_classifiers = len(classifiers)
    nrows = 2
    ncols = int(np.ceil(n_classifiers / nrows))
    fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(5 * ncols, 5 * nrows))
    
    axes = axes.flatten()
    
    for ax, (name, classifier_func) in zip(axes, classifiers.items()):
        _, _, classifier, _, _ = classifier_func(feature_matrix)
        plot_confusion_matrix(classifier, X_test, y_test, ax=ax, cmap=plt.cm.Blues)
        ax.set_title(f"{name} Confusion Matrix")
    
    for i in range(n_classifiers, nrows * ncols):
        axes[i].axis('off')
    
    plt.tight_layout()
    plt.show()


## K-fold Cross Validation

In [141]:
def all_classifiers_cross_validation(classifiers, feature_matrix, n_splits=5):
    X = feature_matrix.drop('Label', axis=1)
    y = feature_matrix['Label']
    cv_scores_dict = {}
    cv_times_dict = {}
    
    for name, classifier_func in classifiers.items():
        accuracy, _, classifier, _, _ = classifier_func(feature_matrix)
        cv_results = cross_validate(classifier, X, y, cv=n_splits, return_train_score=False)
        cv_scores_dict[name] = cv_results['test_score']
        cv_times_dict[name] = cv_results['score_time']
        
    cv_scores_df = pd.DataFrame(cv_scores_dict)
    cv_times_df = pd.DataFrame(cv_times_dict)
    
    # Plot cross-validation scores
    plt.figure(figsize=(12, 6))
    sns.boxplot(data=cv_scores_df)
    plt.title("Classifier Cross-Validation Scores")
    plt.xlabel("Classifier")
    plt.ylabel("Cross-Validation Score")
    plt.show()
    
    # Plot prediction times
    plt.figure(figsize=(12, 6))
    sns.boxplot(data=cv_times_df)
    plt.title("Classifier Prediction Times")
    plt.xlabel("Classifier")
    plt.ylabel("Prediction Time (seconds)")
    plt.show()

    return cv_scores_dict, cv_times_dict

# Streaming Condition

In [142]:
def streaming_condition(knn_classifier, sample_rate, window_size=None, increment=None):
    if window_size is None:
        window_size = sample_rate
    if increment is None:
        increment = window_size // 3
    input_buffer = []
    lower_interval = 0
    return_str = ""

    while True:
        new_data = read_data_from_spikerbox()
        input_buffer.extend(new_data)
        if len(input_buffer) >= lower_interval + window_size:
            upper_interval = lower_interval + window_size
            interval = np.array(input_buffer[lower_interval:upper_interval])
            input_buffer = input_buffer[upper_interval:]
            lower_interval = 0

            mean = np.mean(interval)
            sd = np.std(interval)
            zero_crossing_point = len(np.where(np.diff(np.sign(interval)))[0])

            if zero_crossing_point < 40:
                temp_df = pd.DataFrame({'id': 0, 'time': np.arange(len(interval)), 'value': interval.astype(float)})
                extracted_features = extract_features(temp_df, column_id='id', column_sort='time',
                                                      default_fc_parameters=EfficientFCParameters())
                impute(extracted_features)
                features = extracted_features

                predicted = knn_classifier.predict(features)
                if predicted[0] == "L":
                    goLeft()
                elif predicted[0] == "R":
                    goRight()
                elif predicted[0] == "B":
                    status = not status
            else:
                lower_interval = lower_interval + increment
    return None


# Connect to Keyboard

In [143]:
def goLeft():
    pyautogui.press('left')

def goRight():
    pyautogui.press('right')


# Visualization of Different Classifiers

In [144]:
if __name__ == '__main__':
    # Load the classifier
    path = "../datasets/zoe_spiker/Length3"
    file_ls = []
    wave_file_ls = os.listdir(path)
    for i in range(len(wave_file_ls)):
        file_path = path + "/" + wave_file_ls[i]
        file_ls.append(file_path)
    results = record_all_training(files = file_ls,method="zc")

    if results != None:
        matrix = make_matrix_tsfresh(results["ls_signals"], results["ls_labels"])
        print(matrix)
        train_and_save_classifiers(classifiers, matrix)

../datasets/zoe_spiker/Length3/LLR_z.wav
../datasets/zoe_spiker/Length3/RLL_z.wav
../datasets/zoe_spiker/Length3/.DS_Store
../datasets/zoe_spiker/Length3/RRR_z.wav
../datasets/zoe_spiker/Length3/LRL_z.wav
../datasets/zoe_spiker/Length3/RRL_z.wav
../datasets/zoe_spiker/Length3/LRR_z.wav
../datasets/zoe_spiker/Length3/RLR_z.wav
../datasets/zoe_spiker/Length3/RRL_z2.wav
../datasets/zoe_spiker/Length3/RRL_z3.wav
../datasets/zoe_spiker/Length3/归档.zip
../datasets/zoe_spiker/Length3/LLR_z3.wav
../datasets/zoe_spiker/Length3/LLR_z2.wav
../datasets/zoe_spiker/Length3/RLR_z2.wav
../datasets/zoe_spiker/Length3/RLR_z3.wav
../datasets/zoe_spiker/Length3/LRL_z3.wav
../datasets/zoe_spiker/Length3/LRL_z2.wav
../datasets/zoe_spiker/Length3/LLL_z3.wav
../datasets/zoe_spiker/Length3/LLL_z2.wav
../datasets/zoe_spiker/Length3/RRR_z2.wav
../datasets/zoe_spiker/Length3/LRRz_2.wav
../datasets/zoe_spiker/Length3/RRR_z3.wav
../datasets/zoe_spiker/Length3/LRR_z3.wav
../datasets/zoe_spiker/Length3/RLL_z2.wav
../d

Feature Extraction: 100%|███████████████████| 20/20 [00:01<00:00, 10.44it/s]


    value__sum_values  value__median  value__mean  value__length  \
0       -6.574972e+05     -50.868754   -57.924168        11351.0   
1       -2.488980e+06    -306.868754  -257.898699         9651.0   
2        6.044780e+05     -97.868754    60.745455         9951.0   
3        1.476205e+06      95.000286    79.790567        18501.0   
4       -1.473831e+06     -64.999714  -131.580287        11201.0   
..                ...            ...          ...            ...   
72      -1.483565e+06     -48.779874  -131.860756        11251.0   
73      -1.195668e+06      17.220126   -91.265410        13101.0   
74       2.528954e+06     293.730889   284.120171         8901.0   
75      -1.880082e+06    -372.269111  -216.076490         8701.0   
76      -2.047131e+06    -657.269111  -301.004444         6801.0   

    value__standard_deviation  value__variance  value__root_mean_square  \
0                  776.634379     6.031610e+05               778.791479   
1                  835.200502    

ValueError: Expected 2D array, got scalar array instead:
array=5.
Reshape your data either using array.reshape(-1, 1) if your data has a single feature or array.reshape(1, -1) if it contains a single sample.

## Load classifiers

In [None]:
signals = results["ls_signals"]
classifiers = load_all_classifiers(classifier_filenames)
rf = classifiers['Random Forest classifier']
# rf.predict(signals)
predict(signals,rf)

In [None]:
def predict(signals,classifier):
    for i, signal in enumerate(signals):
        df_list = []
        temp_df = pd.DataFrame({'id': i, 'time': np.arange(len(signal)), 'value': signal.astype(float)})
        df_list.append(temp_df)

    df = pd.concat(df_list, ignore_index=True)
    extracted_features = extract_features(df, column_id='id', column_sort='time',default_fc_parameters=EfficientFCParameters())
    impute(extracted_features)
    predicted_class = classifier.predict(extracted_features)
    return predicted_class

In [None]:
compare_classifiers(classifiers, matrix)

In [None]:
accuracy, cm, svm_classifier, X_test, y_test = classifier_svm(matrix)
visualize_confusion_matrix(svm_classifier, X_test, y_test)

In [None]:
plot_all_confusion_matrices(classifiers, matrix)

In [None]:
cv_scores_dict = all_classifiers_cross_validation(classifiers, matrix, n_splits=5)

In [None]:
cv_scores, cv_times = all_classifiers_cross_validation(classifiers, matrix, n_splits=5)


In [None]:
matrix.to_csv("test_feature.csv", index=False)