# EM, SDI, and FE Multi-Output Model

### Set Up Data Loaders
Set up DataLoader objects for the training, validation, and test datasets.

In [9]:
import pandas as pd
import torch
from torch.utils.data import Dataset
from PIL import Image
import os
from sklearn.preprocessing import MinMaxScaler, StandardScaler

class CustomImageDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None, scaler=None):
        self.root_dir = root_dir
        self.transform = transform
        self.annotations = pd.read_csv(csv_file, header=None, names=['EM', 'SDI', 'FE'])
        
        # Filter out rows where the corresponding image does not exist
        def file_exists(idx):
            file_path = os.path.join(root_dir, f"{idx}LD_Data.png")
            exists = os.path.isfile(file_path)
            if not exists:
                print(f"File not found: {file_path}")
            return exists
        
        self.annotations = self.annotations[
            self.annotations.index.to_series().apply(file_exists)
        ]

        # Initialize the scaler and fit it on the combined target values (EM, SDI, FE)
        self.scaler = scaler
        if self.scaler is not None:
            combined_values = self.annotations[['EM', 'SDI', 'FE']].values
            self.scaler.fit(combined_values)

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, f"{self.annotations.index[idx]}LD_Data.png")
        image = Image.open(img_name).convert("RGB")
        em = self.annotations.iloc[idx, 0]
        sdi = self.annotations.iloc[idx, 1]
        fe = self.annotations.iloc[idx, 2]
        
        # Normalize/Standardize the combined target values (EM, SDI, FE)
        if self.scaler is not None:
            combined_values = self.scaler.transform([[em, sdi, fe]])[0]
            em, sdi, fe = combined_values
        
        if self.transform:
            image = self.transform(image)
        return image, (em, sdi, fe)

# Load images and FEA input parameters

In [None]:
from sklearn.preprocessing import MinMaxScaler
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
import os
import pandas as pd
import torch
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# Define data transformations
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),  # Resize directly to 224x224
        transforms.RandomHorizontalFlip(),  # Randomly flip the image horizontally
        transforms.ToTensor(),  # Convert the image to a tensor
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalize with mean and std
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),  # Resize directly to 224x224
        transforms.ToTensor(),  # Convert the image to a tensor
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalize with mean and std
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),  # Resize directly to 224x224
        transforms.ToTensor(),  # Convert the image to a tensor
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalize with mean and std
    ]),
}

# Define the data directory and CSV file
data_dir = 'SimulationImages'
csv_file = 'InputParameters.csv'

# Initialize the scaler
scaler = MinMaxScaler()

# Create the full dataset
full_dataset = CustomImageDataset(csv_file, data_dir, data_transforms['train'], scaler=scaler)

# Calculate the sizes for training, validation, and test sets
total_size = len(full_dataset)
train_size = int(0.80 * total_size)
val_size = int(0.18 * total_size)
test_size = total_size - train_size - val_size

# Split the dataset
train_dataset, val_dataset, test_dataset = random_split(full_dataset, [train_size, val_size, test_size])

# Apply the appropriate transformations to each subset
train_dataset.dataset.transform = data_transforms['train']
val_dataset.dataset.transform = data_transforms['val']
test_dataset.dataset.transform = data_transforms['test']

# Create dataloaders
dataloaders = {
    'train': DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0),
    'val': DataLoader(val_dataset, batch_size=32, shuffle=True, num_workers=0),
    'test': DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0)  # Set shuffle to False
}

# Get dataset sizes
dataset_sizes = {x: len(dataloaders[x].dataset) for x in ['train', 'val', 'test']}

# Print dataset sizes
print(f"Training set size: {dataset_sizes['train']}")
print(f"Validation set size: {dataset_sizes['val']}")
print(f"Test set size: {dataset_sizes['test']}")

In [10]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Transfer learning with ResNet18

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
import time
import copy
import pickle

# Load the pre-trained ResNet model if not already loaded
resnet = models.resnet18(pretrained=True)

