# Le but de ce notebook est d'identifier des caractéristiques très simples permettant d'analyser des battements de coeur de foetus

In [None]:
#pip install hyperopt

import os
import pandas as pd
import numpy as np
import copy

# permet de charger les fichiers matlab (*.mat)
from scipy.io import loadmat
from scipy.interpolate import make_interp_spline
from matplotlib.ticker import PercentFormatter


# le répertoire de travail
directory = os.path.abspath('')

# répertoire où se trouvent toutes les données associés à ce challenge
data_directory = os.path.join(directory, 'Data')

# fichier CSV contenant les targets (1 / 0)
targets_path = os.path.join(data_directory, 'CTG_Challenge_files_GroundTruth.csv')

# répertoire où se trouvent les fichiers de données matlab
matlab_directory = os.path.join(data_directory, 'ctg_workshop_database')

# dans l'électrocardiogramme, nous avons 4 mesures par seconde
# chaque mesure correspondent à au nombre de battements de coeurs par minute
elements_en_1s = 4
elements_en_1minute = int(elements_en_1s*60)
elements_en_5minutes = 5*elements_en_1minute
elements_en_30minutes = 30*elements_en_1minute
elements_en_1heure = 60*elements_en_1minute

# retourne tous les fichiers matlab présents dans le repertoire 'path'
def all_mat_files_in_directory(path: str):
    return [os.path.join(path,f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) and f.endswith('.mat')]

# calcule la moyenne de la séquence ''fhr' en ignorant les NaN
def moyenne(fhr):
    return fhr[~np.isnan(fhr)].mean()

# calcule la médiane de la séquence ''fhr' en ignorant les NaN
def mediane(fhr):
    return np.nanmedian(fhr)

# calcule la std dev de la séquence ''fhr' en ignorant les NaN
def ecart_type(fhr):
    return fhr[~np.isnan(fhr)].std()

# nombre d elements NaN dans la séquence 'fhr'
def nan_count(fhr):
    return np.count_nonzero(np.isnan(fhr))



# PRIVE: Chargement des données d'entraînement

In [None]:
# chemin vers tous les fichiers matlab de la base d'entraînement
id_to_path = dict()
for filename in all_mat_files_in_directory(matlab_directory):
    filename_without_extension = os.path.splitext(os.path.basename(filename))[0]    
    id = int(filename_without_extension.lstrip('0'))
    id_to_path[id] = filename

# on charge les targets associés à chaque fichier d'entraînement
targets_df = pd.read_csv(targets_path)
id_to_target = dict()
for _, row in targets_df.iterrows():
    id_to_target[row['ChallengeID']] = row['TrueOutcome']

# lecture des battements de coeurs des foetus
id_to_fhr = dict()
all_lengths = []
for id, path in id_to_path.items():
    matlab_file = loadmat(path)
    id_to_fhr[id] = matlab_file['fhr'].flatten()
    # on ne garde que la dernière heure #//?D
    id_to_fhr[id] = id_to_fhr[id][-60*4*60:]


# PRIVE: Calcul de statistiques sur ces données d'entraînement
## (pour faciliter la recherche des caractéristiques)

In [None]:
ids = []
targets = []
moyenne_full = []
mediane_full = []
ecart_type_full = []
count_full = []
nan_count_full = []
moyenne_1ere_heure = []
mediane_1ere_heure = []
ecart_type_1ere_heure = []
count_1ere_heure = []
nan_count_1ere_heure = []
moyenne_derniere_heure = []
mediane_derniere_heure = []
ecart_type_derniere_heure = []
count_derniere_heure = []
nan_count_derniere_heure = []

for id in list(id_to_path.keys()):
    ids.append(id)
    targets.append(id_to_target[id])
    
    fhr_full = id_to_fhr[id]
    
    moyenne_full.append(moyenne(fhr_full))
    mediane_full.append(mediane(fhr_full))
    ecart_type_full.append(ecart_type(fhr_full)) 
    count_full.append(fhr_full.size)
    nan_count_full.append(nan_count(fhr_full))

    fhr_1ere_heure = fhr_full[:elements_en_1heure]
    moyenne_1ere_heure.append(moyenne(fhr_1ere_heure))
    mediane_1ere_heure.append(mediane(fhr_1ere_heure))
    ecart_type_1ere_heure.append(ecart_type(fhr_1ere_heure)) 
    count_1ere_heure.append(fhr_1ere_heure.size)
    nan_count_1ere_heure.append(nan_count(fhr_1ere_heure))
    
    fhr_derniere_heure = fhr_full[-elements_en_1heure:]
    moyenne_derniere_heure.append(moyenne(fhr_derniere_heure))
    mediane_derniere_heure.append(mediane(fhr_derniere_heure))
    ecart_type_derniere_heure.append(ecart_type(fhr_derniere_heure)) 
    count_derniere_heure.append(fhr_derniere_heure.size)
    nan_count_derniere_heure.append(nan_count(fhr_derniere_heure))

    
# Sauvegarde de ces statistiques dans un DataFrame
fhr_stats = pd.DataFrame(
    {'ids': ids,
    'targets': targets,
    'moyenne_full' : moyenne_full,
    'mediane_full' : mediane_full,
    'ecart_type_full' : ecart_type_full,
    'count_full' : count_full,
    'nan_count_full' : nan_count_full,
    'moyenne_1ere_heure' : moyenne_1ere_heure,
    'mediane_1ere_heure' : mediane_1ere_heure,
    'ecart_type_1ere_heure' : ecart_type_1ere_heure,
    'count_1ere_heure' : count_1ere_heure,
    'nan_count_1ere_heure' : nan_count_1ere_heure,
    'moyenne_derniere_heure' : moyenne_derniere_heure,
    'mediane_derniere_heure' : mediane_derniere_heure,
    'ecart_type_derniere_heure' : ecart_type_derniere_heure,
    'count_derniere_heure' : count_derniere_heure,
    'nan_count_derniere_heure' : nan_count_derniere_heure,
    })

