<a href="https://colab.research.google.com/github/afonso-tiago/thesis-notebooks/blob/main/thesis_playground.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Intoduction & Table of contents

This is supposed to be a complementory notebook for the bachelor thesis "ABC"
Here the reader can play with PINN & the DRM. Try out different model or training parameters or change the domain or RHS of the poisson eq.

Imports

In [None]:
import math

import numpy as np

import tensorflow as tf
from tensorflow import keras
from keras import layers

import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

import plotly.graph_objects as go
import plotly.express as px
import plotly.figure_factory as ff
from plotly.subplots import make_subplots

from scipy.spatial import Delaunay

from IPython.display import clear_output as cls

Domain & Boundary Points
 explain what both methods do and where they are used
 explain what boundary_0, boundary_1 and boundary_2 are

In [None]:
# parameters defining the domain and boundary
gamma = 3/2*math.pi # max angle
R = 1 # max radius

num_inner = 500
num_boundary_0 = 10
num_boundary_1 = 100
num_boundary_2 = 10
num_boundary = num_boundary_0 + num_boundary_1 + num_boundary_2

# generate inner points
def generateInnerDataRandomly(num_inner):
  # uniformly generate random polar coordinates
  phi = tf.random.uniform([num_inner], 0, gamma)
  r = tf.sqrt(tf.random.uniform([num_inner], 0, R)) # sqrt is necessary to have the same density near 0 as near R
  
  # calculate exact solution
  u_exact = tf.pow(r, math.pi/gamma)*tf.sin(phi*math.pi/gamma)

  # convert polar coordinates into cartesian coordinates
  X_inner = r*tf.cos(phi)
  Y_inner = r*tf.sin(phi)

  # return generated inner points and exact solution
  return X_inner, Y_inner, u_exact

# generate boundary points and values
def generateBoundaryDataUniformly(num_boundary_0, num_boundary_1, num_boundary_2):
  # generate boundary points for the line where phi=0
  # r = tf.random.uniform([num_boundary_0], 0, R)
  # r = tf.sort(r) # sorting is only needed for correct visualization
  r = tf.linspace(0.0, R, num_boundary_0)
  X_boundary_0 = r*math.cos(0)
  Y_boundary_0 = r*math.sin(0)
  # calculate boundary values for the line where phi=0
  g_0 = tf.zeros([num_boundary_0])

  # generate boundary points for the arc where phi in [0, gamma]
  # phi = tf.random.uniform([num_boundary_1], 0, gamma)
  # phi = tf.sort(phi) # sorting is only needed for correct visualization
  phi = tf.linspace(0.0, gamma, num_boundary_1)
  X_boundary_1 = R*tf.math.cos(phi)
  Y_boundary_1 = R*tf.math.sin(phi)
  # calculate boundary values for the arc where phi in [0, gamma]
  g_1 = tf.math.sin(phi*math.pi/gamma)

  # generate boundary points for the line where phi=gamma
  # r = tf.random.uniform([num_boundary_2], 0, R)
  # r = tf.sort(r, direction='DESCENDING') # sorting is only needed for correct visualization
  r = tf.linspace(0.0, R, num_boundary_2)
  X_boundary_2 = r*math.cos(gamma)
  Y_boundary_2 = r*math.sin(gamma)
  # calculate boundary values for the line where phi=gamma
  g_2 = tf.zeros([num_boundary_2])

  # combine all generated boundary points
  X_boundary = tf.concat([X_boundary_0, X_boundary_1, X_boundary_2], 0)
  Y_boundary = tf.concat([Y_boundary_0, Y_boundary_1, Y_boundary_2], 0)
  # combine all computed boundary values
  g = tf.concat([g_0, g_1, g_2], 0)
  
  # return generated boundary points and values
  return X_boundary, Y_boundary, g

X_inner, Y_inner, u_exact = generateInnerDataRandomly(num_inner)
X_boundary, Y_boundary, g = generateBoundaryDataUniformly(num_boundary_0, num_boundary_1, num_boundary_2)