# Modify the final fully connected layer for multi-output regression
num_ftrs = resnet.fc.in_features
resnet.fc = nn.Linear(num_ftrs, 3)  # Output three values: EM, SDI, and FE

# Move the model to the GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
resnet = resnet.to(device)

# Define the loss function and optimizer
criterion = nn.MSELoss()    # Mean Squared Error loss for regression
optimizer = optim.Adam(resnet.parameters(), lr=0.001)   # Adam optimizer with a learning rate of 0.001

# Training and validation function
# The train_model function is used to train the model. It takes the model, dataloaders, criterion, optimizer, and number of epochs as input.
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25):
    since = time.time() # Get the current time

    best_model_wts = copy.deepcopy(model.state_dict())  # Deep copy the model weights to keep track of the best model
    best_loss = float('inf')    # Set the best loss to infinity to ensure that the first model is saved

    epoch_vect = []
    train_vect = []
    val_vect = []
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        epoch_vect.append(epoch)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0  # Initialize the running loss to 0

            # Iterate over data.
            for inputs, values in dataloaders[phase]:
                inputs = inputs.to(device)  # Move the inputs to the GPU if available
                em = values[0][:].to(device).unsqueeze(1).float()   # Extract the EM value and move to GPU if available
                sdi = values[1][:].to(device).unsqueeze(1).float()   # Extract the SDI value and move to GPU if available
                fe = values[2][:].to(device).unsqueeze(1).float()   # Extract the FE value and move to GPU if available

                # Combine the targets into a single tensor
                targets = torch.cat((em, sdi, fe), dim=1)

                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward pass
                with torch.set_grad_enabled(phase == 'train'): # Set the gradients to be computed only during training
                    outputs = model(inputs) # Compute the output of the model
                    loss = criterion(outputs, targets)   # Compute the loss using the output of the model and the targets

                    # Backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward() # Compute the gradients of the loss with respect to the model parameters
                        optimizer.step()    # Update the model parameters using the gradients and the optimizer

                # Statistics
                running_loss += loss.item() * inputs.size(0)    # Multiply the loss by the batch size and add it to the running loss

            epoch_loss = running_loss / len(dataloaders[phase].dataset)   # Compute the epoch loss by dividing the running loss by the number of samples in the dataset

            print(f'{phase} Loss: {epoch_loss:.4f}')
            if phase == 'train':
                train_vect.append(epoch_loss)
            if phase == 'val':
                val_vect.append(epoch_loss)

            # Deep copy the model
            if phase == 'val' and epoch_loss < best_loss:   # If the phase is validation and the epoch loss is less than the best loss
                best_loss = epoch_loss  # Update the best loss
                best_model_wts = copy.deepcopy(model.state_dict())  # Save a deep copy of the model weights

        print() # Print an empty line

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')   # Print the time taken for training
    print(f'Best val Loss: {best_loss:.4f}')    # Print the best validation loss
    
    with open("TrainLoss", "wb") as fp:   #Pickling
        pickle.dump(train_vect, fp)

    with open("ValLoss", "wb") as fp:   #Pickling
        pickle.dump(val_vect, fp)
    
    # Load best model weights
    model.load_state_dict(best_model_wts)   # Load the best model weights
    return model

# Train the model using the method defined above for the number of epochs specified below
num_epochs = 100
resnet = train_model(resnet, dataloaders, criterion, optimizer, num_epochs=num_epochs)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import random
import torch
from sklearn.preprocessing import MinMaxScaler
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split

# Set random seed for reproducibility
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

# Function to display images with true and predicted values
def imshow(inp, ax, title=None, fontsize=12):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    ax.imshow(inp)
    if title is not None:
        ax.set_title(title, fontsize=fontsize)  # Set the font size of the title
    ax.axis('off')  # Remove axes

# Get a batch of test data
inputs, targets = next(iter(dataloaders['test']))
inputs = inputs.to(device)
targets = torch.cat([targets[i].unsqueeze(1) for i in range(3)], dim=1).to(device).float()  # Combine EM, SDI, and FE


