In [1]:
'''from google.colab import drive
drive.mount('/content/drive')'''

Mounted at /content/drive


In [2]:
import torch
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.pyplot as plt
import os
from scipy.stats import norm
import torch
import torch.nn as nn
from torch.autograd import Variable
import numpy as np
import torch.nn.functional as F
import seaborn as sns
import pandas as pd

device = 'cuda'#'cpu'#


## Loss functions

In [3]:
def pacejka(x, B, C, D, E):
    if len(x.size()) == 3:
        x = torch.squeeze(x, 1)
        # expand parameters from [bs] to [bs, 100]
        # so that batch can be handled without running loops
        B = torch.reshape(B, (-1, 1)).expand(-1, x.size(1))
        C = torch.reshape(C, (-1, 1)).expand(-1, x.size(1))
        D = torch.reshape(D, (-1, 1)).expand(-1, x.size(1))
        E = torch.reshape(E, (-1, 1)).expand(-1, x.size(1))
    # print(B.size())
    # print(x.size())
    y = D * torch.sin(C * torch.arctan(B * x * (1 - E) + E * torch.arctan(B * x)))
    return y

def magic_formula_loss(x, real_bcde, predicted_bcde):
    # print(real_bcde.shape)
    B_min, B_max = 4, 12
    C_min, C_max = 1, 2
    D_min, D_max = 0.1, 1.9
    E_min, E_max = -10, 1

    # real_bcde[:, :, 0] = real_bcde[:, :, 0]*(B_max - B_min) + B_min
    # real_bcde[:, :, 1] = real_bcde[:, :, 1]*(C_max - C_min) + C_min
    # real_bcde[:, :, 2] = real_bcde[:, :, 2]*(D_max - D_min) + D_min
    # real_bcde[:, :, 3] = real_bcde[:, :, 3]*(E_max - E_min) + E_min

    # predicted_bcde[:, 0] = predicted_bcde[:, 0]*(B_max - B_min) + B_min
    # predicted_bcde[:, 1] = predicted_bcde[:, 1]*(C_max - C_min) + C_min
    # predicted_bcde[:, 2] = predicted_bcde[:, 2]*(D_max - D_min) + D_min
    # predicted_bcde[:, 3] = predicted_bcde[:, 3]*(E_max - E_min) + E_min


    real_y = pacejka(x, real_bcde[:, :, 0], real_bcde[:, :, 1], real_bcde[:, :, 2], real_bcde[:, :, 3])
    predicted_y = pacejka(x, predicted_bcde[:, 0], predicted_bcde[:, 1], predicted_bcde[:, 2], predicted_bcde[:, 3])
    loss = nn.MSELoss()(real_y, predicted_y)
    return loss


def range_condition_loss(B, C, D, E):
    B_min, B_max = 4, 12
    C_min, C_max = 1, 2
    D_min, D_max = 0.1, 1.9
    E_min, E_max = -10, 1

    # B = B*(B_max - B_min) + B_min
    # C = C*(C_max - C_min) + C_min
    # D = D*(D_max - D_min) + D_min
    # E = E*(E_max - E_min) + E_min

    loss = F.relu(B_min - B) + F.relu(B - B_max)
    loss += F.relu(C_min - C) + F.relu(C - C_max)
    loss += F.relu(D_min - D) + F.relu(D - D_max)
    loss += F.relu(E_min - E) + F.relu(E - E_max)
    loss = loss.mean()
    return loss


def slope_loss(B, C, D, predB, predC, predD):
    B_min, B_max = 4, 12
    C_min, C_max = 1, 2
    D_min, D_max = 0.1, 1.9

    # B = B*(B_max - B_min) + B_min
    # C = C*(C_max - C_min) + C_min
    # D = D*(D_max - D_min) + D_min

    # predB = predB*(B_max - B_min) + B_min
    # predC = predC*(C_max - C_min) + C_min
    # predD = predD*(D_max - D_min) + D_min

    original_slope = B * C * D
    predicted_slope = predB * predC * predD
    loss = nn.MSELoss()(original_slope, predicted_slope)
    return loss


def peak_loss(x_real_peak, y_real_peak, x_pred_peak, y_pred_peak):
    loss = nn.MSELoss()(x_real_peak, x_pred_peak) + nn.MSELoss()(y_real_peak, y_pred_peak)
    return loss


