# Distance function in 2D (Linear difussion 2D)

In [1]:
import turtle
import random
import timeit

import tensorflow as tf
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense
from tensorflow.keras import Input
from keras.models import load_model

import numpy as np
from shapely.geometry import Point, Polygon
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Constant and function definitions

In [None]:
#Seeds
np.random.seed(0)
tf.random.set_seed(0)

#Constants
Nd = 101 #Collocation points in the domain
Nb = 1   #Collocation points in the boundary

#Computation of non-smooth distance function d:
def d(xs, xbs):
  #xs: List of collocation points
  #xbs: List of boundary conditions
  xs, xbs = np.array(xs), np.array(xbs)
  ds = [min([np.linalg.norm(x - xb) for xb in xbs]) for x in xs]
  return ds

def custom_loss(y_actual,y_pred):
    custom_loss=(0.5) * 1 / (Nb + Nd) * (y_actual-y_pred)**2
    return custom_loss
    
def get_optimizer():
  return tf.keras.optimizers.Adam(lr_schedule)

start = timeit.default_timer()

## Define a star and use polygon to check points inside

In [None]:
star=[]
r = np.sqrt(0.5**2 + 0.25**2)
R = 1

num_vert = 12
start = np.pi / 6
for n in range(0,12):
  if n % 2:
    rad = r
  else:
    rad = R
  x = rad * np.cos(start + 2 * np.pi * n / num_vert)
  y = rad * np.sin(start + 2 * np.pi * n / num_vert)
  star.append([x,y])

star_poly = Polygon(star)

## Generate M random points in the boundary

In [None]:
def get_random_boundary_point(polygon):
  x,y = star_poly.exterior.xy
  points = [*zip(x,y)]
  random_index = random.randint(0, len(points)-1)
  a, b = np.array(points[random_index]), np.array(points[(random_index + 1) % len(points)])
  new_random_point = a + random.random() * (b - a)
  return list(new_random_point)

## Generate N random points inside

In [None]:
def get_random_inside_polygon(polygon):
  minx, miny, maxx, maxy = polygon.bounds
  while True:
    p = Point(random.uniform(minx, maxx), random.uniform(miny, maxy))
    if polygon.contains(p):
      x, y = p.x, p.y
      return [x, y]

In [None]:
M = 50
N = 500

random_boundary_points = [get_random_boundary_point(star_poly) for _ in range(M)]
random_inside_points = [get_random_inside_polygon(star_poly) for _ in range(N)]

In [None]:
x,y = star_poly.exterior.xy
fig, ax = plt.subplots(figsize=(8,8))

ax.plot(x,y)
ax.scatter(*list(zip(*random_boundary_points)), color="blue", marker='x')
ax.scatter(*list(zip(*random_inside_points)), color="red", marker='x')
plt.show()

## Calculate fine grid

In [None]:
N = 5000
M = 500

random_boundary_points = [get_random_boundary_point(star_poly) for _ in range(M)]
random_inside_points = [get_random_inside_polygon(star_poly) for _ in range(N)]

x,y = star_poly.exterior.xy
fig, ax = plt.subplots(figsize=(8,8))

ax.plot(x,y)
ax.scatter(*list(zip(*random_boundary_points)), color="blue", marker='x')
ax.scatter(*list(zip(*random_inside_points)), color="red", marker='x', s=10)
plt.show()

# Model for distance function (Linear difussion 2D) 

In [None]:
#Define the model

inputs = Input(shape=(2,))
x = Dense(20, 'sigmoid', dtype='float64')(inputs)
x = Dense(20, 'sigmoid')(x)
y = Dense(1, dtype='float64')(x)
model_distance_difussion = Model(inputs=inputs, outputs=y)

optimizer = Adam(learning_rate=0.01, beta_1=0.9, beta_2=0.99)
model_distance_difussion.compile(loss=custom_loss,
                optimizer=optimizer,
                metrics=['mae', 'mse'])

tf.keras.utils.plot_model(model_distance_difussion,show_shapes=True)

In [None]:
N = 500
M = 250