# Get model predictions
resnet.eval()
with torch.no_grad():
    outputs = resnet(inputs)

# Convert to CPU for visualization
inputs = inputs.cpu()
targets = targets.cpu()
outputs = outputs.cpu()

# Denormalize/Destandardize the predictions and true values
targets_denorm = scaler.inverse_transform(targets.numpy())
outputs_denorm = scaler.inverse_transform(outputs.numpy())

with open("Target", "wb") as fp:   #Pickling
    pickle.dump(targets_denorm, fp)

with open("Output", "wb") as fp:   #Pickling
    pickle.dump(outputs_denorm, fp)

# Plot the images with true and predicted values for EM, SDI, and FE
fig, axes = plt.subplots(8, 4, figsize=(32, 64))  # Increase figsize to make individual plots bigger
axes = axes.flatten()
for i in range(32):
    imshow(inputs[i], ax=axes[i], title=f"EM P: {outputs_denorm[i][0]:,.0f}, T: {targets_denorm[i][0]:,.0f}, E: {(abs(outputs_denorm[i][0] - targets_denorm[i][0]) / targets_denorm[i][0]) *100:.2f}%\n"
                                        f"SDI P: {outputs_denorm[i][1]:.5f}, T: {targets_denorm[i][1]:.5f}, E: {(abs(outputs_denorm[i][1] - targets_denorm[i][1]) / targets_denorm[i][1]) *100:.2f}%\n"
                                        f"FE P: {outputs_denorm[i][2]:.4f}, T: {targets_denorm[i][2]:.4f}, E: {(abs(outputs_denorm[i][2] - targets_denorm[i][2]) / targets_denorm[i][2]) *100:.2f}%", fontsize=20)

plt.subplots_adjust(wspace=0.1, hspace=0.3)  # Adjust spacing between subplots
plt.show()

# Save the Model, Weights, etc.

In [None]:
import os
import torch
import joblib  # For saving the scaler


# Create the folder if it doesn't exist
os.makedirs('SavedModels', exist_ok=True)

# Save the entire model
torch.save(resnet, 'SavedModels/EM_SDI_FE_ResNet.pth')

# Save only the model state dictionary (weights)
torch.save(resnet.state_dict(), 'SavedModels/EM_SDI_FE_ResNet_weights.pth')

# Save the scaler
joblib.dump(scaler, 'SavedModels/EM_SDI_FE_ResNet_scaler.pkl')

# Save the optimizer state
torch.save(optimizer.state_dict(), 'SavedModels/EM_SDI_FE_ResNet_optimizer.pth')

# Save the training configuration
training_config = {
    'learning_rate': 0.001,
    'batch_size': 32,
    'num_epochs': 100,
    'seed': 42,
    'model_architecture': 'resnet18',
    'output_features': 3
}
joblib.dump(training_config, 'SavedModels/EM_SDI_FE_ResNet_config.pkl')

### Re-Create Model and Load Saved Weights
This is good if you go to another system that may have different versions of Pytorch or Python.

In [None]:
import torch
import torch.nn as nn
from torchvision import models
from sklearn.preprocessing import MinMaxScaler
import joblib  # For loading the scaler
import numpy as np
import random
import torch.optim as optim
import time
import copy
import pickle

# Set random seed for reproducibility
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

# Define the model architecture
resnet = models.resnet18(pretrained=False)
num_ftrs = resnet.fc.in_features
resnet.fc = nn.Linear(num_ftrs, 3)  # Output three values: EM, SDI, and FE

# Load the model state dictionary (weights)
resnet.load_state_dict(torch.load('SavedModels/EM_SDI_FE_ResNet_weights.pth'))
resnet.eval()  # Set the model to evaluation mode

# Move the model to the GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
resnet = resnet.to(device)

# Load the scaler used during training
scaler = joblib.load('SavedModels/EM_SDI_FE_ResNet_scaler.pkl')  # Assuming you saved the scaler using joblib

