In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import pickle
from SimDataScraper import *
import matplotlib.pyplot as plt
import hashlib

In [None]:
# Regression NN
class FFNN(nn.Module):
    def __init__(self, inputSize, hiddenSize, outputSize, layerSize, activation_function):
        super().__init__()

        # Defining the input, hidden, and output layers. Here the hidden layer depth is made variable
        self.lin0 = nn.Linear(inputSize, hiddenSize)
        self.layers = nn.ModuleList([nn.Linear(hiddenSize, hiddenSize) for i in range(layerSize - 2)]) #layer size includes input & output hence the avgus 2
        self.lin5 = nn.Linear(hiddenSize, outputSize)
        
        match(activation_function):
            case "sigmoid":
                self.actFunct = nn.Sigmoid()
            case "tanh":
                self.actFunct = nn.Tanh()
            case "relu":
                self.actFunct = nn.ReLU()
            case "lrelu":
                self.actFunct = nn.LeakyReLU()

    # The forward pass basically just runs each layer and activation function until the output is calculated
    def forward(self, x):
        out = self.lin0(x)
        out = self.actFunct(out)
        
        for l in self.layers:
            out = l(out)
            out = self.actFunct(out)

        out = self.lin5(out)
        return out

# Classification NN
class NNCLASS(nn.Module):
    def __init__(self, inputSize, hiddenSize, outputSize, layerSize):
        super().__init__()

        self.lin0 = nn.Linear(inputSize, hiddenSize)
        self.layers = nn.ModuleList([nn.Linear(hiddenSize, hiddenSize) for i in range(layerSize - 2)])
        self.lin5 = nn.Linear(hiddenSize, outputSize)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid() # Sigmoid use for the output layer to clamp between 0 and 1
        
        
    def forward(self, x):
        out = self.lin0(x)
        out = self.relu(out)
        
        for l in self.layers:
            out = l(out)
            out = self.relu(out)

        out = self.lin5(out)
        out = self.sigmoid(out)
        return out

In [None]:
# Opening the dataset as the serialized file with the pickle lib

with open(".\\OverMax_WithPhase", 'rb+') as datasetFile:
    dataset = pickle.load(datasetFile)

In [None]:
def HSV2RGB(H, S, V):
    C = V * S
    divH = (H / 60)
    X = C * (1 - abs((divH % 2) - 1))
    m = V - C
    
    RGBp = tuple()
    match (int)(divH):
        case 0:
            RGBp = (C, X, 0)
        case 1:
            RGBp = (X, C, 0)
        case 2:
            RGBp = (0, C, X)
        case 3:
            RGBp = (0, X, C)
        case 4:
            RGBp = (X, 0, C)
        case 5:
            RGBp = (C, 0, X)

    R = (int)((RGBp[0] + m )* 255)
    G = (int)((RGBp[1] + m )* 255)
    B = (int)((RGBp[2] + m )* 255)

    return (R, G, B)
        
hist, bins, patches = plt.hist(dataset.SimDict["Tj_Final_B"], 150)

'''
bigColor = (60, 0.90, 0.75)
smallColor = (240, 0.90, 0.35)

maxCount = max(hist)
avgCount = avg(hist)
for i in range(len(patches)):
    normVal = (hist[i] - avgCount) / (maxCount - minCount)

    colr = [0, 0, 0]
    for j in range(3):
        colorRange = (bigColor[j] - smallColor[j])
        coloravg = avg(bigColor[j], smallColor[j])
        if colorRange < 0:
            coloravg = max(bigColor[j], smallColor[j])
        colr[j] = (normVal * colorRange + coloravg)

    colr = HSV2RGB(*colr)
    patches[i].set_facecolor("#" + ('{:02X}{:02X}{:02X}').format(*colr))
    #patches[i].set_edgecolor("#FFFFFF")
    #patches[i].set_linewidth(0.25)           
'''

fontSize = 18
plt.title("Bottom Junction Temperature Data", fontsize=fontSize * 0.75)
plt.yscale('log')
#plt.ylim((0,1))
plt.xlabel("Normalized Value", fontsize=fontSize)
plt.ylabel("Count", fontsize=fontSize)
plt.minorticks_on()
plt.grid(which="major")
plt.grid(which="minor", axis='y', linewidth = 0.3)
plt.tick_params(labelsize=fontSize*0.75)
plt.show()

