# Classification 🤖

In [1]:
import pandas as pd
import numpy as np
from scipy.fft import fft2

## Load the Data 💽

In [2]:
signals = pd.read_csv("transformed_signal_data/labeled_signals.csv")
signals.head()

Unnamed: 0,Label,1 sec,2 sec,3 sec,4 sec,5 sec,6 sec,7 sec,8 sec,9 sec,...,65527 sec,65528 sec,65529 sec,65530 sec,65531 sec,65532 sec,65533 sec,65534 sec,65535 sec,65536 sec
0,ARR,-0.097857,-0.156881,-0.137816,-0.137177,-0.147684,-0.168522,-0.175891,-0.170236,-0.181131,...,-0.161495,-0.124765,-0.105196,-0.094394,-0.124967,-0.104613,-0.126682,-0.074905,-0.116527,-0.180421
1,ARR,0.119361,0.187559,0.169569,0.17859,0.211824,0.196607,0.072911,0.165885,0.083204,...,-0.141731,-0.121423,-0.113781,-0.087638,-0.097274,-0.089068,-0.105625,-0.097218,-0.123936,-0.106481
2,ARR,-0.024602,-0.036279,-0.035968,-0.030194,-0.045812,-0.093124,-0.091381,-0.128953,-0.11248,...,-0.327886,-0.308444,-0.282376,-0.269384,-0.248776,-0.247613,-0.191715,-0.186142,-0.129985,-0.122356
3,ARR,-0.508309,-0.808009,-0.721684,-0.78972,-0.767678,-0.774352,-0.772501,-0.794323,-0.794939,...,-0.953446,-0.943424,-0.971548,-0.988559,-0.994919,-0.996746,-1.015446,-1.042696,-1.040684,-1.048192
4,ARR,-0.312039,-0.49526,-0.442597,-0.465246,-0.428401,-0.444373,-0.417601,-0.453342,-0.429223,...,0.392087,0.434705,0.523518,0.59292,0.663007,0.743777,0.809435,0.822494,0.787239,0.684293


## Define Filters 🚬 and Feature Engineering ⚙️

In [3]:
def apply_wave_filter(signals):
    v = np.linspace(0.5 * np.pi, 1.5 * np.pi, 15)
    peak_filter = np.sin(v)
    return pd.DataFrame(np.correlate(signals, peak_filter, mode='same'))

def fft2_filter(signals):
    return pd.DataFrame(np.abs(fft2(signals.to_frame())))

def moving_median(signals, n=10000):
    return signals.rolling(n).median()

In [4]:
"""
Peaks Info:
"""
from scipy.signal import find_peaks

def get_signals_peak_info(sigs):
    distance = 30 
    p = 5000 
    pl = 950
    w = 5 
    peaks, peaks_props = find_peaks(sigs, height=0, distance=distance, prominence=(pl, p), width=w)
    diff = np.diff(peaks)
    return np.max(diff)

## Applying Transformation and Feature Extractions 

In [5]:
signals_cp = signals.copy()
signal_features = pd.DataFrame()
labels = signals_cp.pop("Label")

In [6]:
signals_scaled = signals_cp.copy()
signals_scaled = (signals_scaled - signals_scaled.mean(axis=0)) / signals_scaled.std(axis=0)

In [7]:
for i in range(0, 161):
    sig = signals_cp.loc[i]
    medians = moving_median(pd.Series(sig.values))
    signal_features.loc[i, "variation"] = len(medians.unique())
    
    sig_scaled = signals_scaled.loc[i]
    ff2_filtered = fft2_filter(sig_scaled)[0]
    wave_filtered = apply_wave_filter(ff2_filtered)
    signal_features.loc[i, "max_peak_distance"] = get_signals_peak_info(wave_filtered[0])
    signal_features.loc[i, "label"] = labels[i]
    

In [8]:
signal_features.head()

Unnamed: 0,variation,max_peak_distance,label
0,7616.0,20406.0,ARR
1,4560.0,23949.0,ARR
2,9824.0,25709.0,ARR
3,7791.0,34835.0,ARR
4,7768.0,16475.0,ARR


## Separate Training and Testing Dataset 🏋️ / 🧪

In [9]:
signal_features_cp = signal_features.copy()

X_nsr = signal_features_cp[signal_features_cp["label"] == "NSR"].copy()
X_arr = signal_features_cp[signal_features_cp["label"] == "ARR"].copy()
X_chf = signal_features_cp[signal_features_cp["label"] == "CHF"].copy()

y_nsr = X_nsr.pop("label")
y_arr = X_arr.pop("label")
y_chf = X_chf.pop("label")

X_nsr.head()

Unnamed: 0,variation,max_peak_distance
126,8.0,39018.0
127,6.0,37208.0
128,6.0,37355.0
129,10.0,36506.0
130,14.0,35166.0


