Packages:

In [None]:
import glob
import os
import warnings
 
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pydicom
from pydicom.data import get_testdata_files
from sklearn.model_selection import train_test_split

from IPython.display import Markdown, display

 # Import functions from the module
import importlib
import help_files._0_definitions 
# import  help_files._1_visuals_script
# import  help_files._01_load_data
 # Reload the module to apply the changes to the script
importlib.reload(help_files._0_definitions)
 
# importlib.reload(help_files._01_load_data)
 
# Group by 'condition', 'level', and 'severity' and count occurrences
from help_files._0_definitions import count_severity_by_condition_level 
# Define the path
from pathlib import Path

pd.set_option("display.width", 1000)  # Set a large width to prevent line wrapping
 

In [None]:
### In definitions are all the functions that are used in the notebook and globals
with open("help_files/_0_definitions.py") as file:
    exec(file.read())
    ### In definitions are all the functions that are used in the notebook and globals
with open("help_files/_0_run_definitions.py") as file:
    exec(file.read())

In [None]:
# loading data
file_names = ["train_df_4_loc.csv", "test_df_4_loc.csv"]
# Load the data from the CSV files
dataframes = [pd.read_csv(data_path_vor / file_name) for file_name in file_names]
# Unpack the dataframes into separate variables
train_df, test_df = dataframes

print("DataFrames have been loaded successfully.")



In [None]:
# # end sample or small sample    
if whole_data_set == False:
    print("Using the whole data set")
else:
    train_df = train_df.sample(n=50, random_state=RSEED)
    test_df = test_df.sample(n=100, random_state=RSEED)
    display(Markdown('<span style="color:red"> this is a small sample : 48692</span>'))
    train_df.dtypes
train_df['condition'] = train_df['condition'].astype('category').cat.codes
train_data = train_df

In [None]:
train_df['image_path'] = train_df['image_path'].apply(lambda p: os.path.normpath(p))

import pydicom

test_path = train_df['image_path'].iloc[0]  # Get the first path
print("Testing file:", test_path)

try:
    dicom_image = pydicom.dcmread(test_path)
    print("File loaded successfully!")
except Exception as e:
    print(f"Failed to load file: {e}")

In [None]:
print(train_df['severity'].unique())

In [None]:
train_df['condition'] = train_df['condition'].astype('category').cat.codes
train_data = train_df

In [None]:
import os
import pydicom
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import pandas as pd
import cv2

# Function to load DICOM images and convert them to RGB
def load_dicom_image(image_path):
    """Load a DICOM image and convert it to an RGB image using pydicom and OpenCV."""
    dicom_data = pydicom.dcmread(image_path)
    image_array = dicom_data.pixel_array

    # Normalize the pixel data
    image_array = image_array.astype(float)
    image_array = (image_array / image_array.max() * 255).astype('uint8')

    # Convert to RGB if grayscale
    if len(image_array.shape) == 2:  # Grayscale image
        image = cv2.cvtColor(image_array, cv2.COLOR_GRAY2RGB)
    else:  # Already RGB
        image = image_array

    # Convert numpy array to PIL Image
    return Image.fromarray(image)


# Custom Dataset Class
class MRILocalizationDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        """
        Args:
            dataframe (pd.DataFrame): DataFrame containing image paths, x, y, and severity labels.
            transform (callable, optional): Transformations to apply to the images.
        """
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, index):
        row = self.dataframe.iloc[index]
        image_path = row['image_path']
        x = row['x']
        y = row['y']
        severity = row['severity']

        # Load and preprocess the image
        image = load_dicom_image(image_path)

        # Apply transformations
        if self.transform:
            image = self.transform(image)

        return image, (x, y), severity


# Localization Model
class LocalizationModel(nn.Module):
    def __init__(self):
        super(LocalizationModel, self).__init__()
        self.resnet = models.resnet50(pretrained=True)
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, 2)  # Output (x, y)

    def forward(self, x):
        return self.resnet(x)


# Classification Model
class ClassificationModel(nn.Module):
    def __init__(self, num_classes):
        super(ClassificationModel, self).__init__()
        self.resnet = models.resnet50(pretrained=True)
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, num_classes)  # Output severity

    def forward(self, x):
        return self.resnet(x)