# Load the optimizer state
optimizer = optim.Adam(resnet.parameters(), lr=0.001)  # Define the optimizer
optimizer.load_state_dict(torch.load('SavedModels/EM_SDI_FE_ResNet_optimizer.pth'))

# Load the training configuration
training_config = joblib.load('SavedModels/EM_SDI_FE_ResNet_config.pkl')
print(training_config)

# Experimental Results

In [None]:
# Load the image Exp01.png from the Images/Experiments folder
# and make a prediction using the model
from PIL import Image
import torch
from torchvision import transforms
import numpy as np
import csv

EMvector = []
SDIvector = []
FEvector = []


with open('FEAInputData.csv', 'w', newline='') as csvfile:
    spamwriter = csv.writer(csvfile, quoting=csv.QUOTE_ALL)

    for ki in range(0, 12):
        # Load the image
        #if ki < 10:
            #img = Image.open('Images/Experiments/0'+str(ki)+' (Unload).png').convert("RGB")
        #else:
        img = Image.open('Experiments/'+str(ki)+'Exp.png').convert("RGB")
        # Define the transformations
        data_transforms = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
        
        # Apply the transformations
        img = data_transforms(img).unsqueeze(0)
        
        # Move the image to the GPU if available
        device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        img = img.to(device)
        
        # Make a prediction
        resnet.eval()
        with torch.no_grad():
            output = resnet(img)
        
        # Convert the output to CPU for further processing
        output = output.cpu().numpy()
        
        # Denormalize/Destandardize the predictions
        output_denorm = scaler.inverse_transform(output)
        
        EMvector.append(output_denorm[0][0])
        SDIvector.append(output_denorm[0][1])
        FEvector.append(output_denorm[0][2])

        spamwriter.writerow([output_denorm[0][0], output_denorm[0][1], output_denorm[0][2]])
    
    print(EMvector)
    print(SDIvector)
    print(FEvector)
    
    print(np.multiply(EMvector, SDIvector))
    print(np.mean(np.multiply(EMvector, SDIvector)))
    
    # Print the predictions
    print(f"EM: {output_denorm[0][0]:,.0f} MPa")
    print(f"SDI: {output_denorm[0][1]:.5f} mm/mm")
    print(f"FE: {output_denorm[0][2]:.4f} kJ/m^2")





# Model Interpretation Using Captum