# create a figure with two subplots (one on the left and one on the right) 
fig = make_subplots(rows=1, cols=2, specs=[[{'type': 'xy'}, {'type': 'scene'}]])
fig.update_layout(height=500, width=1000)
# add subplot of random points generated inside the domain to the to left half of the figure
fig.add_trace(
    go.Scatter(x=X_inner, y=Y_inner, mode='markers'),
    row=1, col=1,
)
# add subplot of boundary values and exact solution to the to right half of the figure
fig.add_traces(
    data=[go.Scatter3d(x=X_boundary, y=Y_boundary, z=g),
          # go.Scatter3d(x=X_inner, y=Y_inner, z=u_exact, mode='markers'),
          ],
    rows=1, cols=2
)
# display figure
fig.show()

Exact solution

If an exact solution is know one can pass it here otherwise one can pass None to sol

In [None]:
# prepare inputs for surface plot of model

# parameters controlling number of inputs
rows = 50
columns = 50
# generate inputs
X = tf.linspace(-1,1, columns)
Y = tf.linspace(-1,1, rows) 
[X,Y] = tf.meshgrid(X,Y)
# reshape inputs into a "line" for evaluation
X = tf.reshape(X, (rows*columns,))
Y = tf.reshape(Y, (rows*columns,))

# convert cartesian coordinates to polar coordinates 
r = tf.sqrt(X**2+Y**2)
phi = tf.atan2(Y, X)
phi = tf.map_fn(lambda a: a if a >= 0 else a+2*math.pi, phi)

# compute exact solution using polar coordinates
u_exact = tf.pow(r, math.pi/gamma)*tf.sin(phi*math.pi/gamma)

# reshape inputs into a grid
X = tf.reshape(X, (rows, columns))
Y = tf.reshape(Y, (rows, columns))
u_exact = tf.reshape(u_exact, (rows, columns))

# display figure
fig = go.Figure(data=[go.Surface(x=X, y=Y, z=u_exact)])
fig.add_trace(go.Scatter3d(x=X_boundary, y=Y_boundary, z=g))
fig.update_layout(scene = dict(zaxis = dict(range=[-1,1],),),)
fig.show()

Model

In [None]:
# model parameters
num_blocks = 4
nodes_per_layer = 10
batch_size = None # None means batch size is determined dynamically
activation = keras.activations.swish # lambda x: tf.math.pow(tf.math.maximum(x,0), 3)

# building the model
inputs = keras.Input(shape=(2,), batch_size=batch_size)
x = tf.pad(inputs, [[0,0], [0,nodes_per_layer-2]])
for i in range(num_blocks):
  tmp = layers.Dense(nodes_per_layer, activation=activation)(x)
  tmp = layers.Dense(nodes_per_layer, activation=activation)(tmp)
  x = layers.add([tmp, x])
outputs = layers.Dense(1)(x)

model = keras.Model(inputs=inputs, outputs=outputs)

# print model summary
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 2)]          0           []                               
                                                                                                  
 tf.compat.v1.pad_1 (TFOpLambda  (None, 10)          0           ['input_2[0][0]']                
 )                                                                                                
                                                                                                  
 dense_9 (Dense)                (None, 10)           110         ['tf.compat.v1.pad_1[0][0]']     
                                                                                                  
 dense_10 (Dense)               (None, 10)           110         ['dense_9[0][0]']          

PINN & DRM
By either executing the first or the second code block you can choose which Method to use. First = PINN, Second = DRM.

In [None]:
# train_step using PINN
method_used = 'PINN'

