# Poisson problem (1D/2D)

In [2]:
from cuqi.problem import BayesianProblem
from cuqi.distribution import Gaussian
from cuqipy_fenics.utilities import ExpressionFromCallable
from cuqipy_fenics.testproblem import FEniCSPoisson2D, FEniCSDiffusion1D
import numpy as np
import cuqi
import cuqipy_fenics
import dolfin as dl
import matplotlib.pyplot as plt

Print CUQIpy and CUQIpy-FEniCS versions:

In [None]:
print(cuqi.__version__)
print(cuqipy_fenics.__version__)
print(cuqipy_fenics.__file__)

In [4]:
physical_dim = 1 # choice of physical dimension
prior_type = 'TV'#'Gaussian' #'TV' #'Gaussian' # 'TV'
enable_FD = True
sampler_choice = "MH" #"NUTS"


In [None]:
endpoint = 10 # domain length
# f is sum of 3 exponentials at 1/4*endpoint and 1/2*endpoint and 3/4*endpoint

def f(x):
    return np.exp(-100*(x-1/4*endpoint)**2) + np.exp(-100*(x-1/2*endpoint)**2) + np.exp(-100*(x-3/4*endpoint)**2)

#plot f
x = np.linspace(0, endpoint, 100)
plt.plot(x, f(x))


f_expr = ExpressionFromCallable(f)

In [6]:
if physical_dim == 2:
    A = FEniCSPoisson2D(dim=(16,16), field_type=None, mapping='exponential', bc_types=['Dirichlet', 'Neumann', 'Dirichlet', 'Neumann']).model
elif physical_dim == 1:

    A = FEniCSDiffusion1D(dim=20, right_bc=0, f=f_expr, endpoint=endpoint).model # note source term is zero for the 1D case

In [7]:
G_domain = A.domain_geometry
G_range = A.range_geometry

In [None]:
A


In [9]:
# SMPrior class
class SMPrior:
    def __init__(self, ginv, corrlength, var, mean, covariancetype=None):
        self.corrlength = corrlength
        self.mean = mean
        self.c = 1e-9  # default value
        if covariancetype is not None:
            self.covariancetype = covariancetype
        else:
            self.covariancetype = 'Squared Distance'  # default
        self.compute_L(ginv, corrlength, var)

    def compute_L(self, g, corrlength, var):
        ng = g.shape[0]
        a = var - self.c
        b = np.sqrt(-corrlength**2 / (2 * np.log(0.01)))
        Gamma_pr = np.zeros((ng, ng))

        for ii in range(ng):
            for jj in range(ii, ng):
                dist_ij = np.linalg.norm(g[ii, :] - g[jj, :])
                if self.covariancetype == 'Squared Distance':
                    gamma_ij = a * np.exp(-dist_ij**2 / (2 * b**2))
                elif self.covariancetype == 'Ornstein-Uhlenbeck':
                    gamma_ij = a * np.exp(-dist_ij / corrlength)
                else:
                    raise ValueError('Unrecognized prior covariance type')
                if ii == jj:
                    gamma_ij = gamma_ij + self.c
                Gamma_pr[ii, jj] = gamma_ij
                Gamma_pr[jj, ii] = gamma_ij
        
        self.cov = Gamma_pr
        self.L = np.linalg.cholesky(np.linalg.inv(Gamma_pr)).T

    def draw_samples(self, nsamples):
        samples = self.mean + np.linalg.solve(self.L, np.random.randn(self.L.shape[0], nsamples))
        return samples

    def eval_fun(self, args):
        sigma = args[0]
        res = 0.5 * np.linalg.norm(self.L @ (sigma - self.mean))**2
        return res
    
    def evaluate_target_external(self, x, compute_grad=False):
        x = x.reshape((-1,1))
        # print("x.shape: ", x.shape)
        # print("self.mean.shape: ", self.mean.shape)
        if compute_grad:
            grad = self.L.T @ self.L @ (x - self.mean)
        else:
            grad = None
        
        return self.eval_fun(x), grad
        

    def compute_hess_and_grad(self, args, nparam):
        sigma = args[0]
        Hess = self.L.T @ self.L
        grad = Hess @ (sigma - self.mean)

        if nparam > len(sigma):
            Hess = np.block([[Hess, np.zeros((len(sigma), nparam - len(sigma)))],
                             [np.zeros((nparam - len(sigma), len(sigma))), np.zeros((nparam - len(sigma), nparam - len(sigma)))]])
            grad = np.concatenate([grad, np.zeros(nparam - len(sigma))])


        return Hess, grad