In [10]:
from sklearn.model_selection import train_test_split
def get_datasets():
    X_nsr_train, X_nsr_test, y_nsr_train, y_nsr_test = train_test_split(X_nsr, y_nsr, test_size = 0.2, random_state = 0)
    X_arr_train, X_arr_test, y_arr_train, y_arr_test = train_test_split(X_arr, y_arr, test_size = 0.2, random_state = 0)
    X_chf_train, X_chf_test, y_chf_train, y_chf_test = train_test_split(X_chf, y_chf, test_size = 0.2, random_state = 0)

    X_train = pd.concat([X_nsr_train, X_arr_train, X_chf_train])
    X_test = pd.concat([X_nsr_test, X_arr_test, X_chf_test])
    y_train = pd.concat([y_nsr_train, y_arr_train, y_chf_train])
    y_test = pd.concat([y_nsr_test, y_arr_test, y_chf_test])
    return X_train, X_test, y_train, y_test


## Scoring the Model 💯

In [11]:
class Classifier:
    def __init__(self, features: pd.DataFrame, targets: pd.Series, k: int = 5):
        self._features = features
        self._targets = targets
        self._k = k
    def classify(self, row):
        var_neigh = self._get_variation_neighbors(row)
        most_voted = self._get_most_voted(var_neigh)
        if most_voted == "NSR":
            return most_voted
        
        var_max_peak = self._get_max_peak_distance_neighbors(row)
        most_voted = self._get_most_voted(var_max_peak)
        return most_voted 
    
    def _get_variation_neighbors(self, row) -> list:
        return self._get_neighbors(row, "variation")
    
    def _get_max_peak_distance_neighbors(self, row) -> list:
        return self._get_neighbors(row, "max_peak_distance", exclude="NSR")
   
    def _get_neighbors(self, row, feature: str, exclude: str = None) -> list:
        feature_value = row[feature]
        f_cp = self._features.copy()
        f_cp["distance"] = abs(f_cp[feature] - feature_value)
        if exclude:
            t_cp = self._targets.copy()
            idx_list = t_cp[t_cp == exclude].index.to_list()
            f_cp[f_cp.index.isin(idx_list)]["distance"] = np.inf
            
        f_cp = f_cp.sort_values(by=["distance"])
        return f_cp.iloc[0:self._k]["distance"]
    
    def _get_most_voted(self, neighbors) -> str:
        idx_list = neighbors.index.to_list()
        return self._targets[self._targets.index.isin(idx_list)].value_counts().sort_values(ascending=False).index[0]

In [12]:
classifier = Classifier(X_train, y_train)

wrong_counter = 0
for i in range(0, len(X_test)):
    row = X_test.iloc[i]
    y = y_test.iloc[i]
    c = classifier.classify(row)
    if c != y:
        wrong_counter += 1
        print(f"Expected: {y} - Got: {c}")
        
print(f"Wrong: {wrong_counter} - Total: {len(X_test)} - Accuracy: {1 - (wrong_counter / len(X_test))}")

NameError: name 'X_train' is not defined

### Runs the test 100 times 💯 🧪

In [15]:
results = []
for i in range(100):
    X_train, X_test, y_train, y_test = get_datasets()
    classifier = Classifier(X_train, y_train)
    wrong_counter = 0
    for i in range(0, len(X_test)):
        row = X_test.iloc[i]
        y = y_test.iloc[i]
        c = classifier.classify(row)
        if c != y:
            wrong_counter += 1
            print(f"Expected: {y} - Got: {c}")
    result = {}
    result["number"] = i
    result["total"] = len(X_test)
    result["accuracy"] = 1 - (wrong_counter / len(X_test))
    result["wrong"] = wrong_counter
    results.append(result)
    #print(f"Wrong: {wrong_counter} - Total: {len(X_test)} - Accuracy: {1 - (wrong_counter / len(X_test))}")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  f_cp[f_cp.index.isin(idx_list)]["distance"] = np.inf
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  f_cp[f_cp.index.isin(idx_list)]["distance"] = np.inf
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  f_cp[f_cp.index.isin(idx_list)]["distance"] = np.inf
A value is trying to be set on a copy of a sli

In [24]:
total_tests = len(results)
total_accuracy = np.sum([r["accuracy"] for r in results]) / total_tests
total_wrong = np.sum([r["wrong"] for r in results])
total_predictions = np.sum([r["total"] for r in results])

results_summary_df = pd.DataFrame({
    "Total Test Rounds": [total_tests],
    "Total predictions": [total_predictions],
    "Total accuracy": [total_accuracy],
    "Total wrong": [total_wrong],
})

results_summary_df.head()

Unnamed: 0,Total Test Rounds,Total predictions,Total accuracy,Total wrong
0,100,3300,1.0,0