## Generating data




### Functions

In [4]:
def generate_data(x_grid_size=100, resampled_len=10, batch_size=100, noise_sd=0.04):
    x = torch.linspace(0, 1, x_grid_size)

    B_list = torch.linspace(4, 12, 9)
    C_list = torch.linspace(1, 2, 11)
    D_list = torch.linspace(0.1, 1.9, 21)
    E_list = torch.linspace(-10, 1, 11)

    resampled_mu_noisy = torch.zeros(size=(batch_size, resampled_len))
    resampled_x = torch.zeros(size=(batch_size, resampled_len))
    parameters = torch.zeros(size=(batch_size, 4))
    clean_mu = torch.zeros(size=(batch_size, x_grid_size))

    for i in range(batch_size):
        B_idx = torch.randint(high=B_list.size(0), size=(1,))
        C_idx = torch.randint(high=C_list.size(0), size=(1,))
        D_idx = torch.randint(high=D_list.size(0), size=(1,))
        E_idx = torch.randint(high=E_list.size(0), size=(1,))

        B = B_list[B_idx]
        C = C_list[C_idx]
        D = D_list[D_idx]
        E = E_list[E_idx]

        mu_clean = pacejka(x, B, C, D, E)

        mu_noisy = noise_maker(mu_clean, noise_sd)
        idx_resample = torch.randint(high=x_grid_size, size=(resampled_len,))
        resampled_mu_noisy[i, :] = mu_noisy[idx_resample]
        resampled_x[i, :] = x[idx_resample]
        parameters[i, 0] = B
        parameters[i, 1] = C
        parameters[i, 2] = D
        parameters[i, 3] = E
        clean_mu[i, :] = mu_clean

    return resampled_mu_noisy, resampled_x, parameters, clean_mu

In [5]:
def noise_maker(data, sd):
    low = -sd
    high = +sd
    noise = (high - low) * torch.rand(size=data.size())
    noisy_data = data + noise
    return noisy_data

### Generated data

In [None]:
resampled_mu_noisy, resampled_x, parameters, clean_mu = generate_data(x_grid_size=100, resampled_len=10, batch_size=25000, noise_sd=0.04)
resampled_mu_noisy = resampled_mu_noisy.unsqueeze(1)
resampled_x = resampled_x.unsqueeze(1)
parameters = parameters.unsqueeze(1)
clean_mu = clean_mu.unsqueeze(1)
resampled_mu_noisy = resampled_mu_noisy.to(device)
resampled_x = resampled_x.to(device)
scaled_parameters = torch.zeros_like(parameters)
scaled_parameters[:,:,0] = (parameters[:,:,0]-parameters[:,:,0].min())/(parameters[:,:,0].max()-parameters[:,:,0].min())
scaled_parameters[:,:,1] = (parameters[:,:,1]-parameters[:,:,1].min())/(parameters[:,:,1].max()-parameters[:,:,1].min())
scaled_parameters[:,:,2] = (parameters[:,:,2]-parameters[:,:,2].min())/(parameters[:,:,2].max()-parameters[:,:,2].min())
scaled_parameters[:,:,3] = (parameters[:,:,3]-parameters[:,:,3].min())/(parameters[:,:,3].max()-parameters[:,:,3].min())
parameters = parameters.to(device)
scaled_parameters = scaled_parameters.to(device)
clean_mu = clean_mu.to(device)
print("resampled_mu_noisy: ", resampled_mu_noisy.shape)
print("resampled_x: ", resampled_x.shape)
print("parameters: ", parameters.shape)
print("clean_mu: ", clean_mu.shape)

#### Data Shuffling

In [None]:
idx_train, idx_val = torch.utils.data.random_split(torch.arange(0, 25000), [20000, 5000])
idx_train = idx_train.indices
idx_val = idx_val.indices

## CWGAN