# on sauvegarde ces stats sur le disque
fhr_stats.to_csv(os.path.join(directory, 'fhr_stats.csv'), index=False)    

# PRIVE: méthodes permettant l'affichage d'électrocardiogrammes

In [None]:
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import copy

def interpolate_nans(array):
    not_nan = ~np.isnan(array)
    indices = np.arange(len(array))
    interpolated_array = np.copy(array)
    interpolated_array[np.isnan(array)] = np.interp(indices[np.isnan(array)], indices[not_nan], array[not_nan])
    return interpolated_array

def display_electrocardiogram(id: int, start_minut:float, duration_in_minuts: float, display_mean: bool = True, interpolate_missing_values:bool = True, min_y_value: int = None, max_y_value: int = None):

    # nous ne gardons que les 28 dernières minutes de l'enregistrement:
    # (pour être aligné avec les fichiers pdf du répertoire 'ctg_data')
    # Il y a 4 mesures par seconde: nous devons donc garder les 4*60*'last_minuts_to_keep' dernières valeurs
    
    fhr_full = loadmat(id_to_path[id])['fhr'].ravel()

    start_idx = int(start_minut*4*60)
    if start_idx<0: 
        start_idx+=len(fhr_full)
    start_idx = max(0, start_idx-1)
    end_idx = min( start_idx+1+int(duration_in_minuts*4*60) , len(fhr_full) )
    
    fhr = fhr_full[start_idx:end_idx]
    if interpolate_missing_values:
        fhr = interpolate_nans(fhr)
    
    # nombre d'éléments dans l'électrocardiogramme
    n = len(fhr)
    
    # Create an array of time points (assuming each heart rate measurement is taken at regular intervals)
    time_in_minuts = np.arange(n)/(60*4)
    # Plot the heart rate data
    plt.figure(figsize=(20, 10))
    plt.plot(time_in_minuts, fhr, linestyle='-', color='black', linewidth=1)
    # Formater les ticks pour afficher le temps au format hh:mm
    def format_func(time_in_minuts, tick_number):
        #time_in_minuts = int(time_in_minuts/(60*4))
        hours = int(time_in_minuts) // 60
        minutes = int(time_in_minuts) % 60
        return f'{hours:02d}:{minutes:02d}'
    plt.gca().xaxis.set_major_formatter(ticker.FuncFormatter(format_func))
    comment = 'sujet sain' if id_to_target[id] == 0 else 'sujet malade'
    
    if display_mean:
        observed_mean = int(moyenne(fhr_full))
        plt.axhline(y=observed_mean, color='red', linewidth=1, linestyle='-', label='Moyenne: '+str(observed_mean))
        # Annotate the y-value on the vertical axis
        plt.text(x=0, y=observed_mean, s=f'{observed_mean:.0f}', color='red', va='center', ha='right')

    # Add labels and title
    plt.xlabel('Temps (minutes)', fontsize=15)
    plt.ylabel('Battements de coeur par minutes', fontsize=15)
    plt.title(f"Electrocardiogramme pour un {comment} ({id})", fontsize=15)
    plt.xlim(0, max(time_in_minuts))
    plt.ylim(min_y_value or 60, max_y_value or 180)
    # Display grid
    plt.grid(True)

    if display_mean:
        plt.legend()
    # Show the plot
    plt.show()

def display_electrocardiograms(count:int, start_minut:int, duration_in_minuts: int, label: int, start_id:int = 1, display_mean: bool = True, interpolate_missing_values:bool = True, min_y_value: int = None, max_y_value: int = None):
    displayed_count = 0
    for idx in range(start_id, start_id+300,1):
        id = idx if idx <=300 else idx-300
        if id_to_target[id] != label:
            continue
        display_electrocardiogram(id, start_minut, duration_in_minuts, display_mean=display_mean, interpolate_missing_values=interpolate_missing_values, min_y_value=min_y_value, max_y_value=max_y_value)
        displayed_count+= 1
        if displayed_count>=count:
            break


# Affichage d'exemples d'électrocardiogrammes pour des sujets sains

In [None]:
nombre_a_afficher = 100
start_minut = 0
duration_in_minuts = 120
label = 0  #0 pour les sujets sains , 1 pour les sujets malades
start_id = 50 # id du 1er élément à afficher

display_electrocardiograms(nombre_a_afficher, start_minut, duration_in_minuts, label, start_id, display_mean=True)

# Affichage d'exemples d'électrocardiogrammes pour des sujets malades

In [None]:
nombre_a_afficher = 100
start_minut = -120
duration_in_minuts = 90
label = 1  #0 pour les sujets sains , 1 pour les sujets malades
start_id = 50 # id du 1er élément à afficher


display_electrocardiograms(nombre_a_afficher, start_minut, duration_in_minuts, label, start_id, display_mean=True)

# PRIVE: Liste des caractéristiques

In [None]:
import functools

id_to_data = id_to_fhr

@functools.lru_cache(maxsize=None)
def proportion_in_interval(id, min_value: float, max_value: float) -> float:
    fhr = id_to_data[id]
    total_points = np.count_nonzero(~np.isnan(fhr))
    if total_points<=0:
        return 0
    total_points_in_interval = np.count_nonzero((fhr > min_value) & (fhr < max_value) )
    return total_points_in_interval/total_points

# calcul de la écart-type
@functools.lru_cache(maxsize=None)
def compute_id_std_dev(id: int):
    return ecart_type(id_to_data[id])
    
# calcul de l'etendue de la séquence associée à l'id 'id' en ignorant les NaN
@functools.lru_cache(maxsize=None)
def compute_id_range(id: int):
    data = id_to_data[id]
    return max(data)-min(data)
   
# calcul de la moyenne de la séquence associée à l'id 'id' en ignorant les NaN
@functools.lru_cache(maxsize=None)
def compute_id_mean(id: int):
    return moyenne(id_to_data[id])

# calcul de la médiane de la séquence associée à l'id 'id' en ignorant les NaN
@functools.lru_cache(maxsize=None)
def compute_id_median(id: int):
    return mediane(id_to_data[id])

