In [None]:
import os
import sys
# src_path = os.path.realpath(os.path.join(os.path.dirname(__file__), '../../src'))
# sys.path.append(src_path)
src_path = os.path.realpath(os.path.join(os.getcwd(), './src'))
sys.path.append(src_path)

import numpy as np
import matplotlib.pyplot as plt
import torch
from torch.utils.data import DataLoader, TensorDataset, random_split
import random
from utilFuncs import *
import matplotlib.patches as mpatches


In [None]:
p = 7
unitCellName = 'C12'
numberOfDataFiles = 2 # seed 0-1 (2), just seed0 (1)
# make sure analysis is done on the same geometry and compression ratio 
heightLattice = 12.7 # mm
compression = 40

In [None]:
SaveLoc = unitCellName + 'NN' + str(p) + 'var/'
filenames = [] # Initialize an empty list to store the generated filenames

for seed_num in range(numberOfDataFiles): # range(2) will generate 0, 1
    filename = (SaveLoc + 'abaqusComp' + str(compression) +
                unitCellName + 'LHS_' + str(p) + 'var_seedNum' + str(seed_num))
    filenames.append(filename)

if p==2: # uniform data replace filenames
    filenames = [SaveLoc+'abaqusComp'+str(compression)+unitCellName+'uniform_'+str(p)+'var_seedNum0']


compressionRatio = compression*0.01
maxDispUz = heightLattice*compressionRatio # mm

ctr = ".txt"
R = np.array([])
U = np.array([])
F = np.array([])
analysisState = np.array([])

for filename in filenames:
    filename = filename+ctr
    
    DesVarTemp, UTemp, FTemp, analysisStateTemp = extract_ABQ_values(filename,torch.linspace(0,1,11)*maxDispUz)
    if R.size == 0:
        R = DesVarTemp
        U = UTemp
        F = FTemp
        analysisState = analysisStateTemp
    else:
        R = np.concatenate((R, DesVarTemp))
        U = np.concatenate((U, UTemp))
        F = np.concatenate((F, FTemp))
        analysisState = np.concatenate((analysisState, analysisStateTemp))

U = U/np.max(U)

for i in range(len(analysisState)):
    if analysisState[i] == 1:
        t = U[i,:]
        break;
mask = ((np.abs(U - t[np.newaxis,:]))<=1e-6)*1.0


# take out the failed or keep solutions
includeFailed = True
maskinputDiv = mask[analysisState == 0,:]

if includeFailed:
    maskinput = mask[analysisState <= 1,:]
    Uinput = U[analysisState <= 1,:]
    NNinput = R[analysisState <= 1,:]
    NNoutput= F[analysisState <= 1,:]
else:
    maskinput = mask[analysisState == 1,:]
    Uinput = U[analysisState == 1,:]
    NNinput = R[analysisState == 1,:]
    NNoutput= F[analysisState == 1,:]
    
NNinputDiv = R[analysisState == 0,:]
NNoutputDiv = F[analysisState == 0,:]

    
inputSize = NNinput.shape[1]
outputSize = NNoutput.shape[1]

dataInput = np.concatenate((NNinput,maskinput),axis=1)
dataOutput =  NNoutput.copy()

# make sure the partial curve has at least half of the data ()
sumMask = np.sum(maskinput,axis=1)
Num = (sumMask>=0.5*(outputSize-1)) # skip the first 0th size
analysisState = sumMask/np.max(sumMask) # change the analysisState such that it shows > half success

## remove the less than half success 
NNinput = np.concatenate((NNinput,maskinput),axis=1)[Num,:]
NNoutput = NNoutput[Num,:]
NNinputDiv = np.concatenate((NNinputDiv,maskinputDiv),axis=1)

success = np.sum(analysisState == 1)
partial = np.sum((analysisState >= 0.5) & (analysisState < 1))
failure = np.sum(analysisState < 0.5) 
print(f"Data shapes: Input (with mask) {NNinput.shape}, Output {NNoutput.shape}")
print(f"Classification counts: Success={success}, Partial={partial}, Failure={failure}")