In [6]:
class Generator(nn.Module):
  def __init__(self):
    super(Generator, self).__init__()
    self.conv1 = torch.nn.Conv1d(1, 4, kernel_size=3, stride=1, padding=1)
    self.pool = torch.nn.MaxPool1d(kernel_size=2, stride=2, padding=0)
    #self.conv2 = torch.nn.Conv1d(8, 16, kernel_size=3, stride=1, padding=1)
    self.fc1 = torch.nn.Linear(48, 24)
    self.fc2 = torch.nn.Linear(24, 12)
    self.fc3 = torch.nn.Linear(12, 10)
    self.fc4 = torch.nn.Linear(10, 12)
    self.fc5 = torch.nn.Linear(12, 24)
    self.fc6 = torch.nn.Linear(24, 4)
  def forward(self, y_sample, x_sample, z):
    # print(x_sample.shape)
    # print(y_sample.shape)
    # print(z.shape)
    conditional_tensor = torch.cat((y_sample, x_sample,  z), dim=2)
    # print(conditional_tensor.shape)
    x = torch.relu(self.conv1(conditional_tensor))
    x = self.pool(x)
    #print(x.shape)
    x = x.view(-1, x.size(-2) * x.size(-1))
    #print(x.shape)
    x = torch.relu(self.fc1(x))
    x = torch.relu(self.fc2(x))
    x = torch.relu(self.fc3(x))
    x = torch.relu(self.fc4(x))
    x = torch.relu(self.fc5(x))
    x = self.fc6(x)
    # x[:,0:3] = torch.abs(x[:,0:3])
    # x[:,1] = torch.exp(x[:,1])
    # x[:,2] = torch.exp(x[:,2])
    # print(x[:,0:3].shape)
    return x

In [None]:
# G = Generator()

In [None]:
# x = torch.linspace(0, 1, 100)
# G(resampled_mu_noisy[idx_train, :, :], resampled_x[idx_train, :, :], torch.randn_like(clean_mu)[idx_train, :, :])

In [None]:
class Discriminator(nn.Module):
  def __init__(self):
    super(Discriminator, self).__init__()
    self.fc1 = nn.Linear(24, 32)  # Assuming mu and sigma are 2-dimensional
    self.fc2 = nn.Linear(32, 16)
    self.fc3 = nn.Linear(16, 8)
    self.fc4 = nn.Linear(8, 4)
    self.fc5 = nn.Linear(4, 1)

  def forward(self, y_sample, x_sample, y_hat):
    y = torch.cat((y_sample, x_sample, y_hat), dim = 2)
    # print("Disc>>>>>")
    # print(y.shape)
    critic = torch.relu(self.fc1(y))
    critic = torch.relu(self.fc2(critic))
    critic = torch.relu(self.fc3(critic))
    critic = torch.relu(self.fc4(critic))
    return self.fc5(critic)

In [None]:
# D = Discriminator()

In [None]:
# D(resampled_mu_noisy[idx_train, :, :], resampled_x[idx_train, :, :], torch.randn_like(clean_mu)[idx_train, :, :])

In [None]:
def calculate_gradient_penalty(y_sample_true, x_sample_true, y_true,
        y_sample_fake, x_sample_fake, y_hat_fake,
                               D):
  # print("gp>>>>>")
  # print(y_sample_true.shape)
  # print(x_sample_true.shape)
  # print(y_true.shape)
  # print(y_sample_fake.shape)
  # print(x_sample_fake.shape)
  # print(y_hat_fake.shape)


  real = torch.cat((y_sample_true, x_sample_true, y_true), dim=2)
  fake = torch.cat((y_sample_fake, x_sample_fake, y_hat_fake), dim=2)
  # print(fake.shape)
  grad_penalty2 = 0
  for delta in np.random.uniform(0, 1, 30):
        # Linearly interpolate between real and fake samples
    interpolated = delta * real + (1 - delta) * fake
        #print(interpolated[:,0:-2].shape)
        # Calculate gradients of probabilities with respect to examples
    interpolated.requires_grad_(True)
        #print("interpolated: ", interpolated.size())
        #print("mu: ", interpolated[:,:,-2].size())
        #print("sigma: ", interpolated[:,:,-1].size())
    # print(interpolated.shape)
    # print(interpolated[:,:,0:10].shape)
    # print(interpolated[:,:,10:20].shape)
    # print(interpolated[:,:,20::].shape)
    prob_interpolated = D(interpolated[:,:,0:10], interpolated[:,:,10:20], interpolated[:,:,20::])
    gradients = torch.autograd.grad(outputs=prob_interpolated, inputs=interpolated,
                                         grad_outputs=torch.ones_like(prob_interpolated),
                                         create_graph=True, retain_graph=True)[0]

        # Calculate gradient penalty
    grad_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
    grad_penalty2 += grad_penalty

    # Return the average gradient penalty
  return grad_penalty2 / 30

