Here we use tandem neural Networks to solve the problem.
Similar architectures to those used in the VAE, here adopted in a deterministic way. In a first step a forward network is trained to identify the density associated to a structure. In a second step the forward network is frozen and a backward network is trained to identify a microstructure associated to a density value in a deterministic way.

This is a one to one relationship. A pragmatic approach to solve the problem

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import os
import re
import torch
from torch.utils.data import Dataset, random_split, DataLoader
import numpy as np
import os



In [None]:
path = 'c:/Users/Pietro/Desktop/Porosities/Porosities/'
os.chdir(path)

In [None]:
from Lib.Data import PorosityDistribution, extract_microstructures
from Lib.Datasets import  MicrostructureDataset

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

In [None]:
sample_path = os.getcwd()+'/Job_Assignment_Data/'

In [None]:
extracted_data = extract_microstructures(sample_path,keep_density_doubles=False)

In [None]:
extracted_data[77].plot_porosity_distribution()

In [None]:
len(extracted_data.keys())

In [None]:
# Create train, validation, and test datasets
train_dataset = MicrostructureDataset(sample_path, train=True, val=False, test=False,device=device,keep_doubles=True)
val_dataset = MicrostructureDataset(sample_path, train=False, val=True, test=False,device=device,keep_doubles=True)
test_dataset = MicrostructureDataset(sample_path, train=False, val=False, test=True,device=device,keep_doubles=True)

# Create DataLoaders
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
import torch.nn as nn

class Forward(nn.Module):
    def __init__(self,scale=1):
        super(Forward, self).__init__()
        self.scale = scale

        # 3D Convolutional Layers with Batch Normalization
        self.conv1 = nn.Conv3d(1, 4*scale, kernel_size=3, stride=2) #Bx8x14x14x14
        self.bn1 = nn.BatchNorm3d(4*scale)  # Batch Normalization after conv1
        self.conv2 = nn.Conv3d(4*scale, 8*scale, kernel_size=3, stride=2) #Bx16x6x6x6
        self.bn2 = nn.BatchNorm3d(8*scale)  # Batch Normalization after conv2
        self.conv3 = nn.Conv3d(8*scale, 16*scale, kernel_size=3, stride=2) #Bx32x2x2x2
        self.bn3 = nn.BatchNorm3d(16*scale)  # Batch Normalization after conv3
        self.conv4 = nn.Conv3d(16*scale, 32*scale, kernel_size=2, stride=2) #Bx64x1x1x1
        self.bn4 = nn.BatchNorm3d(32*scale)  # Batch Normalization after conv3

        # Linear Layers with Dropout
        self.fc1 = nn.Linear(32*scale, 16*scale)
        self.dropout1 = nn.Dropout(0.1)  # Dropout after fc1
        self.fc2 = nn.Linear(16*scale, 8*scale)
        self.dropout2 = nn.Dropout(0.1)  # Dropout after fc2
        self.fc3 = nn.Linear(8*scale, 4*scale)
        self.dropout3 = nn.Dropout(0.1)  # Dropout after fc3
        self.regressor = nn.Linear(4*scale,1)

        # Activation Function
        self.Silu = nn.SiLU()

    def convolution(self,x):
        x = self.Silu(self.bn1(self.conv1(x)))
        x = self.Silu(self.bn2(self.conv2(x)))
        x = self.Silu(self.bn3(self.conv3(x)))
        x = self.Silu(self.bn4(self.conv4(x)))

        return x

    def nl_projection(self,x):
        x = self.Silu(self.dropout1(self.fc1(x)))
        x = self.Silu(self.dropout2(self.fc2(x)))
        x = self.Silu(self.dropout2(self.fc3(x)))

        return self.regressor(x)

    def forward(self, x):
        x = self.convolution(x)
        x = x.view(-1, 32 * self.scale)  # Flatten for linear layers
        x = self.nl_projection(x)

        return x.squeeze()
    
    