In [None]:
# make some force range, and use only that data
F_aggregated_Value = np.mean(NNoutput,axis=1)

F1,F2 = -np.max(F_aggregated_Value),(np.max(F_aggregated_Value)+1.0)

if F2 > 0:
    NNinput = NNinput[ (F_aggregated_Value >= F1) & (F_aggregated_Value <= F2) ,:]
    NNoutput= NNoutput[(F_aggregated_Value >= F1) & (F_aggregated_Value <= F2),:]



In [None]:
# split the data into train and test sets
train_ratio = 0.8
n = int(NNinput.shape[0]/1)  # Get the number of samples (rows)
num_train = int(n * train_ratio)  # Calculate number of training samples

random.seed(0) 
# Generate random indices for shuffling
indices = list(range(n))
random.shuffle(indices)

# Split indices into training and testing
train_indices = indices[:num_train]
test_indices = indices[num_train:]

# Use the shuffled indices to select data
Rinput_train = NNinput[train_indices,:]
Uoutput_train = Uinput[train_indices,:]
Foutput_train = NNoutput[train_indices,:]

Rinput_test = NNinput[test_indices,:]
Uoutput_test = Uinput[test_indices,:]
Foutput_test = NNoutput[test_indices,:]

inputs_train = Rinput_train # Training Inputs
targets_train = Foutput_train # Training Targets

inputs_test = Rinput_test # Test Inputs
targets_test =  Foutput_test # Test Targets
print(f"Training set shapes: Inputs {inputs_train.shape}, Targets {targets_train.shape}")
print(f"Testing set shapes: Inputs {inputs_test.shape}, Targets {targets_test.shape}")

In [None]:
def plotAllFDcurves(inputs_train,targets_train,inputs_test,targets_test,lbls=['',''],strAdd= ''):
    n = inputs_train.shape[0] + inputs_test.shape[0]
    # lbls = ["Training_set","Validation_set"]
    # lbls = ["Success","Partial Success"]
    # lbls = ["Data_1","Data_200"]
    plt.figure(figsize=(8, 6))
    for i in range(n+1):
        if i < targets_train.shape[0]:
            actu = targets_train[i,:].reshape(-1)
            uu = np.linspace(0,1,len(actu))
            nn = int(inputs_train[i,2::].sum())
            if i==0:
                plt.plot(uu[0:nn],actu[0:nn],'m-s',label=f'{lbls[0]}')
            else:
                plt.plot(uu[0:nn],actu[0:nn],'m-s')
                
        else:
            i = i - n
            actu = targets_test[i,:].reshape(-1)
            uu = np.linspace(0,1,len(actu))
            nn = int(inputs_test[i,2::].sum())
            if i == -1:
                plt.plot(uu[0:nn],actu[0:nn],'b-o',label=f'{lbls[1]}')
            else:
                plt.plot(uu[0:nn],actu[0:nn],'b-o')        

    # # Labels
    plt.ylabel('Force in N',fontsize=16)
    # plt.ylabel('Normalized Force',fontsize=16)

    plt.xlabel('Normalized displacement',fontsize=16)
    target = np.concatenate((targets_train,targets_test),axis=0)
    # plt.ylim(None,np.max(target)*1.3)
    
    # Title
    titleStr = f'Force Displacement curve data'
    titleStr = f'{titleStr} {strAdd}'
    plt.title(titleStr,fontsize=18)
    plt.legend(fontsize=16,loc='upper left')
    plt.show()

