In [1]:
%config IPCompleter.greedy=True #si puo' cancellare, è per avere l'autocompletion su jupyter notebook ma non pare funzionare

import os
import re
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image, ImageOps
import numpy as np
import pandas as pd
import csv
from utils import *

class PointsDataset(Dataset):
    def __init__(self, root_dir, transform=None, padding=None):
        """
        Args:
            root_dir (str): Root directory containing subfolders, each representing a sample.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.data = []
        self.sim_info = []

        try:
            with open(os.path.join(self.root_dir, "Simulations_Info.csv"), newline='') as file:
                reader = csv.DictReader(file)
                for row in reader:
                    index = int(row["INDEX"])
                    completed = row["COMPLETED"].strip().lower() in ["true", "1", "yes"]
                    self.sim_info.append((index, completed))
        except:
            self.sim_info = [(i, True) for i in range(5001)]

        # Iterate over each subfolder in root_dir and load all data
        for subfolder in os.listdir(root_dir):
            subfolder_path = os.path.join(root_dir, subfolder)
            sample_idx = [int(s) for s in re.findall(r'\d+', subfolder)]  # Extract the first number in the subfolder name
            if sample_idx:
                sample_idx = sample_idx[0]
            else:
                continue
           
            completed = next((item[1] for item in self.sim_info if item[0] == sample_idx), None) #completed refers to abaqus simulation stsatus
            if os.path.isdir(subfolder_path) and sample_idx and completed:
                init_coords_circle_file = next((f for f in os.listdir(subfolder_path) if f.endswith('input_coordinates_circle_1.csv')), None)
                before_coords_file = next((f for f in os.listdir(subfolder_path) if f.endswith('input_coordinates_circle_2.csv')), None)
                gt_file = next((f for f in os.listdir(subfolder_path) if f.endswith('output_displacement_external.csv')), None)
                init_coord_plate_file = os.path.join(root_dir,'plate_initial_coordinates.csv')
                # Load ground truth data (displacement external)
                init_coord_plate_data = pd.read_csv(os.path.join(subfolder_path, init_coord_plate_file), skiprows=0, usecols=[1, 2, 3])
                init_coord_plate_data = init_coord_plate_data.to_numpy().flatten()
                init_coord_plate_data = torch.tensor(init_coord_plate_data).float()
                
                if init_coords_circle_file and before_coords_file and gt_file and init_coord_plate_file:

                    #load initial circle data
                    init_coords_circle_data = pd.read_csv(os.path.join(subfolder_path, init_coords_circle_file), skiprows=0, usecols=[1, 2, 3])
                    init_coords_circle_data = init_coords_circle_data.to_numpy()
                    init_coords_circle_data = torch.tensor(init_coords_circle_data).float()

                    #load before_impact circle data
                    before_coords_data = pd.read_csv(os.path.join(subfolder_path, before_coords_file), skiprows=0, usecols=[1, 2, 3])
                    before_coords_data = before_coords_data.to_numpy()
                    before_coords_data = torch.tensor(before_coords_data).float()
                    
                    total_data = torch.cat((init_coords_circle_data,before_coords_data),1)
                    if (padding is not None) and total_data.shape[0]<padding:
                        total_data = zero_pad_tensor(total_data,target_size=padding)
                    #import pdb;pdb.set_trace()
                    gt_data = pd.read_csv(os.path.join(subfolder_path, gt_file), skiprows=0, usecols=[1, 2, 3])
                    gt_data = gt_data.to_numpy().flatten()
                    gt_data = torch.tensor(gt_data).float()

                    # Append the final_image, gt_data, init_coord_plate_data tuple to the data list
                    self.data.append((total_data, gt_data, init_coord_plate_data))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Retrieve preloaded data
        total_data, gt_data, init_coord_plate_data = self.data[idx]

        # Apply transformation if available
        if self.transform:
            total_data = self.transform(total_data)

        return total_data, gt_data, init_coord_plate_data


In [None]:
from torch.utils.data import random_split
from torch import nn
import torch.optim as optim
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
import numpy as np
from tqdm import tqdm
from torchvision.models import  ResNet18_Weights
import copy
from transformers import ConvNextConfig, ConvNextModel
import math 
from torch.utils.tensorboard import SummaryWriter
from torch.optim.lr_scheduler import OneCycleLR
from utils import *

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # Shape: (1, max_len, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x shape: (batch_size, seq_len, d_model)
        x = x + self.pe[:, :x.size(1), :]  # Add positional encoding
        return x

class Transformer2DPointsModel(nn.Module):
    def __init__(self, input_dim,input_seq_len, d_model, nhead, num_encoder_layers, dim_feedforward, output_dim, dropout=0.1):
        super(Transformer2DPointsModel, self).__init__()
        self.d_model = d_model
        self.linear_in = nn.Linear(input_dim, d_model)  # Project input to d_model
        self.positional_encoding = PositionalEncoding(d_model)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout, batch_first=True),
            num_encoder_layers
        )
        self.attention = AttentionLayer(d_model)  # Add attention layer
        #self.linear_out = nn.Linear(d_model, output_dim)  # Map to the desired output size
        self.linear_out = nn.Linear(d_model*input_seq_len, output_dim)  # Map to the desired output size

    def forward(self, src, src_mask=None):
        
        # src shape: (batch_size, seq_len, input_dim)
        src = self.linear_in(src)  # Project input to d_model
        src = self.positional_encoding(src)  # Add positional encoding
        #memory = self.transformer_encoder(src, src_key_padding_mask=src_mask)  # Pass through transformer encoder
                # memory shape: (batch_size, seq_len, d_model)
        # Zero out padded tokens using src_mask
        if src_mask is not None:
            memory = memory * src_mask.unsqueeze(-1)
            src_key_padding_mask = (src_mask == 0) # Convert 0/1 mask to boolean
            memory = self.transformer_encoder(src, src_key_padding_mask=src_key_padding_mask)
        else:
            memory = self.transformer_encoder(src)
            
        # Apply attention mechanism
        #aggregated = self.attention(memory, src_mask)  # Shape: (batch_size, d_model)

        # Map to the desired output size
        #output = self.linear_out(aggregated)  # Shape: (batch_size, output_dim)

        memory = memory.view(memory.shape[0], -1)  # Shape: (batch_size, d_model, seq_len)
        # Map to the desired output size
        output = self.linear_out(memory)  # Shape: (batch_size, output_dim)
        return output

class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(in_features=288, out_features=2493, bias=True)
        self.fc2 = nn.Linear(in_features=2493, out_features=1386, bias=True)
        self.fc3 = nn.Linear(in_features=1386, out_features=280, bias=True)
        self.relu = nn.ReLU()
    def forward(self, src, src_mask=None):
        src = src.view(src.shape[0],-1)
        x = self.fc1(self.relu(src))
        x = self.fc2(self.relu(x))
        x = self.fc3(self.relu(x))
        return x

        
# Training Setup with Best Model Saving
def train_model(model, epochs=3, learning_rate=0.0001, optimizer= None, scheduler = None, train_loader=None, test_loader=None, norm_values=None):
    #smoothness_loss_fn = SmoothnessLoss(weight=0.0)  # starting weight
    criterion = nn.MSELoss()
    
    # Initialize variables for tracking best model
    best_model_weights = None
    best_test_loss = float('inf')
    best_epoch=0
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        #smoothness_loss_fn.weight = time_varying_weight(epoch, 10, 20, max_weight=0.1)
        for images, targets, init_data in train_loader:
            images = images.to(device)
            attention_mask = create_attention_mask(images).to(device)
            targets = normalize_targets(targets,norm_values).to(device)              
            optimizer.zero_grad()
            outputs = model(images,None)
            loss = criterion(outputs, targets)

            #loss2 = smoothness_loss_fn(outputs)
            loss = loss #+ loss2
            loss.backward()
            optimizer.step()
            if scheduler is not None: scheduler.step()  # Update learning rate
            
            with torch.no_grad():
                denorm_outputs = denormalize_targets(outputs, norm_values)
                denorm_targets = denormalize_targets(targets,norm_values)
                denorm_loss = criterion(denorm_outputs, denorm_targets)
                running_loss += denorm_loss.item() * images.size(0)
        
        train_loss = running_loss/len(train_loader.dataset)
        print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.5f}",end='')
        writer.add_scalar('Loss/train', train_loss*1000, epoch+1)
        # Evaluate on test set
        model.eval()
        test_loss = 0.0
        with torch.no_grad():
            for images, targets, init_data in test_loader:
                images = images.to(device)
                targets = targets.to(device) 
                attention_mask = create_attention_mask(images).to(device)
                outputs = model(images,None)
                loss = criterion(denormalize_targets(outputs,norm_values), targets)
                test_loss += loss.item()*images.size(0)
        
        mean_test_loss = test_loss/len(test_loader.dataset)
        print(f", Test Loss: {mean_test_loss:.5f}")
        writer.add_scalar('Loss/test', mean_test_loss*1000, epoch+1)
        # Save the model weights if this epoch has the lowest test loss so far
        if mean_test_loss < best_test_loss:
            torch.save(model, "./full_model.pth")
            torch.save(model.state_dict(), "model_state_dict.pth")
            best_test_loss = mean_test_loss
            best_model_weights = copy.deepcopy(model.state_dict())
            print(f"New best model saved with test loss: {best_test_loss:.5f}")
            best_epoch = epoch
            writer.add_scalar('Loss/best_test', best_test_loss*1000, best_epoch+1)
        writer.flush()
    # Load the best model weights at the end of training
    if best_model_weights:
        model.load_state_dict(best_model_weights)
        print(f"Loaded best model weights with test loss: {best_test_loss:.5f}, epoch: {best_epoch+1:3d}")
    else:
        print("Warning: No best model weights found.")
    
    print("Training Complete")
    return model

writer = SummaryWriter('runs/experiment_name',flush_secs=5)
device = "cuda" if torch.cuda.is_available() else "cpu"
input_seq_len = 98
input_dim = 6  # Each input is a sequence of 3Dx2 points (x, y, z)
d_model = 512  # Embedding dimension
nhead = 4  # Number of attention heads
num_encoder_layers = 4  # Number of transformer encoder layers
dim_feedforward = 512  # Feedforward network dimension
output_dim = 9078  # Each output is a 3D point (x, y, z) and we have 3026 points.
dropout = 0.0

model = Transformer2DPointsModel(input_dim, input_seq_len, d_model, nhead, num_encoder_layers, dim_feedforward, output_dim, dropout).to(device)
#model = MLP().to(device)
total_params = sum(p.numel() for p in model.parameters())
print(f"Number of parameters: {total_params}")

#dataset = ImagePairDataset("/mnt/hdd1/deformazione-2d-parametrica/", transform=transform)
dataset = PointsDataset("/home/fabiana/libraries/my_work/simulazioni_3d/", transform=None,padding=input_seq_len)
test_split = 0.2 
batch_size = 128
# Split dataset into train and test sets
test_size = int(test_split * len(dataset))
train_size = len(dataset) - test_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
norm_values=calculate_target_normalization(train_loader)
import pdb; pdb.set_trace()
learning_rate = 0.0001
n_epochs = 1000
warmup_steps = 100  # Number of warm-up steps
total_steps = n_epochs * len(train_loader)  # Total training steps
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = OneCycleLR(optimizer, max_lr=learning_rate, total_steps=total_steps, pct_start=warmup_steps/total_steps)

trained_model = train_model(model=model, epochs=n_epochs,learning_rate=learning_rate, optimizer=optimizer, scheduler=scheduler, train_loader=train_loader,test_loader=test_loader,norm_values=norm_values)
writer.close()

Number of parameters: 461822839
--Return--
None
> [32m/tmp/ipykernel_11565/4200543878.py[39m([92m186[39m)[36m<module>[39m[34m()[39m
[32m    184[39m test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=[38;5;28;01mFalse[39;00m)
[32m    185[39m norm_values=calculate_target_normalization(train_loader)
[32m--> 186[39m [38;5;28;01mimport[39;00m pdb; pdb.set_trace()
[32m    187[39m learning_rate = [32m0.0001[39m
[32m    188[39m n_epochs = [32m1000[39m



ipdb>  norm_values.shape


*** AttributeError: 'dict' object has no attribute 'shape'


ipdb>  norm_values


{'mean': array([0., 0., 0., ..., 0., 0., 0.]), 'std': array([0.0001, 0.0001, 0.0001, ..., 0.0001, 0.0001, 0.0001])}


ipdb>  norm_values['mean']


array([0., 0., 0., ..., 0., 0., 0.])


ipdb>  norm_values['mean'].shape


(9078,)


In [None]:
%matplotlib ipympl
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.mplot3d import Axes3D

import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.mplot3d import Axes3D

def plot_points_sequence_3d_separate(init_coord_data, gt_data, predicted_displacements, connect_points=True):
    """
    Plots two separate 3D figures: one for ground truth points and one for predicted points.

    Args:
        init_coord_data (numpy.ndarray): Initial positions of shape (N, 3).
        gt_data (numpy.ndarray): Ground truth displacements of shape (N, 3).
        predicted_displacements (numpy.ndarray): Predicted displacements of shape (N, 3).
        connect_points (bool): Whether to draw lines connecting successive points.
    """
    # Compute final positions
    gt_points = init_coord_data + gt_data
    predicted_points = init_coord_data + predicted_displacements

    # ---------- Ground Truth Figure ----------
    fig_gt = plt.figure(figsize=(10, 8))
    ax_gt = fig_gt.add_subplot(111, projection='3d')
    ax_gt.scatter(gt_points[:, 0], gt_points[:, 1], gt_points[:, 2],
                  color='green', label='Ground Truth Points', marker='x', s=10)

    if connect_points:
        for i in range(len(gt_points) - 1):
            ax_gt.plot([gt_points[i, 0], gt_points[i + 1, 0]],
                       [gt_points[i, 1], gt_points[i + 1, 1]],
                       [gt_points[i, 2], gt_points[i + 1, 2]], 'g-', alpha=0.5)

    ax_gt.set_title('Ground Truth 3D Points')
    ax_gt.set_xlabel('X')
    ax_gt.set_ylabel('Y')
    ax_gt.set_zlabel('Z')
    ax_gt.legend()
    plt.grid(True)

    # ---------- Predicted Points Figure ----------
    fig_pred = plt.figure(figsize=(10, 8))
    ax_pred = fig_pred.add_subplot(111, projection='3d')
    ax_pred.scatter(predicted_points[:, 0], predicted_points[:, 1], predicted_points[:, 2],
                    color='red', label='Predicted Points', marker='o', s=10)

    if connect_points:
        for i in range(len(predicted_points) - 1):
            ax_pred.plot([predicted_points[i, 0], predicted_points[i + 1, 0]],
                         [predicted_points[i, 1], predicted_points[i + 1, 1]],
                         [predicted_points[i, 2], predicted_points[i + 1, 2]], 'r-', alpha=0.5)

    ax_pred.set_title('Predicted 3D Points')
    ax_pred.set_xlabel('X')
    ax_pred.set_ylabel('Y')
    ax_pred.set_zlabel('Z')
    ax_pred.legend()
    plt.grid(True)

    # Show both figures
    plt.show()



# Example usage for a given sample index
# Assuming `init_coord_data`, `gt_data`, and `predicted_displacements` are NumPy arrays of shape (N, 2)
# For example:
images, targets, init_data = test_loader.dataset[78]
images = images.to(device)
images = images[None, :, :]
attention_mask = create_attention_mask(images).to(device)
init_data = init_data.numpy().reshape(-1,3)
targets = targets.numpy().reshape(-1,3)
predicted_displacements = trained_model(images,None)
predicted_displacements = denormalize_targets(predicted_displacements,norm_values)
predicted_displacements = predicted_displacements.cpu().detach().numpy()
predicted_displacements = predicted_displacements.reshape(-1,3)
#import pdb; pdb.set_trace()
# Call the function to plot the points for a sample
plot_points_sequence_3d_separate(init_data, targets, predicted_displacements,connect_points=False)



In [None]:
def plot_image_tensor(image_tensor):
    """
    Takes a PyTorch tensor representing image data and plots the image.

    Args:
        image_tensor (torch.Tensor): A tensor representing image data with shape 
                                     (batch_size, channels, height, width).
    """
    # Check if the tensor is a single image or a batch
    if image_tensor.ndimension() == 4:
        # If batch size is greater than 1, let's plot the first image
        image_tensor = image_tensor[0]  # Take the first image in the batch
    
    # Convert the tensor to a NumPy array and move channels to last dimension (for plotting)
    image_array = image_tensor.numpy().transpose((1, 2, 0))  # Convert from (C, H, W) to (H, W, C)
    #image_array[:, :, 1] = 1
    # If the image is grayscale (single channel), it will have only one channel
    if image_array.shape[2] == 1:
        image_array = image_array[:, :, 0]  # Convert to 2D for grayscale images
    
    # Plot the image
    plt.imshow(image_array, cmap='gray' if image_array.ndim == 2 else None)
    plt.axis('off')  # Hide axes
    plt.show()

plot_image_tensor(images[:,:,:,:].cpu())

In [None]:
images.shape