In [10]:
#class MyTV:
#    def __init__(self, q0fun, mesh, delta,**kwargs):
#        #self.qfun = project(qFunction(phi,q1,q2),V1)
#
#        self.V1 = FunctionSpace(mesh,'CG',1)
#        self.V02 = VectorFunctionSpace(mesh,'DG',0)
#
#        self.q0fun = q0fun
#        self.q0grad = project(grad(self.q0fun),self.V02)
#        self.q0_denom = Denom(self.q0grad,delta)
#
#        # operator
#        self.p_trial = TrialFunction(self.V1)
#        self.p_test = TestFunction(self.V1)
#
#        #self.L_op = assemble(ufl.inner(self.p_trial, self.p_test)*dx)
#        #self.TV_op = assemble(self.q_denom*ufl.inner(grad(self.p_trial),grad(self.p_test))*dx)
#        self.TV_op = assemble((self.q0_denom*inner(grad(self.p_trial),grad(self.p_test)))*dx)
#
#        self.delta = delta
#
#    def eval_TV(self,qfun):
#        self.update_op(qfun)
#        return np.dot(self.TV_op * qfun.vector(),qfun.vector())
#
#    def eval_grad(self,qfun):
#        self.update_op(qfun)
#        return 2*(self.TV_op * qfun.vector())#[idx2]
#    
#    def update_op(self,q0fun):
#        self.q0fun = q0fun
#        self.q0grad = project(grad(self.q0fun),self.V02)
#        self.q0_denom = Denom(self.q0grad,self.delta)
#        self.TV_op = assemble((self.q0_denom*inner(grad(self.p_trial),grad(self.p_test)))*dx) 
#

class TV_reg:
    def __init__(self, V, beta, weight):
        self.beta    = dl.Constant(beta)
        self.m_tilde  = dl.TestFunction(V)
        self.m_hat = dl.TrialFunction(V)
        self.weight = weight
        
    def cost_reg(self, m):
        return self.weight*dl.assemble(dl.sqrt( dl.inner(dl.grad(m), dl.grad(m)) + self.beta)*dl.dx)

    
    def grad_reg(self, m):  
        print("grad", dl.grad)      
        TVm = dl.sqrt( dl.inner(dl.grad(m), dl.grad(m)) + self.beta)
        grad_val = dl.assemble(dl.Constant(1.)/TVm*dl.inner(dl.grad(m), dl.grad(self.m_tilde))*dl.dx)
        return self.weight*grad_val

In [11]:
H = G_domain.function_space

In [45]:
tv_reg = TV_reg(H, 1e-3, weight=1)

def eval_density(x):
    # convert from numpy array to dolfin function
    x_fun = dl.Function(H)
    x_fun.vector()[:] = x
    return tv_reg.cost_reg(x_fun)

def eval_grad_density(x):
    # convert from numpy array to dolfin function
    x_fun = dl.Function(H)
    x_fun.vector()[:] = x
    grad_val = tv_reg.grad_reg(x_fun).get_local()
    return grad_val


In [None]:

# Create correlation matrix for prior x



v2d = dl.vertex_to_dof_map(H)
d2v = dl.dof_to_vertex_map(H)

mesh = G_domain.mesh 


mean_sigma = np.zeros((H.dim(), 1)) #linearization point
corrlength =  0.2* endpoint
var_sigma = 0.05 ** 2   #prior variance

smprior = SMPrior(mesh.coordinates()[d2v], corrlength, var_sigma, mean_sigma)#, covariancetype='Ornstein-Uhlenbeck')


sample = smprior.draw_samples(1)
fun = dl.Function(H)
fun.vector().set_local(sample)

im = dl.plot(fun)
if physical_dim == 2:
    plt.colorbar(im)






In [None]:
# visualize the covariance matrix 
plt.figure()
im = plt.imshow(smprior.cov)
plt.colorbar(im)

