# **Check enviroment**

Set data_path to make notebooks works well both on local and google drive.

In [None]:
import sys
import os
data_path = ''
# Check if the environment is Google Colab
if 'google.colab' in sys.modules:
    print("Running on Google Colab")
    # Install required libraries
    !pip install tensorflow -q
    !pip install keras -q
    !pip install scikit-learn -q
    !pip install pandas -q
    !pip install numpy -q
    !pip install matplotlib -q
    !pip install umap-learn -q
    

    # Mount Google Drive
    from google.colab import drive
    drive.mount('/content/drive')
    # set the path where the csv file stored in your own google drive. 
    data_path = '/content/drive/MyDrive/Heartbeat_Project/'
    
else:
    print("Running on local environment")

    current_path = os.getcwd()
    print("Current working directory:", current_path)
    data_path = '../data/raw/'

Path = dict({
    'ptbdb_normal': data_path +  'ptbdb_normal.csv',
    'ptbdb_abnormal':  data_path + 'ptbdb_abnormal.csv',
})

# **Import packages:**

In [None]:

# Verify installation and import libraries
import tensorflow as tf
import keras
import sklearn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Patch
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from mpl_toolkits.mplot3d import Axes3D
import plotly.express as px
from matplotlib.colors import ListedColormap
from sklearn.manifold import TSNE
from sklearn.manifold import Isomap
import umap
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import RobustScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score


In [None]:
def addColumnsToDataframe(df):
    """
    As the dataset is composed with 188 columns with the 188th columns as the category values,
    so we give the last column the name 'target', others named with 'c_182'
    """
    num_columns= df.shape[1]
    feature_col_name = ['c_' + str(i) for i in range(0, num_columns - 1)]
    df_columns = feature_col_name + ['target']
    df.columns = df_columns
    return df

def convertColumnAsInt(df, column):
    """
    As the category value is in float type. We want to get the int to identify the category.
    """
    df[column] = df[column].astype(int)
    return df

def getBarChartFromCategoryValueCounts(category_value_counts):
    """
    We call the plot over the pandas series object to plot the category count values
    """
    plt.figure(figsize=(10, 6))
    bar_chart = category_value_counts.plot(kind='bar')
    plt.xlabel('Categories')
    plt.ylabel('Count')
    plt.legend()
    plt.grid(False)
    plt.xticks(rotation=360)
    for i in bar_chart.containers:
        bar_chart.bar_label(i, label_type='edge')
    plt.show()


def showTop10DataInChart(df):
    plt.figure(figsize=(10, 6))
    xDataAxis = list(range(0, df.shape[1]))
    yDataRows = list(df.values[1: 10])
    for y in yDataRows:
        plt.plot(xDataAxis, y)
    plt.show()

In [None]:
ptbdb_normal = pd.read_csv(Path.get('ptbdb_normal'), header=None ) 
ptbdb_normal_with_columns = addColumnsToDataframe(ptbdb_normal) # add columns to the dataframe
ptbdb_normal_with_columns = convertColumnAsInt(ptbdb_normal_with_columns, 'target') # convert the target column to int

In [None]:
ptbdb_normal_with_columns

In [None]:
ptbdb_abnormal = pd.read_csv(Path.get('ptbdb_abnormal'), header=None ) 
ptbdb_abnormal_with_columns = addColumnsToDataframe(ptbdb_abnormal) # add columns to the dataframe
ptbdb_abnormal_with_columns = convertColumnAsInt(ptbdb_abnormal_with_columns, 'target') # convert the target column to int

In [None]:
ptbdb_abnormal_with_columns

Merge normal and abnomal data to one dataset and shuffle it

In [None]:
#Merge the datasets
ptbdb = pd.concat([ptbdb_abnormal_with_columns, ptbdb_normal_with_columns], ignore_index=True) # ingore the index to make the index continuous
#Shuffle the dataset
ptbdb = ptbdb.sample(frac=1).reset_index(drop=True)
ptbdb

In [None]:
#Split the data into training (80%) and testing (20%)
from sklearn.model_selection import train_test_split
ptbdb_train, ptbdb_test = train_test_split(ptbdb, test_size=0.2, random_state=42)