# Main function for automation
def main(train_df):
    # Check if the required columns exist
    required_columns = ['study_id', 'severity', 'condition', 'level', 'series_id', 'x', 'y', 'image_path', 'missing_image', 'study_id_count']
    
    if not all(col in train_df.columns for col in required_columns):
        raise ValueError("DataFrame is missing required columns.")

    # Define transformations
    transform = transforms.Compose([
        transforms.Resize((224, 224)),  # Resize to ResNet input size
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize
    ])

    # Create Dataset and DataLoader
    train_dataset = MRILocalizationDataset(dataframe=train_df, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

    # Instantiate models
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    localization_model = LocalizationModel().to(device)
    classification_model = ClassificationModel(num_classes=3).to(device)  # Adjust for severity levels

    # Loss functions and optimizers
    criterion_localization = nn.MSELoss()
    criterion_classification = nn.CrossEntropyLoss()
    optimizer_localization = torch.optim.Adam(localization_model.parameters(), lr=0.001)
    optimizer_classification = torch.optim.Adam(classification_model.parameters(), lr=0.001)

    # Training loop
    num_epochs = 10
    for epoch in range(num_epochs):
        localization_model.train()
        classification_model.train()

        running_loss_localization = 0.0
        running_loss_classification = 0.0

        for images, (x_coords, y_coords), severities in train_loader:
            images = images.to(device)
            x_coords = torch.tensor(x_coords, dtype=torch.float32).to(device)
            y_coords = torch.tensor(y_coords, dtype=torch.float32).to(device)
            severities = torch.tensor(severities, dtype=torch.long).to(device)

            # Zero gradients
            optimizer_localization.zero_grad()
            optimizer_classification.zero_grad()

            # Localization model forward pass
            predicted_coords = localization_model(images)
            loss_localization = criterion_localization(predicted_coords, torch.stack((x_coords, y_coords), dim=1))
            loss_localization.backward()

            # Classification model forward pass
            predicted_severity = classification_model(images)
            loss_classification = criterion_classification(predicted_severity, severities)
            loss_classification.backward()

            # Update weights
            optimizer_localization.step()
            optimizer_classification.step()

            # Track losses
            running_loss_localization += loss_localization.item()
            running_loss_classification += loss_classification.item()

        # Print epoch loss
        print(f"Epoch [{epoch+1}/{num_epochs}], "
              f"Localization Loss: {running_loss_localization/len(train_loader):.4f}, "
              f"Classification Loss: {running_loss_classification/len(train_loader):.4f}")

    print("Training Complete")

    # Save the models
    torch.save(localization_model.state_dict(), 'localization_model.pth')
    torch.save(classification_model.state_dict(), 'classification_model.pth')

    # Inference Example
    localization_model.eval()
    classification_model.eval()
    with torch.no_grad():
        for images, (x_coords, y_coords), severities in train_loader:
            images = images.to(device)
            predicted_coords = localization_model(images)
            predicted_severity = classification_model(images)

            # Output results
            print(f"Predicted Coordinates: {predicted_coords}")
            print(f"Predicted Severity: {predicted_severity}")


# Run the main function automatically
if __name__ == "__main__":
    # Load DataFrame from a CSV file (Replace with your actual file)
    train_df = train_df  # Make sure to provide the correct path to your CSV file

    # Automatically run the main function
    main(train_df)


In [None]:
ss

Predictions

In [None]:
import torch
import torchvision.models as models

# Step 1: Define the model architecture (same as training)
model_for_prediciton = models.resnet50()
model_for_prediciton.fc = torch.nn.Linear(model_for_prediciton.fc.in_features, 3)  # Update for your classes

# Step 2: Load the best model weights
# Adjust this path if necessary
best_weights_path = "C:/Users/HP1/Desktop/Spiced/capstone-project/mlruns/711328181724227740/deaa903e488043d19aad572a6c429479/artifacts/best_model_weights_epoch_1.pt"
model_for_prediciton.load_state_dict(torch.load(best_weights_path))

# Step 3: Set the model to evaluation mode
model_for_prediciton.eval()
model_for_prediciton.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))

In [None]:
# Replace <run_id> with your actual run ID and <path_to_model> with the artifact path used during logging
model_path = r"C:\Users\HP1\Desktop\Spiced\capstone-project\mlruns\711328181724227740\deaa903e488043d19aad572a6c429479\artifacts\final_model"
model_for_prediciton = mlflow.pytorch.load_model(model_path)

model_for_prediciton.eval()  # Set the model to evaluation mode

Restore parameters and metrtics from mlflow

I should take prdicted probabilities and not predicted classes: output scores (logits) are converted into probabilities using the softmax function

In [None]:
# class definition
import mlflow
import mlflow.pytorch
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.models as models
from torchvision import transforms
import pydicom
import cv2
import pandas as pd
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import numpy as np



# Load the model
model = model_for_prediciton
model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))  # Move model to appropriate device
model.eval()  # Set the model to evaluation mode

# Prepare data
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

class MRIDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        self.transform = transform
        self.data['severity'] = self.data['severity'].astype(int)

    def __getitem__(self, index):
        row = self.data.iloc[index]
        image_path = row['image_path']
        label = row['severity']
        dicom_image = pydicom.dcmread(image_path)
        image = dicom_image.pixel_array.astype(float)
        image = (image / image.max() * 255).astype('uint8')
        if len(image.shape) == 2:  # Grayscale
            image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
        image_tensor = self.transform(image) if self.transform else torch.from_numpy(image).permute(2, 0, 1)
        return image_tensor, torch.tensor(label).long()

    def __len__(self):
        return len(self.data)

predicting probabilities for test data

In [None]:
test_data = test_df