@tf.function
def train_step(X_inner, Y_inner, X_boundary, Y_boundary, g, num_inner, num_boundary):
  with tf.GradientTape() as tape3:
    with tf.GradientTape(persistent=True) as tape2:
      tape2.watch(X_inner)
      tape2.watch(Y_inner)
      tape2.watch(X_boundary)
      tape2.watch(Y_boundary)
      with tf.GradientTape(persistent=True) as tape1:
        tape1.watch(X_inner)
        tape1.watch(Y_inner)
        tape1.watch(X_boundary)
        tape1.watch(Y_boundary)
        
        batch = tf.stack([X_inner,Y_inner], axis=1)
        u = model(batch)
        u = tf.reshape(u, [num_inner])

        batch_boundary = tf.stack([X_boundary, Y_boundary], axis=1)
        u_boundary = model(batch_boundary)
        u_boundary = tf.reshape(u_boundary, [num_boundary])
      udx = tape1.gradient(u, X_inner)
      udy = tape1.gradient(u, Y_inner)
      tape1.stop_recording()
    udxx = tape2.gradient(udx, X_inner)
    udyy = tape2.gradient(udy, Y_inner)
    tape2.stop_recording()

    loss_inner = tf.reduce_sum((udxx + udyy)**2)/num_inner
    loss_boundary = tf.reduce_sum((u_boundary - g)**2)/num_boundary
    loss = loss_inner + beta*loss_boundary
  grad = tape3.gradient(loss, model.trainable_weights)
  opt.apply_gradients(zip(grad, model.trainable_weights))

  return u, loss

In [None]:
# train_step using DRM
method_used = 'DRM'

@tf.function
def train_step(X_inner, Y_inner, X_boundary, Y_boundary, g, num_inner, num_boundary):
  with tf.GradientTape() as tape2:
    with tf.GradientTape(persistent=True) as tape1:
      tape1.watch(X_inner)
      tape1.watch(Y_inner)
      tape1.watch(X_boundary)
      tape1.watch(Y_boundary)
      
      batch = tf.stack([X_inner,Y_inner], axis=1)
      u = model(batch)
      u = tf.reshape(u, [num_inner])

      batch_boundary = tf.stack([X_boundary, Y_boundary], axis=1)
      u_boundary = model(batch_boundary)
      u_boundary = tf.reshape(u_boundary, [num_boundary])
    udx = tape1.gradient(u, X_inner)
    udy = tape1.gradient(u, Y_inner)
    tape1.stop_recording()

    loss_inner = tf.reduce_sum(0.5*(udx**2 + udy**2))/num_inner
    loss_boundary = tf.reduce_sum((u_boundary - g)**2)/num_boundary
    loss = loss_inner + beta*loss_boundary
  grad = tape2.gradient(loss, model.trainable_weights)
  opt.apply_gradients(zip(grad, model.trainable_weights))

  return u, loss

Training loop

In [None]:
# training loop

beta = 500 # weight of boundary condition penalty term

learning_rate = 0.001
opt = keras.optimizers.Adam(learning_rate)

train_metric = keras.metrics.RootMeanSquaredError()

epochs = 5000

X_boundary, Y_boundary, g = generateBoundaryDataUniformly(num_boundary_0, num_boundary_1, num_boundary_2)
for epoch in range(epochs+1):
  X_inner, Y_inner, u_exact = generateInnerDataRandomly(num_inner)
  u, loss = train_step(X_inner, Y_inner, X_boundary, Y_boundary, g, num_inner, num_boundary)
  if epoch%1000 == 0:
    train_metric.update_state(u_exact, u)
    print(f"Training metric at epoch {epoch}/{epochs}={math.floor(epoch/epochs*100)}%: {float(train_metric.result())}")
    train_metric.reset_states()
    # display trained NN
    tri = Delaunay(tf.stack([X_inner,Y_inner], axis=1))
    simplices = tri.simplices
    fig = ff.create_trisurf(x=X_inner, y=Y_inner, z=u, simplices=simplices, title=f"{method_used} after {epochs} epochs")
    fig.add_trace(go.Scatter3d(x=X_boundary, y=Y_boundary, z=g))
    fig.show()


Training metric at epoch 0/5000=0%: 0.04026195406913757


Training metric at epoch 1000/5000=20%: 0.01647510938346386


Training metric at epoch 2000/5000=40%: 0.009484578855335712


Training metric at epoch 3000/5000=60%: 0.007201374042779207


Training metric at epoch 4000/5000=80%: 0.0066103157587349415