In [None]:
#train:
print(ptbdb_train.shape)
print("The train data has 11.641 and 188 columns.")

#test:
print(ptbdb_test.shape)
print("The test data has 2.911 and 188 columns.")

In [None]:
print(ptbdb_train["target"])
print(ptbdb_test["target"])

In [None]:
print(ptbdb_train.dtypes)
print(ptbdb_test.dtypes)
print("The features are numeric. All columns have the type float64, while the target column is int32.")

In [None]:
print(ptbdb_test.info(show_counts=True), end="\n\n")
print("Size of the DataFrame", ptbdb_test.shape, end='\n\n')

print(ptbdb_train.info(show_counts=True), end="\n\n")
print("Size of the DataFrame", ptbdb_train.shape, end='\n\n')

Remove duplicates

In [None]:
nb_rows_duplicated = ptbdb_train.duplicated().sum()
print("Number of rows duplicated :", nb_rows_duplicated)

nb_rows_duplicated = ptbdb_test.duplicated().sum()
print("Number of rows duplicated :", nb_rows_duplicated)

In [None]:
# Remove duplicates from the training set
ptbdb_train = ptbdb_train.drop_duplicates()

# Remove duplicates from the testing set
ptbdb_test = ptbdb_test.drop_duplicates()

# Verify the removal by checking the number of duplicated rows again
nb_rows_duplicated_train = ptbdb_train.duplicated().sum()
print("Number of rows duplicated in train set after removal:", nb_rows_duplicated_train)

nb_rows_duplicated_test = ptbdb_test.duplicated().sum()
print("Number of rows duplicated in test set after removal:", nb_rows_duplicated_test)


In [None]:
print("Missing values in train:", ptbdb_train.isnull().sum())
print("Missing values in test:", ptbdb_test.isnull().sum())
print("The data has no missing values.")

In [None]:
ptbdb_train.head()

In [None]:
ptbdb_test.head()

Visualize Target 

In [None]:
# Define mapping dictionary
class_mapping = {
    1: 'Normal',
    0: 'Abnormal'
}

# Define custom colors for each category
color_mapping = {
    0: 'green',    # Normal beat
    1: 'red',      # Abnormal beat 
}

classes_to_plot = [0, 1]

# Calculate value counts based on mapped class names
value_counts_series_train = ptbdb_train['target'].map(class_mapping).value_counts()
# Calculate value counts based on mapped class names
value_counts_series_test = ptbdb_test['target'].map(class_mapping).value_counts()

In [None]:
print("Training Dataset Class Distribution:")
print(value_counts_series_train)

print("\nTesting Dataset Class Distribution:")
print(value_counts_series_test)

Barplots and Piecharts

In [None]:
import matplotlib.pyplot as plt

# Bar chart for the training dataset
plt.figure(figsize=(10, 6))
bar_chart_train = value_counts_series_train.plot(kind='bar', color=color_mapping.values())
plt.xlabel('Categories')
plt.ylabel('Count')
plt.title('Distribution of Target Categories (Training Dataset)')
plt.xticks(rotation=45, ha='right')

# Add labels to the bars in the training set
for container in bar_chart_train.containers:
    plt.bar_label(container, label_type='edge')

plt.tight_layout()
plt.show()

# Pie chart for the training dataset
plt.figure(figsize=(8, 8))  # Set figure size
value_counts_series_train.plot(kind='pie', autopct='%1.1f%%', colors=color_mapping.values(), startangle=90)
plt.ylabel('')  # Remove the y-label
plt.title('Target Categories (Training Dataset) - Pie Chart')
plt.show()

# Bar chart for the testing dataset
plt.figure(figsize=(10, 6))
bar_chart_test = value_counts_series_test.plot(kind='bar', color=color_mapping.values())
plt.xlabel('Categories')
plt.ylabel('Count')
plt.title('Distribution of Target Categories (Testing Dataset)')
plt.xticks(rotation=45, ha='right')

# Add labels to the bars in the testing set
for container in bar_chart_test.containers:
    plt.bar_label(container, label_type='edge')

plt.tight_layout()
plt.show()