random_inside_points = [get_random_inside_polygon(star_poly) for _ in range(N)]
random_boundary_points = [get_random_boundary_point(star_poly) for _ in range(M)]

#Random set to train of train_size random points in the domain (excluding extremal points)
x_train = np.array(random_inside_points).reshape(-1, 2)
ds = d(x_train, random_boundary_points)
y_train = np.array(ds).reshape(-1, 1)

In [None]:
x,y = star_poly.exterior.xy
fig, ax = plt.subplots(figsize=(8,8))

ax.plot(x,y)
ax.scatter(*list(zip(*random_boundary_points)), color="blue", marker='x')
ax.scatter(*list(zip(*random_inside_points)), color="red", marker='x', s=10)
plt.show()

In [None]:
model_distance_difussion.fit(x_train, y_train, epochs=1000, verbose=False)

## Plotting distance model

In [None]:
@np.vectorize
def distance_inside_difussion(x, y, polygon: Polygon):
  distance = model_distance_difussion.predict([[x, y]])[0][0] if polygon.contains(Point([x, y])) else np.nan
  return distance

In [None]:
# generate 2 2d grids for the x & y bounds
y, x = np.meshgrid(np.linspace(-1, 1, 100), np.linspace(-1, 1, 100))
z = distance_inside_difussion(x, y, star_poly)

In [None]:
z_min, z_max = np.nanmin(z), np.nanmax(z)

fig, ax = plt.subplots(figsize=(13,8))

c = ax.pcolormesh(x, y, z, cmap='RdBu', vmin=z_min, vmax=z_max)
ax.set_title('Smoothed distance function')
# set the limits of the plot to the limits of the data
ax.axis([x.min(), x.max(), y.min(), y.max()])
fig.colorbar(c, ax=ax)

plt.show()

## Save model distance

In [None]:
model_distance_difussion.save('/content/drive/MyDrive/models_TFM/model_distance_difussion.h5')

## Load model distance

In [None]:
model_distance_difussion = load_model(
    '/content/drive/MyDrive/models_TFM/model_distance_difussion.h5', 
    custom_objects={'custom_loss':custom_loss}
    )

# Model for the smooth extension of the boundary data ($g(x,y)$) (Linear difussion 2D)

In [None]:
def u(x, y):
  return np.exp(-(2*x**2 + 4*y**2)) + 1/2

def g(x, y):
  return u(x, y)

In [None]:
#Define the model

inputs = Input(shape=(2,))
x = Dense(10, 'sigmoid', dtype='float64')(inputs)
x = Dense(10, 'sigmoid')(x)
y = Dense(1, dtype='float64')(x)
model_boundary_difussion = Model(inputs=inputs, outputs=y)

optimizer = Adam(learning_rate=0.01, beta_1=0.9, beta_2=0.99)
model_boundary_difussion.compile(loss=custom_loss,
                optimizer=optimizer,
                metrics=['mae', 'mse'])

tf.keras.utils.plot_model(model_boundary_difussion,show_shapes=True)

In [None]:
x_train = np.array(random_boundary_points).reshape(-1, 2)
y_train = np.array([g(x, y) for x, y in x_train])

model_boundary_difussion.fit(x_train, y_train, epochs=1000, verbose=False)

## Save model boundary

In [None]:
model_boundary_difussion.save('/content/drive/MyDrive/models_TFM/model_boundary_difussion.h5')

## Load model boundary

In [None]:
model_boundary_difussion = load_model(
    '/content/drive/MyDrive/models_TFM/model_boundary_difussion.h5', 
    custom_objects={'custom_loss':custom_loss}
    )















# Linear diffusion in 2D

The equation is given by:
\begin{align*}
Lu&=\frac{\partial^2 u}{\partial x^2} + \frac{\partial^2 u}{\partial y^2}=f \;\;;\; x \in \Omega\\
u&=g \;\;;\; x \in \Gamma\\
\end{align*}

The set $\Gamma \subset \partial \Omega$ is all the boundary. In such way we have that:

$$\Gamma = \{x\in\partial\Omega\}.$$

And taking the analytic solution as follows:
$$u = \exp(−(2x^2 + 4y^2)) + 1/2$$

