In [1]:
import torch
from torch import nn
torch.manual_seed(2022)

<torch._C.Generator at 0x7fdc75966c10>

In [2]:
if torch.cuda.is_available():
   device = torch.device("cuda")
   print("Running on the GPU")
else:
    device = torch.device("cpu")
    print("Running on the CPU")

Running on the CPU


  return torch._C._cuda_getDeviceCount() > 0


**BCEWithLogitsLoss**

This loss function combines a Sigmoid layer and the Binary Cross-Entropy (BCE) loss in one single class, making it suitable for binary classification tasks where the output logits need to be converted to probabilities. It computes the binary cross-entropy between the target and the output logits.

**Formula**: 
$$BCE(x, y) = -\frac{1}{N} \sum_{i=1}^{N} \left[y_i \cdot \log(\sigma(x_i)) + (1 - y_i) \cdot \log(1 - \sigma(x_i))\right] $$

Where:
- \( $x_i$ \) is the output logit for the \(i\)-th sample.
- \( $y_i$ \) is the target label (0 or 1) for the \(i\)-th sample.
- \( $\sigma(x_i)$ = $\frac{1}{1 + e^{-x_i}}$ \) is the Sigmoid function applied to the output logit.

**L1Loss**

This loss function computes the Mean Absolute Error (MAE) between the predicted output and the target. It is commonly used in regression tasks, where minimizing the absolute difference between predictions and actual values is important.

**Formula**: 
$$L_1Loss(x, y) = \frac{1}{N} \sum_{i=1}^{N} |x_i - y_i|$$

Where:
- \( $x_i$ \) is the predicted value for the \(i\)-th sample.
- \( $y_i$ \) is the target value for the \(i\)-th sample.
- \( N \) is the number of samples.


In [3]:
adv_criterion = nn.BCEWithLogitsLoss()
recon_criterion = nn.L1Loss()

In [4]:
import numpy as np
import scipy.io as sio
import os 

num_channels = 10
prefix = '../DCRM/Data_BC_250'
u3val = np.array([sio.loadmat(os.path.join(prefix, "BCMultiPoissonCalc_" + str(i) + ".mat"))['u'] for i in range(1,num_channels+1)]) # solution obtained with DF
poisson_f = np.array([sio.loadmat(os.path.join(prefix, "BCMultiPoissonCalc_" + str(i) + ".mat"))['gf'] for i in range(1,num_channels+1)]) # source term

inputs = torch.tensor(np.expand_dims(poisson_f, axis=1)) # from (250, 128, 128) to (250, 1, 128, 128)
true_sol = torch.tensor(np.expand_dims(u3val, axis=1))
BCval = torch.zeros_like(true_sol) # Bcs are zero

In [5]:
num_channelsTest = 100
prefixTest = '../DCRM/Data_BC_1000'
u3valTest = np.array([sio.loadmat(os.path.join(prefixTest, "BCMultiPoissonCalc_" + str(i) + ".mat"))['u'] for i in range(1,num_channelsTest+1)])
poisson_fTest = np.array([sio.loadmat(os.path.join(prefixTest, "BCMultiPoissonCalc_" + str(i) + ".mat"))['gf'] for i in range(1,num_channelsTest+1)])

inputsTest = torch.tensor(np.expand_dims(poisson_fTest , axis=1))
true_solTest = torch.tensor(np.expand_dims(u3valTest , axis=1))
BCvalTest = torch.zeros_like(true_solTest)

for i in range(true_solTest .shape[0]):
    BCvalTest[i,0,:,:] = true_solTest[i,0,:,:]
    BCvalTest[i, 0, 1:127, 1:127] = torch.zeros((126,126))


In [6]:
x = torch.linspace(0, 1, 64 )
y = torch.linspace(0, 1, 64 )
rx, ry = torch.meshgrid(x, y) # rx is the x component of the meshgrid, ry is the y component of the meshgrid
rx = rx.to(device)
ry = ry.to(device)

# If the tensors are on the GPU, they have to be moved to the CPU to be converted to numpy arrays
rxd = rx.cpu().detach().numpy()
ryd = ry.cpu().detach().numpy()

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


#### 1 - Adam Optimizer

The Adam optimizer uses two moments for adjusting learning rates:

1. **First Moment (Gradient Mean)**:
   - **Effect**: A value close to 1 gives more weight to past gradients, allowing the optimizer to retain a longer-term memory of gradients.

2. **Second Moment (Gradient Variance)**:
   - **Effect**: Squaring the gradients is used to measure the variance, which helps in adjusting the learning rate adaptively based on the magnitude of gradients.

Parameters:
- **$\beta_1$**: Controls the momentum (gradient mean).
- **$\beta_2$**: Controls the variance (squared gradient mean).


In [7]:
from network_w import *

