<a href="https://colab.research.google.com/github/AbhiramKalluri/Flow_through_elbow_PINN/blob/main/Bent_Pipe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install deepxde



In [None]:
!nvidia-smi

/bin/bash: nvidia-smi: command not found


In [3]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import deepxde as dde
!export DDE_BACKEND=tensorflow
from deepxde.icbc.boundary_conditions import DirichletBC
from deepxde.geometry.geometry_2d import Polygon
from deepxde.callbacks import EarlyStopping
from importlib import import_module
from deepxde.nn import FNN
from deepxde.data.pde import TimePDE
from deepxde.model import Model
import matplotlib.pyplot as plt
from tensorflow.keras.backend import set_floatx
from deepxde.callbacks import EarlyStopping
from deepxde.data.pde import PDE
from deepxde.model import Model
from deepxde.backend import tf
from deepxde.geometry import timedomain
import numpy as np
import os
import math as m



set_floatx("float64")

In [4]:
beta = 20.0


def func_u(x):
  return (2* 9.81*0.707*x[:,0:1])**0.5

def func_zero_velocity(x):
  return np.zeros((len(x),1))

def func_one_velocity(x):
  return np.ones((len(x),1))*beta

def top_horizontal_boundary(x, on_boundary):
  for x[0] in np.arange(-0.5, 0, 0.05):
    return on_boundary and geom.on_boundary(x[:2]) and np.isclose(x[1],0.5)
  for x[0] in np.arange(0,0.5,0.05):
    return on_boundary and geom.on_boundary(x[:2]) and (np.isclose(x[1], m.sqrt((0.5**2) - ((x[0])**2) )))


def bottom_horizontal_boundary(x, on_boundary):
  ''' Checks the bottom boundary'''
  for x[0] in np.arange(-0.5, 0, 0.05):
    return on_boundary and geom.on_boundary(x[:2]) and (np.isclose(x[1],0.4) and x[0] < 0)
  for x[0] in np.arange(0, 0.4, 0.04):
    return on_boundary and geom.on_boundary(x[:2]) and (np.isclose(x[1], m.sqrt((0.4**2) - ((x[0])**2))))

def left_vertical_boundary(x,on_boundary):
  return on_boundary and geom.on_boundary(x[:2]) and (np.isclose(x[0],0.4) and x[1] < 0)

def right_vertical_boundary(x, on_boundary):
  return on_boundary and geom.on_boundary(x[:2]) and (np.isclose(x[0], 0.5) and x[1] < 0)


def inlet_boundary(x, on_boundary):
  '''Checks the inlet boundary'''
  return  on_boundary and geom.on_boundary(x[:2]) and (np.isclose(x[0], -0.5) and x[1] >= 0.4 and x[1] <= 0.5)

def outlet_boundary(x,on_boundary):
  '''Checks the outlet boundary'''
  return on_boundary and geom.on_boundary(x[:2]) and (np.isclose(x[1], -0.5) and x[0] >= 0.4 and x[0] <= 0.5)

def pde(x, y):

  u, v, p = y[:, 0:1], y[:, 1:2], y[:, 2:3]

  du = tf.gradients(u,x)[0]
  dv = tf.gradients(v,x)[0]
  dp = tf.gradients(p,x)[0]

  p_x, p_y = dp[:, 0:1], dp[:, 1:2]
  u_x, u_y, u_t = du[:, 0:1], du[:, 1:2], du[:, 2:3]

  v_x, v_y, v_t = dv[:, 0:1], dv[:, 1:2], dv[:, 2:3]


  u_xx = tf.gradients(u_x, x)[0][:, 0:1]
  u_yy = tf.gradients(u_y, x)[0][:, 1:2]


  v_xx = tf.gradients(v_x, x)[0][:, 0:1]
  v_yy = tf.gradients(v_y, x)[0][:, 1:2]

  continuity = u_x + v_y
  rho = 1.0;
  mu = 0.1;
  x_momentum = rho*(u * u_x + v * u_y) + p_x - mu * u_xx - mu * u_yy
  y_momentum = rho*(u * v_x + v * v_y) + p_y - mu * v_xx - mu * v_yy



  return [continuity,x_momentum,y_momentum]

def boundary(_, on_boundary):
    return on_boundary