def plotFDcurves(ax, inputdata, targetdata, lbl, strAdd, colorshape,addLabel=False):
    """
    Plots force-displacement curves for a single dataset on a given figure.

    Args:
        fig (matplotlib.figure.Figure): The figure object to plot on.
        inputdata (NumPy array): The input data array.
        targetdata (NumPy array): The target data array.
        lbl (str): The label for this dataset.
        strAdd (str): A string to add to the title of the plot.
        colorshape (dict): A dictionary specifying the color and marker.
                           Expected keys: 'color' and 'marker'.
                           Example: {'color': 'm', 'marker': 's'}
    """
    n = inputdata.shape[0]
    for i in range(n):
        actu = targetdata[i,:].reshape(-1)
        uu = np.linspace(0,1,len(actu))
        nn = int(inputdata[i,2::].sum())
        Color = colorshape['color']
        Marker = colorshape['marker']
        ax.plot(uu[0:nn], actu[0:nn], marker=Marker, linestyle='-', color=Color, label=lbl if i == 0 else "",
                linewidth=3,markersize=10)
    
    if addLabel:
        # Labels
        ax.set_ylabel('Force in N', fontsize=16)
        ax.set_xlabel('Normalized displacement', fontsize=16)
    
        ax.set_ylim(None, np.max(targetdata) * 1.4 if targetdata.size > 0 else None)
    
        # Title
        titleStr = f'Force Displacement curve data'
        titleStr = f'{titleStr} {strAdd}'
        ax.set_title(titleStr, fontsize=18)
        # ax.legend(fontsize=16, loc='upper left')
    # else:
    #     ax.set_xticklabels([])
    #     ax.set_yticklabels([])
        
    ax.legend(fontsize=16, loc='upper left')

    return ax
    
def plotObjSpace(x,y,z,colorCode,savePlt=False):
    # Create a 2D scatter plot
    fig, ax = plt.subplots(figsize=(10, 8))    
    
    # Create a colormap that maps 1 to green and 0 to red
    colors = np.select(
    [z == 1, (z >= 0.5) & (z < 1)],
    [colorCode[0], colorCode[1]],
    default= colorCode[2]
    )

    # Create the scatter plot
    plt.scatter(x, y, c=colors, edgecolors="w", marker='s', linewidth=1)
    
    fntSize = 28
    box_legend_loc = (0.5,-0.5)
    legend_loc = 'lower center'
    # Labels
    plt.xlabel('Radii 1',fontsize=fntSize)
    plt.ylabel(' ',fontsize=fntSize)
    if compressionRatio == 0.1:
        plt.ylabel('Radii 2',fontsize=fntSize)
    
    # Calculate rates
    total = len(z)
    success_rate = np.sum(z == 1) * 100 / total
    partial_rate = np.sum((z >= 0.5) & (z < 1)) * 100 / total
    failure_rate = np.sum(z < 0.5) * 100 / total
    # Format rates to 2 significant digits
    formatted_success = "{:.2f}%".format(success_rate)
    formatted_partial = "{:.2f}%".format(partial_rate)
    formatted_failure = "{:.2f}%".format(failure_rate)
    
    title_str = (
    f"Compression {int(compressionRatio*100)}%\n"
    f"Success: {formatted_success}\n"
    f"Partial success: {formatted_partial}\n"
    f"Failure: {formatted_failure}"
    )
    plt.title(title_str, fontsize=fntSize)
    ax.set_box_aspect(1)
    ax.xaxis.set_tick_params(labelsize=fntSize)
    ax.yaxis.set_tick_params(labelsize=fntSize)
    # Create legend patches without the rates
    success_patch = mpatches.Patch(color=colorCode[0], label='Success')
    partial_patch = mpatches.Patch(color=colorCode[1], label='Partial success')
    failed_patch = mpatches.Patch(color=colorCode[2], label='Failure')
    
   
    if compressionRatio == 0.2:
        success_patch = mpatches.Patch(color=colorCode[0], label='Success')
        partial_patch = mpatches.Patch(color=colorCode[1], label='Partial success')
        failed_patch = mpatches.Patch(color=colorCode[2], label='Failure')
    else:
        success_patch = mpatches.Patch(color='w', label='')
        partial_patch = mpatches.Patch(color='w', label='')
        failed_patch = mpatches.Patch(color='w', label='')
        
    # Add the legend to the plot    
    plt.legend(handles=[success_patch,partial_patch,failed_patch], loc=legend_loc, 
    bbox_to_anchor=box_legend_loc, borderaxespad=0., fontsize=fntSize+4,frameon=False)  
        
    if savePlt:
        savePath = os.path.realpath(os.path.join(sys.path[0],'../ObjSpacePlotC'+str(int(compressionRatio*100))+'.png'))
        # Save the figure
        plt.savefig(savePath, dpi=600, bbox_inches='tight')
    # Show plot
    plt.show()
 
