In [None]:
import scipy.io
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
import scipy.stats

import plotly.graph_objects as go
from plotly.subplots import make_subplots

In [None]:
# Start by loading the data

training_path = "CinC2017Data/training2017/training2017/"
answers_path = "CinC2017Data/REFERENCE-v3.csv"

dataset = pd.read_csv(answers_path, header=None, names=["class"], index_col=0)
dataset["data"] = None

print(dataset.head())

for root, dirs, files in os.walk(training_path):
    for name in files:
        try:
            name, ext = name.split(".")
        except ValueError:
            print("error, scipping file")
            continue
        if ext == "mat":
            mat_data = scipy.io.loadmat(os.path.join(root, name+"."+ext))
            dataset.loc[name]["data"] = mat_data["val"]
            print(f"Adding {name}\r", end="")

print(dataset.head())
pk_path = "CinC2017Data/database.pk"
dataset.to_pickle(pk_path)

In [None]:
dataset = pd.read_pickle("CinC2017Data/database.pk")

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix

In [None]:
def get_power_ratios(data):
    N = data.shape[-1]
    fft = np.fft.fft(data[0])
    fftfreq = np.fft.fftfreq(data.shape[-1], 1/300.0)

    fft_low_freq = fft[fftfreq < 5]
    fft_med_freq = fft[np.logical_and(fftfreq > 5, fftfreq < 40)]
    fft_high_freq = fft[fftfreq > 40]

    total_power = 1/N * np.sum(np.abs(fft) ** 2)

    return np.array([((1/f.shape[0]) * np.sum(np.abs(f) ** 2))/total_power for f in (fft_low_freq, fft_med_freq, fft_high_freq)])

dataset["mean"] = dataset["data"].map(np.mean)
dataset["std_dev"] = dataset["data"].map(np.std)
dataset["skewness"] = dataset["data"].map(lambda x: scipy.stats.skew(x[0], axis = -1))
dataset["kurtosis"] = dataset["data"].map(lambda x: scipy.stats.kurtosis(x[0], axis = -1))
dataset["low_freq_power"] = dataset["data"].map(lambda x: get_power_ratios(x)[0])
dataset["med_freq_power"] = dataset["data"].map(lambda x: get_power_ratios(x)[1])
dataset["high_freq_power"] = dataset["data"].map(lambda x: get_power_ratios(x)[2])

In [None]:
dataset.groupby("class").mean()

In [None]:
very_clean = dataset[(dataset["class"] == "N") & (dataset["skewness"] >= 1) & (dataset["high_freq_power"] <= 0.001) & (dataset["std_dev"] <= 200)]
print(len(very_clean.index))

In [None]:
plt.plot(very_clean.iloc[192]["data"][0])
plt.show()

In [None]:
very_noisy = dataset[(dataset["class"] == "~") & (dataset["skewness"] <= 0.1) & (dataset["std_dev"] >= 400)]
print(len(very_noisy.index))

In [None]:
plt.plot(very_noisy.iloc[3]["data"][0])
plt.show()

In [None]:
c = "N"

num_rows = 1
num_cols = 1

num_class_samples = num_cols * num_rows
fig = make_subplots(rows=num_rows, cols=num_cols)

for i, (_, sample) in enumerate(very_clean[very_clean["class"] == c].sample(num_class_samples).iterrows()):
    fig.add_trace(go.Scatter(y=sample["data"][0]), row=i%num_cols + 1, col = i//num_rows + 1)

fig.update_layout(height=1000)
fig.update_xaxes(title="sample number")
fig.update_yaxes(title="amplitude")
fig.show()

In [None]:
def generate_index(c):
    if c == "N":
        return 0
    if c == "O":
        return 1
    if c == "A":
        return 2
    if c == "~":
        return 3

# dataset["onehot"] = dataset["class"].map(generate_onehot)
dataset["class_index"] = dataset["class"].map(generate_index)

In [None]:
# Compare the power ranges

scatter = plt.scatter(dataset["low_freq_power"], np.log(dataset["high_freq_power"]), c=dataset["class_index"], cmap="viridis")
plt.colorbar(scatter)
plt.show()