# Pie chart for the testing dataset
plt.figure(figsize=(8, 8))  # Set figure size
value_counts_series_test.plot(kind='pie', autopct='%1.1f%%', colors=color_mapping.values(), startangle=90)
plt.ylabel('')  # Remove the y-label
plt.title('Target Categories (Testing Dataset) - Pie Chart')
plt.show()


Plot of each class once

In [None]:
# Function to plot overlay of ECG signals from both datasets for a single class
def plot_overlay_ecg_signals(df, label, color, dataset_label): # Define a function to plot overlay of ECG signals for a single class from both datasets
    """
    Plot overlay of ECG signals for a single class from both datasets.

    Parameters:
    df (DataFrame): DataFrame containing ECG signals and target labels
    label (str or int): Class label to plot
    color (str): Color for the plot
    dataset_label (str): Label for the dataset (e.g., 'Training', 'Testing')
    """

    # Map the descriptive label to the corresponding class label
    if isinstance(label, str):
        class_label = [k for k, v in class_mapping.items() if v == label][0] # Get the class label for the specified class name
    else:
        class_label = label # Use the specified class label
    
    # Extract data rows for the specified class label
    class_data = df[df['target'] == class_label]

    # Check if there is any data for the specified class label
    if class_data.empty:
        print(f"No data found for class {class_mapping[class_label]}")
        return
    
    # Extract a sample data row (first row) for the specified class label
    sample_data = class_data.iloc[0]

    # Plot the sample ECG signal, excluding the 'target' column
    plt.plot(sample_data[:-1], label=f'{dataset_label}: {class_mapping[class_label]}', color=color)

    plt.title(f"Overlay of ECG Signals - {dataset_label}")
    plt.ylabel('Amplitude')
    plt.grid(True)
    plt.xticks([])  # Remove x-axis ticks and labels
    plt.legend()

# List of classes to plot
classes_to_plot = ['Normal', 'Abnormal']

# Colors for each class
colors = ['green','red']

# Plotting overlay for each class in both datasets
plt.figure(figsize=(12, 8))

# Plotting for training dataset
for label, color in zip(classes_to_plot, colors):
    plot_overlay_ecg_signals(ptbdb_train, label, color, 'Training')

plt.tight_layout()
plt.show()

plt.figure(figsize=(12, 8))

# Plotting for testing dataset
for label, color in zip(classes_to_plot, colors):
    plot_overlay_ecg_signals(ptbdb_test, label, color, 'Testing')

plt.tight_layout()
plt.show()

# Perform Histogram

In [None]:
def plot_hist(class_name, min_val=5, size=70, title=''):
    # Map the descriptive label to the corresponding class label
    if isinstance(class_name, str):
        class_label = [k for k, v in class_mapping.items() if v == class_name][0]  # Convert class name to class label
    else:
        class_label = class_name  # Use the specified class label directly if it's already a number
    
    # Filter the dataset based on the class_label
    img = ptbdb_train.loc[ptbdb_train['target'] == class_label].iloc[:, min_val:size]
    
    # Flatten the array
    img_flatten = img.values.flatten()
    
    # Generate corresponding x-values for the histogram
    x_values = np.tile(np.arange(min_val, size), img.shape[0])
    
    # Ensure that the lengths match for histogram plotting
    assert len(x_values) == len(img_flatten), "Mismatch in lengths of x-values and flattened image values."
    
    # Create the 2D histogram
    plt.figure(figsize=(10, 6))
    plt.hist2d(x_values, img_flatten, bins=(80, 80), cmap=plt.cm.jet)
    plt.colorbar()  # Add a color bar to indicate the intensity of values
    plt.title('2D Histogram - ' + title)
    plt.xlabel('Feature Index')
    plt.ylabel('Feature Values')
    plt.show()

# Plot for Normal Heart Beat
plot_hist('Normal', title='Normal Heart Beat')

# Plot for Abnormal Heart Beat
plot_hist('Abnormal', title='Abnormal Heart Beat')


# PCA Functions for training Dataset

