In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animaton
import scipy.io

In [6]:
def coriolis_force(lat):
    omega = 2 * np.pi / (24 * 60 * 3600)
    f = 2 * omega * np.sin(lat)
    return f

In [7]:
nu = 10**(-3)

In [8]:
N_train = 5000

data = scipy.io.loadmat('navier_Stokes.mat')
U_star = data['U_star']
X_star = data['X_star']
t_star = data['t']
p_star = data['p_star']

N = X_star.shape[0]
M = t_star.shape[0]

U = U_star[:, 0, :]
V = U_star[:, 1, :]

X = np.tile(X_star[:, 0:1], (1, t_star.shape[0]))  # (N, M)
Y = np.tile(X_star[:, 1:2], (1, t_star.shape[0])) # (N, M)
T = np.tile(t_star, (1, X_star.shape[0])).T # (N, M)

XX = X.flatten()[:, None]
YY = Y.flatten()[:, None]  
TT = T.flatten()[:, None]  
UU = U.flatten()[:, None]
VV = V.flatten()[:, None]
PP = p_star.flatten()[:, None]

indices = np.random.choice(UU.shape[0], N_train, replace=False)
x_train = torch.tensor(XX[indices], dtype=torch.float32, requires_grad=True)
y_train = torch.tensor(YY[indices], dtype=torch.float32, requires_grad=True)
t_train = torch.tensor(TT[indices], dtype=torch.float32, requires_grad=True)
u_train = torch.tensor(UU[indices], dtype=torch.float32)
v_train = torch.tensor(VV[indices], dtype=torch.float32)
p_train = torch.tensor(PP[indices], dtype=torch.float32)

x_test = X_star[:, 0:1]
y_test = X_star[:, 1:2]
t_test = np.ones((x_test.shape[0], x_test.shape[1])) # All ones so that when I animate, we go through all T, also appears smoother
u_test = U_star[:, 0, :]
v_test = U_star[:, 1, :]
p_test = p_star

x_test = torch.tensor(x_test, dtype=torch.float32, requires_grad=True)
y_test = torch.tensor(y_test, dtype=torch.float32, requires_grad=True)
t_test = torch.tensor(t_test, dtype=torch.float32, requires_grad=True)
u_test = torch.tensor(u_test, dtype=torch.float32) # Used to create the residual
v_test = torch.tensor(v_test, dtype=torch.float32)
p_test = torch.tensor(p_test, dtype=torch.float32)

X_plot = X_star[:, 0].reshape(50, 100)
Y_plot = X_star[:, 1].reshape(50, 100)

In [9]:
fig, ax = plt.subplots(1, 3, figsize=(18, 6))

def update(frame):
    ax[0].clear()
    ax[1].clear()
    ax[2].clear()
    
    U_plot = U_star[:, 0, frame].reshape(50, 100)
    V_plot = U_star[:, 1, frame].reshape(50, 100)
    P_plot = p_star[:, frame].reshape(50, 100)
    
    ax[0].contourf(X_plot, Y_plot, U_plot, levels = 20, cmap='jet') # Levels = 20 is necessary cuz gives error that contour lines not increasing
    ax[1].contourf(X_plot, Y_plot, V_plot, levels = 20, cmap='jet')
    ax[2].contourf(X_plot, Y_plot, P_plot, levels = 20, cmap='jet')
     
    ax[0].set_title(f'U velocity at t={t_star[frame][0]:.2f}')
    ax[1].set_title(f'V velocity at t={t_st\ar[frame][0]:.2f}')
    ax[2].set_title(f'Pressure at t={t_star[frame][0]:.2f}')
    ax[0].set_xlabel('x')
    ax[0].set_ylabel('y')
    ax[1].set_xlabel('x')
    ax[1].set_ylabel('y')
    ax[2].set_xlabel('x')
    ax[2].set_ylabel('y')

ani = animation.FuncAnimation(fig, update, frames=t_star.shape[0], interval=1)
#ani.save('navier_stokes_animation_true.gif') # Same graph when plot the test data
plt.show()