In [None]:
# SETUP FOR TRAINING

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') #If a gpu is available, use it 
torch.manual_seed(1234) # Seed the random number generator for consisten results when training 

'''Set Up Variables'''
hiddenLayerSize = 60
layerSize = 6
learningRate = 0.01
epochs = 6000
batchSize = 2**12
train_test_ratio = 0.70
activationFunction = 'lrelu'
checkpointsDir = "CheckpointsPhaseSet"


# Set the inputs desired through the dataset.inputs member variable. Inputs are a list of names 
#dataset.inputs = ["i_ser_rms", "i_ser_max", "i_out_rms", "i_out_max", "i_in_rms", "i_in_max", "v_out_rms", "v_out_max", "Vin", "F"]
dataset.inputs = ["i_ser_rms", "i_ser_max", "i_out_rms", "i_out_max", "i_in_rms", "i_in_max", "v_out_rms", "v_out_max", "Vin", "F", "phase_diff_avg"]
outputString = "Lp"

#dataset.inputs = ["v_ds_max_b", "v_ds_b_slew_min", "v_ds_b_slew_rms", "v_ds_b_slew_avg", "v_ds_b_slew_max", "i_ser_max", "q_gate_max_b"]
#outputString = "zvs_b_true"

#dataset.inputs = ["v_ds_on_max_t", "v_ds_on_min_t", "v_ds_on_max_b", "v_ds_on_min_b","i_d_max_t", "i_d_min_t", "i_out_max", "i_in_max", "v_out_max", "Vin", "F"]
#outputString = "tank_p_avg_loss"

#dataset.inputs = ["v_ds_on_max_t", "v_ds_on_min_t", "i_d_max_t", "i_d_min_t", "i_out_max", "i_in_max", "v_out_max", "Vin", "F"]
#outputString = "Tj_Final_T"

#dataset.inputs = ["v_ds_on_max_t", "v_ds_on_min_t", "i_d_max_t", "i_d_min_t", "i_d_avg_t", "F"]
#outputString = "Rdson_T"

#dataset.inputs = ["v_gs_b_slew_max", "v_gs_b_slew_min", "q_gate_max_b", "q_gate_rms_b", "v_gs_max_b", "v_rg_max_b", "v_rg_rms_b", "v_ds_on_min_b"]
#outputString = "Cgs_B"

#dataset.inputs = ["v_ds_t_slew_max", "v_ds_t_slew_avg", "v_ds_t_slew_rms", "v_ds_t_slew_min", "q_gate_max_t", "v_gs_max_t", "v_ds_on_max_t", "i_ser_rms", "F"]
#outputString = "Cds_T"

dataset.outputs = [outputString] #Same as the inputs member variable but only one output for our use case. These can be extended to more outputs
dataset.PrePackSims() # Make sure to prepack the data. Without this the data cannot be accessed 
inputLayerSize = len(dataset.inputs) #auto set input and output layer sizes
outputLayerSize = len(dataset.outputs)

trainSet, testSet = dataset.Split(train_test_ratio, generator = None) #Split data into test and train sets
trainLoader = DataLoader(trainSet, batch_size=batchSize, shuffle=True) #Pytorch data loader is used for batching and shuffling. One or train and test
testLoader = DataLoader(testSet, batch_size=batchSize, shuffle=True)

# Initialize the model here
model = FFNN(inputLayerSize, hiddenLayerSize, outputLayerSize, layerSize, activationFunction)
#model = NNCLASS(inputLayerSize, hiddenLayerSize, outputLayerSize, layerSize)
model.cuda() #Make sure to put the model on the device (hopefully a gpu) for speed

# Initialize the loss function
criterion = nn.MSELoss()
#criterion = nn.BCELoss()

# Initialize the optimizer and learning rate
optimizer = torch.optim.Adam(model.parameters(), lr=learningRate)

In [None]:
# If the training is interrupted, you can save the last checkpoint of your model manually here