In [None]:
# Define the function to apply PCA
def apply_pca_to_dataset(df, n_components=2):
    X = df.drop('target', axis=1)
    pca = PCA(n_components=n_components)
    components = pca.fit_transform(X)
    df_pca = pd.DataFrame(data=components, columns=[f'Principal Component {i+1}' for i in range(n_components)])
    df_pca['target'] = df['target']
    return df_pca, pca

# Define the function to plot PCA results (2D)
def plot_pca_results(df_pca, title='PCA of Dataset'):
    # Define custom colors and labels for each category
    color_mapping = {0: 'red', 1: 'green'}
    label_mapping = {0: 'Abnormal', 1: 'Normal'}
    
    # Map the target to the corresponding colors and labels
    df_pca['color'] = df_pca['target'].map(color_mapping)
    df_pca['label'] = df_pca['target'].map(label_mapping)

    # Create a scatter plot for the PCA results
    fig = px.scatter(df_pca, x='Principal Component 1', y='Principal Component 2', color='label',
                     color_discrete_map={'Normal': 'green', 'Abnormal': 'red'},
                     title=title, labels={'label': 'Class'}, opacity=0.5)
    fig.update_traces(marker=dict(size=5))
    fig.show()

# Define the function to create a Scree plot
def create_scree_plot(pca, title='Scree Plot'):
    explained_var_ratio = pca.explained_variance_ratio_
    components = np.arange(len(explained_var_ratio)) + 1

    plt.figure(figsize=(8, 5))
    plt.plot(components, explained_var_ratio, 'o-', linewidth=2, color='blue')
    plt.title(title)
    plt.xlabel('Principal Component')
    plt.ylabel('Variance Explained (%)')
    plt.xticks(components)
    plt.show()

# Apply PCA to the training dataset for 10 components to create the Scree plot
df_pca_train, pca_train = apply_pca_to_dataset(ptbdb_train, n_components=10)
create_scree_plot(pca_train, title='Scree Plot for Training Data')

# Apply PCA to the training dataset for 2 components and plot the 2D PCA results
df_pca_train, pca_train = apply_pca_to_dataset(ptbdb_train, n_components=2)
plot_pca_results(df_pca_train, title='PCA of Heartbeat Dataset (2D) - Training Data')

# Apply PCA to the training dataset for 3 components and plot the 3D PCA results
df_pca_train, pca_train = apply_pca_to_dataset(ptbdb_train, n_components=3)
df_pca_train['color'] = df_pca_train['target'].map({0: 'red', 1: 'green'})
df_pca_train['label'] = df_pca_train['target'].map({0: 'Abnormal', 1: 'Normal'})

fig = px.scatter_3d(df_pca_train, x='Principal Component 1', y='Principal Component 2', z='Principal Component 3',
                    color='label',
                    color_discrete_map={'Normal': 'green', 'Abnormal': 'red'},
                    title='3D PCA of Heartbeat Dataset - Training Data',
                    labels={'label': 'Class'}, opacity=0.5)
fig.update_traces(marker=dict(size=5))
fig.show()

# Apply PCA to the test dataset for 10 components to create the Scree plot
df_pca_test, pca_test = apply_pca_to_dataset(ptbdb_test, n_components=10)
create_scree_plot(pca_test, title='Scree Plot for Test Data')

# Apply PCA to the test dataset for 2 components and plot the 2D PCA results
df_pca_test, pca_test = apply_pca_to_dataset(ptbdb_test, n_components=2)
plot_pca_results(df_pca_test, title='PCA of Heartbeat Dataset (2D) - Test Data')

# Apply PCA to the test dataset for 3 components and plot the 3D PCA results
df_pca_test, pca_test = apply_pca_to_dataset(ptbdb_test, n_components=3)
df_pca_test['color'] = df_pca_test['target'].map({0: 'red', 1: 'green'})
df_pca_test['label'] = df_pca_test['target'].map({0: 'Abnormal', 1: 'Normal'})

fig = px.scatter_3d(df_pca_test, x='Principal Component 1', y='Principal Component 2', z='Principal Component 3',
                    color='label',
                    color_discrete_map={'Normal': 'green', 'Abnormal': 'red'},
                    title='3D PCA of Heartbeat Dataset - Test Data',
                    labels={'label': 'Class'}, opacity=0.5)