def weights_init(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
        torch.nn.init.normal_(m.weight, 0.0, 0.02) # initialize the weights with a normal distribution
    if isinstance(m, nn.BatchNorm2d):
        torch.nn.init.normal_(m.weight, 0.0, 0.02) 
        torch.nn.init.constant_(m.bias, 0) # initialize the bias of the batch normalization to zero

input_dim = 2
real_dim = 1

lr = 0.0001
beta_1 = 0.5
beta_2 = 0.999

gen = UNet(input_dim, real_dim).to(device)
gen = gen.apply(weights_init)
gen_opt = torch.optim.Adam(gen.parameters(), lr=lr, betas=(beta_1, beta_2))

#### 2 - Cosine Annealing

1. **Why we are using it**:
    - Used to dynamically reduce the learning rate of the optimizer during training, **following a cosine curve**. By decreasing the learning rate, it helps avoid saddle points or shallow local minima.
2. **Saddle points**:
    - Point where the function has different curvatures in different directions, being both convex and concave in various directions
    - Result in weak or zero gradients, making it hard to determine the optimal direction for optimization and potentially slowing convergence


In [8]:
gen_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(gen_opt, 300 * 2500)

#### 3 - Noyaux de convolution

1. **Output dimension of an image of size WxH after performing a convolution with a kernel of size $k_h$ x $k_w$**: 
    - Padding : P and Stride : S
    
    - H' = $\frac{H + 2P - k_h}{S} + 1$ 

    - W' = $\frac{W + 2P - k_w}{S} + 1$ 

2. **Paper explanations (for DCPINN)**:
    - Laplacian operator discretized spacially by centrale difference schemes :
$$
\begin{align*}
\Delta u(x, y) & = u_{xx}(x, y) + u_{yy}(x, y) \\
& \approx \frac{u(x -h, y) + u(x + h, y) - 4u(x, y) + u(x, y - h) + u(x,y + h)}{h^2} \\
& := \frac{1}{h^2} \begin{bmatrix}
                    0 & 1 & 0 \\
                    1 & -4 & 1 \\
                    0 & 1 & 0
                    \end{bmatrix}
\end{align*}
$$

3. **Paper explanations (for DCRM)**:
    - We want to minimise : $E(u) = \int_0^1 \int_0^1 (\frac{1}{2}) \|\nabla\hat{U}\| - \hat{U}F - \int_{|\partial \Omega_N}\hat{U}|_{\partial \Omega_N}g_N$
    
    - We have to approximate : $\frac{\partial u}{\partial x}$ and $\frac{\partial u}{\partial y}$

In [9]:
stencildudx =np.array(((-3.0,4. , -1.0))) # forward stencil for the x derivative
stencildudy =np.array(((-3.0),(4.) ,(-1.0))) # forward stencil for the y derivative

stencildudx_2 =np.array(((-1.0,4. , -3.0))) # backward stencil for the x derivative
stencildudy_2 =np.array(((-1.0),(4.) ,(-3.0))) # backward stencil for the y derivative

def du_dx(index, real_dim):
    m = torch.nn.Conv2d(1, 1, (3,1), stride=(1,1), groups=real_dim).to(device) # create a convolutional layer with 1 input channel, 1 output channel, a kernel size of 3x1, and a stride of 1x1

    if index == 1: #  initialize the weights of the convolutional layer with the stencil
        for i in range(3):
            for k in range(1):
                m.weight.data[k, 0, i, 0] = stencildudx[i]
    else:
        for i in range(3):
            for k in range(1):
                m.weight.data[k, 0, i, 0] = stencildudx_2[i]
    m.bias.data = torch.zeros((1))

    with torch.autograd.no_grad():
        m.weight.requires_grad_(False)
        m.bias.requires_grad_(False)

    m= m.to(device)
    return m

def du_dy(index, real_dim):
    m = torch.nn.Conv2d(1, 1, (1,3), stride=1, groups=real_dim).to(device)

    if index == 1:
        for j in range(3):
            for k in range(1):
                m.weight.data[k, 0, 0, j] = stencildudy[j]
    else:
        for j in range(3):
            for k in range(1):
                m.weight.data[k, 0, 0, j] = stencildudy_2[j]

    m.bias.data = torch.zeros((1))
    with torch.autograd.no_grad():

        m.weight.requires_grad_(False)
        m.bias.requires_grad_(False)

    m= m.to(device)
    return m

# 1 - Train process

In [10]:
condition = inputs.to(device).float() # source term
bc = BCval.to(device).float() # boundary conditions
inpComb = torch.cat((condition, bc), 1) # concatenate the source term and the boundary conditions

conditionTest = inputsTest.to(device).float()
bcTest = BCvalTest.to(device).float()
inpCombTest = torch.cat((conditionTest, bcTest), 1)

#### 1.1 - Interpolation on a grid 

1. **Why we shloud do that** :

    - The high-fidelity model is based on computational methods such as FEM (Finite Element Method) or FVM (Finite Volume Method).

    - In these numerical simulations, data is often obtained on non-uniform meshes that are adapted to the complex geometry of the domain or are finer in certain areas (local refinement). 

    - CNNs are designed to process data on a regular grid.

2. **Why we choose to not interpolate boundary conditions** :

    - Imposed boundary conditions are often specific values or physical behaviors that we want to be strictly adhered to.

    - If the boundary conditions are defined on complex geometries (such as curves or non-planar surfaces), interpolation might not faithfully respect the geometric shape of the boundaries.

3. **What we interpolate** :

    - source term withou bc, solution for training and solution for test

In [11]:
from scipy.interpolate import griddata

def interpo(condition_st):
    n = 128
    linx128 = np.linspace(0, 1, n)

    linx130 = np.linspace(0, 1, n + 2)
    z1_out = np.zeros(((condition_st.shape[0], 1, 130, 130)))
    for mm in range(condition_st.shape[0]):
        val = condition_st[mm, 0, :, :].detach().cpu().numpy()
        xv_128, yv_128 = np.meshgrid(linx128, linx128, indexing='ij')
        xv_130, yv_130 = np.meshgrid(linx130, linx130, indexing='ij')

        points = np.zeros(( 128 * 128, 2))
        values = np.zeros(( 128 * 128, 1))
        iter = 0
        for i in range(128):
            for j in range(128):
                points[iter, 0] = xv_128[i, j]
                points[iter, 1] = yv_128[i, j]
                values[iter, 0] = val[i, j]
                iter = iter + 1

        grid_z1 = griddata(points, values[:, 0], (xv_130, yv_130), method='linear') # We can change the method : see doc
        z1_out[mm,0,:,:] = grid_z1.reshape((1, 1, 130, 130))

    return torch.as_tensor(z1_out)

In [12]:
# Source term is interpolated to 130x130
inter_condition = interpo(condition)
inter_condition = inter_condition.to(device).float()

In [13]:
# Solution term for training is interpolated to 130x130
inter_true = interpo(true_sol)
inter_true = inter_true.to(device).float()
inter_true_np = inter_true.cpu().detach().numpy()

In [14]:
# Solution term for testing is interpolated to 130x130
inter_trueTest = interpo(true_solTest)
inter_trueTest = inter_trueTest.to(device).float()

#### 1.2 - Grid creation


In [15]:
n = condition.shape[2]
m = condition.shape[3]
x = torch.linspace(0, 1, n + 2)
y = torch.linspace(0, 1, m + 2)
rx, ry = torch.meshgrid(x, y)
rx = rx.to(device)
ry = ry.to(device)
rxd = rx.cpu().detach().numpy()
ryd = ry.cpu().detach().numpy()

W = torch.zeros((1, 1, rx.shape[0], rx.shape[1]), device=device)

x130 = np.linspace(0, 1, 130)
deltax130 = np.abs(x130[1] - x130[0])
x = np.linspace(0,1,128)
deltax = np.abs(x[1] - x[0])

convGraddudx = du_dx(1, real_dim)
convGraddudy = du_dy(1, real_dim)
convGraddudx2 = du_dx(2, real_dim)
convGraddudy2 = du_dy(2, real_dim)

#### 1.3 - Dataset creation and training

1. **Various steps of the training process** :
    - Padding the output of the CNN to impose the boundary conditions

    - Compute the approximation of $\frac{\partial u}{\partial x}$ and $\frac{\partial u}{\partial y}$

    - Compute the loss E[u]

In [16]:
def impose_BC(x):
    output_CNN = gen(x) # output of the CNN
    n = x.shape[2]
    m = x.shape[3]
    xx = torch.zeros((x.shape[0], 1, n + 2, m + 2)).to(device)

    xx[:, :, 1:n + 1, 1:m + 1] = output_CNN # the output of the CNN is placed in the center of the tensor

    # Rules defined in the paper for the padding on the sides
    xx[:, 0, 0, 1:m + 1] = x[:,1,0, 0:m] 
    xx[:, 0, n + 1, 1:m + 1] = x[:,1,n-1, 0:m] 
    xx[:, 0, 1:n + 1, 0] = x[:,1,0:n, 0]
    xx[:, 0, 1:n + 1, m + 1] = x[:,1,0:n, m-1]

    # Corners : mean of the neighbors
    xx[:, 0, 0, m + 1] = 0.5 * (xx[:, 0, 0, m] + xx[:, 0, 1, m + 1])
    xx[:, 0, n + 1, 0] = 0.5 * (xx[:, 0, n, 0] + xx[:, 0, n + 1, 1])
    xx[:, 0, 0, 0] = 0.5 * (xx[:, 0, 0, 1] + xx[:, 0, 1, 0])
    xx[:, 0, n + 1, m + 1] = 0.5 * (xx[:, 0, n + 1, m] + xx[:, 0, n, m + 1])

    return xx

In [22]:
import IntegrationLoss
import pickle
from torch.utils.data import DataLoader, TensorDataset
from tqdm.auto import tqdm

torch_dataset = TensorDataset(inpComb, inter_true,inter_condition)
dataloader = DataLoader(dataset=torch_dataset, batch_size=2, shuffle=False)

torch_datasetTest = TensorDataset(inpCombTest, inter_trueTest)
dataloaderTest = DataLoader(dataset=torch_datasetTest, batch_size=2, shuffle=False)

# Training parameters

cur_step = 0
display_step = 100
plot_step = 50
losses_list = []

# Loss function
intLoss = IntegrationLoss.IntegrationLoss('trapezoidal', 2)

# Plots
idx_plot = [0, 5, 9]

while cur_step < 500:
    mean_loss_compare = 0.0
    mean_loss = 0.0

    for inpt, outpt, inter_out in tqdm(dataloader) :
        out = impose_BC(inpt)
        out_dudx = (1 / (2 * deltax130)) * convGraddudx(out)
        out_dudy = (1 / (2 * deltax130)) * convGraddudy(out)
        out_dudx2 = (1 / (2 * deltax130)) * convGraddudx2(out)
        out_dudy2 = (1 / (2 * deltax130)) * convGraddudy2(out)

        # Loss calculation
        I_in1 = torch.pow(out_dudx, 2)
        I_in12 = torch.pow(out_dudx2, 2)
        I_in2 = torch.pow(out_dudy, 2)
        I_in22 = torch.pow(out_dudy2, 2)

        internal1 = intLoss.lossInternalEnergy(0.5 * (I_in1 + I_in12), dx=deltax, dy=deltax130, shape=I_in1.shape)
        internal2 = intLoss.lossInternalEnergy(0.5 * (I_in2 + I_in22), dx=deltax130, dy=deltax, shape=I_in2.shape)

        fu = out * inter_out
        internal_f = intLoss.lossInternalEnergy(fu, dx=deltax, dy=deltax, shape=fu.shape)

        loss = 0.5*(internal1 + internal2) - internal_f
        loss.backward()
        gen_opt.step()

        mean_loss += loss.item()
        mean_loss_compare += torch.sum(torch.abs(out - outpt)).item()

        if cur_step % display_step == 0 :
            with torch.no_grad():
                loss_test = torch.tensor(0.0, requires_grad=False).to(device)
                for inpt_test, outpt_test in tqdm(dataloaderTest):
                    out_test = impose_BC(inpt_test)
                    loss_test += torch.sum(torch.abs(out_test - outpt_test))

            mean_loss /= display_step
            mean_loss_compare /= display_step
            losses_list.append([cur_step, mean_loss, mean_loss_compare, loss_test.item()])
            with open('../DCRM/Images/energy_loss.pickle', 'wb') as f:
                pickle.dump(losses_list, f)

            print(f"Step {cur_step}: Generator loss: {mean_loss}, Compare loss: {mean_loss_compare}, Test loss: {loss_test}")

        if cur_step % plot_step == 0 :
            with torch.no_grad():
                out_cnn = impose_BC(inpComb[idx_plot, :, :, :])
            out_cnn_np = out_cnn.cpu().detach().numpy()

            for xi in range(len(idx_plot)):
                fig, axs = plt.subplots(1, 3, figsize=(15, 5))
                im0 = axs[0].imshow(inter_true_np[idx_plot[xi], 0, :, :], cmap='jet')
                axs[0].set_title('True solution')
                fig.colorbar(im0, ax=axs[0])
                im1 = axs[1].imshow(out_cnn_np[xi, 0, :, :], cmap='jet')
                axs[1].set_title('Predicted solution')
                fig.colorbar(im1, ax=axs[1])
                im2 = axs[2].imshow(np.abs(inter_true_np[idx_plot[xi], 0, :, :] - out_cnn_np[xi, 0, :, :]), cmap='jet')
                axs[2].set_title('Difference')
                fig.colorbar(im2, ax=axs[2])
                plt.savefig(f'../DCRM/Images/energy_{cur_step}_{idx_plot[xi]}.png')
                plt.close()

        cur_step += 1
            

                




Constructor: IntegrationLoss  trapezoidal  in  2  dimension 


  0%|          | 0/5 [00:00<?, ?it/s]

tensor(0.1236, grad_fn=<AddBackward0>)


  0%|          | 0/50 [00:00<?, ?it/s]

KeyboardInterrupt: 