if p==2:
    colorCode = ['#2ca02c', '#ff7f0e', '#d62728']
    lbls = ["Success","Partial success","Failure"] # use abaqus 50% data for best curve to show diff between success and partial success
    
    plotObjSpace(R[:,0].reshape(-1),R[:,1].reshape(-1),analysisState.reshape(-1),
                colorCode,savePlt=False) # green, orange, red

    TotalDataSum = dataInput[:,2::].sum(axis=1)
    
    n1 = np.where(TotalDataSum == 11)[0][400]
    n2 = np.where(TotalDataSum == 8)[0][30]
    n3 = np.where(TotalDataSum == 4)[0][6]
    print(n1,n2,n3)
    
    # n1,n2,n3 = 10,15,20
    fig = plt.figure(figsize=(8, 6))
    ax = fig.add_subplot(111)  # Add a subplot to the figure
    ax = plotFDcurves(ax, dataInput[n1:n1+1,:], dataOutput[n1:n1+1,:], lbls[0], strAdd='', colorshape={'color': colorCode[0], 'marker': 's'},addLabel=True)
    ax = plotFDcurves(ax, dataInput[n2:n2+1,:], dataOutput[n2:n2+1,:], lbls[1], strAdd='', colorshape={'color': colorCode[1], 'marker': 'o'},addLabel=False)
    ax = plotFDcurves(ax, dataInput[n3:n3+1,:], dataOutput[n3:n3+1,:], lbls[2], strAdd='', colorshape={'color': colorCode[2], 'marker': '^'},addLabel=False)


lbls = ["Training_set","Validation_set"]
n1,n2 = 0,-1
plotAllFDcurves(inputs_train[n1:n2,:],targets_train[n1:n2,:],inputs_test[n1:n2,:],targets_test[n1:n2,:],lbls,strAdd= '')



In [None]:

inputs_train_tensor  = torch.tensor(inputs_train, dtype=torch.float32)
targets_train_tensor = torch.tensor(targets_train, dtype=torch.float32)
inputs_test_tensor   = torch.tensor(inputs_test, dtype=torch.float32)
targets_test_tensor  = torch.tensor(targets_test, dtype=torch.float32)

# Create TensorDatasets (separately for train and test)
train_dataset    = TensorDataset(inputs_train_tensor, targets_train_tensor)
test_dataset     = TensorDataset(inputs_test_tensor, targets_test_tensor)

# Create DataLoaders
batch_size_train = int(len(targets_train_tensor)/1) # You can adjust this for training
batch_size_test  = int(len(targets_test_tensor)) # For full test set evaluation at the end
train_loader     = DataLoader(train_dataset, batch_size=batch_size_train, shuffle=False)
test_loader      = DataLoader(test_dataset, batch_size=batch_size_test, shuffle=False)  # No need to shuffle test data


In [None]:
nnSettings = {'inputDim': inputSize,
        'outputDim' : 6, #7 for F
        'numLayers': 2, 
        'numNeuronsPerLyr': 80,
        'curve':'spline',
        'dropout':0.5, 
        'bottleneck_factor':1.0}

print(f"Input Var is R and of size {inputSize}")
NNmodel = NeuralNet(nnSettings,useCPU = False)
print(f"Total number of weights in the model: {NNmodel.total_weights}")
print(f"Total number of biases in the model: {NNmodel.total_biases}")
NNmodel.train_model(train_loader,test_loader,num_epochs=5000,tol=1e-12,prntNum=500)


In [None]:

# Saving the model:
# torch.save(NNmodel, SaveLoc+'SurrogateModel.pth')  # Saves the entire model object