fig.update_traces(marker=dict(size=5))
fig.show()



# Look at the loadings to determine which variables are most significant in each principal component in training data set

In [None]:
# train
df_pca, pca = apply_pca_to_dataset(ptbdb_train, n_components=5)

loadings = pca.components_ # Get the loadings for the principal components (eigenvectors)

for i, component in enumerate(loadings):

    plt.figure(figsize=(10, 5))


    plt.bar(x=range(len(component)), height=component)


    plt.xlabel('Feature')
    plt.ylabel('Loading')
    plt.title(f'Loadings for Principal Component {i+1}')

    plt.show()

In [None]:
# test
df_pca, pca = apply_pca_to_dataset(ptbdb_test, n_components=5)


loadings = pca.components_


for i, component in enumerate(loadings):

    plt.figure(figsize=(10, 5))


    plt.bar(x=range(len(component)), height=component)


    plt.xlabel('Feature')
    plt.ylabel('Loading')
    plt.title(f'Loadings for Principal Component {i+1}')

    plt.show()

# Identify significant features

In [None]:
from scipy.stats import f_oneway, kruskal
# Separate features and labels
features = ptbdb_train.iloc[:, :-1]
labels = ptbdb_train['target']

p_values = []

for i in range(features.shape[1]):

    class_groups = [features[labels == label].iloc[:, i] for label in class_mapping.values()]

    # If not normally distributed, use Kruskal-Wallis H-test
    h_stat, p_val = kruskal(*class_groups)
    p_values.append(p_val)


adjusted_p_values = [p * len(p_values) for p in p_values]

# Select features with p-value below the significance level
significant_features = [i for i, p_val in enumerate(adjusted_p_values) if p_val < 0.05]

print(f'Number of significant features: {len(significant_features)}')
# print(f'Significant features: {significant_features}')

print('The absence of significant features suggests that the classes may not be well-separated in the original feature space,\n which could make it challenging for PCA and t-SNE to clearly distinguish between them.')

# Perform t_SNE in training set

In [None]:
# Separate features and labels
features = ptbdb_train.iloc[:, :-1]
labels = ptbdb_train.iloc[:, -1]

# Reduce dimensionality with PCA
pca = PCA(n_components=50)
features_pca = pca.fit_transform(features)

# Initialize t-SNE
tsne = TSNE(n_components=2, perplexity=30, learning_rate=200, random_state=42)

# Perform t-SNE
tsne_results = tsne.fit_transform(features_pca)

# Plot the results
plt.figure(figsize=(16,10))
unique_labels = labels.unique()
label_to_number = {label: number for number, label in enumerate(unique_labels)}
colors = labels.map(label_to_number)

# Define your discrete color scale
color_discrete_scale = ['green', 'red']

# Create a ListedColormap object with the defined colors
cmap = ListedColormap(color_discrete_scale[:len(unique_labels)])

scatter = plt.scatter(tsne_results[:,0], tsne_results[:,1], c=colors, cmap=cmap, alpha=0.5)
plt.title('t-SNE visualization of MIT-test dataset')
plt.xlabel('t-SNE axis 1')
plt.ylabel('t-SNE axis 2')

# Create a legend with the correct labels
handles, _ = scatter.legend_elements()
plt.legend(handles, unique_labels, title='Class')

plt.show()


# Perform t_SNE in test set

In [None]:
# Separate features and labels
features = ptbdb_test.iloc[:, :-1]
labels = ptbdb_test.iloc[:, -1]

# Reduce dimensionality with PCA
pca = PCA(n_components=50)
features_pca = pca.fit_transform(features)

# Initialize t-SNE
tsne = TSNE(n_components=2, perplexity=30, learning_rate=200, random_state=42)

# Perform t-SNE
tsne_results = tsne.fit_transform(features_pca)

# Plot the results
plt.figure(figsize=(16,10))
unique_labels = labels.unique()
label_to_number = {label: number for number, label in enumerate(unique_labels)}
colors = labels.map(label_to_number)

# Define your discrete color scale
color_discrete_scale = ['green', 'red']

