In [1]:
import torch
from torchvision import datasets
from torchvision.transforms import ToTensor, Normalize

In [2]:
import numpy as np
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from IPython.core.debugger import set_trace

In [3]:
# setting device and data types
dtype = torch.float32 
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [4]:
import torch.nn as nn

In [None]:
'''
Input channels will be:
    - x, x coordinate position.
    - y, y coordinate position.
    - u, velocity in the x direction.
    - v, velocity in the y direction.
    - p, pressure field.

Output channels, will have three:
    - u, velocity in the x direction.
    - v, velocity in the y direction.
    - p, pressure field.

Can I, in a way, add equivariant constraints? Enfore that the model is equivariant.
Add padding to allow for the boundary conditions to exist. Also allows for data augmentation, 
which should prevent overfitting on learning the boundary conditions.

Model trained as an autoencoder, with a PDE loss. We expect that the PDE loss contradicts the AE.
Maybe, could start with the AE to be noearly convergent, and then add the PDE. PDE alone might fail at
performing the auto-encoding overall.
'''

In [5]:
class Interpolate(nn.Module):
    def __init__(self, size, mode):
        super(Interpolate, self).__init__()
        self.interp = nn.functional.interpolate
        self.size = size
        self.mode = mode
        
    def forward(self, x):
        x = self.interp(x, size=self.size, mode=self.mode, align_corners=False)
        return x

In [8]:
from torch.nn.modules.activation import ReLU
class ConvAE(nn.Module):
    def __init__(self, input_size):
        super(ConvAE, self).__init__()

        # encoder
        self.enc1 = nn.Sequential(
            nn.Conv2d(in_channels=5, out_channels=16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            Interpolate(size=input_size//2, mode='bilinear'),
        )

        self.enc2 = nn.Sequential(
          nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
          nn.ReLU(),
          Interpolate(size=input_size//8, mode='bilinear'),
        )
        
        self.enc3 = nn.Sequential(
          nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
          nn.ReLU(),
          Interpolate(size=input_size//16, mode='bilinear'),
        )

        # decoder 
        self.dec1 = nn.Sequential(
          nn.Conv2d(in_channels=64, out_channels=32, kernel_size=3, stride=1, padding=1),
          nn.ReLU(),
          Interpolate(size=input_size//8, mode='bilinear'),
        )
        self.dec2 = nn.Sequential(
          nn.Conv2d(in_channels=32, out_channels=16, kernel_size=3, stride=1, padding=1),
          nn.ReLU(),
          Interpolate(size=input_size//2, mode='bilinear'),
        )
        self.dec3 = nn.Sequential(
          nn.Conv2d(in_channels=16, out_channels=5, kernel_size=3, stride=1, padding=1),
          nn.ReLU(),
          Interpolate(size=input_size, mode='bilinear'),
        )
        self.dec4 = nn.Sequential(
          nn.Conv2d(in_channels=5, out_channels=3, kernel_size=3, stride=1, padding=1),
        )
    
    def compute_next_size(self, dimension, kernel, padding=0, stride=1):
        return int((dimension + 2*padding - kernel) / stride + 1)

    def forward(self, x):
        # encoding
        x = self.enc1(x)
        x = self.enc2(x)
        x = self.enc3(x)
        
        # decoding
        x = self.dec1(x)
        x = self.dec2(x)
        x = self.dec3(x)
        reconstruction = self.dec4(x)
        return reconstruction

In [9]:
# loss function? will have to think about it :)

In [92]:
x = torch.randn((5, 5), requires_grad=True)
y = torch.randn((5, 5), requires_grad=True)
u = torch.randn((5, 5), requires_grad=True)
v = torch.randn((5, 5), requires_grad=True)
p = torch.randn((5, 5), requires_grad=True)

In [93]:
upred = (x**2*y+y**2)*u
vpred = (y*x)**2*v
ppred = x*v + y*u

In [94]:
dudx, dudy = torch.autograd.grad(upred, [x, y], grad_outputs=torch.ones(upred.shape), retain_graph=True, create_graph=True)
dvdx, dvdy = torch.autograd.grad(vpred, [x, y], grad_outputs=torch.ones(upred.shape), retain_graph=True, create_graph=True)
dpdx, dpdy = torch.autograd.grad(upred, [x, y], grad_outputs=torch.ones(upred.shape))

In [95]:
d2udx2 = torch.autograd.grad(dudx, x, grad_outputs=torch.ones(upred.shape), retain_graph=True,)[0]
d2udy2 = torch.autograd.grad(dudy, y, grad_outputs=torch.ones(upred.shape))[0]
d2vdx2 = torch.autograd.grad(dvdx, x, grad_outputs=torch.ones(upred.shape), retain_graph=True,)[0]
d2vdy2 = torch.autograd.grad(dvdy, y, grad_outputs=torch.ones(upred.shape))[0]

In [None]:
# pde_loss, each navier stokes term difference squared!