In [None]:
# calculate prbabilities and probability list
test_dataset = MRIDataset(data=test_data, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Inference loop with probability extraction
results = []
probabilities_list = []

with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)  # Ensure labels are also on the correct device
        outputs = model(images)
        probabilities = torch.softmax(outputs, dim=1)  # Calculate class probabilities
        _, predicted_classes = torch.max(outputs, 1)
        
        # Append predictions and probabilities
        results.append(predicted_classes.item())
        probabilities_list.append(probabilities.cpu().numpy())



In [None]:
# Print predicted classes and their probabilities
for i, (pred, probs) in enumerate(zip(results, probabilities_list)):
    print(f"Test image {i}: Predicted class {pred}, Probabilities: {probs}")

# Additional code to plot the confusion matrix can stay as-is:
true_labels = []
predicted_labels = []

for images, labels in test_loader:
    images = images.to(device)
    true_labels.extend(labels.numpy())

    with torch.no_grad():
        outputs = model(images)
        _, predicted_classes = torch.max(outputs, 1)
        predicted_labels.extend(predicted_classes.cpu().numpy())

true_labels = np.array(true_labels)
predicted_labels = np.array(predicted_labels)

print("True Labels Unique Values:", np.unique(true_labels))
print("Predicted Labels Unique Values:", np.unique(predicted_labels))


In [None]:
# Convert the list of probabilities to a numpy array for easier manipulation
probabilities_array = np.vstack(probabilities_list)

# Ensure the length of probabilities_array matches the length of test_df
probabilities_array = probabilities_array[:len(test_df)]

# Add the probabilities to the test_df DataFrame
test_df['Probability_Class_0'] = probabilities_array[:, 0]
test_df['Probability_Class_1'] = probabilities_array[:, 1]
test_df['Probability_Class_2'] = probabilities_array[:, 2]

# Round the probabilities to 4 decimal places
test_df['Probability_Class_0'] = test_df['Probability_Class_0'].round(4)
test_df['Probability_Class_1'] = test_df['Probability_Class_1'].round(4)
test_df['Probability_Class_2'] = test_df['Probability_Class_2'].round(4)
# Add the predicted classes to the test_df DataFrame
test_df['Predicted_Class'] = results



# Display the updated DataFrame
 
test_df.head()

In [None]:
# Group by severity and sum the Probability_Class_1
severity_prob_sum_all_classes = test_df.groupby('severity')[['Probability_Class_0', 'Probability_Class_1', 'Probability_Class_2']].mean()

# Print the result
severity_prob_sum_all_classes
# Use severity_prob_sum_all_classes as confusion matrix
conf_matrix_prob_df = severity_prob_sum_all_classes 
# Create a confusion matrix from severity and predicted class
conf_matrix_severity_pred = confusion_matrix(test_df['severity'], test_df['Predicted_Class'])

# Create a DataFrame from the confusion matrix
conf_matrix_severity_pred_df = pd.DataFrame(
    conf_matrix_severity_pred, 
    index=[f"Actual {i}" for i in range(len(conf_matrix_severity_pred))], 
    columns=[f"Predicted {i}" for i in range(len(conf_matrix_severity_pred[0]))]
)

# Print the confusion matrix DataFrame
print(conf_matrix_severity_pred_df)

# Optionally, display it using a more formatted view (e.g., in Jupyter Notebook)
conf_matrix_severity_pred_df.style.background_gradient(cmap='Blues')
# Print the confusion matrix DataFrame
print(conf_matrix_prob_df)

# Optionally, display it using a more formatted view (e.g., in Jupyter Notebook)
conf_matrix_prob_df.style.background_gradient(cmap='Blues')


In [None]:
conf_matrix_severity_pred_df.style.background_gradient(cmap='Blues')

In [None]:
from sklearn.metrics import confusion_matrix
import pandas as pd
import numpy as np

# Ensure true_labels and predicted_labels have the same length
min_length = min(len(true_labels), len(predicted_labels))
true_labels = true_labels[:min_length]
predicted_labels = predicted_labels[:min_length]

# Generate the confusion matrix
conf_matrix = confusion_matrix(true_labels, predicted_labels)

# Create a DataFrame from the confusion matrix
conf_matrix_df = pd.DataFrame(
    conf_matrix, 
    index=[f"Actual {i}" for i in range(len(conf_matrix))], 
    columns=[f"Predicted {i}" for i in range(len(conf_matrix[0]))]
)

# Print the confusion matrix DataFrame
print(conf_matrix_df)

# Optionally, display it using a more formatted view (e.g., in Jupyter Notebook)
conf_matrix_df.style.background_gradient(cmap='Blues')


In [None]:
from sklearn.metrics import precision_score, recall_score, accuracy_score
# Calculate precision, recall, and accuracy using mean_probabilities_df
precision = precision_score(true_labels, predicted_labels, average='weighted')
recall = recall_score(true_labels, predicted_labels, average='weighted')
accuracy = accuracy_score(true_labels, predicted_labels)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"Accuracy: {accuracy:.4f}")