# calcul de la moyenne sur l'ensemble du dataset
@functools.lru_cache(maxsize=None)
def global_mean() -> float :
    moyennes = []
    for id,data in id_to_data.items():
        moyennes.append(compute_id_mean(id)) 
    return np.mean(moyennes)



# calcul de l'écart type autour de la médiane de la séquence associée à l'id 'id' en ignorant les NaN
@functools.lru_cache(maxsize=None)
def compute_id_std_median(id: int) -> float:
    data = id_to_data[id]
    median = compute_id_median(id)
    deviations = data - median
    squared_deviations = deviations ** 2
    variance = np.nanmean(squared_deviations)
    std_deviation = np.sqrt(variance)
    return std_deviation

# calcul de la médiane sur l'ensemble du dataset
@functools.lru_cache(maxsize=None)
def global_median() -> float :
    medianes = []
    for id,data in id_to_data.items():
        medianes.append(compute_id_median(id)) 
    return np.mean(medianes)

# calcul du % de fois où une valeur a été passée
@functools.lru_cache(maxsize=None)
def percentage_crossings(id: int, target_value: int) -> float:
    series = id_to_data[id]
    # Remove NaN values
    valid_indices = ~np.isnan(series)
    series = series[valid_indices]
    if len(series) == 0:
        return 0.0
    # Create a boolean array where True indicates the value is greater than the specified value
    above_value = series > target_value
    # Calculate the difference of the boolean array
    # A change from False to True or True to False indicates a crossing
    crossings = np.diff(above_value.astype(int))
    # Count the number of crossings
    num_crossings = np.sum(np.abs(crossings))
    # Return the % of crossing
    return num_crossings/len(series)


# calcul du nombre de passage par une valeur en 5 minutes
@functools.lru_cache(maxsize=None)
def count_crossing_in_5minutes(id: int, target_value: int) -> float:
    return percentage_crossings(id, target_value)*elements_en_5minutes


# 1ere caracteristique version 1A : sujet sain si le pourcentage de points dans un intervalle donné est supérieur à un seuil (avec ajustement à la moyenne).
def calcul_caracteristique1A(id: int, hyperparameters: dict, suffix: str) -> int:
    min_value = hyperparameters['min_value_'+suffix]
    min_value += compute_id_mean(id)-global_mean()
    max_value = min_value +hyperparameters['range_'+suffix]
    return 0 if proportion_in_interval(id, min_value, max_value) > hyperparameters['seuil_'+suffix] else 1

# 1ere caracteristique version 1B : sujet sain si le pourcentage de points dans un intervalle donné est supérieur à un seuil (sans ajustement à la moyenne).
def calcul_caracteristique1B(id: int, hyperparameters: dict, suffix: str) -> int:
    min_value = hyperparameters['min_value_'+suffix]
    max_value = min_value  +hyperparameters['range_'+suffix]
    return 0 if proportion_in_interval(id, min_value, max_value) > hyperparameters['seuil_'+suffix] else 1

# 3eme caracteristique: sujet sain si l'étendue de l'électrocardiogramme est inférieur à un seuil.
def calcul_caracteristique3(id: int, hyperparameters: dict, suffix: str) -> int:
    return 0 if compute_id_range(id)< hyperparameters['seuil_'+suffix] else 1

# 4eme caracteristique: sujet sain si le pourcentage de points autour de la moyenne est supérieur à un seuil.
def calcul_caracteristique4A(id: int, hyperparameters: dict, suffix: str) -> int:
    range = hyperparameters['range_'+suffix]
    moyenne = compute_id_mean(id)
    return 0 if proportion_in_interval(id, moyenne-range, moyenne+range) > hyperparameters['seuil_'+suffix] else 1

# caracteristique 4B: sujet sain si le pourcentage de points autour de la médiane est supérieur à un seuil.
def calcul_caracteristique4B(id: int, hyperparameters: dict, suffix: str) -> int:
    range = hyperparameters['range_'+suffix]
    median = compute_id_median(id)
    return 0 if proportion_in_interval(id, median-range, median+range) > hyperparameters['seuil_'+suffix] else 1

# caracteristique 5A: sujet sain si la moyenne de l'électrocardiogramme est dans un intervalle donné
def calcul_caracteristique5A(id: int, hyperparameters: dict, suffix: str) -> int:
    min_value = hyperparameters['min_value_'+suffix]
    max_value = min_value + hyperparameters['range_'+suffix]
    moyenne = compute_id_mean(id)
    return 0 if ((moyenne>=min_value) and (moyenne<=max_value)) else 1
    
# caracteristique 5B: sujet sain si la médiane de l'électrocardiogramme est dans un intervalle donné
def calcul_caracteristique5B(id: int, hyperparameters: dict, suffix: str) -> int:
    min_value = hyperparameters['min_value_'+suffix]
    max_value = min_value + hyperparameters['range_'+suffix]
    median = compute_id_median(id)
    return 0 if ((median>=min_value) and (median<=max_value)) else 1

# caracteristique 6: sujet sain si le % de points au dessus de la moyenne de l'électrocardiogramme est inférieur à un seuil.
def calcul_caracteristique6(id: int, hyperparameters: dict, suffix: str) -> int:
    moyenne = compute_id_mean(id)
    return 0 if proportion_in_interval(id, moyenne, 1e9) < hyperparameters['seuil_'+suffix] else 1

# 7eme caracteristique: sujet sain si la écart-type de l'électrocardiogramme est inférieur à un seuil.
def calcul_caracteristique7A(id: int, hyperparameters: dict, suffix: str) -> int:
    return 0 if compute_id_std_dev(id) < hyperparameters['seuil_'+suffix] else 1

# caracteristique 7B: sujet sain si l'écart type autour de la médiane de l'électrocardiogramme est inférieur à un seuil.
def calcul_caracteristique7B(id: int, hyperparameters: dict, suffix: str) -> int:
    return 0 if compute_id_std_median(id) < hyperparameters['seuil_'+suffix] else 1