# The name of the file is automated to include the output parameter, activation function, network size, number of epochs, and a hash of the input vector to differentiate inputs
PATH_CHECKPOINT = ".\\"+checkpointsDir+"__%s_%s_(%d, %d)_%d_%s" % (outputString, activationFunction, hiddenLayerSize, layerSize, epoch,  hashlib.md5(("+".join(dataset.inputs)).encode('utf-8')).hexdigest())
# Pytorch allows you to save snapshots of your model as they train and reload them later for use/testing. This means you don't have to retrain at all, or you can load a model and continue training for that epoch
# This takes a standard dictonary with these parameters
torch.save({
    'epoch': epoch,
    'model_state_dict' : model.state_dict(),
    'optimizer_state_dict' : optimizer.state_dict(),
    'loss' : loss,
}, 
PATH_CHECKPOINT)

In [None]:
losses = [] #comment this out if you want to continue training 
#epochs = 1000
startingEpoch = 0000 #If you are training a snapshot, enter the last epoch here so the progress update is correct

# Training Loop 
for epoch in range(startingEpoch, startingEpoch + epochs):
    for i, (inputs, outputs) in enumerate(trainLoader): #Loop over batches

        # Reset the gradients and put inputs/outputs on the gpu
        optimizer.zero_grad()
        inputs = inputs.to(device)
        outputs = outputs.to(device)

        #Forward Pass
        prediction = model(inputs)
        loss = criterion(prediction, outputs)
        losses.append(loss)

        #Backward Pass
        loss.backward()
        optimizer.step()

        # Save snapshot every 250
        if (epoch + 1) % 250 == 0 and epoch != startingEpoch:
            PATH_CHECKPOINT = ".\\"+checkpointsDir+"\\%s_%s_(%d, %d)_%d_%s" % (outputString, activationFunction, hiddenLayerSize, layerSize, epoch + 1,  hashlib.md5(("+".join(dataset.inputs)).encode('utf-8')).hexdigest() )
            torch.save({
                'epoch': epoch + 1,
                'model_state_dict' : model.state_dict(),
                'optimizer_state_dict' : optimizer.state_dict(),
                'loss' : loss,
            }, 
            PATH_CHECKPOINT)
        
    print("EPOCH(%d/%d)  Loss: %lf" % (epoch+1, startingEpoch + epochs, loss.item()))

In [None]:
# Graph the loss over epochs
plt.plot(torch.Tensor(losses).cpu())
plt.grid()
plt.xlabel("Samples")
plt.ylabel("Loss")
plt.ylim((0.00,0.05))

In [None]:
# Loading a snapshot is done like this
print("Check inputs are correct with: %s" % hashlib.md5(("+".join(dataset.inputs)).encode('utf-8')).hexdigest())
checkpointLoad = torch.load(".\\"+checkpointsDir+"\\Lp_lrelu_(30, 6)_4000_4775d7d5a91afd083d46af026fc93343")

model.load_state_dict(checkpointLoad['model_state_dict'])
optimizer.load_state_dict(checkpointLoad['optimizer_state_dict'])
epoch = checkpointLoad['epoch']
loss = checkpointLoad['epoch']

In [None]:
#Inference Benchmark to get time/sample
import time

start = time.time_ns()
with torch.no_grad():
    for j, (inputs, outputs) in enumerate(testLoader):
        inputs = inputs.to(device)
        outputs = outputs.to(device)
        
        #Forward Pass
        prediction = model(inputs)

end = time.time_ns()
delta = end - start
print(" Delta/Sample [us] %0.3f" % ((delta/1e3)/len(testLoader.dataset)))

In [None]:
# Generate dummy test sets to compare scores too
outputList = torch.tensor(dataset.SimDict[outputString], requires_grad=False)
numSamples = len(testLoader.dataset)
outputVar = outputList.var()
DummySet_ConstMid = outputList.mean()
DummySet_Mode,_ = outputList.mode()
del outputList

predList = torch.empty((0,1))
truthList = torch.empty((0,1))

