# Solutions to Elliptic and Parabolic Problems via Finite Difference Based Unsupervised Small Linear Convolutional Neural Networks

Code for elliptic problems presented in [*Solutions to Elliptic and Parabolic Problems via Finite Difference Based Unsupervised Small Linear Convolutional Neural Networks*](https://arxiv.org/abs/2311.00259)

In [None]:
import random
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.utils import Progbar

random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)
tf.keras.utils.set_random_seed(42)


# # Uncomment to set memory growth for GPU
# def set_memory_growth():
#     # Get GPUs
#     gpus = tf.config.list_physical_devices('GPU')

#     # For tensorflow 2.x.x allow memory growth on GPU
#     for gpu in gpus:
#         tf.config.experimental.set_memory_growth(gpu, True)
        
        
# set_memory_growth()

## Get problem data

In [None]:
# Set grid size
N = 128
h = 1./(N - 1)

# Define grid
x = np.linspace(0, 1, N)
y = np.linspace(0, 1, N)
[X, Y] = np.meshgrid(x, y)

# Get problem data
# Bubble function
u = X*(X - 1)*Y*(Y - 1)
f = 2*(X - 1)*X + 2*(Y - 1)*Y

# Peak function
# u = 0.0005*X**2*(X - 1)**2*Y**2*(Y - 1)**2*np.exp(10*X**2 + 10*Y)
# f = 0.0010 * np.exp(10 * X**2 + 10 * Y) * (
#     200 * X**6 * Y**4 - 400 * X**6 * Y**3 + 200 * X**6 * Y**2
#     - 400 * X**5 * Y**4 + 800 * X**5 * Y**3 - 400 * X**5 * Y**2
#     + 340 * X**4 * Y**4 - 640 * X**4 * Y**3 + 286 * X**4 * Y**2
#     + 14 * X**4 * Y + X**4 - 240 * X**3 * Y**4 + 400 * X**3 * Y**3
#     - 132 * X**3 * Y**2 - 28 * X**3 * Y - 2 * X**3
#     + 106 * X**2 * Y**4 - 172 * X**2 * Y**3 + 52 * X**2 * Y**2
#     + 14 * X**2 * Y + X**2 - 6 * X * Y**4 + 12 * X * Y**3
#     - 6 * X * Y**2 + Y**4 - 2 * Y**3 + Y**2)

# Exponential-Trigonometric function
# u = np.exp(-X**2 - Y**2) * np.sin(3 * np.pi * X) * np.sin(3 * np.pi * Y) + X
# f = -2 * np.exp(-X**2 - Y**2) * (
#         6 * np.pi * Y * np.cos(3 * np.pi * Y) * np.sin(3 * np.pi * X) +
#         (6 * np.pi * X * np.cos(3 * np.pi * X) +
#          (9 * np.pi**2 - 2 * (-1 + X**2 + Y**2)) * np.sin(3 * np.pi * X)) * np.sin(3 * np.pi * Y))

# Get boundary data
g = [u[:, 0], u[:, -1], u[0, :], u[-1, :]]

## Define loss function for elliptic problems

In [None]:
# Define loss function
class FDLoss(tf.keras.losses.Loss):
    def __init__(self, N, f, g, **kwargs):
        super(FDLoss, self).__init__(**kwargs)
        self.N = N
        self.h = 1./(N - 1)
        self.alpha = np.square(0.5*self.h)

        # Set up right hand side
        self.f = tf.constant(f, dtype=tf.float32)
        self.f = tf.reshape(f, [1, N, N, 1])
        self.f = tf.cast(self.f, tf.float32)

        # Set up finite difference kernel
        k_laplacian = np.array([[0, 1, 0], [1, -4, 1], [0, 1, 0]]) / np.square(self.h)
        k_laplacian = tf.constant(k_laplacian, dtype=tf.float32)
        self.k_laplacian = tf.reshape(k_laplacian, [3, 3, 1, 1])

        # Define boundary terms
        self.g = [tf.cast(g[i], tf.float32) for i in range(4)]

    def call(self, y_true, y_pred):
        # Loss on interior
        rhs = -tf.nn.convolution(y_pred, self.k_laplacian, strides=1)
        interior = tf.reduce_mean(tf.square(rhs + self.f[:, 1:-1, 1:-1, :]))

        # Get boundary values for left, right, bottom, and top
        left_boundary = tf.square(self.g[0] - tf.reshape(y_pred[:, :, 0, :], [self.N]))
        right_boundary = tf.square(self.g[1] - tf.reshape(y_pred[:, :, -1, :], [self.N]))
        bottom_boundary = tf.square(self.g[2] - tf.reshape(y_pred[:, 0, :, :], [self.N]))
        top_boundary = tf.square(self.g[3] - tf.reshape(y_pred[:, -1, :, :], [self.N]))

        # Define boundary loss for left, right, bottom, and top boundaries
        boundary = tf.concat([left_boundary,
                              right_boundary,
                              bottom_boundary,
                              top_boundary], axis = -1)
        boundary = tf.reduce_mean(boundary)

        # Compute final loss
        loss = self.alpha*interior + (1. - self.alpha)*boundary

        return loss