In [None]:
avg_mae, avg_rmse,r2,maxV_indexT,minV_indexT = NNmodel.evaluate_NN(train_loader)   # Return both MAE and RMSE
print("Result on the training set")
print(f"MAE {avg_mae:.4f} , RMSE {avg_rmse:.4f}, and R_squared {r2:.4f}")
print(f"Max abs sum error value {maxV_indexT.values.cpu()} ,\n and Index {maxV_indexT.indices.cpu()}\n")
avg_mae, avg_rmse,r2,maxV_index,minV_index = NNmodel.evaluate_NN(test_loader)   # Return both MAE and RMSE
print("Result on the test set")
print(f"MAE {avg_mae:.4f} , RMSE {avg_rmse:.4f}, and R_squared {r2:.4f}")
print(f"Max abs sum error value {maxV_index.values.cpu()} ,\n and Index {maxV_index.indices.cpu()}\n")


In [None]:
predTrain,_ = NNmodel.predict_out(inputs_train)
predTest,_ = NNmodel.predict_out(inputs_test)
inputs_train2, inputs_test2 = inputs_train.copy(),inputs_test.copy()
inputs_train2[:,2::] = 1
inputs_test2[:,2::] = 1
plotAllFDcurves(inputs_train2,predTrain,inputs_test2,predTest,strAdd= 'NN Prediction')


In [None]:
def plotNNFDcurve(inputs,target,arrayN,titleStrAdd,pltCP = False,savePath = None):
    print(inputs.shape)
    for num1 in arrayN:
        num2 = num1+1
        xyU = inputs[num1:num2,:]
        print(xyU,inputs[num1,:])
        u = np.linspace(0,1,2*len(xyU[0,2::]))
        pred,CP = NNmodel.predict_out(xyU,u)#.reshape(-1)
        actu = target[num1:num2,:].reshape(-1)
        print(f"Rad values: {xyU},\n")
        # print(f"Prediction: {Fpred},\n Actual: {Factu}")
    
        plt.figure(figsize=(8, 6))
        uu = np.linspace(0,1,len(actu))
        nn = int(xyU[0,-len(actu)::].sum())
        plt.plot(uu[0:nn],actu[0:nn],'r--^',label='Abaqus data',markersize=15,linewidth=3)
        plt.plot(u,pred.reshape(-1),'b-',label=' Surrogate model prediction',linewidth=3)
        
        if pltCP:
            CP = CP.reshape(-1)
            n = len(CP)
            p = 2 # hard coded to degree 2
            e = np.linspace(0, 1, n - p + 1)
            u = np.linspace(0, 1, n - p + 2)
            u[1:-1] = e[:-1] + np.diff(e) / 2
            plt.plot(u,CP,'m--s',label='NN B-Spline control points',markersize=8)

        
        # # Labels
        plt.ylabel('Force in N',fontsize=16)
        plt.xlabel('Normalized displacement',fontsize=16)
        # plt.ylim(None,1.0)
        
       
        maxF =  np.max(np.concatenate((actu.reshape(-1), pred.reshape(-1)))) * 1.5
        plt.ylim([None, maxF])
        
        # Title
        # abs_force_diff = np.abs(pred*xyU[0,-len(actu)::] - actu*xyU[0,-len(actu)::]).mean()
        # titleStr = f'Force Displacement curve on {titleStrAdd} data {num1}\n abs force diff = {abs_force_diff:.2f}'
        # plt.title(titleStr,fontsize=18)
        plt.legend(fontsize=16,loc='upper left')
        if savePath is not None:
           plt.savefig(savePath, dpi=600, bbox_inches='tight')

        plt.show()


all_inputs = []
all_targets = []

for inputs, targets in train_loader:
    all_inputs.append(inputs)
    all_targets.append(targets)
    
inputs_train = torch.cat(all_inputs, dim=0).detach().cpu().numpy()
targets_train = torch.cat(all_targets, dim=0).detach().cpu().numpy()
    
print("Number of train data", inputs_train.shape)
num1 = maxV_indexT.indices.cpu().numpy()

# plotNNFDcurve(inputs_train,targets_train,maxV_indexT.indices.cpu().numpy(),'trained',pltCP = True)