with torch.no_grad():
    sqrErr = 0
    absErr = 0
    errVar = 0
    dummyAbsErr = torch.tensor([0, 0], dtype=float).to(device)
    dummySqrErr = torch.tensor([0, 0], dtype=float).to(device)

    for j, (inputs, outputs) in enumerate(testLoader):
        inputs = inputs.to(device)
        outputs = outputs.to(device)
        
        #Forward Pass
        prediction = model(inputs)

        # Calculate Stats
        absErr += torch.dist(prediction, outputs, 1)
        sqrErr += torch.dist(prediction, outputs, 2) ** 2

        dummyAbsErr[0] += torch.dist(DummySet_ConstMid, outputs, 1)
        dummyAbsErr[1] += torch.dist(DummySet_Mode, outputs, 1)
        dummySqrErr[0] += torch.dist(DummySet_ConstMid, outputs, 2) ** 2
        dummySqrErr[1] += torch.dist(DummySet_Mode, outputs, 2) ** 2

        predList = torch.concat((predList, prediction.to('cpu')))
        truthList = torch.concat((truthList, outputs.to('cpu')))

    sqrErr /= numSamples
    absErr /= numSamples
    dummyAbsErr /= numSamples
    dummySqrErr /= numSamples
    
    print("MSE: %f" % sqrErr)
    print("RMS: %f" % np.sqrt(sqrErr.to('cpu')))
    print("MAE: %f" % absErr)    
    import sklearn.metrics 
    print("R^2: %f" % sklearn.metrics.r2_score(np.array(truthList), np.array(predList)))    

    
    # Compare to scenario where a hypothetical NN outputs only the mean and mode of the test set. Serves as a baseline metric
    print("\nDummy Set Stats:")
    print("MEAN MSE: %f  MODE MSE: %f" % (dummySqrErr[0], dummySqrErr[1]))
    #print("MEAN RMS: %f  MODE RMS: %f" % (torch.sqrt(dummySqrErr[0]), torch.sqrt(dummySqrErr[1])))
    print("MEAN MAE: %f  MODE MAE: %f" % (dummyAbsErr[0], dummyAbsErr[1]))
    

In [None]:
# Plotting the prediction/test histogram

plotName = "Top On-State Resistance (1500 epochs)"

plt.hist(np.array(truthList), 31)
plt.hist(np.array(predList), 31)
plt.grid()
plt.xlabel("Value")
plt.ylabel("Count")
plt.legend(["Ground Truth", "Predictions"])
plt.title(plotName)
# plt.xlim((0.7,1.0))
plt.show()

In [None]:
#plt.hist(predList[np.array(truthList) == 0], 100)

#plt.boxplot(predList[np.array(truthList) == 0], whis=[0,100])

plt.plot([0,1], linestyle='dotted', color='white')
plt.hist2d(np.array(truthList).flatten(), np.array(predList).flatten(), [50,32])

#plt.violinplot(np.array(predList), showmedians=True)

plt.xlabel("Value", fontsize=15)
plt.ylabel("Count", fontsize=15)
plt.title(plotName, fontsize=15)

In [None]:
# Plotting the truth vs prediction scatter plot
plt.scatter(np.array(truthList), np.array(predList), marker='.')
plt.scatter(np.array(truthList), np.array(predList), marker='.', color='r', alpha=0.008)
plt.grid()
plt.xlabel("Ground Truth")
plt.ylabel("Predictions")
plt.title(plotName) # Title set in histogram

plt.plot([0,1], linestyle='dashed', color='black')
plt.show()

In [None]:
# Box and whisker plot testing

def GetBinData(lower, upper, trueData, data):
    lowerMask = trueData >= lower
    upperMask = trueData < upper
    mask = np.logical_and(lowerMask, upperMask)
    return data[mask]

#dataList = np.array(predList).flatten()

#dat = GetBinData(0.0, 0.3125, np.array(truthList).flatten(), dataList)
#plt.boxplot(dat, whis=[0,100])


#counts, bins = np.histogram(truthList, 32)


dataTruthList = np.array(truthList).flatten()
dataPredList = np.array(predList).flatten()
bins = np.unique(dataTruthList)

fig, ax = plt.subplots(1,1)
fontSize = 18

boxWidth = bins[1] - bins[0]