In [48]:
if prior_type == 'Gaussian':
    cov_const = 1 #100
    x = Gaussian(np.zeros(G_domain.par_dim), cov=cov_const*smprior.cov, geometry=G_domain)
elif prior_type == 'TV':
    # CUQIpy user defined distribution
    x = cuqi.distribution.UserDefinedDistribution(logpdf_func=eval_density,
                                              gradient_func=eval_grad_density,
                                              dim=H.dim())

In [49]:
# Set the random seed
np.random.seed(3) 

In [None]:
if prior_type == 'Gaussian':
    x_true = x.sample()
elif prior_type == 'TV':
    fun_x_true_expr = dl.Expression('x[0] > 5 ? 0 : -0.5', degree=1)
    x_true_fun = dl.interpolate(fun_x_true_expr, H)
    x_true = cuqi.array.CUQIarray(x_true_fun.vector().get_local(), geometry=G_domain)
    
else:
    raise ValueError('Unrecognized prior type')
im = x_true.plot()
if physical_dim == 2:
    plt.colorbar(im[0])



This cell to compute s_noise

In [51]:
noise_level = 0.01
y_true = A(x_true)
s_noise = 1.0/np.sqrt(G_domain.par_dim)* noise_level*np.linalg.norm(y_true)

In [None]:
y_true.plot()

In [53]:
y = Gaussian(A(x), s_noise**2, geometry=G_range)

In [None]:
y_obs = y(x=x_true).sample()
y_obs.plot()
y_true.plot()

In [55]:
BP = BayesianProblem(y, x).set_data(y=y_obs)

In [None]:
# map point
if enable_FD:
    BP.posterior.enable_FD() 
else:
    BP.posterior.disable_FD()
map_x = BP.MAP()


In [None]:
# plot map predicted data

y_obs.plot(label='y_obs')
A(map_x).plot(label='A(x_map)')
plt.legend()


In [None]:
map_x.plot()
dl.plot(x_true.funvals)#.plot('--')
plt.legend(['MAP', 'True'])
#plt.ylim(-1.5, 0.7)

In [None]:
map_x

In [None]:
#posterior_samples = BP.UQ(Ns=3000, Nb=2, percent=97)
posterior = BP.posterior()
#sampler = cuqi.sampler.NUTS(posterior, max_depth=4, x0=np.zeros(G_domain.par_dim)+.001)
#sampler = cuqi.sampler.ULA(posterior, x0=np.zeros(G_domain.par_dim)+.001, scale=0.00000005)
# np.zeros(G_domain.par_dim)+.001

initial_point = np.zeros(G_domain.par_dim) # x_true.to_numpy()
#initial_point = 0.5*map_x.to_numpy()

#sampler = cuqi.experimental.mcmc.PCN(posterior, initial_point=initial_point, scale=0.05)#, max_depth=10, scale=0.005)

if sampler_choice == "NUTS":
    sampler = cuqi.experimental.mcmc.NUTS(posterior, initial_point=initial_point, max_depth=5, step_size=1e-3)
    Ns = 400
    Nb = 20 #20
elif sampler_choice == "MALA":
    sampler = cuqi.experimental.mcmc.MALA(posterior, initial_point=initial_point, scale=2.1e-7)
    Ns = 1000
    Nb = 20

elif sampler_choice == "ULA":
    sampler = cuqi.experimental.mcmc.ULA(posterior, initial_point=initial_point, scale=2.1e-7)
    Ns = 100
    Nb = 20

elif sampler_choice == "MH":
    sampler = cuqi.experimental.mcmc.MH(posterior, initial_point=initial_point, scale=2.1e-5)
    Ns = 1000
    Nb = 20


_ = sampler.warmup(Nb)
_ = sampler.sample(Ns)


#posterior_samples = sampler.sample_adapt(100, Nb=10)

In [28]:
Nb_extra = 80
posterior_samples = sampler.get_samples().burnthin(Nb+Nb_extra)

In [None]:
try:
    print(sampler.scale)
except:
    pass

In [30]:
try:
    plt.plot(sampler.num_tree_node_list)
    plt.figure()
    plt.semilogy(sampler.epsilon_list)
    plt.semilogy(sampler.epsilon_bar_list)
    print(np.max(sampler.epsilon_list))