Therefore, we have: 
\begin{align*}
f(x) &= 4\exp(-2(x^2 + 2y^2))(-1 + 4x^2) + 8\exp(-2(x^2 + 2 y^2))(-1 + 8y^2)
\end{align*} 

So, the ansatz, $\hat{u}=\hat{u}(x;w,b)$, take the form:

$$u(x) = G(x) + D(x)y^L (x; w, b) $$

In [None]:
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense
from tensorflow.keras import Input
import numpy as np
import matplotlib.pylab as plt

In [None]:
#Define the model

inputs = Input(shape=(2,))
x = Dense(10, 'sigmoid', dtype='float64')(inputs)
x = Dense(10, 'sigmoid')(x)
x = Dense(10, 'sigmoid')(x)
x = Dense(10, 'sigmoid')(x)
x = Dense(10, 'sigmoid')(x)
y = Dense(1, dtype='float64')(x)
model = Model(inputs=inputs, outputs=y)

tf.keras.utils.plot_model(model,show_shapes=True)

In [None]:
def get_random_inside_polygon(polygon):
  minx, miny, maxx, maxy = polygon.bounds
  while True:
    p = Point(random.uniform(minx, maxx), random.uniform(miny, maxy))
    if polygon.contains(p):
      x, y = p.x, p.y
      return [x, y]

def u(x, y):
  return np.exp(-(2*x**2 + 4*y**2)) + 1/2

def f(x, y):
  d2f_dx2 = 4*np.exp(-2*(x**2 + 2*y**2))*(-1 + 4*x**2)
  d2f_dy2 = 8*np.exp(-2*(x**2 + 2*y**2))*(-1 + 8*y**2)
  return d2f_dx2 + d2f_dy2

def D(x, y):
  points = tf.transpose([x, y])
  return model_distance_difussion(points)

def G(x, y):
  points = tf.transpose([x, y])
  return model_boundary_difussion(points)

In [None]:
num_samples = 1000

#Random set to train of train_size random points
inside_points = [get_random_inside_polygon(star_poly) for _ in range(num_samples)]

x_train = tf.constant([point[0] for point in inside_points], dtype='float64')
y_train = tf.constant([point[1] for point in inside_points], dtype='float64')

z_train = f(x_train, y_train)
# using the high level tf.data API for data handling
dataset = tf.data.Dataset.from_tensor_slices((x_train,y_train)).batch(1)

In [None]:
opt = Adam(learning_rate=0.01, beta_1=0.9, beta_2=0.99)

x_variable = tf.Variable(x_train)
y_variable = tf.Variable(y_train) 

with tf.GradientTape() as gt_x2, tf.GradientTape() as gt_y2:
  gt_x2.watch(x_variable)
  gt_y2.watch(y_variable)
  with tf.GradientTape() as gt_x, tf.GradientTape() as gt_y:
    gt_x.watch(x_variable)
    gt_y.watch(y_variable)
    G_xy = G(x_variable, y_variable)
  dG_dx = gt_x.gradient(G_xy, x_variable)   
  dG_dy = gt_y.gradient(G_xy, y_variable)
d2G_dx2 = gt_x2.gradient(dG_dx, x_variable)   
d2G_dy2 = gt_y2.gradient(dG_dy, y_variable)  

G_pred = G(x_variable, y_variable)

with tf.GradientTape() as dt_x2, tf.GradientTape() as dt_y2:
  dt_x2.watch(x_variable)
  dt_y2.watch(y_variable)
  with tf.GradientTape() as dt_x, tf.GradientTape() as dt_y:
    dt_x.watch(x_variable)
    dt_y.watch(y_variable)
    D_xy = D(x_variable, y_variable)
  dD_dx = dt_x.gradient(D_xy, x_variable)
  dD_dy = dt_y.gradient(D_xy, y_variable)    
d2D_dx2 = dt_x2.gradient(dD_dx, x_variable)  
d2D_dy2 = dt_y2.gradient(dD_dy, y_variable)      

D_pred = D(x_variable, y_variable)

ff = tf.constant(f(x_variable, y_variable))

n_train_steps = 10000