for i in range(0, len(bins), 1):
    #print("%f -- %f" % (bins[i- 1], bins[i]))

    #dat = GetBinData(bins[i - 1], bins[i], dataTruthList, dataPredList)

    mask = (dataTruthList == bins[i])
    dat = dataPredList[mask]
    boxDict = ax.boxplot(dat, whis=[0,100], positions=[bins[i]], widths=[boxWidth])

    color = (1,0,0)
    boxDict['medians'][0].set_color(color) 
    for x in boxDict['whiskers']:
        x.set_color(color)
    

ax.plot([0,1], linestyle='dashed', color='black', linewidth='1.5')

tickList = [x/10.0 for x in range(0, 12, 2)]
ax.set_xticks(tickList, [str(x) for x in tickList])
ax.set_xlim((-0.1,1.1))
ax.grid()

ax.set_title(plotName, size=fontSize)
ax.set_xlabel("Normalized Value", size=fontSize)
ax.set_ylabel("Count", size=fontSize)
ax.tick_params(labelsize=fontSize)
#plt.scatter(np.array(truthList), np.array(predList), marker='.')




In [None]:
# Fill Between Plot

def GetBinData(lower, upper, trueData, data):
    lowerMask = trueData >= lower
    upperMask = trueData < upper
    mask = np.logical_and(lowerMask, upperMask)
    return data[mask]


dataTruthList = np.array(truthList).flatten()
dataPredList = np.array(predList).flatten()
bins = np.unique(dataTruthList)

fig, ax = plt.subplots(1,1)
fontSize = 20

boxWidth = bins[1] - bins[0]

maxList = []
minList = []
medList = []
avgList = []
medList = []
firstQuartList = []
thirdQuartList = []

for i in range(0, len(bins), 1):
    mask = (dataTruthList == bins[i])
    dat = dataPredList[mask]
    maxList.append(np.max(dat))
    minList.append(np.min(dat))
    avgList.append(np.average(dat))
    medList.append(np.median(dat))
    firstQuartList.append(np.quantile(dat, 0.25))
    thirdQuartList.append(np.quantile(dat, 0.75))

ax.plot([0,1], linestyle='solid', color='r', linewidth='5', zorder=1)

ax.fill_between(bins, maxList, minList, alpha=0.5, linewidth=0, color='dimgray', zorder=0)
ax.fill_between(bins, thirdQuartList, firstQuartList, alpha=1, linewidth=0, color='turquoise', zorder=2)
#ax.plot(bins, avgList, linewidth='1.5')
#ax.plot(bins, medList, linewidth='1.5')
#ax.scatter(bins, avgList)
ax.scatter(bins, medList, color='k', zorder=2)


tickList = [x/10.0 for x in range(0, 12, 2)]
ax.set_xticks(tickList, [str(x) for x in tickList])
ax.set_xlim((-0.025,1.025))
ax.grid(color='k', linewidth=0.5)

ax.set_title(plotName, size=fontSize*0.75)
ax.set_xlabel("Normalized Value", size=fontSize)
ax.set_ylabel("Count", size=fontSize)
ax.tick_params(labelsize=fontSize*0.75)
plt.scatter(np.array(truthList), np.array(predList), marker='.', alpha=0.3,zorder=-1, color='tab:blue')


In [None]:
# Used for testing the classification

with torch.no_grad():
    predList = torch.empty((0,1))
    truthList = torch.empty((0,1))

    for j, (inputs, outputs) in enumerate(testLoader):
        inputs = inputs.to(device)
        outputs = outputs.to(device)
        
        #Forward Pass
        prediction = model(inputs)

        predList = torch.concat((predList, prediction.to('cpu')))
        truthList = torch.concat((truthList, outputs.to('cpu')))

TP = torch.bitwise_and((truthList == 1), (predList > 0.5)).count_nonzero()
FP = torch.bitwise_and((truthList == 1), (predList < 0.5)).count_nonzero()
TN = torch.bitwise_and((truthList != 1), (predList < 0.5)).count_nonzero()
FN =  torch.bitwise_and((truthList != 1), (predList > 0.5)).count_nonzero()

print("Acc: %0.4f" % ((TP+TN)/len(truthList)))
print("Precision: %0.4f" % (TP / (TP + FP)))
print("Recall: %0.4f" % (TP / (TP + FN)))