## Build U-Net

In [None]:
def get_norm(name):
    if "batch" in name:
        return tf.keras.layers.BatchNormalization(axis=-1, center=True, scale=True)
    elif "identity" in name:
        return tf.identity
    else:
        raise ValueError("Invalid normalization layer")


def get_regularizer(name):
    if "l2" in name:
        return tf.keras.regularizers.L2(1e-7)
    elif "none" in name:
        return None
    else:
        raise ValueError("Invalid regularization layer")


def get_activation(name, **kwargs):
    if name == "relu":
        return tf.keras.layers.ReLU()
    elif name == "leaky":
        return tf.keras.layers.LeakyReLU(alpha=kwargs["alpha"])
    elif name == "prelu":
        return tf.keras.layers.PReLU(shared_axes=[1, 2])
    elif name == "identity":
        return tf.identity
    else:
        raise ValueError("Invalid activation layer")


class ConvDownsample(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__()
        self.pad = tf.keras.layers.ZeroPadding2D(padding=(1, 1))
        self.conv = tf.keras.layers.Conv2D(filters=kwargs["filters"],
                                           kernel_size=3,
                                           strides=2,
                                           kernel_regularizer=get_regularizer(kwargs["regularizer"]))
        self.norm = get_norm(kwargs["norm"])
        self.activation = get_activation(kwargs["activation"], **kwargs)

    def call(self, x):
        x = self.pad(x)
        x = self.conv(x)
        x = self.norm(x)
        x = self.activation(x)
        return x


def get_downsample(name, **kwargs):
    if name == 'maxpool':
        return tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2))
    elif name == "avgpool":
        return tf.keras.layers.AveragePooling2D(pool_size=(2, 2), strides=(2, 2))
    elif name == 'conv':
        return ConvDownsample(**kwargs)
    else:
        raise ValueError("Invalid downsampling operation")

class ConvLayer(tf.keras.layers.Layer):
    def __init__(self, filters, **kwargs):
        super().__init__()
        self.conv = tf.keras.layers.Conv2D(filters=filters,
                                           kernel_size=5,
                                           padding="same",
                                           kernel_regularizer=get_regularizer(kwargs["regularizer"]))
        self.norm = get_norm(kwargs["norm"])
        self.activation = get_activation(kwargs["activation"], **kwargs)

    def call(self, x):
        x = self.conv(x)
        x = self.norm(x)
        x = self.activation(x)
        return x


class EncoderBlock(tf.keras.layers.Layer):
    def __init__(self, filters, block, **kwargs):
        super().__init__()
        self.block = block(filters, **kwargs)
        self.down = get_downsample(kwargs["down_type"], filters=filters, **kwargs)

    def call(self, x):
        skip = self.block(x)
        x = self.down(skip)
        return skip, x


class Bottleneck(tf.keras.layers.Layer):
    def __init__(self, filters, block, **kwargs):
        super().__init__()
        self.block = block(filters, **kwargs)

    def call(self, x):
        x = self.block(x)
        return x


class DecoderBlock(tf.keras.layers.Layer):
    def __init__(self, filters, block, **kwargs):
        super().__init__()
        self.upsample = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation="bilinear")
        self.block = block(filters, **kwargs)

    def call(self, skip, x):
        up = self.upsample(x)
        concat = tf.keras.layers.concatenate([skip, up])
        out = self.block(concat)
        return out


