In [None]:
!pip install torch
!pip install numpy
!pip install matplotlib
!pip install pandas
!pip install scipy

In [None]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.spatial.distance import euclidean
from torch.utils.data import Dataset, DataLoader

# Import Custom Activation Layers
import sys
sys.path.insert(0, '..')
from activationLayers import *

# Import RMHD Equations
from RMHDEquations import *

In [2]:
# Set device
device = ("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

mps


In [3]:
# Custom Neural Network

class RMHDPINN(nn.Module):
    def __init__(self, input_size, output_size, n_hidden, hidden_width, activation=TrainableTanh(), output_projection=FinalActivation()):
        super(RMHDPINN, self).__init__()
        
        self.input_dim = input_size
        self.output_dim = output_size
        self.hidden_dim = hidden_width
        self.num_layers = n_hidden
        
        # Define U and V layers separately
        self.U_layer = nn.Linear(input_size, hidden_width)
        self.V_layer = nn.Linear(input_size, hidden_width)
        
        # Define hidden and output layers
        self.hidden_layers = nn.ModuleList([nn.Linear(hidden_width, hidden_width) for _ in range(n_hidden)])
        self.output_layer = nn.Linear(hidden_width, output_size)
        
        # Set activation function
        self.phi = activation
        self.output_projection = output_projection
        
    def forward(self, t, x):
        """
        Performs the forward pass of the neural network.

        Args:
            t (torch.Tensor): The time argument.
            x (torch.Tensor): The position argument.

        Returns:
            torch.Tensor: The primitives for the 1-dimensional RMHD equations.

        """
        t = t.unsqueeze(1) if len(t.shape) == 1 else t
        x = x.unsqueeze(1) if len(x.shape) == 1 else x
        X = torch.cat([x, t], dim=1)
        U_t = self.phi(self.U_layer(X))
        V_t = self.phi(self.V_layer(X))
        H = U_t  # initial value for H

        for layer in self.hidden_layers:
            Z = self.phi(layer(H))
            H = (1 - Z) * U_t + Z * V_t

        output = self.output_layer(H)
        # Print the shape of the tensor before it's passed to the FinalActivation

        return output


# Initialization
def init_weights(m):
    if type(m) == nn.Linear:
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.01)

In [4]:
# Hyperparameters
depth = 4
width = 16
activation = FinalActivation()
model = RMHDPINN(input_size=2, output_size=5, n_hidden=depth, hidden_width=width, activation=TrainableTanh(), output_projection=activation).to(device)
print(model)
model(torch.rand(10,device=device), torch.rand(10,device=device))

RMHDPINN(
  (U_layer): Linear(in_features=2, out_features=16, bias=True)
  (V_layer): Linear(in_features=2, out_features=16, bias=True)
  (hidden_layers): ModuleList(
    (0-3): 4 x Linear(in_features=16, out_features=16, bias=True)
  )
  (output_layer): Linear(in_features=16, out_features=5, bias=True)
  (phi): TrainableTanh()
  (output_projection): FinalActivation()
)


tensor([[ 0.1613, -0.5243,  0.7675, -0.2049, -0.1905],
        [-0.0306, -0.6939,  0.4499, -0.0265, -0.1600],
        [ 0.1456, -0.5776,  0.7330, -0.1305, -0.2178],
        [ 0.1148, -0.5707,  0.6902, -0.1809, -0.1788],
        [ 0.1440, -0.4446,  0.7735, -0.3308, -0.1402],
        [ 0.2125, -0.4414,  0.8597, -0.2617, -0.1880],
        [ 0.0673, -0.4824,  0.6494, -0.3697, -0.1208],
        [ 0.1209, -0.4426,  0.7435, -0.3585, -0.1300],
        [ 0.0573, -0.6288,  0.5925, -0.1287, -0.1710],
        [ 0.1251, -0.4189,  0.7605, -0.3810, -0.1277]], device='mps:0',
       grad_fn=<LinearBackward0>)

In [5]:
"""
This function generates the initial conditions for primitives from conserved quantities.

Parameters:
- x (torch.Tensor): The spatial coordinate.
- Gamma (float): The adiabatic index.
- v_z (float): The z-component of the velocity.
- B_x (float): The x-component of the magnetic field.
- B_z (float): The z-component of the magnetic field.

Returns:
- torch.Tensor: The initial conditions for primitives.

"""
def initial_condition(x):
    # Conserved: rho, v_x, v_y, By, p
    # Primitives: rho, mx, my, By, E
    condition1 = torch.tensor([1.0, 0.0, 0.0, 6.0, 30.0], device=device)
    condition2 = torch.tensor([1.0, 0.0, 0.0, 0.7, 1.0], device=device)
    newx = torch.flatten(x).to(device)
    return torch.outer(newx<0, condition1) + torch.outer(newx>=0, condition2)

In [6]:
def gradients(outputs, inputs, order = 1, device=device):
    if order == 1:
        return torch.autograd.grad(outputs, inputs, grad_outputs=torch.ones_like(outputs), create_graph=True)[0]
    elif order > 1:
        return gradients(gradients(outputs, inputs, 1), inputs, order - 1) # Recursively take gradients
    else:
        return outputs

def rmhd_residual(pred,t,x):
    u_t= gradients(conserved_alfredo(pred),t)
    u_x= gradients(currents_alfredo(pred),x)
    return u_t + u_x


def random_domain(num_samples, t_range, x_range):
    t_random = torch.zeros(size=(num_samples, 1), device=device).uniform_(*t_range)
    x_random = torch.zeros(size=(num_samples, 1), device=device).uniform_(*x_range)
    t_random.requires_grad = True
    x_random.requires_grad = True
    return t_random, x_random