# 8eme caracteristique: sujet sain si le pourcentage de points dans l'intervalle [ k1 * moyenne, (1+k2) * moyenne] est supérieur à un seuil.
def calcul_caracteristique8(id: int, hyperparameters: dict, suffix: str) -> int:
    k1 = hyperparameters['k1_'+suffix]
    k2 = hyperparameters['k2_'+suffix]
    moyenne = compute_id_mean(id)
    return 0 if proportion_in_interval(id, k1*moyenne, (1+k2)*moyenne) > hyperparameters['seuil_'+suffix] else 1

# caracteristique 9: sujet sain si le pourcentage de points dans l'intervalle [ 0, k] est inférieur à un seuil.
def calcul_caracteristique9(id: int, hyperparameters: dict, suffix: str) -> int:
    k = hyperparameters['k_'+suffix]
    return 0 if proportion_in_interval(id, 0, k) < hyperparameters['seuil_'+suffix] else 1
   
def calcul_caracteristique13(id: int, hyperparameters: dict, suffix: str) -> int:
    k = hyperparameters['k_'+suffix]
    return 0 if count_crossing_in_5minutes(id, k) > hyperparameters['seuil_'+suffix] else 1

def calcul_caracteristique14A(id: int, hyperparameters: dict, suffix: str) -> int:
    return 0 if count_crossing_in_5minutes(id, compute_id_mean(id)) > hyperparameters['seuil_'+suffix] else 1

def calcul_caracteristique14B(id: int, hyperparameters: dict, suffix: str) -> int:
    return 0 if count_crossing_in_5minutes(id, compute_id_median(id)) > hyperparameters['seuil_'+suffix] else 1


# PRIVE: Hyperparameters Management

In [None]:
import numpy as np
from sklearn.metrics import f1_score
from typing import List
import math
from hyperopt import fmin, tpe, space_eval, hp, Trials, rand as hyperopt_rand
import hashlib
import random
import time
import sys

current_lowest_error = None
stats_hpo = dict()
last_time_display_stats_hpo = time.time()

def compute_error(TP: int, TN: int, FP: int, FN: int):
    return (compute_error_target0(TP,TN,FP,FN)+compute_error_target1(TP,TN,FP,FN))/2

def compute_f1_score(TP: int, TN: int, FP: int, FN: int):
    return (2*TP)/(2*TP+FP+FN)
        
def compute_error_target0(TP: int, TN: int, FP: int, FN: int):
    return 1.0-TN/(1*TN+FP)

def compute_error_target1(TP: int, TN: int, FP: int, FN: int):
    return 1.0-TP/(1*TP+FN)
        
def compute_matrice_de_confusion(id_to_predictions, id_to_target) -> (int, int, int,int):
        TP = 0
        TN = 0
        FP = 0
        FN = 0
        for id, data in id_to_predictions.items():
            target = id_to_target[id]
            prediction = id_to_predictions[id]
            if prediction == target: # bonne prediction
                if target == 1:
                    TP += 1
                else:
                    TN += 1
            else: # erreur dans la prediciton
                if prediction == 1:
                    FP += 1
                else:
                    FN += 1
        return (TP,TN,FP,FN)

def compute_single_prediction_single_caracteristique(id: str, hyperparameters: dict, caracteristique: str) -> int:  
    if caracteristique[-1:] == 'i':
        # we compute the complement of caracteristique 'caracteristique[:-1]'
        # ex: caracteristique '10i' will compute the complement of caracteristique '10'
        return 1-globals()['calcul_caracteristique'+caracteristique[:-1]](id, hyperparameters, caracteristique)
    return globals()['calcul_caracteristique'+caracteristique](id, hyperparameters, caracteristique)

def compute_single_prediction(id: str, hyperparameters: dict) -> int:  
    valeurs_caracteristiques_a_utiliser = []
    for c in hyperparameters['caracteristiques_a_utiliser'].split('+'):
        valeurs_caracteristiques_a_utiliser.append(compute_single_prediction_single_caracteristique(id, hyperparameters, c))
    if not all_elements_same(valeurs_caracteristiques_a_utiliser):
        return hyperparameters['label_si_resultats_differents']
    return valeurs_caracteristiques_a_utiliser[0]
    
def all_elements_same(data):
    first_element = data[0]
    return all(element == first_element for element in data)

def compute_toutes_les_predictions(hyperparameters: dict) -> dict:
    id_to_predictions = dict()
    for id in id_to_data.keys():
        id_to_predictions[id] = compute_single_prediction(id, hyperparameters)
    return id_to_predictions
    
def train(hyperparameters: dict, verbose: bool) -> dict:
    id_to_predictions = compute_toutes_les_predictions(hyperparameters)
    (TP,TN,FP,FN) = compute_matrice_de_confusion(id_to_predictions, id_to_target)
    metrics = dict()
    metrics['TP'] = TP
    metrics['TN'] = TN
    metrics['FP'] = FP
    metrics['FN'] = FN
    metrics['error_target0'] = compute_error_target0(TP,TN,FP,FN)
    metrics['error_target1'] = compute_error_target1(TP,TN,FP,FN)
    current_error = compute_error(TP,TN,FP,FN)
    metrics['erreur'] = current_error
    update_stats_hpo(current_error, hyperparameters)
    global last_time_display_stats_hpo
    if (time.time()-last_time_display_stats_hpo)>600:
        save_stats_hpo(False)
        last_time_display_stats_hpo = time.time()
    global current_lowest_error
    if not current_lowest_error or current_error<current_lowest_error:
        current_lowest_error = current_error
        if verbose:
            print(f"new lowest error {round(current_error,4)} for hyperparameters {[c for c in hyperparameters.items() if c[1] is not None]}")
        save_model(hyperparameters, metrics)
        save_stats_hpo(False)
    return metrics

    
