In [None]:
import wfdb
import numpy as np
import os

# Path to the folder containing the records
folder_path = 'C:\\Users\\jacob\\Desktop\\GitHub\\CS593-ProjectFiles\\Project 3\\ECG Anomoly Detection\\Dataset'

# List of record names
records = [f[:-4] for f in os.listdir(folder_path) if f.endswith('.dat')]

# Initialize empty lists to store the data
data = []
labels = []

# Window size
window_size = 3600

# Loop over all records
for record_name in records:
    try:
        # Load the record and the annotations
        record = wfdb.rdrecord(os.path.join(folder_path, record_name))
        annotation = wfdb.rdann(os.path.join(folder_path, record_name), 'atr')

        # Pad the signal data with zeros until its length is a multiple of the window size
        padded_length = np.ceil(record.p_signal.shape[0] / window_size) * window_size
        padded_signal = np.pad(record.p_signal, ((0, int(padded_length - record.p_signal.shape[0])), (0, 0)))

        # Reshape the padded signal data into windows
        X = np.reshape(padded_signal, (-1, window_size, 2))

        # Create labels for each window based on the annotations
        Y = np.zeros(X.shape[0])
        for i in range(len(annotation.sample)):
            if annotation.symbol[i] != 'N':
                Y[annotation.sample[i] // window_size] = 1

        # Append the data and the labels to the lists
        data.append(X)
        labels.append(Y)
    except:
        print(f"Error loading record {record_name}")

# Concatenate all the data and labels
data = np.concatenate(data)
labels = np.concatenate(labels)

# Data Normalization

In [None]:
mean = np.mean(data, axis=(0, 1))
std = np.std(data, axis=(0, 1))

# Standardize the data
data = (data - mean) / std

# Split the data

In [None]:
from sklearn.model_selection import train_test_split

# Split the data into a training set and a test set
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=1)

# Further split the training set into a training set and a validation set
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=1)


# Set Up The Model

# Detection using Forecasting:
 This approach can be implemented using a variety of methods, depending on what kind of forecasting model you want to use. One common choice is ARIMA (Autoregressive Integrated Moving Average), which can be implemented using the statsmodels library. Here's a basic example:

In [None]:
from statsmodels.tsa.arima.model import ARIMA

# Select the first channel of the first record
first_channel = data[0, :, 0]

# Create and fit the model
model = ARIMA(first_channel, order=(5,1,0))
model_fit = model.fit(disp=0)

# Forecast the next data point
forecast = model_fit.forecast()[0]

# If the difference between the forecast and the actual next data point is greater than some threshold, consider it an anomaly
if abs(forecast - first_channel[-1]) > 0.5:
    print("Anomaly detected")

NameError: name 'data' is not defined

# Generate prediction class

In [None]:

# Use the model to predict the test set
y_pred = model.predict(X_test)

y_pred = y_pred.max(axis=1).flatten()

# The model's output is continuous, but we need binary predictions for the metrics.
# We can choose a threshold (e.g., 0.5) and classify all instances with an output above this threshold as anomalies.
y_pred_bin = (y_pred > 0.5).astype(int)