# Create a ListedColormap object with the defined colors
cmap = ListedColormap(color_discrete_scale[:len(unique_labels)])

scatter = plt.scatter(tsne_results[:,0], tsne_results[:,1], c=colors, cmap=cmap, alpha=0.5)
plt.title('t-SNE visualization of PTBDB-test dataset')
plt.xlabel('t-SNE axis 1')
plt.ylabel('t-SNE axis 2')
plt.legend(handles=scatter.legend_elements()[0], labels=['Normal', 'Abnormal'], title='Class')
plt.show()


# Perform UMAP in training set (UMAP das not work for me, HELP!)

In [None]:
# from umap import UMAP
# # Drop target column from the training data
# features = ptbdb_train.drop(columns=['target'])

# # Apply UMAP
# umap_model = UMAP(n_components=2, random_state=42)  # Instantiate UMAP model
# df_umap = umap_model.fit_transform(features)  # Fit and transform the data

# # Define color mapping for target labels
# label_to_color = {0: 'red', 1: 'green'}  # Assuming 0 is Abnormal, 1 is Normal
# colors = ptbdb_train['target'].map(label_to_color)

# # Create the plot
# plt.figure(figsize=(16, 10))

# # Scatter plot of UMAP results
# scatter = plt.scatter(df_umap[:, 0], df_umap[:, 1], c=colors, alpha=0.5)

# # Create a custom legend
# handles = [
#     plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='red', markersize=10, label='Abnormal'),
#     plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='green', markersize=10, label='Normal')
# ]

# plt.title('UMAP Visualization of PTBDB Train Dataset')
# plt.xlabel('UMAP Axis 1')
# plt.ylabel('UMAP Axis 2')
# plt.legend(handles=handles, title='Class')
# plt.show()

# Perform UMAP in test set

In [None]:
# from umap import UMAP
# # Drop target column from test data
# features = ptbdb_test.drop(columns=['target'])

# # Apply UMAP
# umap_model = umap.UMAP(n_components=2, random_state=42)
# df_umap = umap_model.fit_transform(features)

# # Get unique labels and create a mapping
# unique_labels = ptbdb_test['target'].unique()
# label_to_color = {0: 'red', 1: 'green'}
# colors = ptbdb_test['target'].map(label_to_color)

# # Create the plot
# plt.figure(figsize=(16, 10))

# # Scatter plot of UMAP results
# scatter = plt.scatter(df_umap[:, 0], df_umap[:, 1], c=colors, cmap='viridis', alpha=0.5)

# # Create a custom legend
# handles = [plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='red', markersize=10, label='Abnormal'),
#            plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='green', markersize=10, label='Normal')]

# plt.title('UMAP Visualization of PTBDB Test Dataset')
# plt.xlabel('UMAP Axis 1')
# plt.ylabel('UMAP Axis 2')
# plt.legend(handles=handles, title='Class')
# plt.show()

# Prepare Data for modeling and Preprocessing

In [None]:
num_cases_target_1 = ptbdb_train['target'].value_counts()[1] # [1] =  Normal
print("1 = Normal:", num_cases_target_1)

num_cases_target_0 = ptbdb_train['target'].value_counts()[0] # [0] = Abnormal
print("0 = Abnormal:",num_cases_target_0) # Attention: The Coding for target is exactly the opposite of the MIT-BIH dataset!!!!!!

# load and split cleaned data

In [None]:
# File paths
train_file_path = 'ptbdb_train_clean.csv'
test_file_path = 'ptbdb_test_clean.csv'

# If the cleaned data files do not exist, save them
if not os.path.isfile(train_file_path) or not os.path.isfile(test_file_path):
    # Read the data
    # Encode the labels
    ptbdb_train['target'] = ptbdb_train['target'].replace({'Normal': 1, 'Abnormal': 0})
    ptbdb_test['target'] = ptbdb_test['target'].replace({'Normal': 1, 'Abnormal': 0})

    # Save cleaned data
    ptbdb_train.to_csv(train_file_path, index=False)
    ptbdb_test.to_csv(test_file_path, index=False)
else:
    print("Cleaned data files already exist.")