except:
    pass

In [None]:
2**7

In [32]:
# sampler.scale

Code cell for generating Figure 1

In [None]:


import os
from matplotlib import ticker
import matplotlib.pyplot as plt

# Set up matplotlib
SMALL_SIZE = 7
MEDIUM_SIZE = 8
BIGGER_SIZE = 9
plt.rc('font', size=MEDIUM_SIZE)          # controls default text sizes
plt.rc('axes', titlesize=BIGGER_SIZE)     # fontsize of the axes title
plt.rc('axes', labelsize=MEDIUM_SIZE)    # fontsize of the x and y labels
plt.rc('xtick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
plt.rc('ytick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
plt.rc('legend', fontsize=SMALL_SIZE)    # legend fontsize
plt.rc('figure', titlesize=BIGGER_SIZE)  # fontsize of the figure title

# Use latex package
plt.rc('text', usetex=True)
plt.rc('text.latex', preamble=r'\usepackage{bm}')

# Data directory
fig_dir = './figs/'

# Figure file
fig_dir = fig_dir 

# Check if the directory exists
if not os.path.exists(fig_dir):
    os.makedirs(fig_dir)

# Figure version
version = 'v8'

# Figure file
fig_file = fig_dir + 'paper_figure1_'+version+'.pdf'

# Create the figure
cm_to_in = 1/2.54
fig, axs = plt.subplots(nrows=2, ncols=3,
                        figsize=(17.8*cm_to_in, 9.8*cm_to_in),
                        layout="constrained")

# Define the colors to be used in the plots
colors = ['C0', 'green', 'purple', 'k', 'gray']

# (a)
plt.sca(axs[0,0])
im = x_true.plot(subplots=False)#, vmin=-0.2, vmax=0.5, mode='color')

if physical_dim == 2: 
    inset_axes = plt.gca().inset_axes([1.04, 0.2, 0.05, 0.6])
    fig.colorbar(im[0], ax=plt.gca(), cax=inset_axes)
    plt.gca().set_ylim(0, 1)

#plt.gca().set_xlim(0, 1)
plt.gca().set_title('(a) Exact solution')
plt.ylabel('$\\xi^2$')
plt.gca().yaxis.labelpad = -5
plt.xlabel('$\\xi^1$')
plt.gca().xaxis.labelpad = -5

# (b)
plt.sca(axs[0,1])
im = y_true.plot(subplots=False)

if physical_dim == 2:
    inset_axes = plt.gca().inset_axes([1.04, 0.2, 0.05, 0.6])
    fig.colorbar(im[0], ax=plt.gca(), cax=inset_axes)
    plt.ylabel('$\\xi^2$')
    plt.gca().yaxis.labelpad = -5
    plt.gca().set_title('(b) Exact data')
else:
    samples_mean = cuqi.array.CUQIarray(posterior_samples.mean(), geometry=G_domain)
    A(samples_mean).plot(subplots=False)
    plt.gca().set_title('(b) Exact and predicted data')
plt.xlabel('$\\xi^1$')
plt.gca().xaxis.labelpad = -5


# (c)
plt.sca(axs[0,2])
im = y_obs.plot(subplots=False)
if physical_dim == 2:
    inset_axes = plt.gca().inset_axes([1.04, 0.2, 0.05, 0.6])
    fig.colorbar(im[0], ax=plt.gca(), cax=inset_axes)
    plt.ylabel('$\\xi^2$')
    plt.gca().yaxis.labelpad = -5
plt.xlabel('$\\xi^1$')
plt.gca().xaxis.labelpad = -5
plt.gca().set_title('(c) Noisy data')

# (d)
plt.sca(axs[1,0])
im = posterior_samples.plot_mean(
    subplots=False)#, vmin=-0.2, vmax=0.5, mode='color')
if physical_dim == 2:
    inset_axes = plt.gca().inset_axes([1.04, 0.2, 0.05, 0.6])
    fig.colorbar(im[0], ax=plt.gca(), cax=inset_axes)
    plt.gca().set_ylim(0, 1)
    plt.ylabel('$\\xi^2$')
    plt.gca().yaxis.labelpad = -5

#plt.gca().set_xlim(0, 1)
plt.xlabel('$\\xi^1$')
plt.gca().xaxis.labelpad = -5
plt.gca().set_title('(d) Posterior mean')

# (e)
plt.sca(axs[1,1])
im = posterior_samples.funvals.vector.plot_variance(subplots=False)

if physical_dim == 2:
    inset_axes = plt.gca().inset_axes([1.04, 0.2, 0.05, 0.6])
    cb = fig.colorbar(im[0], ax=plt.gca(), cax=inset_axes)
    cb.locator = ticker.MaxNLocator(nbins=4)
    plt.ylabel('$\\xi^2$')
    plt.gca().yaxis.labelpad = -5
plt.xlabel('$\\xi^1$')
plt.gca().xaxis.labelpad = -5
plt.gca().set_title('(e) Posterior variance')

# (f)
plt.sca(axs[1,2])
lci = posterior_samples.plot_ci(
    97, exact=x_true, plot_par=True, markersize=SMALL_SIZE-3)
lci[0].set_label("Mean")
lci[1].set_label("Exact")
lci[2].set_label("$97\\%$ CI")
#plt.ylim(-5, 3)
plt.legend(ncols=2) 
plt.ylabel(r'$\bm{x}_i$')
plt.gca().yaxis.labelpad = -5
plt.gca().yaxis.set_label_coords( -0.06, 0.5)
plt.xlabel('$i$')
plt.gca().xaxis.labelpad = -5
plt.gca().set_title('(f) Posterior CI')
n_ticks = 8
num_var = posterior_samples.geometry.par_dim
tick_ids = np.linspace(0, num_var-1, n_ticks, dtype=int)
plt.xticks(tick_ids, tick_ids)
# switch legend off
plt.gca().legend().set_visible(False)

# Save the figure
plt.savefig(fig_file, bbox_inches='tight', pad_inches=0.01, dpi=1200)

In [None]:
ess_list = posterior_samples.compute_ess()
print(ess_list)
print(np.min(ess_list))
print(np.max(ess_list))
print(np.mean(ess_list))

In [None]:
posterior_samples.plot_trace()

In [None]:
posterior_samples.plot()

In [36]:
# - [x] 1D case 
# - [ ] DG0
# - [ ] TV with non-MY gradient and with NUTS 
# - [x] verify the gradient of the posterior
# - [x] PCN seems to work (for both 1D and 2D cases)
# - [x] Start from x0=the true parameter to debug

## Verify gradient  (model)

In [None]:
# import check_grad
from scipy.optimize import check_grad
from cuqi.utilities import approx_gradient

np.random.seed(3)
# random direction y
y0 = np.ones(G_range.par_dim) # np.random.randn(G_range.par_dim)
y0[0] = 0
y0[-1] = 0
V = G_range.function_space
# corresponding fenics function
y0_fun = dl.Function(V)
y0_fun.vector()[:] = y0

# objective function
def f(x):
    return A(x).T@ y0.reshape(-1,1)
    #return A(x).vector().get_local().T@ y0.reshape(-1,1)

# objective function gradient
def fprime(x):
    return A.gradient(y0, x)

# random input x (the point which gradient is calculated with respect to)
x0 = np.random.randn(G_domain.par_dim)

# assert that the gradient is correct
print(check_grad(f, fprime, x0))

plt.plot(fprime(x_true))
# plot finite difference gradient 

plt.plot(approx_gradient(f, x_true, 1e-6), '--')
plt.legend(['adjoint based', 'finite difference'])


In [None]:
## Verify gradient  (posterior)

plt.plot(posterior.gradient(x_true))
# plot finite difference gradient 

plt.plot(approx_gradient(posterior.logd, x_true, 1e-8), '--')
plt.legend(['adjoint based', 'finite difference'])

In [None]:
# check the gradient of the prior

plt.plot(x.gradient(x_true))
# plot finite difference gradient 

plt.plot(approx_gradient(x.logd, x_true, 1e-8), '--')
plt.legend(['adjoint based', 'finite difference'])

In [None]:
# check the gradient of the likelihood

plt.plot(posterior.likelihood.gradient(x_true))
# plot finite difference gradient 

plt.plot(approx_gradient(posterior.likelihood.logd, x_true, 1e-10), '--')
plt.legend(['adjoint based', 'finite difference'])