# Beta Decay Regression

This notebook is a simplified version of the beta decay regression notebook in `examples/Beta-Decay.ipy`. Please refer to the original notebook for detailed discussion.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

In [None]:
# Set use_cuda=True to use an available GPU (for reference)
# This doesn't really help for this example though since making plots
# in each training iteration requires copying tensors to the CPU.
use_cuda=False

In [None]:
# Set use_exact_model=True to use a model parameterized
# exactly like the analytic solution
use_exact_model=True

In [None]:
# Analytic Solution & RHS

# initial conditions at "time" 0
y0_t0 = 1
y1_t0 = 0

# exponential decay constant
exp_A = 1

# this is the analytic solution of the system as a function of x
def sol(_x):
    # ya & yb are tensors that let us express the analytic
    # solution of the system in matrix form
    ya = torch.ones(1,2)
    ya[0][0] =  y0_t0
    ya[0][1] = -y0_t0

    yb = torch.ones(1,2)
    yb[0][0] = 0
    yb[0][1] = y0_t0 + y1_t0
    
    return torch.exp(-exp_A * _x) * ya + yb

# this is the analytic derivative of the system w.r.t. x
def rhs(_y):
    yb = torch.ones(1,2)
    yb[0][0] = 0
    yb[0][1] = y0_t0 + y1_t0
    
    return -exp_A * (_y - yb)

# if we're using cuda, then put the tensors
# in our workspace on the GPU.
if use_cuda:
    x = x.cuda()
    x_test = x_test.cuda()

## Generate Training Data

In [None]:
# generate NumSamples for training
NumSamples = 128
NumTest = NumSamples//2

# let's look at a range in x from [0, 10]
xmin = 0
xmax = 10.0

# the range of the solution values is [0,1]
ymin = 0
ymax = 1

# set random seed
torch.manual_seed(42)

x = torch.unsqueeze(torch.linspace(xmin, xmax, NumSamples, requires_grad=True), dim=1)
x_test = torch.unsqueeze(torch.rand(NumTest, requires_grad=True), dim=1) * (xmax-xmin) + xmin

# get the analytic solution as a function of x
y = sol(x)

# get the analytic right-hand-side as a function of y(x)
# f(x) = dy(x)/dx
dydx = rhs(y)

# get the analytic solution at the test points x_test
y_test = sol(x_test)
    
# we will want to propagate gradients through y, dydx, and x
# so make them PyTorch Variables
x = Variable(x, requires_grad=True)
y = Variable(y, requires_grad=True)
dydx = Variable(dydx, requires_grad=True)

# we will need to evaluate gradients w.r.t. x multiple
# times so tell PyTorch to save the gradient variable in x.
x.retain_grad()

## Setting up the Models, Optimizers & Loss Function ...

In [None]:
class HiddenNet(nn.Module):
    def __init__(self, n_independent, n_dependent,
                 n_hidden, hidden_depth, activation):
        super(HiddenNet, self).__init__()
        
        self.activation = activation
        self.input_layer = nn.Linear(n_independent, n_hidden)
        self.hidden_layers = nn.ModuleList()
        for i in range(hidden_depth):
            self.hidden_layers.append(nn.Linear(n_hidden, n_hidden))
        self.output_layer = nn.Linear(n_hidden, n_dependent)
        
    def forward(self, x):
        x = self.activation(self.input_layer(x))
        for h in self.hidden_layers:
            x = self.activation(h(x))
        x = self.output_layer(x)
        return x

In [None]:
class ExactModel(nn.Module):
    def __init__(self):
        super().__init__()

        self.y0_0 = torch.nn.Parameter(torch.tensor(1.0))
        self.y1_0 = torch.nn.Parameter(torch.tensor(1.0))
        self.a = torch.nn.Parameter(torch.tensor(1.0))

    def forward(self, x):
        # ya & yb are tensors that let us express the analytic
        # solution of the system in matrix form
        ya = torch.ones(1,2)
        ya[0][0] =  self.y0_0
        ya[0][1] = -self.y0_0

        yb = torch.ones(1,2)
        yb[0][0] = 0
        yb[0][1] = self.y0_0 + self.y1_0
        
        return torch.exp(-self.a * x) * ya + yb

In [None]:
# Define model
if use_exact_model:
    net = ExactModel()
