## Imports

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

import numpy as np
import random

import torch
import torch.nn as nn
import torch.nn.functional as F

import math
import time
import pandas as pd

from AFSParser import build_dataset

  from .autonotebook import tqdm as notebook_tqdm


# Check for GPU

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

# Hyperparams

In [None]:
LR = 1e-4 # learning rate
MOM = 0.9 # momentum
NUM_ITER = 10001 # number iterations
WD = 1e-4 # weight decay for l2-regularization
TV = 1e-2 # total variation regularisation 

Z_NUM = 32 # input seed dimension
NGF = 64 # number of filters per layer
NC = 1 # number of channels

GIVEN_MEASUREMENTS = 5 # inverse of proportion of given measurements

# Get the data

In [None]:
# Load the data from the file
X, y0 = build_dataset(True, "/scratch1/04703/sravula/UTAFSDataNew")

# Choose an s4p example from the dataset (arbitrarily choose the first for now)
X, y0 = torch.from_numpy(X[0]).float(), y0[0]
LENGTH = len(X) # number of frequency samples for this s4p chip
NUM_MEASUREMENTS = math.ceil(LENGTH / GIVEN_MEASUREMENTS)
# kept_samples = range(NUM_MEASUREMENTS)
kept_samples = range(0, LENGTH, NUM_MEASUREMENTS)
imputed_samples = [x for x in range(LENGTH) if x not in kept_samples]


# Normalize then graph the data

In [None]:
# TODO: Think about what type of normalization is happening here
def normalise(x):
    y = np.squeeze(x)
    
    mins = np.min(y)
    maxs = np.max(y)
    ranges = maxs - mins
    
    return (x - mins)/ranges

y_normalized = np.zeros_like(y0)
# Normalize all of the curves separately
for i in range(10):
    y_normalized[:, i, 0] = normalise(y0[:, i, 0])
    y_normalized[:, i, 1] = normalise(y0[:, i, 1])


figure, axis = plt.subplots(10, 2, figsize=(100, 100))
for i in range(10):
    axis[i, 0].plot(X, y_normalized[:, i, 0]) # The reals for the i'th s-parameter
    axis[i, 0].set_title("s-param " + str(i) + " reals")
    axis[i, 1].plot(X, y_normalized[:, i, 1]) # The reals for the i'th s-parameter
    axis[i, 1].set_title("s-param " + str(i) + " imag")
plt.show()

# For now, just focus on the REAL values of ONE of the s-params for this chip
y_normalized = y_normalized[:, 0, 0]

meas = y_normalized[kept_samples]
y = torch.from_numpy(meas)

# Step 1: Train a model to learn on one curve

Create the network: (currently using default model from 1D-DIP example)

In [None]:
class DCGAN(nn.Module):
    def __init__(self, nz, ngf=64, output_size=1024, nc=1, num_measurements=64):
        super(DCGAN, self).__init__()
        self.nc = nc
        self.output_size = output_size
        self.num_measurements = num_measurements

        # Deconv Layers: (in_channels, out_channels, kernel_size, stride, padding, bias = false)
        # Inputs: R^(N x Cin x Lin), Outputs: R^(N, Cout, Lout) s.t. Lout = (Lin - 1)*stride - 2*padding + kernel_size

        self.conv1 = nn.ConvTranspose1d(nz, ngf, 4, 1, 0, bias=False)
        self.bn1 = nn.BatchNorm1d(ngf)
        # LAYER 1: input: (random) zϵR^(nzx1), output: x1ϵR^(64x4) (channels x length)

        self.conv2 = nn.ConvTranspose1d(ngf, ngf, 6, 2, 2, bias=False)
        self.bn2 = nn.BatchNorm1d(ngf)
        # LAYER 2: input: x1ϵR^(64x4), output: x2ϵR^(64x8) (channels x length)

        self.conv3 = nn.ConvTranspose1d(ngf, ngf, 6, 2, 2, bias=False)
        self.bn3 = nn.BatchNorm1d(ngf)
        # LAYER 3: input: x2ϵR^(64x8), output: x3ϵR^(64x16) (channels x length)

        self.conv4 = nn.ConvTranspose1d(ngf, ngf, 6, 2, 2, bias=False)
        self.bn4 = nn.BatchNorm1d(ngf)
        # LAYER 4: input: x3ϵR^(64x16), output: x4ϵR^(64x32) (channels x length)

        self.conv5 = nn.ConvTranspose1d(ngf, ngf, 6, 2, 2, bias=False)
        self.bn5 = nn.BatchNorm1d(ngf)
        # LAYER 5: input: x4ϵR^(64x32), output: x5ϵR^(64x64) (channels x length)

        self.conv6 = nn.ConvTranspose1d(ngf, ngf, 6, 2, 2, bias=False)
        self.bn6 = nn.BatchNorm1d(ngf)
        # LAYER 6: input: x5ϵR^(64x64), output: x6ϵR^(64x128) (channels x length)
        
        self.conv7 = nn.ConvTranspose1d(ngf, ngf, 6, 2, 2, bias=False)
        self.bn7 = nn.BatchNorm1d(ngf)
        # LAYER 7: input: x6ϵR^(64x128), output: x7ϵR^(64x256) (channels x length)

        self.conv8 = nn.ConvTranspose1d(ngf, ngf, 6, 2, 2, bias=False)
        self.bn8 = nn.BatchNorm1d(ngf)
        # LAYER 8: input: x7ϵR^(64x256), output: x8ϵR^(64x512) (channels x length)

        self.conv9 = nn.ConvTranspose1d(ngf, nc, 4, 2, 1, bias=False)  # output is image
        # LAYER 9: input: x8ϵR^(64x512), output: G(z,w)ϵR^(1x1024) (channels x length)
        # Deconv Layers: (in_channels, out_channels, kernel_size, stride, padding, bias = false)
        # Inputs: R^(N x Cin x Lin), Outputs: R^(N, Cout, Lout) s.t. Lout = (Lin - 1)*stride - 2*padding + kernel_size

        self.output = nn.Linear(1024, output_size * nc, bias=False)
        # TODO: LAYER 10: not too sure about this, currently taking the output of layer 9 and using linear layer to get correct shape

        self.fc = nn.Linear(output_size * nc, num_measurements, bias=False)  # output is A; measurement matrix
        # each entry should be drawn from a Gaussian (random noisy measurements)
        # don't compute gradient of self.fc! memory issues

    def forward(self, x):
        # print("INPUT", x.shape)
        # x = self.conv1(x)
        # print("Post conv", x.shape)
        # x = F.relu(self.bn1(x))
        # print("Post batch norm", x.shape)
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = F.relu(self.bn5(self.conv5(x)))
        x = F.relu(self.bn6(self.conv6(x)))
        x = F.relu(self.bn7(self.conv7(x)))
        x = F.relu(self.bn8(self.conv8(x)))
        x = F.relu(self.conv9(x))
        x = F.sigmoid(self.output(x)) # Get the output size to match the curve length we expect

        return x

    def measurements(self, x):
        # this gives the image - make it a single row vector of appropriate length
        y = self.forward(x).view(1, -1)

        # pass thru FC layer - returns A*image
        meas = self.fc(y).view(-1, 1)

        if CUDA:
            return meas.cuda()
        else:
            return meas

