In [1]:
import math
import torch
import numpy as np
import pandas as pd
import tensorflow as tf
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from sklearn import preprocessing
from Training import model, utils, dataset, train
from sklearn.model_selection import train_test_split


import keras.backend as K

KeyboardInterrupt: 

In [None]:
def check_acc(y_hat,y,margin=0.05):
    a_err = (np.abs(y_hat - y)) # get normalized error 
    err = np.divide(a_err, y, out=a_err, where=y!=0)
    assert(err.shape == y.shape)
    num_correct = 0
    for row in err:
        num_in_row = len(np.where(row < margin)[0]) # margin * 100 because 
        if num_in_row == len(row):
            num_correct += 1

    num_samples = y.shape[0]
    correct_idx = np.where(err < margin)
    num_part_correct = len(correct_idx[0])
    num_part_samples = y.shape[0] * y.shape[1]
    print(f"Correct = {num_correct} / {num_samples}")
    return (num_correct/num_samples)
from matplotlib.patches import Ellipse
def plot_cov_ellipse(cov, pos, nstd=2, ax=None, **kwargs):
    """
    Plots an `nstd` sigma error ellipse based on the specified covariance
    matrix (`cov`). Additional keyword arguments are passed on to the 
    ellipse patch artist.
    Parameters
    ----------
        cov : The 2x2 covariance matrix to base the ellipse on
        pos : The location of the center of the ellipse. Expects a 2-element
            sequence of [x0, y0].
        nstd : The radius of the ellipse in numbers of standard deviations.
            Defaults to 2 standard deviations.
        ax : The axis that the ellipse will be plotted on. Defaults to the 
            current axis.
        Additional keyword arguments are pass on to the ellipse patch.
    Returns
    -------
        A matplotlib ellipse artist
    """
    def eigsorted(cov):
        vals, vecs = np.linalg.eigh(cov)
        order = vals.argsort()[::-1]
        return vals[order], vecs[:,order]

    if ax is None:
        ax = plt.gca()

    vals, vecs = eigsorted(cov)
    theta = np.degrees(np.arctan2(*vecs[:,0][::-1]))

    # Width and height are "full" widths, not radius
    width, height = 2 * nstd * np.sqrt(vals)
    ellip = Ellipse(xy=pos, width=width, height=height, angle=theta, **kwargs)

    ax.add_artist(ellip)
    return ellip
def multivariate_gaussian_nll(ypreds, ytrue, var):
    
    diag = torch.exp(var[:,:2]) # convert log-scale var to
    n = ypreds.shape[1] #number of parameters ie number of means (2 gain and bandwidth)
    B = ypreds.shape[0] #Batch size
    
    z = torch.zeros(B)
    o = torch.ones(B)
    D = torch.stack((diag[:,0],z,z,diag[:,1]),dim=1).reshape(B,2,2) # form Diagnol matrix D for LDLT
    L = torch.stack((o,z,var[:,2],o),dim=1).reshape(B,2,2) # form L matrix 
    LT = torch.stack((o,var[:,2],z,o),dim=1).reshape(B,2,2) # form LT matrix (transpose of L)

    sigma = L @ D @ LT   # form sigma inv from LDLT decomp
    ximu =(ytrue-ypreds).reshape(B,2,1)  #true- minus 
    ximuT =(ytrue-ypreds).reshape(B,1,2) # true- minus  transpose

    loss = 0.5*torch.mean(ximuT@sigma@ximu + ((n/2)*(-torch.sum(var[:,:2],axis=1).reshape(B,1))))
    return loss
def formCovMatrix(var):
    diag = np.exp(var[:2])
    z = np.zeros(2)
    o = np.ones(2)
    D = np.array([diag[0],0,0,diag[1]]).reshape(2,2)
    L = np.array([1,0,var[2],1]).reshape(2,2)
    LT = np.array([1,var[2],0,1]).reshape(2,2)
    sigma = L @ D @ LT
    return np.linalg.pinv(sigma)

In [None]:
data = utils.parseGainAndBWCsv2("Data/BW-3000.csv").astype(float)

In [None]:
print(data[0])