'''
#PDEs defined
def pde(x,y):

  u, v, p = y[:, 0:1], y[:, 1:2], y[:, 2:3]


  du = tf.gradients(u,x)[0]
  dv = tf.gradients(v,x)[0]
  dp = tf.gradients(p,x)[0]

  u_x, u_y = tf.gradients(u,x)[0][:, 0:1], tf.gradients(u,x)[0][:, 1:2]
  v_x, v_y = tf.gradients(v,x)[0][:, 0:1], tf.gradients(v,x)[0][:, 1:2]
  p_x, p_y = tf.gradients(p,x)[0][:, 0:1], tf.gradients(p,x)[0][:, 1:2]

  u_xx = tf.gradients(u_x, x)[0][:, 0:1]
  u_yy = tf.gradients(u_y, x)[0][:,1:2]
  v_xx, v_yy = tf.gradients(v_x, x)[0][:,0:1], tf.gradients(v_y, x)[0][:, 1:2]

  continuity = u_x + v_y # continuity equation
  rho = 1.0
  nu = 0.1

  ns_x = p_x - nu*(u_xx + u_yy) + rho*(u*u_x + v*u_y) # x-momentum equation
  ns_y = p_y - nu*(v_xx + v_yy) + rho*(u*v_x + v*v_y) # y-momentum equation

  return [continuity, ns_x, ns_y]

def boundary(_, on_boundary):
  return on_boundary
'''

'\n#PDEs defined\ndef pde(x,y):\n\n  u, v, p = y[:, 0:1], y[:, 1:2], y[:, 2:3]\n\n\n  du = tf.gradients(u,x)[0]\n  dv = tf.gradients(v,x)[0]\n  dp = tf.gradients(p,x)[0]\n\n  u_x, u_y = tf.gradients(u,x)[0][:, 0:1], tf.gradients(u,x)[0][:, 1:2]\n  v_x, v_y = tf.gradients(v,x)[0][:, 0:1], tf.gradients(v,x)[0][:, 1:2]\n  p_x, p_y = tf.gradients(p,x)[0][:, 0:1], tf.gradients(p,x)[0][:, 1:2]\n\n  u_xx = tf.gradients(u_x, x)[0][:, 0:1]\n  u_yy = tf.gradients(u_y, x)[0][:,1:2]\n  v_xx, v_yy = tf.gradients(v_x, x)[0][:,0:1], tf.gradients(v_y, x)[0][:, 1:2]\n\n  continuity = u_x + v_y # continuity equation\n  rho = 1.0\n  nu = 0.1\n\n  ns_x = p_x - nu*(u_xx + u_yy) + rho*(u*u_x + v*u_y) # x-momentum equation\n  ns_y = p_y - nu*(v_xx + v_yy) + rho*(u*v_x + v*v_y) # y-momentum equation\n\n  return [continuity, ns_x, ns_y]\n\ndef boundary(_, on_boundary):\n  return on_boundary\n'

In [5]:
def bent_pipe():
  # my geometry definition
  roc = 0.4 # inner elbow radius of curvature
  thickness = 0.1 #inner diameter of the pipe
  out_roc = roc+thickness #radius of curvature for the outer elbow


  farfield = dde.geometry.geometry_2d.Rectangle(xmin= [-out_roc,-out_roc],xmax= [out_roc,out_roc])
  obstacle_1 = dde.geometry.geometry_2d.Rectangle(xmin = [-out_roc,-out_roc], xmax = [0,roc])
  obstacle_2 = dde.geometry.geometry_2d.Rectangle(xmin = [0,-out_roc], xmax =[roc, 0])
  geom1 = dde.geometry.CSGDifference(farfield, obstacle_1)
  geom  = dde.geometry.CSGDifference(geom1, obstacle_2)


  del_roc = roc/10

  # removing the curved region 1 (Marked as CR1)
  for i in np.arange(del_roc, roc, del_roc):

    obstacle_i = dde.geometry.geometry_2d.Rectangle(xmin = [i - del_roc, 0], xmax = [i + del_roc, (roc**2 - ((i + del_roc)**2))**0.5])
    geom = dde.geometry.CSGDifference(geom, obstacle_i)

  #removing the curved region 2 (Marked as CR2)

  del_out_roc = (out_roc)/10
  for j in np.arange(del_out_roc, out_roc, del_out_roc):

    obstacle_j = dde.geometry.geometry_2d.Rectangle(xmin = [j, (out_roc**2 - (j)**2)**0.5 ] , xmax = [j+del_out_roc, out_roc])
    geom = dde.geometry.CSGDifference(geom, obstacle_j)

  return geom

In [None]:
import deepxde as dde
import math as m