def update_stats_hpo(error:float, hyperparameters: dict) -> None:
    for hpo_key, hpo_value in hyperparameters.items():
        if hpo_value is None:
            continue
        if hpo_key not in stats_hpo:
            stats_hpo[hpo_key] = dict()
        if hpo_value not in stats_hpo[hpo_key]:
            stats_hpo[hpo_key][hpo_value] = [0,0]
        stats_hpo[hpo_key][hpo_value][0] += 1
        stats_hpo[hpo_key][hpo_value][1] += error

    
# the objective used for Hyperparameters Optimization (HPO)
# it is the function to minimize
def objective(sample_from_search_space):
    hyperparameters = fix_hyperparameters(sample_from_search_space)
    model_name = get_model_name(hyperparameters)
    if model_name in already_processed_model_names_with_hpo:
        metrics = already_processed_model_names_with_hpo[model_name]
    else:
        metrics = train(hyperparameters, True)
        already_processed_model_names_with_hpo[model_name] = metrics
    # we want to minimize this mettric ('erreur')
    return metrics['erreur']

hpo_session_id = str(int(100*time.time()))

def save_stats_hpo(display: bool):
    res = stats_hpo_to_str()
    try:
        path = os.path.join(directory, 'hpo_'+hpo_session_id+".txt")
        with open(path, 'w') as f:
            f.write(res)
    except Exception as e:
        print(f'failed to save hp')
    if display:
        print(res)

# path to the last model saved
last_path_for_save_model = None
        
def save_model(hyperparameters: dict, metrics: dict) -> None:
    global last_path_for_save_model
    try:
        erreur = metrics['erreur']
        path = os.path.join(directory, 'hpo_'+hpo_session_id+'_'+hyperparameters['caracteristiques_a_utiliser']+'_'+str(erreur)+".txt")
        with open(path, 'w') as f:
            text = hyperparameters_to_str(hyperparameters)
            for m, v in metrics.items():
                text += f'\n#{m}={v}'
            f.write(text)
        try:
            if last_path_for_save_model and os.path.isfile(last_path_for_save_model):
                os.remove(last_path_for_save_model)
        except Exception as e:
            print(f'failed to delete file {last_path_for_save_model}: {e}')
        last_path_for_save_model = path
    except Exception as e:
        print(f'failed to save hp: {e}')
    
def stats_hpo_to_str() -> str:
    max_intervals = 10
    result = ""
    for hpo_key, hpo_key_stats in stats_hpo.items():
        result += f"stats for key '{hpo_key}':\n"
        all_values = list(hpo_key_stats.keys())
        old_value_to_new_value = dict()
        if len(all_values)>max_intervals and (isinstance(all_values[0], int) or isinstance(all_values[0], float)):
            min_value = float(min(all_values))
            max_value = float(max(all_values))
            for v in all_values:
                index_interval = int (max_intervals * (float(v-min_value)/(max_value-min_value)))
                index_interval = min(index_interval, max_intervals-1)
                min_value_interval = min_value+index_interval* (max_value-min_value)/max_intervals
                max_value_interval = min_value_interval+ (max_value-min_value)/max_intervals
                new_key = f'[{round(min_value_interval,2)}, {round(max_value_interval,2)}]'
                if new_key not in old_value_to_new_value:
                    old_value_to_new_value[new_key] = [0,0]
                old_value_to_new_value[new_key][0] += hpo_key_stats[v][0]
                old_value_to_new_value[new_key][1] += hpo_key_stats[v][1]
        else:
            for v in all_values:
                old_value_to_new_value[v] = hpo_key_stats[v]     
        
        for value, value_stats in sorted(old_value_to_new_value.items(), key=lambda item: item[1][1]/item[1][0], reverse=True):
            count = value_stats[0]
            avg_error = value_stats[1]/count
            result += f'\t{value} : {avg_error} ({count} samples)\n'
    return result
        
def extract_caracteristique_id(key: str):
    if key == 'caracteristiques_a_utiliser' or key == 'label_si_resultats_differents':
        return None
    splitted = key.split('_')
    if len(splitted)>=2 and len(splitted[-1]) >=1 and splitted[-1][0].isdigit():
        return splitted[-1]
    return None

# When conducting an HPO search, some hyperparameters may exhibit inconsistent values.
# This method aims to address those inconsistencies.
def fix_hyperparameters(hyperparameters: dict) -> dict :
    res = dict(hyperparameters)
    for key in list(res.keys()):
        caracteristique_id = extract_caracteristique_id(key)
        if caracteristique_id and caracteristique_id not in res['caracteristiques_a_utiliser'].split('+'):
            del res[key]
    if '+' not in res['caracteristiques_a_utiliser']:
        res['label_si_resultats_differents'] = None
    return res


# Transform the dictionary of hyperparameters 'hyperparameters' into string.
def hyperparameters_to_str(hyperparameters: dict) -> str:
    sorted_hyperparameters = sorted (hyperparameters.items())
    return "\n".join([hyperparameter_name+" = "+str(hyperparameter_value) for (hyperparameter_name,hyperparameter_value) in sorted_hyperparameters if hyperparameter_value is not None])

def get_model_name(hyperparameters: dict) -> str:
    file_content = hyperparameters_to_str(hyperparameters)
    return compute_hash(file_content, 10)

def compute_hash(input_string, max_length):
    # Calculate MD5 hash of the input string
    md5_hash = hashlib.md5(input_string.encode('ascii')).hexdigest().upper()
    # Return the hash truncated to the max_length
    return md5_hash[:max_length]

def launch_hpo_for_transformer_model(max_evals: int):
    seed = random.randint(0, 100000)
    print(f'using seed: {seed}')
    rstate = np.random.default_rng(seed)
    best_indexes = fmin(
        fn=objective,  # "Loss" function to minimize
        space=search_space,  # Hyperparameter space
        algo=hyperopt_rand.suggest, #Random Search
        #algo=tpe.suggest,  # Tree-structured Parzen Estimator (TPE)
        max_evals=max_evals,  # Perform 'max_evals' trials
        max_queue_len = 10,
        rstate =rstate,
    )

    # Get the best parameters
    best  = space_eval(search_space, best_indexes)
    print(f"Found minimum after {max_evals} trials:")
    print([c for c in best.items() if c[1] is not None])
    return best