SyntaxError: f-string expression part cannot include a backslash (1530806721.py, line 17)

In [2]:
class PINN(nn.Module):
    def __init__(self):
        super(PINN, self).__init__()
        self.fc1 = nn.Linear(3, 50) 
        self.fc2 = nn.Linear(50, 50) 
        self.fc3 = nn.Linear(50, 50) 
        self.fc4 = nn.Linear(50, 50) 
        self.fc5 = nn.Linear(50, 50) 
        self.fc6 = nn.Linear(50, 50) 
        self.fc7 = nn.Linear(50, 50) 
        self.fc8 = nn.Linear(50, 50) 
        self.fc9 = nn.Linear(50, 50) 
        self.fc10 = nn.Linear(50, 3)

    def forward(self, x):
        x = torch.tanh(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        x = torch.tanh(self.fc3(x))
        x = torch.tanh(self.fc4(x))
        x = torch.tanh(self.fc5(x))
        x = torch.tanh(self.fc6(x))
        x = torch.tanh(self.fc7(x))
        x = torch.tanh(self.fc8(x))
        x = torch.tanh(self.fc9(x))
        x = self.fc10(x)
        return x

In [18]:
def loss_phys(f_x, f_y, c):
    return torch.mean((f_x - 0)**2 + (f_y - 0)**2 + (c - 0)**2)

In [19]:
def loss_data(u, v, ssh): 
    return torch.mean((u - u_train)**2 + (v - v_train)**2 + (ssh - ssh_train)**2)
    # return torch.mean((u - u_train)**2 + (v - v_train)**2 + (ssh - p_train)**2)

In [20]:
def function(x, y, t):
    predict = model(torch.hstack((x, y, t))) # Allows x, y, and t to be in the computational graph instead of creating training_data
    psi = predict[:, 0:1]
    ssh = predict[:, 1:2]
    
    u = torch.autograd.grad(psi, y, grad_outputs=torch.ones_like(psi), create_graph=True)[0]
    v = -1 * torch.autograd.grad(psi, x, grad_outputs=torch.ones_like(psi), create_graph=True)[0]

    u_x = torch.autograd.grad(u, x, grad_outputs=torch.ones_like(u), create_graph=True)[0]
    u_xx = torch.autograd.grad(u_x, x, grad_outputs=torch.ones_like(u_x), create_graph=True)[0]
    u_y = torch.autograd.grad(u, y, grad_outputs=torch.ones_like(u), create_graph=True)[0]
    u_yy = torch.autograd.grad(u_y, y, grad_outputs=torch.ones_like(u_y), create_graph=True)[0]

    v_x = torch.autograd.grad(v, x, grad_outputs=torch.ones_like(v), create_graph=True)[0]
    v_xx = torch.autograd.grad(v_x, x, grad_outputs=torch.ones_like(v_x), create_graph=True)[0]
    v_y = torch.autograd.grad(v, y, grad_outputs=torch.ones_like(v), create_graph=True)[0]
    v_yy = torch.autograd.grad(v_y, y, grad_outputs=torch.ones_like(v_y), create_graph=True)[0]

    ssh_x = torch.autograd.grad(ssh, x, grad_outputs=torch.ones_like(ssh), create_graph=True)[0]
    ssh_y = torch.autograd.grad(ssh, y, grad_outputs=torch.ones_like(ssh), create_graph=True)[0]

    u_t = torch.autograd.grad(u, t, grad_outputs=torch.ones_like(u), create_graph=True)[0]
    v_t = torch.autograd.grad(v, t, grad_outputs=torch.ones_like(v), create_graph=True)[0]

    # f_x = u_t + u * u_x + v * u_y + ssh_x - nu * (u_xx + u_yy)
    # f_y = v_t + u * v_x + v * v_y + ssh_y - nu * (v_xx + v_yy)

    c = u_x + v_y

    f_x = u_t + u * u_x + v * u_y + g * ssh_x - nu * (u_xx + u_yy)
    f_y = v_t + u * v_x + v * v_y + g * ssh_y - nu * (v_xx + v_yy)

    return u, v, ssh, f_x, f_y, c

In [22]:
def closure():
    optimizer.zero_grad()
    u_pred, v_pred, ssh_pred, f_x, f_y, c = function(x_train, y_train, t_train)
    loss = loss_data(u_pred, v_pred, ssh_pred) + loss_phys(f_x, f_y, c) 
    loss.backward()
    return loss

In [37]:
# Training function
def train(model, optimizer, num_epochs = 2000):
    prev_loss = float('inf')  
    for epoch in range(1, num_epochs + 1):
        loss = optimizer.step(closure) # This is for LBFGS
        print(f'Epoch [{epoch}/{num_epochs}], Loss: {loss.item()}')
        if loss.item() >= prev_loss:
            break # Stop iteration if loss isn't decreasing
        prev_loss = loss.item()

In [38]:
# Initialize model and optimizer
model = PINN()

# LBFGS optimizer best for handling second derivative information and small datasets
# requires a closure() to perform the loss function multiple times, and be consistent
# optimizer = optim.LBFGS(model.parameters(), lr = 1)
optimizer = optim.Adam(model.parameters(), lr = 0.001)

# Train the model
train(model, optimizer)

Epoch [1/1000], Loss: 22.1621036529541
Epoch [2/1000], Loss: 21.363168716430664
Epoch [3/1000], Loss: 19.46809196472168
Epoch [4/1000], Loss: 16.031036376953125
Epoch [5/1000], Loss: 13.639433860778809
Epoch [6/1000], Loss: 11.999612808227539
Epoch [7/1000], Loss: 10.751105308532715
Epoch [8/1000], Loss: 9.715580940246582
Epoch [9/1000], Loss: 8.658815383911133
Epoch [10/1000], Loss: 7.36850643157959
Epoch [11/1000], Loss: 6.331462383270264
Epoch [12/1000], Loss: 5.6038031578063965
Epoch [13/1000], Loss: 5.05152702331543
Epoch [14/1000], Loss: 4.556077003479004
Epoch [15/1000], Loss: 4.072875022888184
Epoch [16/1000], Loss: 3.6936779022216797
Epoch [17/1000], Loss: 3.411155939102173
Epoch [18/1000], Loss: 3.1782002449035645
Epoch [19/1000], Loss: 2.9285409450531006
Epoch [20/1000], Loss: 2.7094054222106934
Epoch [21/1000], Loss: 2.5221800804138184
Epoch [22/1000], Loss: 2.3220877647399902
Epoch [23/1000], Loss: 2.170361042022705
Epoch [24/1000], Loss: 2.0673441886901855
Epoch [25/1000]

In [None]:
fig, ax = plt.subplots(1, 3, figsize=(18, 6))

u_pred, v_pred, p_pred, _ , _, _ = function(x_test, y_test, t_test * t_star[0][0])

U_plot = u_pred.detach().numpy().reshape(50, 100)
V_plot = v_pred.detach().numpy().reshape(50, 100)
P_plot = p_pred.detach().numpy().reshape(50, 100)

ax[0].contourf(X_plot, Y_plot, U_plot, cmap='jet')
ax[1].contourf(X_plot, Y_plot, V_plot, cmap='jet')
ax[2].contourf(X_plot, Y_plot, P_plot, cmap='jet')

c1 = ax[0].contourf(X_plot, Y_plot, U_plot, cmap='jet')
c2 = ax[1].contourf(X_plot, Y_plot, V_plot, cmap='jet')
c3 = ax[2].contourf(X_plot, Y_plot, P_plot, cmap='jet')

cb1 = fig.colorbar(c1, ax=ax[0])
cb2 = fig.colorbar(c2, ax=ax[1])
cb3 = fig.colorbar(c3, ax=ax[2])

def update(frame):
    ax[0].clear()
    ax[1].clear()
    ax[2].clear()

    u_pred, v_pred, p_pred, _ , _, _ = function(x_test, y_test, t_test * t_star[frame][0])
    
    U_plot = u_pred.detach().numpy().reshape(50, 100)
    V_plot = v_pred.detach().numpy().reshape(50, 100)
    P_plot = p_pred.detach().numpy().reshape(50, 100)

    ax[0].contourf(X_plot, Y_plot, U_plot, cmap='jet')
    ax[1].contourf(X_plot, Y_plot, V_plot, cmap='jet')
    ax[2].contourf(X_plot, Y_plot, P_plot, cmap='jet')
    
    ax[0].set_title(f'U velocity at t={t_star[frame][0]:.2f}')
    ax[1].set_title(f'V velocity at t={t_star[frame][0]:.2f}')
    ax[2].set_title(f'Pressure at t={t_star[frame][0]:.2f}')
    ax[0].set_xlabel('y')
    ax[0].set_ylabel('x')
    ax[1].set_xlabel('y')
    ax[1].set_ylabel('x')
    ax[2].set_xlabel('y')
    ax[2].set_ylabel('x')

ani = animation.FuncAnimation(fig, update, frames=t_star.shape[0], interval=1)
#ani.save('navier_stokes_animation_pred.gif')
plt.show()

In [None]:
fig, ax = plt.subplots(1, 3, figsize=(18, 6))

u_pred, v_pred, p_pred, _ , _ , _ = function(x_test, y_test, t_test * t_star[0][0])

U_plot = ((u_test[:, 0:1] - u_pred.detach().numpy())**2).reshape(50, 100) # [:, frame:frame + 1] to get extra dim. **2 to see differences
V_plot = ((v_test[:, 0:1] - v_pred.detach().numpy())**2).reshape(50, 100)
P_plot = ((p_test[:, 0:1] - p_pred.detach().numpy())**2).reshape(50, 100)

ax[0].contourf(X_plot, Y_plot, U_plot, cmap='jet')
ax[1].contourf(X_plot, Y_plot, V_plot, cmap='jet')
ax[2].contourf(X_plot, Y_plot, P_plot, cmap='jet')

c1 = ax[0].contourf(X_plot, Y_plot, U_plot, cmap='jet')
c2 = ax[1].contourf(X_plot, Y_plot, V_plot, cmap='jet')
c3 = ax[2].contourf(X_plot, Y_plot, P_plot, cmap='jet')

cb1 = fig.colorbar(c1, ax=ax[0])
cb2 = fig.colorbar(c2, ax=ax[1])
cb3 = fig.colorbar(c3, ax=ax[2])

def update(frame):
    ax[0].clear()
    ax[1].clear()
    ax[2].clear()

    u_pred, v_pred, p_pred, _ , _ , _ = function(x_test, y_test, t_test * t_star[frame][0])

    U_plot = ((u_test[:, frame:frame + 1] - u_pred.detach().numpy())**2).reshape(50, 100) # [:, frame:frame + 1] to get extra dim. **2 to see differences
    V_plot = ((v_test[:, frame:frame + 1] - v_pred.detach().numpy())**2).reshape(50, 100)
    P_plot = ((p_test[:, frame:frame + 1] - p_pred.detach().numpy())**2).reshape(50, 100)
    
    ax[0].contourf(X_plot, Y_plot, U_plot, cmap='jet')
    ax[1].contourf(X_plot, Y_plot, V_plot, cmap='jet')
    ax[2].contourf(X_plot, Y_plot, P_plot, cmap='jet')
     
    ax[0].set_title(f'U velocity at t={t_star[frame][0]:.2f}')
    ax[1].set_title(f'V velocity at t={t_star[frame][0]:.2f}')
    ax[2].set_title(f'Pressure at t={t_star[frame][0]:.2f}')
    ax[0].set_xlabel('y')
    ax[0].set_ylabel('x')
    ax[1].set_xlabel('y')
    ax[1].set_ylabel('x')
    ax[2].set_xlabel('y')
    ax[2].set_ylabel('x')

ani = animation.FuncAnimation(fig, update, frames=t_star.shape[0], interval=1)
#ani.save('navier_stokes_animation_residual.gif')
plt.show()