if __name__ == '__main__':
  set_floatx("float64")

  geom = bent_pipe()

  # my boundary conditions

  bc_top_x = dde.DirichletBC(geom, func_zero_velocity, top_horizontal_boundary, component = 0)
  bc_top_y = dde.DirichletBC(geom, func_zero_velocity, top_horizontal_boundary, component = 1)

  bc_bottom_x = dde.DirichletBC(geom, func_zero_velocity, bottom_horizontal_boundary, component = 0)
  bc_bottom_y = dde.DirichletBC(geom, func_zero_velocity, bottom_horizontal_boundary, component = 1)

  bc_right_x = dde.DirichletBC(geom, func_zero_velocity, right_vertical_boundary, component = 0)
  bc_right_y = dde.DirichletBC(geom, func_zero_velocity, right_vertical_boundary, component = 1)

  bc_left_x = dde.DirichletBC(geom, func_zero_velocity, left_vertical_boundary, component = 0)
  bc_left_y = dde.DirichletBC(geom, func_zero_velocity, left_vertical_boundary, component = 1)


  bc_inlet_x = dde.NeumannBC(geom, lambda x:0, inlet_boundary,component=0)
  bc_inlet_y = dde.DirichletBC(geom, func_zero_velocity, inlet_boundary, component=1)

  bc_outlet_x = dde.NeumannBC(geom, lambda x:0, outlet_boundary, component=0)
  bc_outlet_y = dde.DirichletBC(geom, func_zero_velocity, outlet_boundary, component=1)


  #pressure boundary conditions

  bc_p_inlet = dde.DirichletBC(geom, func_one_velocity, inlet_boundary, component = 2)
  bc_p_outlet = dde.DirichletBC(geom, func_zero_velocity, outlet_boundary, component=2)

  bc_p_horizontal_top = dde.NeumannBC(geom, lambda x: 0, top_horizontal_boundary, component = 2)
  bc_p_horizontal_bottom = dde.NeumannBC(geom, lambda x: 0, bottom_horizontal_boundary, component = 2)

  bc_p_right_vertical = dde.NeumannBC(geom, lambda x: 0, right_vertical_boundary, component = 2)
  bc_p_left_vertical = dde.NeumannBC(geom, lambda x: 0, left_vertical_boundary, component = 2)



  xe = np.array([[-0.5,0.5],[0.5, 0.5],[-0.5, 0.5], [-0.5, 0.4], [0.4, -0.5], [0.5,-0.5]])

  def output_transformer(inputs, outputs):
    """Apply output transforms to strictly enforce boundary conditions"""
    x, y = inputs[:,0], inputs[:,1]

    u_multiplier = y*(y-0.5)
    v_multiplier = x*(x-0.5)*(y)*(y-0.5)
    p_multiplier = 1.0

    return tf.transpose(
        tf.stack((
            u_multiplier*(outputs[:, 0]),
            v_multiplier*(outputs[:, 1]),
            p_multiplier*(outputs[:, 2])
        ))
    )

  bc_s = [bc_top_x, bc_top_y, bc_bottom_x, bc_bottom_y, bc_p_horizontal_top, bc_p_horizontal_bottom]
  data = dde.data.PDE(
      geom,
      pde,
      bc_s,
      num_domain = 200,
      num_boundary = 50,
      num_test = 50

  )

  layer_size = [2] + [50] * 9 + [3]
  #net = dde.nn.FNN(layer_size, "tanh", "Glorot uniform")
  net = dde.maps.FNN([2] + [50] * 9 + [3], "tanh", "Glorot uniform")
  #net.apply_output_transform(output_transformer)

  model = dde.Model(data, net)

  model.compile("adam", lr=0.01, loss_weights=[100000,1,100000,1,1000,1,100000,1,1])
  #model.compile("adam", lr=0.2, loss_weights=[1,10,1,100,10])
  model.train(iterations=5000)
  model.compile("L-BFGS")


  losshistory, train_state = model.train(batch_size = 1000, display_every = 1000)

  dde.saveplot(train_state, losshistory, issave=True, isplot=True)








'''
  bc_p_bottom = dde.NeumannBC(geom, lambda x:0, bottom_boundary, component=2)
  bc_p_left = dde.DirichletBC(geom, func_one_velocity, inlet_boundary, component=2)
  bc_p_right = dde.DirichletBC(geom, func_zero_velocity, outlet_boundary, component=2)
  bc_p_top = dde.NeumannBC(geom, lambda x:0, top_boundary, component=2)
'''

In [None]:
def elbow_flow():

  import deepxde.geometry as geom
  import matplotlib.pyplot as plt
  from matplotlib.patches import Rectangle, Arc

  # pipe and elbow dimensions
  pipe_radius = 0.05
  pipe_length = 1
  elbow_radius = 0.4
  elbow_angle = 90

  # pipe geometry
  pipe = geom.geometry_2d.Rectangle(xmin=[0.0, -pipe_radius], xmax=[pipe_length, pipe_radius])

  # elbow bend as a series of short line segments
  num_segments = 100
  delta_angle = elbow_angle / num_segments
  elbow_points = []
  current_angle = 0.0
  for _ in range(num_segments):
      x = elbow_radius * (1 - abs(current_angle) / 90.0)
      y = elbow_radius * current_angle / 90.0
      elbow_points.append([pipe_length + x, y])
      current_angle += delta_angle
  elbow_points.append([pipe_length, 0.0])
  elbow_points = [[pipe_length, 0.0]] + elbow_points[::-1]

  # union of pipe and elbow geometry
  pipe_with_elbow = geom.CSGUnion(pipe, geom.geometry_2d.Polygon(elbow_points))

