In [None]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import LogisticRegression
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Extracting the data from the csv file

file_path = 'dataset_project.csv'
data = pd.read_csv(file_path, sep = ",")
data.rename(columns={'Unnamed: 0': 'index'}, inplace=True)
data = data.set_index('index')

In [None]:
# Checking for nulls
data.isnull().sum(axis = 0)

In [None]:
data['psg_labels'].value_counts()

In [None]:
data.loc[data['psg_labels'] == 4, 'psg_labels'] = 3 # Combining N4 with N3
data.loc[data['psg_labels'] == 5, 'psg_labels'] = 4 # Change value of 5 (REM) to 4 for easier computation of transition matrix
data['psg_labels'] = data['psg_labels'].astype(int)
data

In [None]:
data['psg_labels'].value_counts()

In [None]:
id_list = data['id'].unique()
features = ['id', 'activity_count', 'hr', 'time']
label = 'psg_labels'

lengths = []
for id in id_list:
    lengths.append(data[data['id'] == id].shape[0])
    
    
print(lengths)
print(f'Minimum length: {np.min(lengths)}')
print(f'Maximum length: {np.max(lengths)}')
print(f'Average length: {np.mean(lengths)}')
print(f'Median length: {np.median(lengths)}')

In [None]:
# Removing the id with 188 recordings

data = data[data['id'] != id_list[np.argmin(lengths)]]
data

In [None]:
figure = plt.figure(figsize = (8, 5))
sns.lineplot(data = data[data['id'] == id_list[6]], 
             x = 'time', 
             y = 'psg_labels', 
             color='b',
             linewidth=2)

plt.xlabel('Time (hours)')
plt.ylabel('Sleeping stage')
plt.title('Transitions Between Sleep Stages')
plt.xlim(0, 5)
plt.yticks([0, 1, 2, 3, 4], labels = ['Wake', 'N1', 'N2', 'N3', 'REM'])
plt.yticks
plt.show()


In [None]:
class customLogisticRegression(LogisticRegression):
    def __init__(self, C=1.0, penalty='l2', solver='lbfgs', class_weight=None, max_iter=1000, num = 16, random_state=None, **kwargs):
        super().__init__(C=C, penalty=penalty, solver=solver, class_weight=class_weight, max_iter=max_iter, random_state=random_state, **kwargs)
        self.transition_matrix = None
        self.num = num # Number of time bins
          
        
    def fit(self, X, y):
        X_features = X.drop(columns=['id', 'time'])
        super().fit(X_features, y)
        
        grouped_X = X.sort_values(by=['id', 'time'], ascending=True)
        grouped_y = y.loc[grouped_X.index]

        self._calculate_transition_matrix(grouped_X, grouped_y)
        return self

                
    def _calculate_transition_matrix(self, X, y):
        '''
        function to compute the transition matrices of different time bins
        '''
        
        n_labels = y.nunique()
        self.transition_matrix = []
        for bins in range(self.num):
            self.transition_matrix.append(np.zeros((n_labels, n_labels), dtype=float))  # List of transition matrices for each bin
            
        unique_ids = X['id'].unique()

        for unique_id in unique_ids:  # Loop to go over each id, to compute transition matrices separately
            id_mask = X['id'] == unique_id    # mask to select only id values for x and y
            id_X = X[id_mask]
            id_y = y[id_mask]
            id_times = id_X['time']  
            
            time_bins = np.linspace(id_times.min(), id_times.max(), num=self.num)
            bin_indices = np.digitize(id_times, bins=time_bins, right=False) - 1
            
            for bin_idx in range(self.num):
                bin_mask = bin_indices == bin_idx
                bin_y = id_y[bin_mask].tolist()

                transition_matrix = np.zeros((n_labels, n_labels), dtype=int)
                
                for i in range(len(bin_y) - 1):
                    transition_matrix[bin_y[i], bin_y[i + 1]] += 1
                    
                row_sums = transition_matrix.sum(axis=1, keepdims=True)
                transition_matrix = transition_matrix / np.maximum(row_sums, 1)

                self.transition_matrix[bin_idx] += transition_matrix
        
        for bin_idx in range(self.num):
            row_sums = self.transition_matrix[bin_idx].sum(axis=1, keepdims=True)
            self.transition_matrix[bin_idx] = self.transition_matrix[bin_idx] / np.maximum(row_sums, 1)
            
        return self
    
    
    
    def predict(self, X):
        '''
        Predicts class for samples in X with transition matrix included
        
        Returns:
        y_pred of predicted classes, length matches rows of X
        
        
        '''
        original_indices = X.index

        X_reset = X.reset_index(drop=True)
        X_features = X_reset.drop(columns=['id', 'time'])
        
        transition_matrix = self.transition_matrix
        y_pred_probs = super().predict_proba(X_features)
        y_pred = np.zeros(X_features.shape[0])
        sorted_X = X_reset.sort_values(by='time', ascending=True)  # Sorting by time

        sorted_y_pred_probs = y_pred_probs[sorted_X.index]
        
        time_bins = np.linspace(sorted_X['time'].min(), sorted_X['time'].max(), num=self.num) 
        bin_indices = np.digitize(sorted_X['time'], bins=time_bins, right = False) - 1

        y_pred[0] = 0 # starting value = 0 = awake
        for i in range(1, len(y_pred)):
            current_bin = bin_indices[i]
                    
            current_transition_matrix = transition_matrix[current_bin]
     
            current_probs = sorted_y_pred_probs[i]
            current_class = y_pred[i-1].astype(int)
            adjusted_probs = current_probs * current_transition_matrix[current_class, :]          
            adjusted_probs /= np.sum(adjusted_probs)
            
            next_class = np.argmax(adjusted_probs)
            
            y_pred[i] = next_class
            
            
        
        y_pred_sorted = np.zeros_like(y_pred)
        for i, idx in enumerate(sorted_X.index):  

            y_pred_sorted[idx] = y_pred[i]
        
        return y_pred_sorted

