In [1]:
import pandas as pd
import torch
from torch.autograd import Variable

# Obtains the file and cleans it
FILE_PATH = 'data/Advertising.csv' 
df = pd.read_csv(FILE_PATH).drop('Unnamed: 0', axis=1)

# Obtains our x and y vectors
x = df.TV.to_numpy().reshape(-1, 1)
y = df.Sales.to_numpy().reshape(-1, 1)

x_tensor = Variable(torch.from_numpy(x).type(torch.FloatTensor)) # type: ignore
y_tensor = Variable(torch.from_numpy(y).type(torch.FloatTensor)) # type: ignore


In [2]:
type(x_tensor)

torch.Tensor

In [4]:
import torch.nn as nn
import torch.optim as optim

In [5]:
class StraightLine_v1(nn.Module):
    def __init__(self):
        super().__init__()
        self.m = nn.Parameter(torch.tensor(1.0))
        self.b = nn.Parameter(torch.tensor(1.0))
        
    def forward(self, x):
        return self.m * x + self.b
    

In [6]:
class nLogLikelyhood_v1(nn.Module):
    def __init__(self):
        super().__init__()
        self.f = StraightLine_v1()
        self.sigmaN = nn.Parameter(torch.tensor(1.0))
        
    def forward(self, x, y):
        pred = self.f(x)
        return 0.5 * (torch.log(self.sigmaN ** 2) + 0.5 * ((y - pred) ** 2) / (self.sigmaN ** 2)).sum() 

In [7]:
from torch.distributions.normal import Normal

class nLogLikelyhood_v2(nn.Module):
    def __init__(self):
        super().__init__()
        self.f = StraightLine_v1()
        self.sigmaN = nn.Parameter(torch.tensor(1.0))
        
    def forward(self, x, y):
        pred = self.f(x)
        return (-1.) * Normal(pred, self.sigmaN).log_prob(y).sum()

In [8]:
from torch.distributions.exponential import Exponential

class maxPosterior_v1(nn.Module):
    def __init__(self):
        super().__init__()
        self.f = StraightLine_v1()
        self.sigmaN = nn.Parameter(torch.tensor(1.0))
        
    def forward(self, x, y):
        pred = self.f(x)
        nLogLik = (-1.) * Normal(pred, self.sigmaN).log_prob(y).sum()
        nLogPriorM = (-1.) * Normal(0, 1).log_prob(self.f.m)
        #nLogPriorB = (-1.) * Exponential(2.0).log_prob(self.f.b)
        nLogPriorB = (-1.) * Normal(0, 10).log_prob(self.f.b)
        
        return nLogLik + nLogPriorM + nLogPriorB

In [9]:
class StraightLine_v2(nn.Module):
    def __init__(self):
        super().__init__()
        self.muM = nn.Parameter(torch.tensor(1.0))
        self.sigmaM = nn.Parameter(torch.tensor(1.0))
        self.muB = nn.Parameter(torch.tensor(1.0))
        self.sigmaB = nn.Parameter(torch.tensor(1.0))
        
        self.samples = 100
        
        #self.epsilonM = self.muM.data.new(self.muM.size()).normal_()
        #self.epsilonB = self.muB.data.new(self.muB.size()).normal_()
        
        #self.m = self.muM + torch.exp(self.sigmaM) * self.epsilonM
        #self.b = self.muB + torch.exp(self.sigmaB) * self.epsilonB
        self.m = self.muM + torch.exp(self.sigmaM) * Normal(0, 1).sample(torch.Size([1, self.samples]))
        self.b = self.muB + torch.exp(self.sigmaB) * Normal(0, 1).sample(torch.Size([1, self.samples]))
        #self.m = self.muM + torch.exp(self.sigmaM) * Normal(0, 1).sample()
        #self.b = self.muB + torch.exp(self.sigmaB) * Normal(0, 1).sample()
    
    def forward(self, x):
        #return x * self.m + self.b
        #return self.m * x + self.b
        
        self.m = self.muM + torch.exp(self.sigmaM) * Normal(0, 1).sample(torch.Size([1, self.samples]))
        self.b = self.muB + torch.exp(self.sigmaB) * Normal(0, 1).sample(torch.Size([1, self.samples]))

        return torch.matmul(x, self.m) + self.b.repeat(x.shape[0],1)

In [11]:
class variationalBayes_v1(nn.Module):
    def __init__(self):
        super().__init__()
        self.f = StraightLine_v2()
        self.sigmaN = nn.Parameter(torch.tensor(1.0))
        self.prior_M_std = torch.tensor([1.])
        self.prior_B_std = torch.tensor([10.])
        
    def forward(self, x, y):
        pred = self.f(x)
        
        y_truth = y.reshape(y.shape[0], -1).repeat(1, self.f.samples)
        nLogLik = (-1.) * Normal(pred, self.sigmaN).log_prob(y_truth).sum() / self.f.samples

        nLogPriorM = torch.log(self.prior_M_std) + 0.5 * (self.f.muM / self.prior_M_std) ** 2 + 0.5 * (self.f.sigmaM / self.prior_M_std) ** 2
        nLogPriorB = torch.log(self.prior_B_std) + 0.5 * (self.f.muB / self.prior_B_std) ** 2 + 0.5 * (self.f.sigmaB / self.prior_B_std) ** 2
        
        LogVarPostM = (-1.) * torch.log(self.f.sigmaM) 
        LogVarPostB = (-1.) * torch.log(self.f.sigmaB)
        
        return LogVarPostM + LogVarPostB + nLogLik + nLogPriorM + nLogPriorB