[Captum](https://captum.ai/) (“comprehension” in Latin) is an open source, extensible library for model interpretability built on PyTorch.

With the increase in model complexity and the resulting lack of transparency, model interpretability methods have become increasingly important. Model understanding is both an active area of research as well as an area of focus for practical applications across industries using machine learning. Captum provides state-of-the-art algorithms, including Integrated Gradients, to provide researchers and developers with an easy way to understand which features are contributing to a model’s output.

Full documentation, an API reference, and a suite of tutorials on specific topics are available at the [captum.ai](https://captum.ai/) website.

In [None]:
import captum
from captum.attr import Occlusion
from captum.attr import visualization as viz
from matplotlib.colors import LinearSegmentedColormap
from PIL import Image
import torch
from torchvision import transforms
import numpy as np
import matplotlib.pyplot as plt



test_image_number = 1
for test_image_number in range(0,9):
    print(test_image_number)
    # Load the image
    img_path = 'Images/Experiments_Update2/'+str(test_image_number)+'Exp.png'
    input_img = Image.open(img_path).convert("RGB")
    # Define the transformations
    data_transforms = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    
    # Apply the transformations
    input_img = data_transforms(input_img).unsqueeze(0)
    
    # Move the image to the GPU if available
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    input_img = input_img.to(device)
    
    # Make a prediction
    resnet.eval()
    with torch.no_grad():
        output = resnet(input_img)
    
    # Convert the output to CPU for further processing
    output = output.cpu().numpy()
    
    # Denormalize/Destandardize the predictions
    output_denorm = scaler.inverse_transform(output)
    pred_em = outputs_denorm[test_image_number][0]
    pred_sdi = outputs_denorm[test_image_number][1]
    pred_fe = outputs_denorm[test_image_number][2]
    
    # Initialize Occlusion with the regression model
    occlusion = Occlusion(resnet)
    
    # Compute the occlusion attributions for each output
    attributions_occ_em = occlusion.attribute(input_img,
                                              target=0,
                                              strides=(3, 8, 8),
                                              sliding_window_shapes=(3, 15, 15),
                                              baselines=0)
    
    attributions_occ_sdi = occlusion.attribute(input_img,
                                               target=1,
                                               strides=(3, 8, 8),
                                               sliding_window_shapes=(3, 15, 15),
                                               baselines=0)
    
    attributions_occ_fe = occlusion.attribute(input_img,
                                              target=2,
                                              strides=(3, 8, 8),
                                              sliding_window_shapes=(3, 15, 15),
                                              baselines=0)
    

    # Transpose the attributions and input image to match the expected shape for visualization
    attr_em = np.transpose(attributions_occ_em.squeeze().cpu().detach().numpy(), (1, 2, 0))
    attr_sdi = np.transpose(attributions_occ_sdi.squeeze().cpu().detach().numpy(), (1, 2, 0))
    attr_fe = np.transpose(attributions_occ_fe.squeeze().cpu().detach().numpy(), (1, 2, 0))
    input_img_np = np.transpose(input_img.squeeze().cpu().detach().numpy(), (1, 2, 0))
    
    # Create a figure and axes
    fig, ax = plt.subplots(1, 1, figsize=(9, 4.5))
    
    # Plot the original image
    ax.imshow(input_img_np)
    ax.axis('off')
    
    # Overlay the positive attributions for Elastic Modulus (EM) in blue
    viz.visualize_image_attr(
        attr_em,
        input_img_np,
        method="blended_heat_map",
        sign="positive",
        show_colorbar=True,
        fig_size=(9, 4.5),
        use_pyplot=False,
        plt_fig_axis=(fig, ax),
        cmap='Blues'
    )
    plt.savefig('Captum_Emod'+str(test_image_number)+'.png', bbox_inches='tight')
    plt.show()

    # Create a figure and axes
    fig, ax = plt.subplots(1, 1, figsize=(9, 4.5))
    # Plot the original image
    ax.imshow(input_img_np)
    ax.axis('off')
    # Overlay the positive attributions for Strain at Damage Initiation (SDI) in green
    viz.visualize_image_attr(
        attr_sdi,
        input_img_np,
        method="blended_heat_map",
        sign="positive",
        show_colorbar=True,
        fig_size=(9, 4.5),
        use_pyplot=False,
        plt_fig_axis=(fig, ax),
        cmap='Greens'
    )
    plt.savefig('Captum_SDI'+str(test_image_number)+'.png', bbox_inches='tight')
    plt.show()

    # Create a figure and axes
    fig, ax = plt.subplots(1, 1, figsize=(9, 4.5))
    # Plot the original image
    ax.imshow(input_img_np)
    ax.axis('off')
    # Overlay the positive attributions for Fracture Energy (FE) in red
    viz.visualize_image_attr(
        attr_fe,
        input_img_np,
        method="blended_heat_map",
        sign="positive",
        show_colorbar=True,
        fig_size=(9, 4.5),
        use_pyplot=False,
        plt_fig_axis=(fig, ax),
        cmap='Reds'
    )
    plt.savefig('Captum_FE'+str(test_image_number)+'.png', bbox_inches='tight')
    plt.show()
    # Add a single colorbar for the combined plot
    #sm = plt.cm.ScalarMappable(cmap='coolwarm')
    #sm.set_array([])
    #sm.set_clim(0.9999999, 1.0) 
    #fig.colorbar(sm, ax=ax, orientation='vertical')

    print(np.min(attr_fe))

    plt.tight_layout()
    plt.show()