In [None]:
D = Discriminator().to(device)
G = Generator().to(device)
D_optimizer = torch.optim.AdamW(D.parameters(), betas=(0.5, 0.9), lr=.0001)# use `adam`
G_optimizer = torch.optim.AdamW(G.parameters(), betas=(0.5, 0.9), lr=.0001)# use `adam` #betas=(0.5, 0.9)

In [None]:
D_losses = []
G_losses = []
G_losses_test = []
Wasserstein_DS = []
gradient_penaltys = []
num_epochs = 600000
batch_size = 20000
min_val_loss = float("inf")
for epoch in range(num_epochs):
  #print(epoch)
  G.zero_grad()
  D.zero_grad() # change the zero grad and move it before you call the D for the first time
    #===========================================================================
  z = torch.randn_like(parameters[idx_train, :,:])
  D_loss = []
  for i in range(2):
    y_sample_true = resampled_mu_noisy[idx_train, :, :]
    x_sample_true = resampled_x[idx_train, :, :]
    y_true = parameters[idx_train,:,:]

    D_real_decision = D(y_sample_true,
                        x_sample_true,
                        y_true)
    D_real_loss = (D_real_decision.mean())

    y_sample_fake = resampled_mu_noisy[idx_train, :, :]#torch.randn_like(resampled_mu_noisy[idx_train, :, :])
    x_sample_fake = resampled_x[idx_train, :, :]#torch.randn_like(resampled_x[idx_train, :, :])
    y_hat_fake = G(y_sample_fake,
                            x_sample_fake,
                            torch.randn_like(z))
    D_fake_decision = D(y_sample_fake, x_sample_fake, y_hat_fake.unsqueeze(1))
    #print("done")
    D_fake_loss = (D_fake_decision.mean())

    gradient_penalty = calculate_gradient_penalty(
        y_sample_true, x_sample_true, y_true,
        y_sample_fake, x_sample_fake, y_hat_fake.unsqueeze(1),
        D
        )

    Wasserstein_D = D_real_loss - D_fake_loss
    lamb = 100
    D_loss = -Wasserstein_D + (lamb*gradient_penalty)
    Wasserstein_DS.append(Wasserstein_D.item())
    gradient_penaltys.append(lamb*gradient_penalty.item())
    # Back propagation
    D_loss.backward()
    D_optimizer.step()

  y_hat = G(y_sample_true, x_sample_true, z)
  #print("input_data>>>D")
  D_fake_decision = D(y_sample_true, x_sample_true, y_hat.unsqueeze(1))
  D_fake_loss = (D_fake_decision.mean())
  lamb_G = -1
  mf_loss = 100*magic_formula_loss(x_sample_true, parameters[idx_train, :, :], y_hat)
  rc_loss = 100*range_condition_loss(y_hat[:, 0], y_hat[:, 1], y_hat[:, 2], y_hat[:, 3])
  s_loss = 100*slope_loss(parameters[idx_train, 0, 0], parameters[idx_train, 0, 1], parameters[idx_train, 0, 2],
                                              y_hat[:, 0], y_hat[:, 1], y_hat[:, 2])
  MSELOSS = 100*nn.MSELoss()(y_hat, parameters[idx_train, 0 ,:])
  D_fake_loss = lamb_G*D_fake_loss
  G_loss = (D_fake_loss) + mf_loss + rc_loss + s_loss + MSELOSS# add loss

  G_loss.backward()
  G_optimizer.step()

  D_losses.append(D_loss.item())
  #print(D_losses)
  G_losses.append(G_loss.item())

  # Validation
  x_sample_test = resampled_x[idx_val, :, :]
  y_resample_test = resampled_mu_noisy[idx_val, :, :]
  y_hat_test = G(y_resample_test,
                 x_sample_test,
                 .5*torch.ones_like(parameters[idx_val,:, :]))
  D_fake_decision_test = D(y_resample_test, x_sample_test, y_hat_test.unsqueeze(1))
  D_fake_loss_test = (D_fake_decision_test.mean())

  mf_loss_test = 100*magic_formula_loss(x_sample_test, parameters[idx_val, :, :], y_hat_test)
  rc_loss_test = 100*range_condition_loss(y_hat_test[:, 0], y_hat_test[:, 1], y_hat_test[:, 2], y_hat_test[:, 3])
  s_loss_test = 100*slope_loss(parameters[idx_val, 0, 0], parameters[idx_val, 0, 1], parameters[idx_val, 0, 2],
                                              y_hat_test[:, 0], y_hat_test[:, 1], y_hat_test[:, 2])
  MSELOSS_test = 100*nn.MSELoss()(y_hat_test, parameters[idx_val, 0 ,:])
  D_fake_loss_test = lamb_G*D_fake_loss_test
  G_loss_test = (D_fake_loss_test) + mf_loss_test + rc_loss_test + s_loss_test + MSELOSS_test# add loss
  G_losses_test.append(G_loss_test.item())
  if (G_loss_test.item() < min_val_loss):
    min_val_loss = G_loss_test.item()
    torch.save({
            'epoch': epoch,
            'G_state_dict': G.state_dict(),
            'D_state_dict': D.state_dict(),
            'G_optimizer_state_dict': G_optimizer.state_dict(),
            'D_optimizer_state_dict': D_optimizer.state_dict(),
            'G_loss': G_losses,
            'D_loss': D_loss
            }, "/content/drive/MyDrive/ML_spring2024/best_model_conv4_2.pth")

  epoch_test = 3000
  if (((epoch%epoch_test == 0))| (epoch == num_epochs)):



    plt.figure(figsize=(18,13))
    plt.subplot(331)
    # Plot D_losses on the first y-axis (left) and G_losses on the second y-axis (right)
    plt.plot(D_losses, color='blue')
    plt.ylabel('D_losses')

    plt.subplot(332)
    plt.plot(G_losses, color='red')
    plt.ylabel('G_losses')

    plt.subplot(333)
    # Plot KDE plots of input data and predicted data
    plt.bar(["W", "MF", "RC", "SL", "MSE"], (D_fake_loss.item(), mf_loss.item(), rc_loss.item(), s_loss.item(), MSELOSS.item()), color=['blue', 'orange', 'green', 'maroon', 'gold'])

    plt.subplot(334)
    x_values = y_hat_test[:,0].cpu().detach().numpy()
    y_values = parameters[idx_val,0,0].cpu().detach().numpy()
    plt.scatter(x_values, y_values, color = 'k')
    plt.axline([4, 4], [12, 12], color = 'r')
    # plt.axline([0, 0], [1, 1], color = 'r')
    plt.xlabel("Predicted B")
    plt.ylabel("Actual B")


    plt.subplot(335)
    plt.bar(["W", "GP"], (Wasserstein_DS[-1], gradient_penaltys[-1]), color=['blue', 'orange'])

    plt.subplot(336)
    plt.plot(G_losses_test, color='orange')
    plt.ylabel('G_losses_val')


    plt.subplot(337)
    x_values = y_hat_test[:,1].cpu().detach().numpy()
    y_values = parameters[idx_val,0,1].cpu().detach().numpy()
    plt.scatter(x_values, y_values, color = 'k')
    plt.axline([1, 1], [2, 2], color = 'r')
    # plt.axline([0, 0], [1, 1], color = 'r')
    plt.xlabel("Predicted C")
    plt.ylabel("Actual C")

    plt.subplot(338)
    x_values = y_hat_test[:,2].cpu().detach().numpy()
    y_values = parameters[idx_val,0,2].cpu().detach().numpy()
    plt.scatter(x_values, y_values, color = 'k')
    plt.axline([.1, .1], [1.9, 1.9], color = 'r')
    # plt.axline([0, 0], [1, 1], color = 'r')
    plt.xlabel("Predicted D")
    plt.ylabel("Actual D")

    plt.subplot(339)
    x_values = y_hat_test[:,3].cpu().detach().numpy()
    y_values = parameters[idx_val,0,3].cpu().detach().numpy()
    plt.scatter(x_values, y_values, color = 'k')
    plt.axline([-10, -10], [1, 1], color = 'r')
    # plt.axline([0, 0], [1, 1], color = 'r')
    plt.xlabel("Predicted E")
    plt.ylabel("Actual E")

    plt.tight_layout()
    plt.show()