In [12]:
class StraightLine_v3(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.muM = nn.Parameter(torch.tensor(1.0))
        self.sigmaM = nn.Parameter(torch.tensor(1.0))
        self.muB = nn.Parameter(torch.tensor(1.0))
        self.sigmaB = nn.Parameter(torch.tensor(1.0))

        #self.m = m
        #self.b = b
        
    def forward(self, x):
        
        m_epsilons = Variable(self.muM.data.new(self.muM.size()).normal_())
        b_epsilons = Variable(self.muB.data.new(self.muB.size()).normal_())
        
        m_stds = torch.exp(self.sigmaM)
        b_stds = torch.exp(self.sigmaB)
        
        m_sample = self.muM + m_stds * m_epsilons
        b_sample = self.muB + b_stds * b_epsilons
        
        return m_sample * x + b_sample

In [13]:
class variationalBayes_v2(nn.Module):
    def __init__(self, num_samples = 40):
        super().__init__()
        
        self.num_samples = num_samples
        
        self.f = StraightLine_v3()
        
        self.sigmaN = nn.Parameter(torch.tensor(1.0))
        self.prior_M_std = torch.tensor([1.])
        self.prior_B_std = torch.tensor([10.])
        
    def forward(self, x, y):
        
        nLogLik = 0.0
        
        for i in range(self.num_samples):
            
            pred = self.f(x)
            #y_truth = y.reshape(y.shape[0], -1).repeat(1, self.f.samples)
            nLogLik = nLogLik + (-1.) * Normal(pred, self.sigmaN).log_prob(y).sum() 
            
        nLogLik = nLogLik / self.num_samples
 
        nLogPriorM = torch.log(self.prior_M_std) + 0.5 * (self.f.muM / self.prior_M_std) ** 2 + 0.5 * (self.f.sigmaM / self.prior_M_std) ** 2
        nLogPriorB = torch.log(self.prior_B_std) + 0.5 * (self.f.muB / self.prior_B_std) ** 2 + 0.5 * (self.f.sigmaB / self.prior_B_std) ** 2
        
        LogVarPostM = (-1.) * torch.log(self.f.sigmaM) 
        LogVarPostB = (-1.) * torch.log(self.f.sigmaB)
        
        return LogVarPostM + LogVarPostB + nLogLik + nLogPriorM + nLogPriorB

In [None]:
class variationalBayes_v3(nn.Module):
    def __init__(self, num_samples = 40):
        super().__init__()
        
        self.num_samples = num_samples
        
        self.f = StraightLine_v2()
        
        self.sigmaN = nn.Parameter(torch.tensor(1.0))
        self.prior_M_std = torch.tensor([1.])
        self.prior_B_std = torch.tensor([10.])
        
    def forward(self, x, y):
        
        nLogLik = 0.0
        
        for i in range(self.num_samples):
            
            pred = self.f(x)
            #y_truth = y.reshape(y.shape[0], -1).repeat(1, self.f.samples)
            nLogLik = nLogLik + (-1.) * Normal(pred, self.sigmaN).log_prob(y).sum() 
            
        nLogLik = nLogLik / self.num_samples
 
        nLogPriorM = torch.log(self.prior_M_std) + 0.5 * (self.f.muM / self.prior_M_std) ** 2 + 0.5 * (self.f.sigmaM / self.prior_M_std) ** 2
        nLogPriorB = torch.log(self.prior_B_std) + 0.5 * (self.f.muB / self.prior_B_std) ** 2 + 0.5 * (self.f.sigmaB / self.prior_B_std) ** 2
        
        LogVarPostM = (-1.) * torch.log(self.f.sigmaM) 
        LogVarPostB = (-1.) * torch.log(self.f.sigmaB)
        
        return LogVarPostM + LogVarPostB + nLogLik + nLogPriorM + nLogPriorB

In [14]:
from tqdm import tqdm

epochs = 10000

#model = nLogLikelyhood_v1()

#model = nLogLikelyhood_v2()

#model = maxPosterior_v1()

learning_rate = 0.02

model = variationalBayes_v3()
optimizer = optim.Adam(model.parameters(), lr = learning_rate)

for epoch in tqdm(range(epochs), desc="Training..."):
    
    optimizer.zero_grad()
    
    #nLogLik = model(x_tensor, y_tensor)
    #e = torch.mean(nLogLik)
    
    nLogLik = model(x_tensor, y_tensor)
    nLogLik.backward(retain_graph=True)
    #nLogLik.backward()
    
    #e.backward()
    optimizer.step()

Training...:   0%|          | 0/10000 [00:00<?, ?it/s]

Training...: 100%|██████████| 10000/10000 [04:31<00:00, 36.80it/s]


In [15]:
model.f.muM

Parameter containing:
tensor(0.0472, requires_grad=True)

In [16]:
model.f.muB

Parameter containing:
tensor(7.0060, requires_grad=True)

In [17]:
model.sigmaN

Parameter containing:
tensor(3.3132, requires_grad=True)

In [18]:
torch.exp(model.f.sigmaM)

tensor(0.0252, grad_fn=<ExpBackward0>)

In [19]:
torch.exp(model.f.sigmaB)

tensor(1.0498, grad_fn=<ExpBackward0>)