# plotNNFDcurve(inputs_train,targets_train,minV_indexT.indices.cpu().numpy(),'trained')



In [None]:
all_inputs = []
all_targets = []

for inputs, targets in test_loader:
    all_inputs.append(inputs)
    all_targets.append(targets)
    
inputs_test = torch.cat(all_inputs, dim=0).detach().cpu().numpy()
targets_test = torch.cat(all_targets, dim=0).detach().cpu().numpy()

print("Number of test data", inputs_test.shape)
# plotNNFDcurve(inputs_test,targets_test,maxV_index.indices.cpu().numpy(),'validation')
# plotNNFDcurve(inputs_test,targets_test,minV_index.indices.cpu().numpy(),'validation')
    

In [None]:

num = 100
np.random.seed(32)
random_array = np.sort(np.random.randint(0,NNinputDiv.shape[0],size=5))
print(random_array,NNinputDiv.shape)

In [None]:

# plotNNFDcurve(NNinputDiv,NNoutputDiv,np.concatenate(([num], random_array)),'Failed')


In [None]:
random_array = np.sort(np.random.randint(0,len(inputs_test[:,0]),size=5))

In [None]:

# plotNNFDcurve(inputs_train,targets_train,random_array,'trained random',pltCP = True)
# plotNNFDcurve(inputs_test,targets_test,random_array,'validation random',pltCP = True)


In [None]:
#This is to show the CP and surrogate model and data
if p==2:
    max_in_target_data = np.max(targets_train,axis=1)
     
    inputs_train_new = inputs_train[(max_in_target_data > 2.0) , :]
    targets_train_new = targets_train[(max_in_target_data > 2.0) , :]
    
    TotalDataSum = inputs_train_new[:,2::].sum(axis=1)
    successN = np.where(TotalDataSum == 11)[0][12] #12 is close used for part1 
    print(successN.shape)
    plotNNFDcurve(inputs_train_new[successN,:].reshape(1,-1),targets_train_new[successN,:].reshape(1,-1),
                    np.array([0]),'trained',pltCP=True,savePath='SurrNdata.png')
                


In [None]:
#This is to show surrogate model and data on test for full and partial success
if p==2:
    max_in_target_data = np.max(targets_test,axis=1)
    
    inputs_test_new = inputs_test[(max_in_target_data > 1.5) , :]
    targets_test_new = targets_test[(max_in_target_data > 1.5) , :]
    
    TotalDataSum = inputs_test_new[:,2::].sum(axis=1)
    successN = np.where(TotalDataSum == 11)[0][5] #5
    print("--",successN.shape)
    plotNNFDcurve(inputs_test_new[successN,:].reshape(1,-1),targets_test_new[successN,:].reshape(1,-1),
                    np.array([0]),'trained',pltCP=False,savePath='SurrValidatePredFull.png')
                    
    # find the half success 
    successN = np.where((TotalDataSum == 7) & (targets_test_new[:, 6] >= 5.0) & (targets_test_new[:, 6] <= 6.0))[0][1] #1
    print("--",successN.shape)
    plotNNFDcurve(inputs_test_new[successN,:].reshape(1,-1),targets_test_new[successN,:].reshape(1,-1),
                    np.array([0]),'trained',pltCP=False,savePath='SurrValidatePredPartial.png')
    
    ## special for failure
    max_in_target_data = np.max(dataOutput,axis=1)
    
    inputs_test_new = dataInput[(max_in_target_data > 1.5) , :]
    targets_test_new = dataOutput[(max_in_target_data > 1.5) , :]
    
    TotalDataSum = inputs_test_new[:,2::].sum(axis=1)
    
    # find the failure  
    successN = np.where((TotalDataSum == 5))[0][10] #10
    print("--",successN.shape)
    plotNNFDcurve(inputs_test_new[successN,:].reshape(1,-1),targets_test_new[successN,:].reshape(1,-1),
    np.array([0]),'trained',pltCP=False,savePath='SurrValidatePredFailure.png')