In [None]:
data = preprocessing.MinMaxScaler((0,1)).fit_transform(data)
X = data[:,:2]
Y = data[:,2:]

In [None]:
test_model = model.DistModelBatchNorm(2,2)

In [None]:
optimizer = optim.Adagrad(test_model.parameters(),lr=3e-4)
loss_fn = nn.L1Loss()

dataset1 = dataset.CircuitSynthesisGainAndBandwidthManually(Y, X)
train_dataset, val_dataset = utils.splitDataset(dataset1, 0.9)
    
train_data = DataLoader(train_dataset,batch_size = 500)
validation_data = DataLoader(val_dataset, batch_size = 500)

In [None]:
epochs = 1000
loss_list, val_loss_list = train.trainAbsoluteModel(test_model, train_data, loss_fn, optimizer, num_epochs=epochs, print_every=10, validation_data=validation_data)

In [None]:
test_y = []
test_x = []
for i in val_dataset:
    test_y.append(list(i[0]))
    test_x.append(list(i[1]))
test_y = np.array(test_y)
test_x = np.array(test_x)

In [None]:
test_model.eval()
x_preds = test_model(torch.Tensor(test_y))
#print(x_preds)
mock_simulator = tf.keras.models.load_model('mock_simulator2.0')
#mock_simulator = mockSimulator

final_preds = []
for i in range(x_preds.shape[0]):
    means = x_preds.detach().numpy()[i,:2].T
    w,r = means
    y_hat = mock_simulator(means.reshape(1,2)) # feed back through simulator to get evaluated performance
    final_preds.append(y_hat)
final_preds = np.array(final_preds).reshape(-1,2)
    


In [None]:
check_acc(test_y,final_preds,margin=.05)

In [None]:
fig, axes = plt.subplots(figsize=(36, 12))
axes.get_xaxis().set_visible(False) # remove erroreas graph axis
axes.get_yaxis().set_visible(False)


xlim = [test_y[:,0].min(),test_y[:,0].max()]
ylim = [test_y[:,1].min(),test_y[:,1].max()]


ax = fig.add_subplot(121)
plt.plot(test_y[:,0],test_y[:,1], ".")
ax = fig.add_subplot(122)
ax.set_xlim(xlim)
ax.set_ylim(ylim)
plt.plot(final_preds[:,0],final_preds[:,1], ".")

plt.show()

In [None]:
plt.plot(loss_list)
plt.plot(val_loss_list)

In [None]:
final_preds = []
for i in range(x_preds.shape[0]):
    means = test_x[i,:2].T
    w,r = means
    y_hat = mock_simulator(means.reshape(1,2)) # feed back through simulator to get evaluated performance
    final_preds.append(y_hat)
final_preds = np.array(final_preds).reshape(-1,2)
check_acc(test_y,final_preds,margin=.05)

In [None]:
test_gaussian_model = model.DistModelBatchNorm(2,5)

In [None]:
optimizer_gaussian = optim.Adagrad(test_gaussian_model.parameters(),lr=0.001)
loss_gaussian_fn = multivariate_gaussian_nll

In [None]:
epochs = 1000
loss_list, val_loss_list = train.trainProbModel(test_gaussian_model, train_data, loss_gaussian_fn, optimizer_gaussian, num_epochs=epochs, print_every=10, validation_data=validation_data)

In [None]:
test_gaussian_model.eval()
x_preds = test_gaussian_model(torch.Tensor(test_y))

mock_simulator = tf.keras.models.load_model('mock_simulator2.0')

final_g_preds = []
for i in range(x_preds.shape[0]):
    means = x_preds.detach().numpy()[i,:2].T
    w,r = means
    y_hat = mock_simulator(means.reshape(1,2)) # feed back through simulator to get evaluated performance
    final_g_preds.append(y_hat)
final_g_preds = np.array(final_g_preds).reshape(-1,2)
    

# for i,d in enumerate(final_preds):
#     print(Y[i],d)
check_acc(test_y,final_g_preds,margin=.05)

In [None]:
plt.plot(loss_list)
plt.plot(val_loss_list)