In [None]:
def model_evaluation(data, T, class_weight):

    params_lr = [100, 10, 1]
    penalties = ['l2']

    accuracy_lr = 0
    accuracy_lrcustom = 0


    #Folds
    n = 5
    k = 4
    outer_CV = KFold(n_splits=n, random_state=None, shuffle=False) # outer CV splits

    for index_train, index_test in outer_CV.split(id_list):
        ids_train = id_list[index_train]
        ids_test = id_list[index_test]

        x_train = data[data['id'].isin(ids_train)][features]
        y_train = data[data['id'].isin(ids_train)][label]
        

        x_test = data[data['id'].isin(ids_test)][features]
        y_test = data[data['id'].isin(ids_test)][label]

        
        inner_CV = KFold(n_splits = k, random_state = None, shuffle = False)
        
        lr_accuracy = np.zeros(len(params_lr))
        lrcustom_accuracy = np.zeros(len(params_lr))
        for index_train_inner, index_val in inner_CV.split(ids_train):
            ids_train_inner = ids_train[index_train_inner]
            ids_val = ids_train[index_val]
            
            x_train_inner = data[data['id'].isin(ids_train_inner)][features]
            y_train_inner = data[data['id'].isin(ids_train_inner)][label]
        

            x_val = data[data['id'].isin(ids_val)][features]
            y_val = data[data['id'].isin(ids_val)][label]
            
            
            for param_idx, param in enumerate(params_lr):
                
                lr_model = LogisticRegression(penalty='l2', C=param, 
                                              solver='lbfgs', class_weight=class_weight)
                lr_model.fit(x_train_inner, y_train_inner)
                y_pred_lr = lr_model.predict(x_val)
                lr_accuracy[param_idx] += accuracy_score(y_val, y_pred_lr)
                
                
                lrcustom_model = customLogisticRegression(penalty='l2', C=param, 
                                              solver='lbfgs', class_weight=class_weight, num = T)

                lrcustom_model.fit(x_train_inner, y_train_inner)
                y_pred_lrcustom = lrcustom_model.predict(x_val)
                lrcustom_accuracy[param_idx] += accuracy_score(y_val, y_pred_lrcustom)
        
        lr_accuracy /= k
        lrcustom_accuracy /= k

        optimal_lr = LogisticRegression(penalty='l2', C=params_lr[np.argmax(lr_accuracy)], 
                                                   solver='lbfgs', class_weight=class_weight)
            
        optimal_lr.fit(x_train, y_train)
        y_pred_lr = optimal_lr.predict(x_test)
        
        accuracy_lr += accuracy_score(y_test, y_pred_lr)
        
        
        
        
        optimal_lrcustom = customLogisticRegression(penalty='l2', C=params_lr[np.argmax(lrcustom_accuracy)], 
                                                   solver='lbfgs', class_weight=class_weight, num = T)
        
        optimal_lrcustom.fit(x_train, y_train)
        y_pred_lrcustom = optimal_lrcustom.predict(x_test)
        
        accuracy_lrcustom += accuracy_score(y_test, y_pred_lrcustom)
        
    
    accuracy_lr /= n
    accuracy_lrcustom /= n
    
    return accuracy_lr, accuracy_lrcustom