def float_range(x1: float, x2: float, count:int = 100) -> List[float]:
    epsilon = (x2-x1)/count
    # Generate the range of float values with the specified step
    return list(np.arange(x1, x2, epsilon))

def int_range(x:int, y:int, count:int = 100) -> List[int]:
    epsilon = int ((y+1-x)/count )
    epsilon = max(epsilon, 1)
    return list(range(x, y+1,epsilon))



search_space = {

    # 1ere caracteristique version 1A : sujet sain si le pourcentage de points dans un intervalle donné est supérieur à un seuil (avec ajustement à la moyenne).
    'min_value_1A': hp.choice('min_value_1A', int_range(80,120)), # 95 104 93
    'range_1A': hp.choice('range_1A', int_range(50,150)), # 125 , 115 , 85  89
    'seuil_1A': hp.choice('seuil_1A', float_range(0.9, 1.0)), # 0.95 0.96 0.97

    # complément de la caractéristique 1A: sujet sain si le pourcentage de points dans un intervalle donné est inférieur à un seuil (avec ajustement à la moyenne).
    'min_value_1Ai': hp.choice('min_value_1Ai', int_range(15,50)),  # 35 , 20, 22 20
    'range_1Ai': hp.choice('range_1Ai', int_range(50,100)), # 65 ,64 87 72
    'seuil_1Ai': hp.choice('seuil_1Ai', float_range(0.0, 0.1)), # 0.04 , 0.03 0.01  0.053 0.03

    # 1ere caracteristique version 1B : sujet sain si le pourcentage de points dans un intervalle donné est supérieur à un seuil (sans ajustement à la moyenne).
    'min_value_1B': hp.choice('min_value_1B', int_range(80,120)), # 93 97
    'range_1B': hp.choice('range_1B', int_range(20,100)), # 72 61
    'seuil_1B': hp.choice('seuil_1B', float_range(0.8, 1.0, 200)), # 0.906 0.882

    # complément de la caractéristique 1B: sujet sain si le pourcentage de points dans un intervalle donné est inférieur à un seuil (sans ajustement à la moyenne).
    'min_value_1Bi': hp.choice('min_value_1Bi', int_range(10,100)), # 14  60
    'range_1Bi': hp.choice('range_1Bi', int_range(20,100)), # 70 30
    'seuil_1Bi': hp.choice('seuil_1Bi', float_range(0.0, 0.1)), # 0.03 0.024

    # 3eme caracteristique: sujet sain si l'étendue de l'électrocardiogramme est inférieur à un seuil.
    'seuil_3': hp.choice('seuil_3', float_range(50,250, 1000)), # 124

    # caracteristique 4A: sujet sain si le pourcentage de points autour de la moyenne est supérieur à un seuil.
    'range_4A': hp.choice('range_4A', float_range(10,50)), # 43 44
    'seuil_4A': hp.choice('seuil_4A', float_range(0.90, 0.99,1000)), # 0.96 0.97

    # caracteristique 4B: sujet sain si le pourcentage de points autour de la médiane est supérieur à un seuil.
    'range_4B': hp.choice('range_4B', float_range(10,50)), # 37  25 43
    'seuil_4B': hp.choice('seuil_4B', float_range(0.90, 0.99,1000)), # 0.96

    # caracteristique 5A: sujet sain si la moyenne de l'électrocardiogramme est dans l'intervalle [k1, k2]
    #'min_value_5A': hp.choice('min_value_5A', list(range(122-40,122+40+1,1))),
    #'seuil_5A': hp.choice('seuil_5A', list(range(9-6,9+6+1,1))),
    'min_value_5A': hp.choice('min_value_5A', int_range(100,170)), # 124 142
    'range_5A': hp.choice('range_5A', int_range(0,30)), # 11 14

    # complément de la caractéristique 5A
    'min_value_5Ai': hp.choice('min_value_5Ai', int_range(100,170)), # 131 133
    'range_5Ai': hp.choice('range_5Ai', int_range(0,30)), #9 11

    # caracteristique 5B: sujet sain si la moyenne de l'électrocardiogramme est dans l'intervalle [k1, k2]
    'min_value_5B': hp.choice('min_value_5B', int_range(50,170)), # 110
    'range_5B': hp.choice('range_5B', int_range(0,50)), # 28
   
    # caracteristique 6: sujet sain si le % de points au dessus de la moyenne de l'électrocardiogramme est inférieur à un seuil.
    #'seuil_6': hp.choice('seuil_6', [(i * 0.01) for i in range(61-30, 61+30+1,1)]),
    'seuil_6': hp.choice('seuil_6', float_range(0.0, 1.0, 1000)), # 0.629

    # caracteristique 7A: sujet sain si l'écart type de l'électrocardiogramme est inférieur à un seuil.
    'seuil_7A': hp.choice('seuil_7A', float_range(18.0, 22.0, 1000)), # 19.25 19.43
    
    # caracteristique 7B: sujet sain si l'écart type autour de la médiane de l'électrocardiogramme est inférieur à un seuil.
    'seuil_7B': hp.choice('seuil_7B', float_range(18.0, 22.0, 1000)), # 19.636  19.615
    
    # 8eme caracteristique: sujet sain si le pourcentage de points dans l'intervalle [ k1 * moyenne, (1+k2) * moyenne] est supérieur à un seuil.
    'k1_8': hp.choice('k1_8', float_range(0.5, 0.9)), # 0.66  0.72  0.71 
    'k2_8': hp.choice('k2_8', float_range(0.08, 0.2)),  # 0.13 0.14 0.1
    'seuil_8': hp.choice('seuil_8', float_range(0.8, 1.0)), #0.87 0.96 0.84

    # caracteristique 9: sujet sain si le pourcentage de points dans l'intervalle [ 0, k] est inférieur à un seuil.
    'k_9': hp.choice('k_9', float_range(50, 150,1000)), # 86
    'seuil_9': hp.choice('seuil_9', float_range(0.0, 0.1)  ), # 0.03 0.017

    # complément de la caractéristique 9
    'k_9i': hp.choice('k_9i', float_range(150,200)), # 156  151
    'seuil_9i': hp.choice('seuil_9i', float_range(0.8, 1.0)), # 0.94 0.885

    # caracteristique 13: sujet sain si le nombre de passage par une valeur 'k' en 5 minutes est > à un seuil.
    'k_13': hp.choice('k_13', int_range(10, 180)),  # 139 140
    'seuil_13': hp.choice('seuil_13', float_range(0, 50, 100)),

    # complément de la caractéristique 13
    'k_13i': hp.choice('k_13i', int_range(50, 90)),  #67 73 72
    'seuil_13i': hp.choice('seuil_13i', float_range(0, 50, 500)),

    # caracteristique 14A: sujet sain si le nombre de passage par la moyenne en 5minutes est > à un seuil.
    'seuil_14A': hp.choice('seuil_14A', float_range(15, 40, 1000)),
    
    # caracteristique 14B: sujet sain si le nombre de passage par la médiane en 5 minutes  est > à un seuil.
    'seuil_14B': hp.choice('seuil_14B', float_range(15, 40, 1000)),
    
    'caracteristiques_a_utiliser': hp.choice('caracteristiques_a_utiliser', ['14A+7A']),
    'label_si_resultats_differents': hp.choice('label_si_resultats_differents', [0, 1]),
}