# Initialize Network

In [None]:
print(Z_NUM, NGF, LENGTH, NC, NUM_MEASUREMENTS)
net = DCGAN(Z_NUM, NGF, LENGTH, NC, NUM_MEASUREMENTS)

net.fc.weight.data = torch.eye(LENGTH)[kept_samples]
net.fc.requires_grad = False

if CUDA: # move network to GPU if available
    net = net.cuda()

# Define input seed z as Torch variable, fill with random normal data
z = torch.zeros(Z_NUM).type(dtype).view(1, Z_NUM, 1)
z.data.normal_().type(dtype)
z._requires_grad()

allparams = [x for x in net.parameters()] #specifies which to compute gradients of
allparams = allparams[:-1] # get rid of last item in list (fc layer) because it's memory intensive
allparams.extend(z)

optim = torch.optim.RMSprop(allparams,lr=LR,momentum=MOM, weight_decay=WD)

In [None]:
def MSE_TV_LOSS(pred, net_meas, y, alpha_TV, dtype):
    TV = torch.sum(torch.abs(pred[:-1, :] - pred[1:, :]))

    mse = torch.nn.MSELoss(reduction='sum').type(dtype)
    MSE = mse(net_meas, y)

    return MSE + alpha_TV * TV

# Training Loop

In [None]:
mse_log_train = np.zeros((NUM_ITER)) #The entire network prediction vs the known measurements
mse_log_test = np.zeros((NUM_ITER)) #The network prediction vs true signal ONLY at unknown values
net_output = np.zeros(LENGTH)

curve = np.zeros((LENGTH, 1))

start = time.time()

for i in range(NUM_ITER):

    optim.zero_grad() # clears graidents of all optimized variables
    out = net(z) # produces curve (in form of data tensor) i.e. G(z,w)
    
    # print("Z shape:", z.shape)
    # print("out shape:", out.shape)
    # print("net.measurements(z) shape:", net.measurements(z).shape)
    # print("y shape:", y.shape)

    loss = MSE_TV_LOSS(out.view(-1,1), np.squeeze(net.measurements(z)), y, TV, dtype) # calculate loss between AG(z,w) and Ay
    """
    wzeros = torch.zeros(out.size())
    if CUDA:
        wzeros = wzeros.cuda()
    out_full = torch.stack((out, wzeros), dim=3)
    fft_out = torch.fft(out_full, signal_ndim=1, normalized=False)
    weight_l1_norm = torch.norm(fft_out, p=1)
    loss = loss + reg_lambda*weight_l1_norm
    """
    
    curve[:,0] = out.data[0].cpu()[0,:] #transfer network output back to cpu to visualize and compare performance

    mse_log_train[i] = np.mean((curve[kept_samples] - meas)**2)
    mse_log_test[i] = np.mean((curve[imputed_samples] - y_normalized[imputed_samples])**2)
    
    if i%100 == 0:
        print(i)
        
        plt.plot(np.arange(LENGTH), curve, label="Network Output")
        plt.plot(np.arange(LENGTH), y_normalized, color='r', label = "True Signal")
        plt.xlabel("Sample")
        plt.ylabel("Value")
        plt.title("Network Prediction vs True Signal")
        plt.legend()
        plt.show()

    if i == NUM_ITER - 1:
        net_output = curve.squeeze()
    
    loss.backward()
    optim.step()

end = time.time()
print("Execution Time: ", round(end-start, 2), "s")