In [None]:
accuracies_lr = []
T_accuracies = []
T = np.linspace(2, 16, 15).astype(int)
weight = None
for bins in T:
    print(f'Bins: {bins}')
    accuracies_customlr = []
    for i in range(0, 4):
        if i == 0:
            print("Starting 5-stage classification")
            accuracies1, accuracies2 = model_evaluation(data, bins, class_weight=weight)
            accuracies_customlr.append(accuracies2)
            if bins == T[0]: # accuracies_lr stays constant across T
                accuracies_lr.append(accuracies1)
                

        if i == 1: #N3 + N2 combined
            print("Starting 4-stage classification")
            data_copy = data.copy()
            data_copy.loc[data_copy['psg_labels'] == 3, 'psg_labels'] = 2 # N3 and N2 combined
            data_copy.loc[data_copy['psg_labels'] == 4, 'psg_labels'] = 3 # Adjusting REM for transition matrix computation

            accuracies1, accuracies2 = model_evaluation(data_copy, bins,class_weight=weight)
            accuracies_customlr.append(accuracies2)
            if bins == T[0]: # accuracies_lr stays constant across T
                accuracies_lr.append(accuracies1)
                
        if i == 2: #N3+N2+N1
            print("Starting 3-stage classification")
            data_copy = data.copy()
            data_copy.loc[data_copy['psg_labels'] == 3, 'psg_labels'] = 2 #N3 -> N2
            data_copy.loc[data_copy['psg_labels'] == 2, 'psg_labels'] = 1 ## N2-> N1 (combine all)
            data_copy.loc[data_copy['psg_labels'] == 4, 'psg_labels'] = 2 # Adjusting REM for transition matrix computation

            accuracies1, accuracies2 = model_evaluation(data_copy, bins, class_weight=weight)
            accuracies_customlr.append(accuracies2)
            if bins == T[0]: # accuracies_lr stays constant across T
                accuracies_lr.append(accuracies1)

        if i == 3: #asleep-wake
            print("Starting wake/asleep classification")
            data_copy = data.copy()
            data_copy.loc[data_copy['psg_labels'] == 3, 'psg_labels'] = 2 # N3 -> N2
            data_copy.loc[data_copy['psg_labels'] == 2, 'psg_labels'] = 1 # N2-> N1 (combine all)
            data_copy.loc[data_copy['psg_labels'] == 4, 'psg_labels'] = 1 # only wake/asleep

            accuracies1, accuracies2 = model_evaluation(data_copy, bins, class_weight=weight)
            accuracies_customlr.append(accuracies2)
            if bins == T[0]: # accuracies_lr stays constant across T
                accuracies_lr.append(accuracies1)

            
    T_accuracies.append(accuracies_customlr)
        
print("Finish")


In [None]:
model_labels = ['5 Stages', 'N2+N3', 'Wake-NREM-REM', 'Sleep-Wake']

plt.figure(figsize=(10, 6))
sns.set(style="whitegrid", palette="muted")
for i in range(4):  
    model_accuracies = []  
    for bins in range(len(T)):  
        model_accuracies.append(T_accuracies[bins][i])
    
    
    sns.lineplot(x=T, y=model_accuracies, label=model_labels[i], marker='o')

plt.title("Prediction accuracy across varying numbers of stages with respect to the number of bins")
plt.xlabel("Bins")
plt.ylabel("Accuracy")
plt.ylim(0.2, 0.95)
plt.legend(title='Model')
plt.show()

In [None]:
def sleep_score(data):
    optimal_amounts = [5, 45, 25, 25]
    
    sleep_recordings = data[data['psg_labels'] != 0]['psg_labels']
    total_sleep_recordings = len(sleep_recordings)
    
    label_frequencies = sleep_recordings.value_counts(normalize=True).sort_index().tolist()
    sleep_score = 0
    for i, state_freq in enumerate(label_frequencies):
        sleep_score += min(state_freq*100, optimal_amounts[i])
        
    return sleep_score


In [None]:
# all lists ordered by ids
average_hr = [] 
average_activity = [] 
sleep_scores = []
features = ['activity_count', 'hr']
for id in id_list:
    data_id = data[data['id'] == id]
    
    sleep_scores.append(sleep_score(data_id))
    
    avg_activity = data_id['activity_count'].mean()
    avg_hr = data_id['hr'].mean()
    
    average_activity.append(avg_activity)
    average_hr.append(avg_hr)
    

In [None]:
correlation_data = np.array([average_hr, average_activity, sleep_scores])
labels = ['HR', 'Motion', 'Sleep Score']

correlation_matrix = np.corrcoef(correlation_data)

fig, ax = plt.subplots(figsize=(10, 4))
sns.heatmap(
    correlation_matrix, 
    annot=True, 
    fmt=".2f", 
    cmap="YlGn", 
    xticklabels=labels, 
    yticklabels=labels, 
    linewidths=.5, 
    ax=ax
)

plt.yticks(rotation=45)
ax.set_title("Correlation Heatmap of sleep score, heart rate variation and motion", fontsize=12)
plt.show()