else:
    net = HiddenNet(n_independent=1, n_dependent=2,
                    n_hidden=2, hidden_depth=0, activation=F.celu)

if use_cuda:
    net.cuda()

print(net)

In [None]:
# Define optimizers
optimizer_sgd = torch.optim.SGD(net.parameters(), lr=0.01, momentum=0.9)
optimizer_adam = torch.optim.Adam(net.parameters(), lr=0.01)

In [None]:
# Define loss function
loss_func = torch.nn.MSELoss()

# Training Loop

In [None]:
def train_error(NumEpochs):
    for t in range(NumEpochs):
        # calculate prediction given the current net state
        prediction = net(x)
        
        # calculate error between prediction and analytic truth y
        loss0 = torch.sqrt(loss_func(prediction, y))

        # calculate gradients d(prediction)/d(x) for each component

        # first, zero out the existing gradients to avoid
        # accumulating gradients on top of existing gradients
        net.zero_grad()

        if x.grad is not None:
            x.grad.data.zero_()

        # now get the gradients dp0/dx
        prediction[:,0].backward(torch.ones_like(prediction[:,0]), retain_graph=True)
        # clone the x gradient to save a copy of it as dp0/dx
        dp0dx = x.grad.clone()
        # clear the x gradient for the loss gradient below
        x.grad.data.zero_()
        
        # get gradient dp1/dx
        prediction[:,1].backward(torch.ones_like(prediction[:,1]), retain_graph=True)
        # clone the x gradient to save a copy of it as dp1/dx
        dp1dx = x.grad.clone()
        # clear the x gradient for the loss gradient below
        x.grad.data.zero_()
        
        dpdx = torch.ones_like(prediction)
        dpdx[:,0] = torch.flatten(dp0dx)
        dpdx[:,1] = torch.flatten(dp1dx)
        
        # evaluate the analytic right-hand-side function at the prediction value
        prhs = rhs(prediction)

        # define the error of the prediction derivative using the analytic derivative
        loss1 = torch.sqrt(loss_func(dpdx, dydx))
        
        # the following doesn't work well :/
        #loss1 = torch.sqrt(loss_func(dpdx, rhs(prediction)))

        # total error combines the error of the prediction (loss0) with 
        # the error of the prediction derivative (loss1)
        loss = loss0 + loss1

        # use the Adam optimizer
        optimizer = optimizer_adam

        # clear gradients for the next training iteration
        optimizer.zero_grad()

        # compute backpropagation gradients
        loss.backward()

        # apply gradients to update the weights
        optimizer.step()
        
        # get error with testing samples
        # first, turn off training
        net.eval()
        
        with torch.no_grad():
            prediction_test = net(x_test)
            test_loss = torch.sqrt(loss_func(prediction_test, y_test)).cpu().data.numpy()

        # turn back on training
        net.train()
        
        # Print epoch/error notifications
        if t%100 == 0:
            print("epoch ", t, " with error: ", loss.item())
    
        # Stop early if our errors are plateauing
        if t > 1000:
            # do a quadratic polynomial fit and see if we will
            # need more than NumEpochs for the error e to vanish:
            # e / (d(e)/d(epoch)) > NumEpochs ?
            # if so, then break out of the training loop ...
            xfit = epochs[-4:]
            efit = losses[-4:]
            coef = np.polyfit(xfit, efit, 2)
            
            if coef[2]/coef[1] > NumEpochs:
                break
    
    print("final testing error: ", test_loss.item())

In [None]:
train_error(1000)

# Converting to Torch Script

In [None]:
script_module = torch.jit.script(net)
script_module.save("betadecay_model.pt")

In [None]:
print(x_test[:10])

In [None]:
import numpy as np

# Convert x_test to numpy array and write to file
xnp = x_test.cpu().data.numpy()
file = open("test_data.txt", "w")

file.write(str(len(xnp)) + "\n")
for row in xnp:
    np.savetxt(file, row)
    
file.close()

In [None]:
# Write y_test to file
ynp = y_test.cpu().data.numpy()

file = open("test_output.txt", "w")

file.write(str(len(ynp)) + "\n")
for row in ynp:
    file.write("{0:.16e}   {1:.16e}\n".format(row[0], row[1]))
    
file.close()