v_loss  = np.empty((n_train_steps,0))
v_error = np.empty((n_train_steps,0))
z_pred_old = np.zeros_like(x_variable)

################################################################################

for step in range(n_train_steps):
    with tf.GradientTape() as model_tape:
        with tf.GradientTape() as z_tape_x2, tf.GradientTape() as z_tape_y2:
          z_tape_x2.watch(x_variable)
          z_tape_y2.watch(y_variable)
          with tf.GradientTape() as z_tape_x, tf.GradientTape() as z_tape_y:
            z_tape_x.watch(x_variable)
            z_tape_y.watch(y_variable)
            z_pred = model(tf.transpose([x_variable, y_variable]))
          dz_dx = z_tape_x.gradient(z_pred, x_variable)
          dz_dy = z_tape_y.gradient(z_pred, y_variable)
        d2z_dx2 = z_tape_x2.gradient(dz_dx, x_variable)
        d2z_dy2 = z_tape_y2.gradient(dz_dy, y_variable)
        z_pred_old = z_pred.numpy()
        lu = (
            d2G_dx2.numpy() + d2D_dx2.numpy()*z_pred.numpy().reshape(-1) + 2*dD_dx.numpy()*dz_dx + D_pred.numpy()*d2z_dx2 +
            d2G_dy2.numpy() + d2D_dy2.numpy()*z_pred.numpy().reshape(-1) + 2*dD_dy.numpy()*dz_dy + D_pred.numpy()*d2z_dy2
            )
        loss = tf.reduce_sum(tf.math.squared_difference(lu, ff))
        v_loss = np.append(v_loss,loss)
        
    grads = model_tape.gradient(loss, model.trainable_variables)
    opt.apply_gradients(
        (grad, var) 
        for (grad, var) in zip(grads, model.trainable_variables) 
        if grad is not None
)


    Yl = model(tf.transpose([x_variable, y_variable])).numpy().reshape(-1)
    Ds = D(x_variable, y_variable).numpy().reshape(-1)
    Gs = G(x_variable, y_variable).numpy().reshape(-1)
    u_hat = Gs+Ds*Yl
    error = np.max(np.abs(u_hat-u(x_variable, y_variable)))
    v_error = np.append(v_error, error)

## Plotting diffusion model

In [None]:
@np.vectorize
def model_inside_difussion(x, y, polygon: Polygon):
  distance = model.predict([[x, y]])[0][0] if polygon.contains(Point([x, y])) else np.nan
  return distance

@np.vectorize
def model_distances_difussion(x, y, polygon: Polygon):
  distance_value = model_distance_difussion.predict([[x, y]])[0][0] if polygon.contains(Point([x, y])) else np.nan
  return distance_value

@np.vectorize
def model_boundaries_difussion(x, y, polygon: Polygon):
  boundary_value = model_boundary_difussion.predict([[x, y]])[0][0] if polygon.contains(Point([x, y])) else np.nan
  return boundary_value

In [None]:
# generate 2 2d grids for the x & y bounds
y, x = np.meshgrid(np.linspace(-1, 1, 100), np.linspace(-1, 1, 100))
z = model_inside_difussion(x, y, star_poly) * model_distances_difussion(x, y, star_poly) + model_boundaries_difussion(x, y, star_poly)

In [None]:
z_min, z_max = -2*np.nanmedian(z), 2*np.nanmedian(z)#np.nanmax(z)

fig, ax = plt.subplots(figsize=(13,8))

c = ax.pcolormesh(x, y, z, cmap='RdBu', vmin=z_min, vmax=z_max)
ax.set_title('Smoothed function')
# set the limits of the plot to the limits of the data
ax.axis([x.min(), x.max(), y.min(), y.max()])
fig.colorbar(c, ax=ax)

plt.show()

## Save difussion model 2D 

In [None]:
model.save('/content/drive/MyDrive/models_TFM/model_difussion.h5')

## Load difussion model 2D 

In [None]:
model_difussion = load_model(
    '/content/drive/MyDrive/models_TFM/model_difussion.h5', 
    custom_objects={'custom_loss':custom_loss}
    )