# Read cleaned data
try:
    ptbdb_train = pd.read_csv(train_file_path, header=0)
    ptbdb_test = pd.read_csv(test_file_path, header=0)
except FileNotFoundError:
    print(f"Error: The file {train_file_path} or {test_file_path} does not exist.")
    # Optionally, add code to handle this case, e.g., exit or retry

# Show the distribution of the two classes
print('Class distribution in training\n', ptbdb_train['target'].value_counts(normalize=True))
print('\nClass distribution in test\n', ptbdb_test['target'].value_counts(normalize=True))
print('\n')

# Plot a pie chart for training data class distribution
label_counts = ptbdb_train['target'].value_counts(normalize=True)
colors = ['green', 'red']
labels = ['Normal', 'Abnormal']

plt.figure(figsize=(10, 6))
plt.pie(label_counts, labels=labels, colors=colors, autopct='%1.1f%%')
plt.title('Percentage of Each Label in Training Data')
plt.show()


In [None]:
# Split the data into features (X) and target (y)
X_train = ptbdb_train.drop(columns=['target'])
y_train = ptbdb_train['target']

X_test = ptbdb_test.drop(columns=['target'])
y_test = ptbdb_test['target']

In [None]:
# to find the distribution of our variables, follow the normal distribution or not
from scipy.stats import kstest
normal_vars = []
non_normal_vars = []

for i in range(187):  # 0 to 186
    stat, p = kstest(X_train.iloc[:, i], 'norm')
    if p > 0.05:
        normal_vars.append(f'Variable {i}')
    else:
        non_normal_vars.append(f'Variable {i}')

print("Variables following normal distribution:")
print(normal_vars)
if len(non_normal_vars) == 187:
    print("Since our variables do not follow a normal distribution, it is advisable to use scaling methods such as MinMaxScaler or RobustScaler, which are better suited for data that does not conform to a normal distribution.")

# Finding the best Rescaling

In [None]:
# Rescaling ensures that all features contribute equally to the model, preventing features with larger ranges from dominating the learning process

# Define recommended scalers
scalers = {
    "StandardScaler": StandardScaler(),
    "MinMaxScaler": MinMaxScaler(),
    "RobustScaler": RobustScaler()
}

# Define models to evaluate

models = {
    "LogisticRegression": LogisticRegression(class_weight='balanced', max_iter=1000),
    "RandomForest": RandomForestClassifier(class_weight='balanced'),
    "SVM": SVC(class_weight='balanced'),
    "KNN": KNeighborsClassifier(),
    "GradientBoosting": GradientBoostingClassifier()
}

def evaluate_scalers(X, y, scalers, models):
    results = {}
    for scaler_name, scaler in scalers.items():
        print(f"Scaler: {scaler_name}", end="\n\n")
        X_scaled = scaler.fit_transform(X)

        skf = StratifiedKFold(n_splits=5)

        for model_name, model in models.items():
            f_score = []
            print(f"Model: {model_name}", end="\n\n")

            for train_index, test_index in skf.split(X_scaled, y):
                X_train_, y_train_ = X_scaled[train_index], y[train_index]
                X_test_, y_test_ = X_scaled[test_index], y[test_index]

                model.fit(X_train_, y_train_)

                y_pred_ = model.predict(X_test_)

                f_score.append(f1_score(y_test_, y_pred_))

            mean_f1_score = np.mean(f_score)
            print("The scores: ", end="\n\n")
            print([round(f, 2) for f in f_score], end="\n\n")
            print('F1-Score mean=%.5f' % (mean_f1_score), end="\n\n")

            if scaler_name not in results:
                results[scaler_name] = {}
            results[scaler_name][model_name] = mean_f1_score
    return results

# Apply evaluation
results = evaluate_scalers(X_train, y_train, scalers, models)
# Summarize the results
for scaler_name, model_scores in results.items():
    print(f"Scaler: {scaler_name}")
    for model_name, score in model_scores.items():
        print(f"  Model: {model_name}, F1-Score mean: {score:.5f}")
    print("\n")


# Apply the best rescaling method