class Backward(nn.Module):
    def __init__(self,scale=1):
        super(Backward, self).__init__()
        self.scale = scale

        # Linear Layers with Dropout
        self.fc1 = nn.Linear(1,4*scale)
        self.dropout1 = nn.Dropout(0.1)  # Dropout after fc1
        self.fc2 = nn.Linear(4*scale, 8*scale)
        self.dropout2 = nn.Dropout(0.1)  # Dropout after fc2
        self.fc3 = nn.Linear(8*scale, 16*scale)
        self.dropout3 = nn.Dropout(0.1)  # Dropout after fc3
        self.fc4 = nn.Linear(16*scale,32*scale)
        self.dropout4 = nn.Dropout(0.1)  # Dropout after fc3
        self.regressor = nn.Linear(32*scale,30*30*30)

        # Activation Function
        self.Silu = nn.SiLU()


    def forward(self,x):
        x = self.Silu(self.dropout1(self.fc1(x)))
        x = self.Silu(self.dropout2(self.fc2(x)))
        x = self.Silu(self.dropout2(self.fc3(x)))
        x = self.Silu(self.dropout2(self.fc4(x)))
        x = self.regressor(x)
        
        x = torch.sigmoid(x)

        return x.view(-1,30,30,30)


In [None]:
(X,y) = next(iter(train_dataloader))
forward = Forward(scale=4)
forward.to(device)
forward(X[:,3,:,:,:].unsqueeze(1))


In [None]:
import torch.optim as optim

# Define the optimizer

optimizer = optim.Adam(forward.parameters(), lr=1e-4)

In [None]:
# Define the loss function
criterion = nn.MSELoss()

In [None]:
# Training loop
num_epochs = 350
scale = 0.05

train_losses = []
val_losses = []

for epoch in range(num_epochs):
    # Training
    forward.train()  # Set the model to training mode
    running_train_loss = 0.0
    for i, (inputs, targets) in enumerate(train_dataloader):
        inputs = inputs[:,3,:,:,:].unsqueeze(1)
        optimizer.zero_grad()  # Zero the gradients
        outputs = forward(inputs)  # Forward pass
        loss = criterion(outputs, targets)/scale  # Calculate the loss
        loss.backward()  # Backpropagate the gradients
        optimizer.step()  # Update the model's weights
        running_train_loss += loss.item()
    epoch_train_loss = running_train_loss / len(train_dataloader)
    train_losses.append(epoch_train_loss)

    # Validation
    forward.eval()  # Set the model to evaluation mode
    running_val_loss = 0.0
    with torch.no_grad():  # Disable gradient calculation during validation
        for i, (inputs, targets) in enumerate(val_dataloader):
            inputs = inputs[:,3,:,:,:].unsqueeze(1)
            outputs = forward(inputs)
            loss = criterion(outputs, targets)/scale
            running_val_loss += loss.item()
    epoch_val_loss = running_val_loss / len(val_dataloader)
    val_losses.append(epoch_val_loss)

    print(f"Epoch [{epoch + 1}/{num_epochs}] Train Loss: {epoch_train_loss:.4f} Val Loss: {epoch_val_loss:.4f}")

print("Finished Training")

In [None]:
import matplotlib.pyplot as plt

# Plot the training and validation loss
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()

In [None]:
# Get predictions for the test dataset
forward.eval()  # Set the model to evaluation mode
all_predictions = []
all_targets = []
with torch.no_grad():  # Disable gradient calculation
    for inputs, targets in test_dataloader:
        predictions = forward(inputs[:,3,:,:,:].unsqueeze(1))
        all_predictions.extend(predictions.tolist())
        all_targets.extend(targets.tolist())

In [None]:
fig = px.scatter(x=all_targets, y=all_predictions,
                 opacity=0.5,  # Set opacity for better visualization
                 trendline="ols",  # Add trendline using Ordinary Least Squares
                 title="Regression Plot: Actual vs. Predicted Density")

# Add ideal line (y = x)
x_range = np.linspace(min(all_targets), max(all_targets), 100)
fig.add_scatter(x=x_range, y=x_range, mode='lines',
                line=dict(color='red', dash='dash'),
                name='Ideal Line (y = x)')