def random_boundary(num_samples, t_range, x_range, initial_to_boundary_ratio = 0.5):
    num_initial = int(initial_to_boundary_ratio * num_samples)
    num_boundary = num_samples - num_initial
    t_min, t_max = t_range

    # Generate initial condition samples
    t_initial = torch.zeros(size=(num_initial, 1), device=device)
    x_initial = torch.zeros(size=(num_initial, 1), device=device).uniform_(*x_range)
    u_initial = initial_condition(x_initial)

    # Generate boundary condition samples
    t_boundary = torch.zeros(size=(num_boundary, 1), device=device).uniform_(*t_range)

    # We assume x_range = (-1, 1) here
    x_boundary = 2 * torch.randint(0, 2, size=(num_boundary, 1), device=device) - 1
    u_boundary = initial_condition(x_boundary)#np.zeros((num_boundary, 1))

    return torch.tensor(t_initial, dtype=torch.float32), torch.tensor(x_initial, dtype=torch.float32), torch.tensor(u_initial, dtype=torch.float32), \
           torch.tensor(t_boundary, dtype=torch.float32), torch.tensor(x_boundary, dtype=torch.float32), torch.tensor(u_boundary, dtype=torch.float32)


In [7]:
lr = 0.005
model = RMHDPINN(input_size=2, output_size=5, n_hidden=depth, hidden_width=width, activation=TrainableTanh(), output_projection=activation).to(device)
#model = PINN(input_size=2, output_size=5, n_hidden=depth, hidden_width=width, activation=activation)
model.to(device)
optimizer = optim.Adam(model.parameters(), lr=lr, eps=1e-4)
boundary_criterion = nn.MSELoss()
initial_criterion = nn.MSELoss()
domain_criterion = nn.MSELoss()
intermediate_criterion = nn.MSELoss()

In [8]:
num_epochs = 3000
model.train()

x_range, t_range = [-0.5, 0.5], [0, 1]
loss_history = []
domain_loss_history = []
initial_loss_history = []
boundary_loss_history = []
a_history = []
b_history = []
c_history = []
gradient_history = []

# Initialize lambda values
lambda_initial = 1.0
lambda_boundary = 1.0

max_schedule_steps = 1000
n = 100
for epoch in range(1, num_epochs + 1, max_schedule_steps):
    n += 7
    domain_samples = n**2
    for g in optimizer.param_groups:
        g['lr'] *= 3/4
    for step in range(0, max_schedule_steps):
        boundary_samples = 300  # You can change this value

        optimizer.zero_grad()
        
        domain_t, domain_x = random_domain(domain_samples, t_range, x_range)
        initial_t, initial_x, initial_u, boundary_t, boundary_x, boundary_u = random_boundary(boundary_samples, t_range, x_range, 0.5)
        domain_prediction = model(domain_t, domain_x)
        domain_residual = rmhd_residual(domain_prediction, domain_t, domain_x)
        initial_prediction = model(initial_t, initial_x)
        boundary_prediction = model(boundary_t, boundary_x)

        domain_loss = domain_criterion(domain_residual, torch.zeros_like(domain_residual))
        initial_loss = initial_criterion(initial_prediction, initial_u)
        boundary_loss = boundary_criterion(boundary_prediction, boundary_u)

        # # Step 3: Compute total loss and backpropagate
        loss = 10*domain_loss + lambda_initial * initial_loss + lambda_boundary * boundary_loss

        # Compute backward and update model parameters as usual
        loss.backward()
        optimizer.step()
        
        current_grads = [p.grad.clone() for p in model.parameters()]
        gradient_history.append(current_grads)
        
        loss_history.append(loss.item())
        domain_loss_history.append(domain_loss.item())
        initial_loss_history.append(initial_loss.item())
        boundary_loss_history.append(boundary_loss.item())

        
        if (epoch+step) % 100 == 0:
            print(f"Epoch: {epoch+step}, Loss: {loss}")

  return torch.tensor(t_initial, dtype=torch.float32), torch.tensor(x_initial, dtype=torch.float32), torch.tensor(u_initial, dtype=torch.float32), \
  torch.tensor(t_boundary, dtype=torch.float32), torch.tensor(x_boundary, dtype=torch.float32), torch.tensor(u_boundary, dtype=torch.float32)


Epoch: 100, Loss: 141.21878051757812
Epoch: 200, Loss: 126.5076904296875
Epoch: 300, Loss: 55.898582458496094
Epoch: 400, Loss: 30.223264694213867
Epoch: 500, Loss: 21.002304077148438
Epoch: 600, Loss: 11.69563102722168
Epoch: 700, Loss: 13.825124740600586


KeyboardInterrupt: 

In [None]:
model(domain_t, domain_x)

In [None]:
plt.plot(loss_history)

In [None]:
model.eval()

tplot = 0  # 0.147

x = np.linspace(-0.5, 0.5, 100)
t = np.linspace(tplot, tplot, 1)

t, x = np.meshgrid(t, x)
t, x = t.flatten(), x.flatten()
t, x = torch.Tensor(t).to(device), torch.Tensor(x).to(device)
# numeric = inter_condition(tplot, x)
# numeric = numeric.reshape(100, 7)
prediction = model(t, x).detach()
prediction = prediction.reshape(100, 5)

# plt.imshow(prediction)

for i in range(7):

    # plt.plot(x.cpu().numpy(), numeric[:, i].cpu().flatten().numpy())
    plt.plot(x.cpu().numpy(), prediction[:, i].cpu().flatten().numpy())
    plt.grid(True)
    plt.show()