In [None]:
# Determine the best scaler based on the highest average F1-score
best_scaler_name = max(results, key=lambda k: np.mean(list(results[k].values())))
best_scaler = scalers[best_scaler_name]
print(f"Best Scaler: {best_scaler_name}")

print("In general, the models perform best with StandardScaler. However, the differences in performance are very small. Random Forest was the best performing model, followed by Gradient Boosting. Logistic Regression and SVM performed the worst.")

In [None]:
# Fit the best scaler on the training data and transform it
scaler = StandardScaler()
# Fit the scaler on the training data and transform it
X_train_scaled = scaler.fit_transform(X_train)
# Transform the test data using the same scaler
X_test_scaled = scaler.transform(X_test)


# Convert the numpy arrays to pandas DataFrames
X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns)
X_test_scaled = pd.DataFrame(X_test_scaled, columns=X_test.columns)

# find the best resampling method

In [None]:
# When dealing with imbalanced datasets, especially those that reflect real-world scenarios,
# it’s important to use both resampling methods and appropriate loss functions to improve model performance.
from imblearn.over_sampling import SMOTE
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
from imblearn.ensemble import BalancedRandomForestClassifier



def crossvalidation(X, y, models):
    resampling_methods = {
        "SMOTE": SMOTE(),
        "Oversampling": RandomOverSampler(sampling_strategy='not majority'),
        "Undersampling": RandomUnderSampler(sampling_strategy='majority'),
        "BalancedRandomForest": BalancedRandomForestClassifier(random_state=42)
    }

    skf = StratifiedKFold(n_splits=5)
    results = {}

    for name, resample in resampling_methods.items():
        print(name, end="\n\n")
        results[name] = {}

        for model_name, model in models.items():
            f_score = []
            print(f"Model: {model_name}", end="\n\n")

            for train_index, test_index in skf.split(X, y):
                X_train_, y_train_ = X.loc[train_index], y.loc[train_index]
                X_test_, y_test_ = X.loc[test_index], y.loc[test_index]

                if name == "BalancedRandomForest":
                    model.fit(X_train_, y_train_)
                else:
                    X_train_resampled, y_train_resampled = resample.fit_resample(X_train_, y_train_)
                    model.fit(X_train_resampled, y_train_resampled)

                y_pred_ = model.predict(X_test_)

                f_score.append(f1_score(y_test_, y_pred_))

            results[name][model_name] = np.mean(f_score)
            print("The scores: ", end="\n\n")
            print([round(f, 2) for f in f_score], end="\n\n")
            print('F1-Score mean=%.5f' % (np.mean(f_score)), end="\n\n")

    return results, resampling_methods

models = {
    "RandomForest": RandomForestClassifier(class_weight='balanced'),
    "GradientBoostingClassifier": GradientBoostingClassifier(),
}

# Apply cross-validation with resampling
results_resample, resampling_methods = crossvalidation(X_train_scaled, y_train, models)


# Find and Apply the best Resampling method

In [None]:
# Compare results
print("Results with Resampling:")
print(results_resample)

# Determine the best resampling method
best_method = max(results_resample, key=lambda k: np.mean(list(results_resample[k].values())))
print(f"\nBest Resampling Method: {best_method}")

# Apply the best resampling method to the entire dataset
best_resampler = resampling_methods[best_method]




In [None]:
# # Fit the resampler on the training data and transform it
# x_train_resampled, y_train_resampled = best_resampler.fit_resample(X_train_scaled, y_train)

# # Convert the numpy arrays to pandas DataFrames
# X_train_resampled = pd.DataFrame(X_train_resampled, columns=X_train.columns)
# y_train_resampled = pd.Series(y_train_resampled)

# # Fit the model on the resampled training data
# model = RandomForestClassifier(random_state=42)
# model.fit(X_train_resampled, y_train_resampled)

# # Predict the target values
# y_pred = model.predict(X_test_scaled)

# # Calculate the F1-score
# f1 = f1_score(y_test, y_pred)
# print(f"F1-Score: {f1:.5f}")

# # Calculate the confusion matrix
# from sklearn.metrics import confusion_matrix
# conf_matrix = confusion_matrix(y_test, y_pred)
# print("Confusion Matrix:")
# print(conf_matrix)