fig.update_layout(
    xaxis_title="Actual Density",
    yaxis_title="Predicted Density"
)

fig.show()

In [None]:
import matplotlib.pyplot as plt

# Get predictions for the test dataset
forward.eval()  # Set the model to evaluation mode
all_predictions = []
all_targets = []
with torch.no_grad():  # Disable gradient calculation
    for inputs, targets in train_dataloader:
        predictions = forward(inputs[:,3,:,:,:].unsqueeze(1))
        all_predictions.extend(predictions.tolist())
        all_targets.extend(targets.tolist())

In [None]:
fig = px.scatter(x=all_targets, y=all_predictions,
                 opacity=0.5,  # Set opacity for better visualization
                 trendline="ols",  # Add trendline using Ordinary Least Squares
                 title="Regression Plot: Actual vs. Predicted Density")

# Add ideal line (y = x)
x_range = np.linspace(min(all_targets), max(all_targets), 100)
fig.add_scatter(x=x_range, y=x_range, mode='lines',
                line=dict(color='red', dash='dash'),
                name='Ideal Line (y = x)')

fig.update_layout(
    xaxis_title="Actual Density",
    yaxis_title="Predicted Density"
)

fig.show()

In [None]:
backward = Backward(scale=4).to(device)

for param in forward.parameters():
    param.requires_grad = False
    
forward.eval()

In [None]:
reg_criterion = torch.nn.BCELoss()
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(backward.parameters(),lr=1e-3)

In [None]:
num_epochs = 1000
alpha = 0.5
stable_reg = 1
stable_loss = 1
for epoch in range(num_epochs):
    for inputs, targets in train_dataloader:
        backward.train()
        inputs = inputs[:,3,:,:,:].unsqueeze(1)
        rec_structure = backward(targets.unsqueeze(-1))
        rec_structure = rec_structure.unsqueeze(1)
        reg_loss = reg_criterion(rec_structure,inputs)
        loss = criterion(forward(rec_structure),targets)
        tot_loss = (1-alpha)*reg_loss/stable_reg+alpha*loss/stable_loss
        
        # Backward Pass and Optimization
        optimizer.zero_grad()
        tot_loss.backward()
        optimizer.step()
    
    if (epoch+1) % 10 == 0:
        print(f'Train: Epoch [{epoch+1}/{num_epochs}], Reg: {reg_loss.item():.4f}, Loss: {loss.item():.4f}, Tot_Loss: {tot_loss.item():.4f}')
        
        backward.eval()
        for inputs, targets in test_dataloader:
            inputs = inputs[:,3,:,:,:].unsqueeze(1)
            rec_structure = backward(targets.unsqueeze(-1))
            rec_structure = rec_structure.unsqueeze(1)
            reg_loss = reg_criterion(rec_structure,inputs)
            loss = criterion(forward(rec_structure),targets)
            tot_loss = (1-alpha)*reg_loss/stable_reg+alpha*loss/stable_loss
        
        print(f'Validation: Epoch [{epoch+1}/{num_epochs}], Reg: {reg_loss.item():.4f}, Loss: {loss.item():.4f}, Tot_Loss: {tot_loss.item():.4f}')

In [None]:
original_data = extracted_data[0]
original_data.plot_porosity_distribution()

In [None]:
device = next(backward.parameters()).device
micro = original_data.as_tensor().to(device)
micro = micro.permute(3,0,1,2)
micro_rec = micro
X,y = micro[3,:,:,:].unsqueeze(0),micro[4,0,0,0].unsqueeze(dim=0)
print(micro.shape)
X_rec = backward(y)
micro_rec[3,:,:,:] = X_rec.squeeze()
micro_rec = micro_rec.permute(1,2,3,0).reshape(30*30*30,-1)
micro_rec = micro_rec.detach().cpu().numpy()
print(micro_rec.shape)

rec_data = PorosityDistribution(micro_rec[:,:-1],y.item())

In [None]:
rec_data.plot_porosity_distribution(porosity=0.3)

In [None]:
micro[4,0,0,0].unsqueeze(dim=0)