In [None]:
#Setup Fenicsx in real mode to solve PDEs.
try:
    import dolfinx
except ImportError:
    !wget "https://fem-on-colab.github.io/releases/fenicsx-install-real.sh" -O "/tmp/fenicsx-install.sh" && bash "/tmp/fenicsx-install.sh"
    import dolfinx

In [None]:
# Import nescessarry python modules.
import numpy as np
import pandas as pd
import ufl
import functools
from mpi4py import MPI
from petsc4py import PETSc
from dolfinx import mesh, fem, io, nls, log
from dolfinx.fem import petsc as femPetsc
from dolfinx.nls import petsc as nlsPetsc
from scipy.io import savemat
from scipy.io import loadmat
from datetime import datetime

# Mount google drive storage to notebook.
from google.colab import drive
drive.mount('/content/gdrive/')

mainProjectDirGDrive = "/content/gdrive/MyDrive/Github/HJDQN/"

pde_solutions_folder = mainProjectDirGDrive + "pde_outputs/pde_solutions/"
pde_fem_matrices_folder = mainProjectDirGDrive + "pde_outputs/fem_matrices/"
pde_ricatti_solution_matrices = mainProjectDirGDrive + "pde_outputs/ricatti_solution_matrices/"

In [None]:
# One dimensional PDE.

# Define initial and end time value.
t = 0
T = 4.0

# Define space interval.

#1d
Omega = [-1,1]

# Time step configuraton.
num_steps = 400
dt = T / num_steps

# Define mesh.

#One dimensional mesh.
nx = 200
domain = mesh.create_interval(MPI.COMM_WORLD, nx, Omega)

# Define test function space.
V = fem.FunctionSpace(domain, ("P", 1))

# Create initial condition for the state function and interpolate it in V.

#class Initial_condition():
#
#  def __init__(self):
#      pass
#
#  def __call__(self,x):
#      values = np.zeros(x.shape,dtype=PETSc.ScalarType)
#      values = (x[0]-1)*(x[0]+1)+5
#      return values

#initial_condition = Initial_condition()

def initial_condition(x):
  return (x[0]-1)*(x[0]+1)+5

y_0 = fem.Function(V)
y_0.name = "y_0"
y_0.interpolate(initial_condition)

# PDE coefficients
nu = 0.1

# Indicator function for 1d PDE also called acctuator functions.
w1 = np.array([-0.7,-0.4])
w2 = np.array([-0.2,0.2])
w3 = np.array([0.4,0.7])
tol = 1e-14

acctuatorDomains = [w1, w2, w3]

I_w = [fem.Function(V) for i in range(0,len(acctuatorDomains))]

for i, acct in enumerate(acctuatorDomains):
  I_w[i].interpolate(lambda x: np.where(np.logical_and(x[0] - acct[0] + tol >= 0, acct[1] + tol - x[0] >= 0), 1, 0))

In [None]:
# Assembling of matrices for reward function.

# Trial and test function.
y_trial, phi = ufl.TrialFunction(V), ufl.TestFunction(V)

# Mass matrix
mass_form = y_trial*phi*ufl.dx
M_dx = femPetsc.assemble_matrix(fem.form(mass_form))
M_dx.assemble()
M_dx.convert("dense")
M = M_dx.getDenseArray()

# State dimension
n = M.shape[1]

# Remove boundary
M_wb = M[1:n-1,1:n-1]

# State dimension without boundary
n_wb = n-2

# Action dimesion
m = len(acctuatorDomains)

# Matrices and coefficients for the reward function r = (alpha/2)*y'*Q*y + (beta/2)*u*R*u
alpha = 1.
beta = 0.01
Q_wb = M_wb
Q = M
R = beta*np.eye(m)

In [None]:
# Time stamp
exec_time = datetime.now().strftime("_%Y-%m-%dT%H%M%S")

# Create output file.
pde_solution = "HJDQN{}_uncontrolled_nonlinear_PDE_1D_state.xdmf".format(exec_time)
xdmf = io.XDMFFile(domain.comm, f"{pde_solutions_folder}{pde_solution}", "w", encoding=io.XDMFFile.Encoding.HDF5)
xdmf.write_mesh(domain)

# Define solution variable, and interpolate initial solution for visualization in Paraview.
y_n = fem.Function(V)
y_n.name = "y_n"
y_n.interpolate(initial_condition)
xdmf.write_function(y_n, t)

# Control vector components.
us = [fem.Constant(domain, 0.0) for i in range(0,len(I_w))]

# Variational Problem (RHS).
L = us[0]*I_w[0]
for i in range(1,len(us)):
  L = L + us[i]*I_w[i]
L = (y_0 + dt*L)*phi*ufl.dx

# Variational Problem.
a_h = y_n*phi*ufl.dx + nu*dt*ufl.dot(ufl.grad(y_n),ufl.grad(phi))*ufl.dx - dt*y_n*(1-y_n**2)*phi*ufl.dx - L

# Create solver.
problem = femPetsc.NonlinearProblem(a_h, y_n)
solver = nlsPetsc.NewtonSolver(MPI.COMM_WORLD, problem)
solver.convergence_criterion = "incremental"
solver.rtol = 1e-6
solver.report = True

ksp = solver.krylov_solver
opts = PETSc.Options()
option_prefix = ksp.getOptionsPrefix()
opts[f"{option_prefix}ksp_type"] = "cg"
opts[f"{option_prefix}pc_type"] = "gamg"
opts[f"{option_prefix}pc_factor_mat_solver_type"] = "mumps"
ksp.setFromOptions()

# Whole time intervall.
for i in range(num_steps):
    t += dt

    # Control vector.
    #y_0_arr = y_0.x.array
    #u = -1*np.matmul(K_mat,y_0_arr[1:len(y_0_arr)-1])
    #uls = u.tolist()

    # Update coefficients of control vector.
    #for i in range(0,len(us)):
    #  us[i].value = uls[i]

    # Solve linear problem
    _, converged = solver.solve(y_n)
    y_n.x.scatter_forward()
    assert(converged)

    # Update solution at previous time step
    y_0.x.array[:] = y_n.x.array

    # Write solution to file
    xdmf.write_function(y_n, t)
xdmf.close()

In [None]:
#Install control package to calulate the solution of the riccati equations

#X, L, K = control.care(Ad, B, Q_wb, R, np.zeros((n_wb, m)), M_wb)
#K = np.asarray(K)