if len(sys.argv) >=2 and str.isdigit(sys.argv[1][0]):
    caracteristiques_a_utiliser = sys.argv[1].split(',')
    print(f'caracteristiques_a_utiliser will be set to {caracteristiques_a_utiliser}')
    search_space['caracteristiques_a_utiliser'] =  hp.choice('caracteristiques_a_utiliser', caracteristiques_a_utiliser)


max_evals = 1000
if len(sys.argv) >=3 and str.isdigit(sys.argv[2]):
    print(f'max_evals will be set to {sys.argv[2]}')
    max_evals = int(sys.argv[2])
print(f'max_evals value is {max_evals}')

already_processed_model_names_with_hpo = dict()
start_time = time.time()
# Uncomment following line to enable HPO
#best = launch_hpo_for_transformer_model(max_evals)
print(f'hpo took {round(time.time()-start_time,2)}s')
save_stats_hpo(True)


<hr style="height:2px; border-width:0; color:black; background-color:black">
<span style="font-size: 48px;">1ère partie</span>
<hr style="height:2px; border-width:0; color:black; background-color:black">

# Approche par le nombre de passages par la moyenne
## le nombre de passage par la moyenne est plus faible pour les sujets malades

## On demande à l'étudiant de compter le nombre de passages par la moyenne pour des sujets sains et pour des sujets malades en 5 minutes

## PRIVE: TODO: Exemple d'électrocardiogrammes de sujets sains pour lesquels on peut demander de compter le nombre de passages par la moyenne 

In [None]:
display_electrocardiogram(53, 105+3, 5, min_y_value=125, max_y_value=165)
display_electrocardiogram(53, 156, 5, min_y_value=110, max_y_value=160)
display_electrocardiogram(54, 0, 5, min_y_value=120, max_y_value=150)
display_electrocardiogram(56, 60+30, 5, min_y_value=125, max_y_value=155)
display_electrocardiogram(56, 56, 5, min_y_value=115, max_y_value=155)
display_electrocardiogram(75, 11, 5, min_y_value=100, max_y_value=135)
display_electrocardiogram(75, 27, 5, min_y_value=85, max_y_value=130)
display_electrocardiogram(75, 55, 5, min_y_value=90, max_y_value=125)
display_electrocardiogram(76, 43+39, 5, min_y_value=115, max_y_value=155)
display_electrocardiogram(84, 115, 5, min_y_value=120, max_y_value=175)
display_electrocardiogram(90, 70, 5, min_y_value=130, max_y_value=150)

## PRIVE: TODO: Exemple d'électrocardiogrammes de sujets malades pour lesquels on peut demander de compter le nombre de passages par la moyenne 

In [None]:
display_electrocardiogram(46, 10, 5)
display_electrocardiogram(46, 87.9, 5)
display_electrocardiogram(46, 92.9, 5)
display_electrocardiogram(46, 110.5 , 5)
display_electrocardiogram(46,235.5 , 5)
display_electrocardiogram(70,-120+95.6 , 5)
display_electrocardiogram(87, 42, 5)
display_electrocardiogram(103, 55.5, 5)
display_electrocardiogram(103,-120+44.1 , 5)
display_electrocardiogram(123,-120+65 , 5)
display_electrocardiogram(123,-120+70 , 5)
display_electrocardiogram(150, 17, 5)
display_electrocardiogram(224, 12.8, 5)

# TODO: 1B
## Faire constater par l'étudiant que ce nombre de passages par la moyenne est plus important pour les sujets sains que pour les sujets malades

In [None]:
def display_stats_nombre_de_passages_par_la_moyenne_en_5_minutes():
    count_crossing_target_0 =[]
    count_crossing_target_1 =[]
    for id in id_to_data.keys():
        count_crossing = count_crossing_in_5minutes(id, compute_id_mean(id))
        if 0 == id_to_target[id]:
            count_crossing_target_0.append(count_crossing)
        else:
            count_crossing_target_1.append(count_crossing)
    print(f'Nombre de passages par la moyenne en 5 minutes:')
    print(f'\tMoyenne pour les sujets sains = {round(np.mean(count_crossing_target_0),2)}')
    print(f'\tMoyenne pour les sujets malades = {round(np.mean(count_crossing_target_1),2)}')
display_stats_nombre_de_passages_par_la_moyenne_en_5_minutes()


# Affichage de la qualité de  l'outil en fonction de la valeur de la caractéristique: nombre de passages par la moyenne 5 minutes.
## Au dessus de ce seuil, on considère que le sujet est sain

