In [None]:
import numpy as np
import scipy.stats as st
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
sns.set_context('paper')
sns.set_style('white')
# A helper function for downloading files
import requests
import os
def download(url, local_filename=None):
    """
    Downloads the file in the ``url`` and saves it in the current working directory.
    """
    data = requests.get(url)
    if local_filename is None:
        local_filename = os.path.basename(url)
    with open(local_filename, 'wb') as fd:
        fd.write(data.content)

# Hands-on Activity 26.3 (Physics-informed regularization: Solving uncertainty propagation problems) 

## Objectives

+ Learn how to solve stochastic PDEs with neural networks.

## References

+ https://doi.org/10.1016/j.jcp.2019.109120
+ https://github.com/PredictiveScienceLab/variational-elliptic-SPDE

## Notes

In this hands-on activity we implement the method of [our paper](https://doi.org/10.1016/j.jcp.2019.109120) to solve high-dimensional elliptic PDEs in PyTorch.
The original implementation is in TensorFlow and can be found in [this](https://github.com/PredictiveScienceLab/variational-elliptic-SPDE) github repository.
However, we solve slightly different problems from the paper.
This is because we do not want to go through the trouble of introducing the details of the random fields of the paper.
Also, note that it is not our intention to provide code that is 100% functional.
This is highly nontrivial.
The code below takes a lot of time to run (~5 hours) and it does not always produce a sufficiently converged network that solves the stochastic PDE.
To fix this, one would have to experiment with the network structure and the details of the optimization algorithm.
So, consider what follows as an example of how you would begin your journey into physics-informed NN for UQ - not as the end.
If you are interested in a complete example that works, visit our github repository and run our code there.

In [None]:
import numpy as np
import torch
import torch.nn as nn

# This is useful for taking derivatives:
def grad(outputs, inputs):
    return torch.autograd.grad(outputs, inputs, grad_outputs=torch.ones_like(outputs), create_graph=True)[0]

## Solving stochastic PDEs

Consider the boundary value problem we introduced back in [Hands-on Activity 10.4](http://localhost:8888/notebooks/me597/data-analytics-se/activities/hands-on-10.4.ipynb). We have a heterogeneous rod with no heat sources.
The temperature of the rod is governed by the steady state heat equation on:
$$
\frac{d}{dx}\left(c(x,\xi)\frac{d}{dx}T(x,\xi)\right) = 0,
$$
for $x$ between $0$ and $1~\;\mbox{m}$ and boundary values:
$$
T(0) = 400^\circ\mbox{C}\;\mbox{and}\;T(1~\mbox{m}) = 4^\circ\mbox{C}.
$$
We are interested in cases in which we are uncertain about the conductivity, $c(x,\xi)$.
Here $\xi$ captures all the information that you need to specify the conductivity.
Let's recall the model we used for the conductivity back in 10.4:

Change units:
$$
\hat{T} = \frac{T}{T_0}.
$$
Then:
$$
\frac{d\hat{T}}{dx} = \frac{dT}{dx}T_0^{-1}
$$

In [None]:
class Rod(object):
    
    """
    A class representing a rod made out of different materials.
    
    Arguments:
    
    segment_coords   -   Coordinates of the segments (both left and right). Total N + 1
                         if segments are N.
    mat_id           -   The material ids on each segment. Total N. Values must start
                         at 0 and go sequential to the maximum number of materials
                         we have on the rod.
    mat_cond         -   The conductivity associaed with each unique material id.
    """
    
    def __init__(self, segment_coords, mat_id, mat_cond):
        # Save segments for later
        self.segment_coords = np.sort(segment_coords)
        # The total number of segments
        self.num_segments = self.segment_coords.shape[0] - 1
        # Save the material id on each segment for later
        self.mat_id = mat_id
        # The number of unique materials
        self.num_mat = len(np.unique(self.mat_id))
        # The conductivity on each segment
        self.segment_cond = np.array([mat_cond[m] for m in mat_id])
        
    def _get_conductivity(self, x):
        """
        Evaluate the conductivity at location x, assuming x is a scalar.
        """
        # Find the segment that contains x
        for i in range(self.num_segments):
            if self.segment_coords[i] <= x and x <= self.segment_coords[i + 1]:
                return self.segment_cond[i]
            
    def get_conductivity(self, x):
        """
        Evaluate the conductivity at location x. This works when x is a scalar or
        a numpy array.
        """
        if isinstance(x, float):
            return self._get_conductivity(x)
        # This checks if x is a numpy array. The function will fail other wise.
        assert isinstance(x, np.ndarray)
        # And it will only work with 1D arrays
        assert x.ndim == 1
        # This evaluates the conductivity at all the elements of x and returns
        # a numpy array
        return np.array([self._get_conductivity(xx) for xx in x])
    
    def __repr__(self):
        """
        Get a string representation of the rod.
        """
        s = 'SEGID\tLeft\tRight\tMat.\tCond.\n'
        s += '-' * 37 + '\n'
        for i in range(self.num_segments):
            s += ' {0:d}\t{1:1.2f}\t{2:1.2f}\t{3:d}\t{4:1.2f}\n'.format(i, segment_coords[i],
                                                                 segment_coords[i+1],
                                                                 material_on_each_segment[i],
                                                                 segment_cond[i])
        return s
    
    def plot(self, ax=None):
        """
        Plots the bar. Returns the axes object on which the rod is plot.
        
        Arguments:
        
        ax    -    An axes object to plot on. If not given, a new one will be created.
        """
        from matplotlib.patches import Rectangle
        if ax is None:
            fig, ax = plt.subplots()
        for i in range(self.num_segments):
            mat_segment = Rectangle((self.segment_coords[i], -0.1), 
                                    width=self.segment_coords[i+1] - self.segment_coords[i],
                                    height=0.1, color=sns.color_palette()[self.mat_id[i]])
            ax.add_patch(mat_segment)
        ax.set_ylim(-0.1, 1.1)
        return ax
    

class RandomRod(object):
    
    """
    A class modeling a random rod.
    
    Arguments:
    
    av_num_segments    -    The ``rate`` of the Poisson distribution giving
                            the number of segments.
    mat_probs          -    The probabilities of the categorical specifying
                            each material goes on each segment.
    mat_cond         -   The conductivity associaed with each unique material id.
    """
    
    def __init__(self, av_num_segments, mat_probs, mat_cond):
        # Save some info internally for later usage
        self.av_num_segments = av_num_segments
        self.mat_probs = mat_probs
        self.mat_cond = mat_cond
        # Make the Poisson variable corresponding to the number of segments
        self.D = st.poisson(av_num_segments)
        # Make the Uniform giving the location of the segment coordinates
        self.U = st.uniform()
        # Make the Categorical random variable that allows us to pick the material
        # id on each segment
        self.M = st.rv_discrete(name='M', values=(np.arange(len(mat_probs)), mat_probs))
        
    def _rvs(self):
        """
        Generate a single random rod.
        """
        # Draw the number of segments
        d = self.D.rvs()
        # Draw the coordinates
        segment_coords = np.hstack([[0.0], np.sort(self.U.rvs(size=d - 1)), [1.0]])
        # Draw the material type on each segment
        mat_ids = self.M.rvs(size=d)
        # Generate the rod
        return Rod(segment_coords, mat_ids, self.mat_cond)
    
    def rvs(self, size=1):
        """
        Generate many random rods.
        """
        if size == 1:
            return self._rvs()
        return [self._rvs() for _ in range(size)]

In [None]:
# Create the random rod object
R = RandomRod(20,         # The number of segments
              [0.3, 0.7],  # The concentration of each material
              [0.045, 38.0])  # The thermal conductivity of each material
# Let's sample a few of them and plot them
for n in range(5):
    fig, ax = plt.subplots(dpi=50)
    R.rvs().plot(ax=ax)

We also had a solver for the equation for random rods.
Let's bring it back.

In [None]:
# Fipy is used to solve the boundary value problem using a finite volume scheme
# You may have to install it if you don't have it using:
# RUN THIS BLOCK IF YOU HAVEN'T INSTALLED fipy yet
!pip install fipy

In [None]:
import fipy

class SteadyStateHeat1DSolver(object):
    
    """
    Solves the 1D steady state heat equation with dirichlet boundary conditions.
    It uses the stochastic model we developed above to define the random conductivity.
    
    Arguments:
    nx          -    Number of grid points
    value_left  -    The value at the left side of the boundary.
    value_right -    The value at the right side of the boundary.
    """
    
    def __init__(self, nx=500, value_left=1.0, value_right=0):
        self.nx = nx
        self.dx = 1. / nx
        # A computational mesh with nx elements
        self.mesh = fipy.Grid1D(nx=self.nx, dx=self.dx)
        # A variable that represents that temperature on the mesh
        self.phi = fipy.CellVariable(name='$T(x)$', mesh=self.mesh, value=0.)
        # A variable that represents the thermal conductivity
        self.C = fipy.FaceVariable(name='$C(x)$', mesh=self.mesh, value=1.)
        # The constrain on the left boundary
        self.phi.constrain(value_left, self.mesh.facesLeft)
        # The constrain on the right boundary
        self.phi.constrain(value_right, self.mesh.facesRight)
        # The The diffusion term
        self.eq = fipy.DiffusionTerm(coeff=self.C)
        
    def __call__(self, rod):
        """
        Evaluates the code at a specific xi.
        """
        # Find the values of the mesh points
        x = self.mesh.faceCenters.value.flatten()
        # Evaluate the conductivity on these points using the rod model
        cond_val = rod.get_conductivity(x)
        # Update conductivity values in the equation
        self.C.setValue(cond_val)
        # Solve the equation
        self.eq.solve(var=self.phi)
        # Return the solution
        return x, self.phi.faceValue()

And recall here, that the solver is used as follows:

In [None]:
solver = SteadyStateHeat1DSolver(nx=500)
for i in range(2):
    fig, ax = plt.subplots(dpi=50)
    rod = R.rvs()
    x, y = solver(rod)
    # plot the rod
    rod.plot(ax=ax)
    # Get rid of the y ticks for the rod
    ax.set_yticks([])
    # Get another axis to plot the temperature
    ax1 = ax.twinx()
    # Make sure the ticks for this axis are on the left
    ax1.yaxis.tick_left()
    # Same for the label
    ax1.yaxis.set_label_position("left")
    # Plot the temperature
    ax1.plot(x, y, 'r', lw=1)
    ax.set_xlabel('$x$ (m)')
    ax1.set_ylabel(r'$T(x)\;(^\circ C)$');

Alright. Now that we have recalled where we were, let's get back to physics informed neural network.
We will use one to learn the temperature $T(x,\xi)$ as a function of any thermal conductivity captured by $\xi$.
What exactly do we mean by $\xi$ here? We mean the rod conductivity. But we would have to pick a fixed dimensional parameterization of the rod conductivity.
The way we will achieve this is by assuming that the conductivity is always sampled on the same spatial positions.
Here is how:

In [None]:
# Here are the fixed spatial positions on which we will be sampling the conductivity
xs = np.linspace(0, 1, 100)
# Here is a random rod
rod = R.rvs()
# And here is the discretization as 500-dimensional vector
xi = np.log(rod.get_conductivity(xs))
# Here it is
print(xi)

Notice that I took the log of the conductivity because the neural network will like it more...

Now that we have $\xi$, let's introduce our parameteterization of the temperature using a neural network.
We need to make sure that the left and right boundary conditions are satisfied.
We will take it to be:
$$
T(x,\xi) = A(x) + x(1-x)N(x,\xi;\theta),
$$
where $A(x)$ must be picked so that the boundary conditions are satisfied, and $N(x,\xi;\theta)$ is a neural network with parameters $\theta$.

Here is the simplest such choice for the boundary function:
$$
A(x) = T_0 + (T_1-T_0)x,
$$
where $T_0 = T(0)$ and $T_1 = T(1)$.
That was easy...

Finally, we need to specify what is the loss function we should be minimizing.
We are going to use this the expectation of the Dirichlet principle (energy):
$$
L(\theta) = \frac{1}{2}\mathbb{E}_{\xi}\left[\int_0^1 \parallel c(x,\xi)\nabla_x T(x,\xi)\parallel^2dx\right].
$$
Okay, let's do it.

In [None]:
import torch.nn.functional as F

# left boundary
T0 = 1.0
# right boundary
T1 = 0.0

# The number of xi's we are going to have
num_xi = xs.shape[0]

# The part for satisfying the boundary
A = lambda x: T0 + (T1 - T0) * x

# And the neural network
class NNModel(nn.Module):
    
    def __init__(self, num_xi=100, num_resnet_blocks=3,
                 num_layers_per_block=2, num_neurons=200):
        super(NNModel, self).__init__()
        self.num_resnet_blocks = num_resnet_blocks
        self.num_layers_per_block = num_layers_per_block
        self.num_neurons = num_neurons
        self.first = nn.Linear(1 + num_xi, num_neurons)
        self.resblocks = [
            [nn.Linear(num_neurons, num_neurons)
             for __ in range(num_layers_per_block)]
            for _ in range(num_resnet_blocks)
        ]
        self.last = nn.Linear(num_neurons, 1)
    
    def forward(self, x, xi):
        """
        Assume that xi and x have the same first dimension.
        """
        #expanded_xi = xi.expand((x.shape[0], -1))
        x_xi = torch.cat((x, xi), dim=1)
        o = F.relu(self.first(x_xi))
        for i in range(self.num_resnet_blocks):
            z = F.relu(self.resblocks[i][0](o))
            for j in range(1, self.num_layers_per_block):
                z = F.relu(self.resblocks[i][j](z))
            o = z + o
        out = self.last(o)
        return out
    
# And here is an instance of the net
N = NNModel(num_xi)

# Here is our model for T
T = lambda x, xi: A(x) + x * (1 - x) * N(x, xi)

# The loss function
def loss(x, Xi, c_vals): 
    x.requires_grad = True
    return torch.mean((c_vals * grad(T(x, Xi), x)) ** 2)

Here is just a sanity check that we start from a candidate solution that indeed satisfies the boundary conditions:

In [None]:
fig, ax = plt.subplots(dpi=100)
for _ in range(5):
    rod = R.rvs()
    xi = torch.Tensor(np.log(rod.get_conductivity(xs)))[None, :]
    xst = torch.Tensor(xs)[:, None]
    Xi = xi.expand((xst.shape[0], -1))
    ax.plot(xs, T(xst, Xi).detach().numpy());

We are ready to start training. This will take about an hour. So, feel free to skip it. We are not going to have a problem like this in the homework.

In [None]:
# Let's see now if a stochastic optimizer makes a difference
adam = torch.optim.Adam(N.parameters(), lr=0.01)

# The batch size you want to use (how many points to use per iteration)
n_batch_x = 100

n_batch_xi = 128

# The maximum number of iterations to do
max_it = 200000

losses = []
for i in range(max_it):
    # Randomly pick n_batch random x's:
    x_vals = np.random.rand(n_batch_x)
    xst = torch.Tensor(x_vals[:, None])
    # Repeat the locations - Xst must become (n_batch_x*n_batch_xi) x 1
    Xst = xst.repeat(n_batch_xi, 1)
    # Randomly pick conductivities
    Xis = []
    C_vals = []
    for j in range(n_batch_xi):
        rod = R.rvs()
        xi = torch.Tensor(np.log(rod.get_conductivity(xs))).repeat(n_batch_x, 1)
        c_val = torch.Tensor(rod.get_conductivity(x_vals))
        Xis.append(xi)
        C_vals.append(c_val)
    Xis = torch.cat(Xis, 0)
    C_vals = torch.cat(C_vals, 0)[:, None]
    # Zero-out the gradient buffers
    adam.zero_grad()
    # Evaluate the loss
    l = loss(Xst, Xis, C_vals)
    # Calculate the gradients
    l.backward()
    # Update the network
    adam.step()
    losses.append(l.item())
    # Print the iteration number
    if i % 10 == 9:
        print('{0:5d}: {1:1.2e}'.format(i, l.item()))

Here is a plot of the loss evolution:

In [None]:
plt.plot(losses);

And here is how you can use the resulting model to make predictions:

In [None]:
for _ in range(5):
    fig, ax = plt.subplots(dpi=50)
    rod = R.rvs()
    x, y = solver(rod)
    # plot the rod
    rod.plot(ax=ax)
    # Get rid of the y ticks for the rod
    ax.set_yticks([])
    # Get another axis to plot the temperature
    ax1 = ax.twinx()
    # Make sure the ticks for this axis are on the left
    ax1.yaxis.tick_left()
    # Same for the label
    ax1.yaxis.set_label_position("left")
    # Plot the temperature
    ax1.plot(x, y, 'r', lw=1, label='True solution')
    ax.set_xlabel('$x$ (m)')
    ax1.set_ylabel(r'$T(x)\;(^\circ C)$');
    xi = torch.Tensor(np.log(rod.get_conductivity(xs))).repeat(xs.shape[0], 1)
    xst = torch.Tensor(xs)[:, None]    
    ax.plot(xs, T(xst, xi).detach().numpy(), label='NN solution')
    plt.legend(loc='best')