In [32]:
import numpy as np
import joblib
import torch
import torch.nn as nn
import torch.nn.functional as F
from tensorflow.keras.models import load_model
import librosa
import torch.optim as optim
import pandas as pd
from sklearn.preprocessing import StandardScaler
from PIL import Image
import torchvision.transforms as transforms
import os



In [33]:


# **Updated CNN Model**
class ECG_CNN(nn.Module):
    def __init__(self, num_classes):
        super(ECG_CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, padding=1)  
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)  
        self.fc1 = nn.Linear(128, 64)  
        self.fc2 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.global_avg_pool(x)
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


def predict_ecg_image(image_path):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # ✅ Ensure model architecture matches the saved one
    num_classes = 4
    model = ECG_CNN(num_classes=num_classes).to(device)

    try:
        state_dict = torch.load("best_ecg_cnn.pth", map_location=device)
        model.load_state_dict(state_dict)
    except RuntimeError as e:
        print(f"Error loading ECG model: {e}")
        return None

    model.eval()
    return model  # Ensure the model is returned properly

state_dict = torch.load("best_ecg_cnn.pth", map_location="cpu")
print(state_dict["fc1.weight"].shape)  # Check the expected shape

def extract_ecg_features(image_path):
    """ Extracts ECG features from an image using a trained CNN model """
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Load the trained ECG model
    model = ECG_CNN(num_classes=4).to(device)  # Ensure num_classes matches training
    model.load_state_dict(torch.load("best_ecg_cnn.pth", map_location=device))
    model.eval()
    
   

    # Load and preprocess the image
    transform = transforms.Compose([
        transforms.Grayscale(),   # Convert to grayscale
        transforms.Resize((128, 128)),  # Resize to match training size
        transforms.ToTensor()
    ])

    img = Image.open(image_path).convert("L")
    img = transform(img).unsqueeze(0).to(device)  # Add batch dimension

    # Get features from the model
    with torch.no_grad():
        features = model(img)

    return features.cpu().numpy().flatten()  # Convert tensor to numpy array

  state_dict = torch.load("best_ecg_cnn.pth", map_location="cpu")


torch.Size([64, 128])


In [34]:

# ✅ Load PPG Model
def extract_ppg_features(csv_path):
    model = joblib.load("best_ppg_model.pkl")
    data = pd.read_csv(csv_path).drop(columns=['Label'])
    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(data)
    return np.mean(scaled_features, axis=0)  # Aggregate features


def extract_pcg_features(audio_path):
    if not os.path.exists("SOUND_LSTM_model.h5"):
        raise FileNotFoundError("Trained PCG model file 'SOUND_LSTM_model.h5' not found!")

    # ✅ Ensure model input shape matches the one used during training
    model = load_model("SOUND_LSTM_model.h5", compile=False)

    # Load and preprocess PCG (Phonocardiogram) audio
    y, sr = librosa.load(audio_path, sr=None)
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    mfccs_scaled = np.mean(mfccs.T, axis=0).reshape(1, -1)

    return mfccs_scaled

In [35]:

# ✅ Final Fusion Model
class FusionNN(nn.Module):
    def __init__(self, input_size, num_classes=3):  # 3 Classes: CAD, Arrhythmia, Normal
        super(FusionNN, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

def predict_fusion(ecg_path, ppg_path, pcg_path):
    # Extract features
    ecg_features = extract_ecg_features(ecg_path)
    ppg_features = extract_ppg_features(ppg_path)
    pcg_features = extract_pcg_features(pcg_path)

    # Concatenate features
    fusion_input = np.hstack((ecg_features, ppg_features, pcg_features))
    
    # Load Fusion Model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = FusionNN(input_size=fusion_input.shape[0], num_classes=3).to(device)
    model.load_state_dict(torch.load("fusion_model.pth", map_location=device))
    model.eval()

    # Predict
    fusion_tensor = torch.tensor(fusion_input, dtype=torch.float32).unsqueeze(0).to(device)
    with torch.no_grad():
        output = model(fusion_tensor)
        _, predicted_class = torch.max(output, 1)

    class_labels = {0: "Coronary Artery Disease (CAD)", 1: "Arrhythmia", 2: "Normal"}
    result = class_labels.get(predicted_class.item(), "Unknown")

    print(f"Final Diagnosis: {result}")
    return result

# ✅ Main Execution
if __name__ == "__main__":
    ecg_path = input("Enter ECG image path: ").strip()
    ppg_path = input("Enter PPG CSV path: ").strip()
    pcg_path = input("Enter PCG audio path: ").strip()

    if all([os.path.exists(ecg_path), os.path.exists(ppg_path), os.path.exists(pcg_path)]):
        final_result = predict_fusion(ecg_path, ppg_path, pcg_path)
    else:
        print("Error: One or more input file paths do not exist!")


Enter ECG image path:  ecg/test/ECG Images of Myocardial Infarction Patients (240x12=2880)/MI(1).jpg
Enter PPG CSV path:  PPG_Dataset.csv
Enter PCG audio path:  heart_sound/val/unhealthy/a0001.wav


  model.load_state_dict(torch.load("best_ecg_cnn.pth", map_location=device))


✅ ECG Model Loaded Successfully!


ValueError: Unrecognized keyword arguments: ['batch_shape']