In [1]:
from google.colab import drive

# Montez votre Google Drive
drive.mount('/content/drive')

# Accédez au répertoire ProjetE4
%cd /content/drive/Othercomputers/Mon\ ordinateur\ portable/Documents/GitHub/ProjetE4


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/Othercomputers/Mon ordinateur portable/Documents/GitHub/ProjetE4


In [2]:
!pip install wfdb


Collecting wfdb
  Downloading wfdb-4.1.2-py3-none-any.whl (159 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/160.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━[0m [32m153.6/160.0 kB[0m [31m5.3 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m160.0/160.0 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: wfdb
Successfully installed wfdb-4.1.2


In [3]:
# Importation des modules nécessaires
import wfdb  # Pour la manipulation de données ECG
import csv  # Pour la manipulation de fichiers CSV
import pandas as pd  # Pour la manipulation de données tabulaires
import numpy as np  # Pour la manipulation de tableaux
import os  # Pour les opérations liées au système d'exploitation

# Création du répertoire de sortie s'il n'existe pas déjà
output_dir = "data_creation"
os.makedirs(output_dir, exist_ok=True)

# Numéros des patients
patient_numbers = [
    "100", "101", "102", "103", "104", "105", "106", "107", "108", "109",
    "111", "112", "113", "114", "115", "116", "117", "118", "119", "121",
    "122", "123", "124", "200", "201", "202", "203", "205", "207", "208",
    "209", "210", "212", "213", "214", "215", "217", "219", "220", "221",
    "222", "223", "228", "230", "231", "232", "233", "234"
]


# N = normal
# S = supra-ventricular premature
# V = ventricular escape
# F = fusion of ventricular and normal
# Q = unclassified heartbeats

# Correspondance des symboles aux catégories de battements cardiaques
symbol_to_category = {
    'N': 'N', '.': 'N', 'L': 'N', 'R': 'N', 'e': 'N', 'j': 'N',
    'a': 'S', 'A': 'S', 'J': 'S', 'S': 'S',
    'V': 'V', 'E': 'V',
    'F': 'F',
    '/': 'Q', 'f': 'Q', 'Q': 'Q'
}

# Parcours de chaque numéro de patient
for patient_number in patient_numbers:
    try:
        # Données ECG
        path_to_record = f"mit-database/{patient_number}"
        patient_record = wfdb.rdrecord(path_to_record)
        leads = patient_record.sig_name  # Noms des dérivations ECG
        ecg_data = patient_record.p_signal  # Données ECG

        # Fichier CSV des données ECG
        ecg_filename = f"{output_dir}/{patient_number}_ECG.csv"
        with open(ecg_filename, "w", newline='') as outfile:
            out_csv = csv.writer(outfile)
            out_csv.writerow(leads)  # Écriture des noms des dérivations
            for row in ecg_data:
                out_csv.writerow(row)  # Écriture des données ECG

        # Données des annotations
        annotation = wfdb.rdann(path_to_record, 'atr')
        symbols = annotation.symbol  # Symboles des annotations
        annotations = annotation.sample  # Échantillons correspondants

        # Filtrer les symboles qui ne sont pas dans symbol_to_category
        filtered_symbols_annotations = [(sym, ann) for sym, ann in zip(symbols, annotations) if sym in symbol_to_category]
        categories = [symbol_to_category[sym] for sym, ann in filtered_symbols_annotations]  # Catégories correspondantes
        annotations_filtered = [ann for sym, ann in filtered_symbols_annotations]  # Annotations filtrées

        # Création d'un DataFrame pandas pour les annotations
        df_annotations = pd.DataFrame({'Category': categories, 'Annotation': annotations_filtered})

        # Fichier CSV des annotations
        annotations_filename = f"{output_dir}/{patient_number}_Annotations.csv"
        df_annotations.to_csv(annotations_filename, index=False)  # Écriture des annotations dans le fichier CSV

    except Exception as e:
        print(f"Échec de traitement : {patient_number}: {e}")

print("Terminé")

# Fonction pour traiter les données d'un patient
def process_patient_data(patient_number, data_creation_dir="data_creation"):
    ecg_file_path = os.path.join(data_creation_dir, f"{patient_number}_ECG.csv")
    annotations_file_path = os.path.join(data_creation_dir, f"{patient_number}_Annotations.csv")

    patient_X = []  # Liste pour stocker les données ECG
    patient_Y = []  # Liste pour stocker les catégories correspondantes

    try:
        ecg_df = pd.read_csv(ecg_file_path)
        annotations_df = pd.read_csv(annotations_file_path)
    except FileNotFoundError:
        print(f"Fichiers pour le patient {patient_number} non trouvés. Passage au suivant...")
        return [], []

    first_column_name = ecg_df.columns[0]  # Nom de la première colonne (nom de la dérivation)

    sampling_rate = 360  # Fréquence d'échantillonnage en Hz
    window_size_seconds = 3  # Nombre de secondes avant et après l'annotation
    window_size_samples = window_size_seconds * sampling_rate  # Nombre d'échantillons dans la fenêtre

    for _, row in annotations_df.iterrows():
        annotation_point = row['Annotation']  # Point d'annotation
        category = row['Category']  # Catégorie de l'annotation

        start_point = max(0, annotation_point - window_size_samples)
        end_point = min(len(ecg_df), annotation_point + window_size_samples)

        window_data = ecg_df.iloc[start_point:end_point][first_column_name].to_numpy()  # Données dans la fenêtre
        if len(window_data) < window_size_samples * 2:
            window_data = np.pad(window_data, (0, window_size_samples * 2 - len(window_data)), 'constant')  # Remplissage des données si la fenêtre est incomplète

        patient_X.append(window_data)
        patient_Y.append(category)

    return patient_X, patient_Y

# Initialisation des listes pour contenir l'ensemble des données
all_X = []
all_Y = []

data_creation_dir = "data_creation"  # Répertoire où se trouvent les fichiers créés

# Traitement de chaque patient
for patient_number in patient_numbers:
    patient_X, patient_Y = process_patient_data(patient_number, data_creation_dir)
    all_X.extend(patient_X)
    all_Y.extend(patient_Y)

X = np.array(all_X)  # Données ECG sous forme de tableau numpy
Y = np.array(all_Y)  # Catégories correspondantes sous forme de tableau numpy


Terminé


In [27]:
print(np.shape(all_X))
print(all_X[2][1])


(109494, 2160)
-0.145


In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Conv1D, MaxPooling1D
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.metrics import accuracy_score



# Séparation des données en ensembles d'entraînement et de test
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

# Normalisation des données
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
from sklearn.preprocessing import LabelEncoder

# Mapper les étiquettes de chaînes de caractères à des entiers
label_encoder = LabelEncoder()
Y_train_encoded = label_encoder.fit_transform(Y_train)
Y_test_encoded = label_encoder.transform(Y_test)

# Conversion des étiquettes en catégories
Y_train = to_categorical(Y_train_encoded)
Y_test = to_categorical(Y_test_encoded)


# Définition du modèle
model = Sequential([
    Conv1D(32, kernel_size=3, activation='relu', input_shape=(X_train.shape[1], 1)),
    MaxPooling1D(pool_size=2),
    Conv1D(64, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    Flatten(),
    Dense(128, activation='relu'),
    Dense(Y_train.shape[1], activation='softmax')
])

# Compiler le modèle
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Entraîner le modèle
checkpoint = ModelCheckpoint('best_model.h5', monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
callbacks_list = [checkpoint]
model.fit(X_train, Y_train, validation_data=(X_test, Y_test), epochs=10, batch_size=32, callbacks=callbacks_list)

# Charger le meilleur modèle
best_model = load_model('best_model.h5')

# Évaluer le modèle
Y_pred = best_model.predict(X_test)
accuracy = accuracy_score(np.argmax(Y_test, axis=1), np.argmax(Y_pred, axis=1))
print("Accuracy:", accuracy)


Epoch 1/10
Epoch 1: val_accuracy improved from -inf to 0.98004, saving model to best_model.h5
Epoch 2/10


  saving_api.save_model(


Epoch 2: val_accuracy improved from 0.98004 to 0.98324, saving model to best_model.h5
Epoch 3/10
Epoch 3: val_accuracy improved from 0.98324 to 0.98397, saving model to best_model.h5
Epoch 4/10
Epoch 4: val_accuracy did not improve from 0.98397
Epoch 5/10
Epoch 5: val_accuracy improved from 0.98397 to 0.98489, saving model to best_model.h5
Epoch 6/10
Epoch 6: val_accuracy did not improve from 0.98489
Epoch 7/10
Epoch 7: val_accuracy improved from 0.98489 to 0.98557, saving model to best_model.h5
Epoch 8/10
Epoch 8: val_accuracy did not improve from 0.98557
Epoch 9/10
Epoch 9: val_accuracy did not improve from 0.98557
Epoch 10/10
Epoch 10: val_accuracy improved from 0.98557 to 0.98575, saving model to best_model.h5


NameError: name 'load_model' is not defined

In [None]:


# Charger le meilleur modèle
best_model = load_model('best_model.h5')

# Évaluer le modèle
Y_pred = best_model.predict(X_test)
accuracy = accuracy_score(np.argmax(Y_test, axis=1), np.argmax(Y_pred, axis=1))
print("Accuracy:", accuracy)


Accuracy: 0.985752774099274