class BaseModel(tf.keras.Model):

    def __init__(self,
                 block,
                 n_classes,
                 init_filters,
                 depth,
                 pocket,
                 **kwargs):
        super(BaseModel, self).__init__()

        # User defined inputs
        self.n_classes = n_classes
        self.init_filters = init_filters
        self.depth = depth
        self.pocket = pocket

        # If pocket network, do not double feature maps after downsampling
        self.mul_on_downsample = 2
        if self.pocket:
            self.mul_on_downsample = 1

        self.encoder = list()
        for i in range(self.depth):
            filters = self.init_filters * self.mul_on_downsample ** i
            self.encoder.append(EncoderBlock(filters, block, **kwargs))

        filters = self.init_filters * self.mul_on_downsample ** self.depth
        self.bottleneck = Bottleneck(filters, block, **kwargs)

        self.decoder = list()
        for i in range(self.depth - 1, -1, -1):
            filters = self.init_filters * self.mul_on_downsample ** i
            self.decoder.append(DecoderBlock(filters, block, **kwargs))

        self.out = tf.keras.layers.Conv2D(self.n_classes, 1, padding="same", dtype="float32")

    def call(self, x):
        skips = list()
        for encoder_block in self.encoder:
            skip, x = encoder_block(x)
            skips.append(skip)

        x = self.bottleneck(x)

        skips.reverse()
        for skip, decoder_block in zip(skips, self.decoder):
            x = decoder_block(skip, x)

        x = self.out(x)
        return x

conv_kwargs = {"regularizer": "l2",
               "norm": "identity",
               "activation": "identity",
               "alpha": 0.01,
               "down_type": "maxpool"}


class UNetBlock(tf.keras.layers.Layer):
    def __init__(self, filters, **kwargs):
        super().__init__()
        self.conv1 = ConvLayer(filters, **kwargs)
        self.conv2 = ConvLayer(filters, **kwargs)

    def call(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        return x



class UNet(tf.keras.Model):

    def __init__(self,
                 n_classes,
                 init_filters,
                 depth,
                 pocket):
        super(UNet, self).__init__()

        self.base_model = BaseModel(UNetBlock,
                                    n_classes,
                                    init_filters,
                                    depth,
                                    pocket,
                                    **conv_kwargs)

    @tf.function
    def call(self, x, **kwargs):
        return self.base_model(x, **kwargs)

## Put it all together

* Instantiate loss function and network
* Define training step
* Begin unsupervised training loop

In [None]:
loss_fn = FDLoss(N, f, g)

n_steps = 2000
initial_learning_rate = 0.001
lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
    initial_learning_rate,
    decay_steps=n_steps
)
optimizer = tf.keras.optimizers.Adam(lr_schedule)
optimizer.global_clipnorm = 0.001

model = UNet(1, 32, 3, True)

inp = f.reshape(1, N, N, 1)

prog_bar = Progbar(n_steps, stateful_metrics=['loss'])

@tf.function
def train_step(input):
    with tf.GradientTape() as tape:
        p = model(inp)
        loss = loss_fn(p, p)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss, p

best = np.Inf
best_p = np.zeros((1, N, N, 1))
for step in range(n_steps):
    loss, p = train_step(input)
    prog_bar.add(1, values=[('loss', loss)])

    if loss < best:
        best = loss
        best_p = p

pred = best_p.numpy().reshape(N, N)

## Compute error and plot solution

In [None]:
print("L2 error: {}".format(np.format_float_scientific(h*np.sqrt(np.sum(np.square(u - pred))), 4)))
print("L-inf error: {}".format(np.format_float_scientific(np.max(np.abs(u - pred)), 4)))

In [None]:
# Create figure and subplots
fig, axs = plt.subplots(ncols=3, figsize=(12, 4))

# Plot true solution
contour1 = axs[0].imshow(u)
axs[0].set_title("True Solution")
cbar1 = fig.colorbar(contour1, ax=axs[0])

# Plot prediction
contour1 = axs[1].imshow(pred)
axs[1].set_title("Prediction")
cbar1 = fig.colorbar(contour1, ax=axs[1])

# Plot difference
contour3 = axs[2].imshow(np.abs(u - pred))
axs[2].set_title("Difference")
cbar3 = fig.colorbar(contour3, ax=axs[2])

# Adjust layout
plt.tight_layout()

# Show plot
plt.show()