In [None]:
from scipy.interpolate import make_interp_spline
from matplotlib.ticker import PercentFormatter

valeurs_caracteristique = []
erreur_caracteristique = []
for seuil_nombre_de_passages_par_la_moyenne_en_5_minutes in list(np.arange(0, 50, 0.1)):
    config = {'caracteristiques_a_utiliser': '14A', 'seuil_14A': seuil_nombre_de_passages_par_la_moyenne_en_5_minutes}
    erreur = train(config, False)['erreur']
    valeurs_caracteristique.append(seuil_nombre_de_passages_par_la_moyenne_en_5_minutes)
    erreur_caracteristique.append(erreur)

x_dense = np.linspace(min(valeurs_caracteristique), max(valeurs_caracteristique), 500)  # 500 points pour une courbe lisse
spline = make_interp_spline(valeurs_caracteristique, erreur_caracteristique)
y_dense = spline(x_dense)

plt.figure(figsize=(20, 10))
plt.plot(x_dense, y_dense, label='Erreur', color='b')
plt.scatter(valeurs_caracteristique, erreur_caracteristique, color='r')
plt.gca().tick_params(axis='y', which='major', labelsize=20) 
#plt.gca().xaxis.set_major_formatter(PercentFormatter(1))
plt.gca().yaxis.set_major_formatter(PercentFormatter(1))
plt.xticks(fontsize=20)
plt.xlabel("Nombre de passages par la moyenne en 5 minutes", fontsize=20)
plt.ylabel('Erreur', fontsize=20)
plt.title("Erreur (%) en fonction du nombre de passages en 5 minutes", fontsize=20)
plt.legend()


In [None]:
# l'étudiant devra remplacer le 10 ci dessous

seuil_nombre_de_passages_par_la_moyenne_en_5_minutes = 20

config = {'caracteristiques_a_utiliser': '14A', 'seuil_14A': seuil_nombre_de_passages_par_la_moyenne_en_5_minutes, 'label_si_resultats_differents': 1}
metrics = train(config, False)
print(f"Erreur obtenue: {round(100*metrics['erreur'],1)}%")


<hr style="height:2px; border-width:0; color:black; background-color:black">
<span style="font-size: 48px;">2ème partie</span>
<hr style="height:2px; border-width:0; color:black; background-color:black">

# TODO: 
# Faire constater par l'étudiant que l'écart type de l'électrocardiogramme des sujets malades est en moyenne plus élevé que l'écart type observé pour les sujets sains

In [None]:
def display_stats_std_dev():
    std_dev_target_0 =[]
    srd_dev_target_1 =[]
    for id in id_to_data.keys():
        std_dev = compute_id_std_dev(id)
        if 0 == id_to_target[id]:
            std_dev_target_0.append(std_dev)
        else:
            srd_dev_target_1.append(std_dev)
    print(f'Ecart type:')
    print(f'\tMoyenne pour les sujets sains = {round(np.mean(std_dev_target_0),2)}')
    print(f'\tMoyenne pour les sujets malades = {round(np.mean(srd_dev_target_1),2)}')
display_stats_std_dev()

# Affichage de la qualité de  l'outil en fonction du seuil d'écart type choisi pour identifier les sujets malades

In [None]:
valeurs_caracteristique = []
erreur_caracteristique = []
for threshold_ecart_type in range(0,30+1,1):
    config = {'caracteristiques_a_utiliser': '7A', 'seuil_7A': threshold_ecart_type}
    erreur = train(config, False)['erreur']
    valeurs_caracteristique.append(threshold_ecart_type)
    erreur_caracteristique.append(erreur)
    #print(f"threshold_ecart_type={threshold_ecart_type} => Erreur={round(100*erreur,1)}%")

x_dense = np.linspace(min(valeurs_caracteristique), max(valeurs_caracteristique), 500)  # 500 points pour une courbe lisse
spline = make_interp_spline(valeurs_caracteristique, erreur_caracteristique)
y_dense = spline(x_dense)

plt.figure(figsize=(20, 10))
plt.plot(x_dense, y_dense, label='Erreur', color='b')
plt.scatter(valeurs_caracteristique, erreur_caracteristique, color='r')
plt.gca().tick_params(axis='y', which='major', labelsize=20) 
plt.gca().yaxis.set_major_formatter(PercentFormatter(1))
plt.xticks(fontsize=20)
plt.xlabel("Ecart type de l'électrocardiogramme", fontsize=20)
plt.ylabel('Erreur', fontsize=20)
plt.title("Erreur en fonction du seuil de l'écart type de l'électrocardiogramme", fontsize=20)
plt.legend()


# On demande à l'étudiant de choisir le seuil pour l'écart type 

In [None]:
# l'étudiant devra remplacer le 10 ci dessous
seuil_ecart_type = 10

config = {'caracteristiques_a_utiliser': '7A', 'seuil_7A': seuil_ecart_type}
metrics = train(config, False)
print(f"Erreur obtenue: {round(100*metrics['erreur'],1)}%")


<hr style="height:2px; border-width:0; color:black; background-color:black">
<span style="font-size: 48px;">3ème partie</span>
<hr style="height:2px; border-width:0; color:black; background-color:black">

# On combine les 2 caractéristiques précédentes

# TODO: affichage d'un graphique montrant l'évolution de l'erreur en fonction de ces 2 caractéristiques

# On demande à l'étudiant de choisir la valeur de ces 2 caractéristiques

In [None]:
# l'étudiant devra remplacer les 2 valeurs ci dessous

seuil_ecart_type = 10
seuil_nombre_de_passages_par_la_moyenne_en_5_minutes = 10

config = {'caracteristiques_a_utiliser': '7A+14A', 'seuil_7A': seuil_ecart_type, 'seuil_14A': seuil_nombre_de_passages_par_la_moyenne_en_5_minutes,'label_si_resultats_differents': 1}
metrics = train(config, False)
print(f"Erreur obtenue: {round(100*metrics['erreur'],1)}%")
