In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.cm import get_cmap
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import grad
import time

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Use GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Parameters - keeping the same as numerical solution
nx = 400       # Number of grid points for visualization
L = 1.0        # Domain length [0,L]
gamma = 1.4    # Ratio of specific heats for ideal gas
tmax = 0.2     # Maximum simulation time (typical for Sod problem)

# Time points for visualization
time_points = [0.0, 0.04, 0.08, 0.12, 0.16, 0.2]

# Create spatiotemporal grid for training and visualization
x = np.linspace(0, L, nx)
t = np.linspace(0, tmax, 100)  # Fine time discretization for training

# Initial conditions - Sod shock tube
def initial_condition(x):
    rho = np.ones_like(x)
    u = np.zeros_like(x)
    p = np.ones_like(x)

    # Set right state
    right_idx = x >= 0.5
    rho[right_idx] = 0.125
    p[right_idx] = 0.1

    # Compute initial energy
    E = p / ((gamma - 1) * rho) + 0.5 * u**2

    return rho, u, p, E

# Generate initial and boundary conditions
rho_init, u_init, p_init, E_init = initial_condition(x)

# Print problem setup
print("Sod Shock Tube Problem - Physics-Informed Neural Network")
print(f"Domain: x ∈ [0, {L}]")
print(f"Time: t ∈ [0, {tmax}]")
print(f"Gamma: {gamma}")
print("Initial conditions:")
print("  Left state (x < 0.5): ρL = 1.0, uL = 0.0, pL = 1.0")
print("  Right state (x ≥ 0.5): ρR = 0.125, uR = 0.0, pR = 0.1")

# Define the neural network
class EulerPINN(nn.Module):
    def __init__(self):
        super(EulerPINN, self).__init__()
        # Network architecture
        # More layers and neurons for complex fluid dynamics
        neurons = 50

        self.net = nn.Sequential(
            nn.Linear(2, neurons),
            nn.Tanh(),
            nn.Linear(neurons, neurons),
            nn.Tanh(),
            nn.Linear(neurons, neurons),
            nn.Tanh(),
            nn.Linear(neurons, neurons),
            nn.Tanh(),
            nn.Linear(neurons, neurons),
            nn.Tanh(),
            nn.Linear(neurons, neurons),
            nn.Tanh(),
            nn.Linear(neurons, 3)  # Output: rho, u, p
        )

        # Initialize weights
        for m in self.net.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

    def forward(self, x, t):
        # Normalize inputs to [-1, 1] for better training
        x_norm = 2.0 * (x / L) - 1.0
        t_norm = 2.0 * (t / tmax) - 1.0

        # Stack inputs
        inputs = torch.cat([x_norm, t_norm], dim=1)
        outputs = self.net(inputs)

        # Apply constraints to ensure positive density and pressure
        rho = torch.exp(outputs[:, 0])  # Ensure positive density
        u = outputs[:, 1]  # Velocity can be positive or negative
        p = torch.exp(outputs[:, 2])  # Ensure positive pressure

        return rho, u, p

    def compute_energy(self, rho, u, p):
        # Compute specific total energy
        E = p / ((gamma - 1) * rho) + 0.5 * u**2
        return E

    def euler_residuals(self, x, t):
        # This function computes the residuals of the Euler equations

        # Clone inputs and set requires_grad
        x_in = x.clone().requires_grad_(True)
        t_in = t.clone().requires_grad_(True)

        # Forward pass
        rho, u, p = self(x_in, t_in)
        E = self.compute_energy(rho, u, p)

        # Compute gradients
        # Mass equation derivatives
        drho_dt = grad(rho, t_in, torch.ones_like(rho), create_graph=True)[0]
        drhou_dx = grad(rho * u, x_in, torch.ones_like(rho), create_graph=True)[0]

        # Momentum equation derivatives
        drhou_dt = grad(rho * u, t_in, torch.ones_like(rho), create_graph=True)[0]
        d_rhou2_p_dx = grad(rho * u**2 + p, x_in, torch.ones_like(rho), create_graph=True)[0]

        # Energy equation derivatives
        drhoE_dt = grad(rho * E, t_in, torch.ones_like(rho), create_graph=True)[0]
        d_rhoEu_pu_dx = grad(u * (rho * E + p), x_in, torch.ones_like(rho), create_graph=True)[0]

        # Euler equations residuals
        res_mass = drho_dt + drhou_dx
        res_momentum = drhou_dt + d_rhou2_p_dx
        res_energy = drhoE_dt + d_rhoEu_pu_dx

        return res_mass, res_momentum, res_energy

# Create the neural network
model = EulerPINN().to(device)
print(model)

# Create training data points
# Collocation points for PDE residuals
n_collocation = 10000
x_collocation = torch.rand(n_collocation, 1, device=device) * L
t_collocation = torch.rand(n_collocation, 1, device=device) * tmax

# Initial condition points
n_ic = 1000
x_ic = torch.linspace(0, L, n_ic, device=device).unsqueeze(1)
t_ic = torch.zeros(n_ic, 1, device=device)

# Convert initial conditions to tensors
rho_ic = torch.tensor(rho_init, dtype=torch.float32, device=device).unsqueeze(1)
u_ic = torch.tensor(u_init, dtype=torch.float32, device=device).unsqueeze(1)
p_ic = torch.tensor(p_init, dtype=torch.float32, device=device).unsqueeze(1)

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=500, factor=0.5, verbose=True)

# Loss weights
lambda_ic = 10.0
lambda_pde = 1.0

# Training loop
n_epochs = 20000
print_every = 500
losses = []

start_time = time.time()
print("Starting training...")

for epoch in range(1, n_epochs + 1):
    optimizer.zero_grad()

    # Compute PDE residuals
    res_mass, res_momentum, res_energy = model.euler_residuals(x_collocation, t_collocation)
    loss_pde = (res_mass**2).mean() + (res_momentum**2).mean() + (res_energy**2).mean()

    # Compute initial condition loss
    rho_pred, u_pred, p_pred = model(x_ic, t_ic)
    loss_ic = ((rho_pred - rho_ic)**2).mean() + ((u_pred - u_ic)**2).mean() + ((p_pred - p_ic)**2).mean()

    # Total loss
    loss = lambda_ic * loss_ic + lambda_pde * loss_pde

    # Backpropagation
    loss.backward()
    optimizer.step()
    scheduler.step(loss)

    losses.append(loss.item())

    # Print progress
    if epoch % print_every == 0:
        elapsed = time.time() - start_time
        print(f"Epoch {epoch}/{n_epochs}, Loss: {loss.item():.6f}, IC Loss: {loss_ic.item():.6f}, "
              f"PDE Loss: {loss_pde.item():.6f}, Time: {elapsed:.2f}s")

print("Training completed!")
print(f"Total training time: {time.time() - start_time:.2f} seconds")

# Save the trained model
torch.save(model.state_dict(), "euler_pinn_model.pt")

# Evaluate the model
model.eval()
results = {}

with torch.no_grad():
    for time_point in time_points:
        # Create evaluation grid
        x_eval = torch.linspace(0, L, nx, device=device).unsqueeze(1)
        t_eval = torch.ones_like(x_eval, device=device) * time_point

        # Predict
        rho_pred, u_pred, p_pred = model(x_eval, t_eval)

        # Store results
        results[time_point] = {
            "rho": rho_pred.cpu().numpy().flatten(),
            "u": u_pred.cpu().numpy().flatten(),
            "p": p_pred.cpu().numpy().flatten()
        }

# Plot results
cmap = get_cmap('viridis')
colors = [cmap(i/len(time_points)) for i in range(len(time_points))]

plt.figure(figsize=(15, 12))

# Plot density
plt.subplot(3, 1, 1)
for i, time in enumerate(time_points):
    if time in results:
        plt.plot(x, results[time]["rho"], label=f"t = {time}", color=colors[i], linewidth=2)
plt.ylabel("Density ρ", fontsize=12)
plt.title("Sod Shock Tube (PINN): Density Evolution", fontsize=14)
plt.grid(True, alpha=0.3)
plt.legend(fontsize=10)

# Plot velocity
plt.subplot(3, 1, 2)
for i, time in enumerate(time_points):
    if time in results:
        plt.plot(x, results[time]["u"], label=f"t = {time}", color=colors[i], linewidth=2)
plt.ylabel("Velocity u", fontsize=12)
plt.title("Sod Shock Tube (PINN): Velocity Evolution", fontsize=14)
plt.grid(True, alpha=0.3)
plt.legend(fontsize=10)

# Plot pressure
plt.subplot(3, 1, 3)
for i, time in enumerate(time_points):
    if time in results:
        plt.plot(x, results[time]["p"], label=f"t = {time}", color=colors[i], linewidth=2)
plt.ylabel("Pressure p", fontsize=12)
plt.xlabel("x", fontsize=12)
plt.title("Sod Shock Tube (PINN): Pressure Evolution", fontsize=14)
plt.grid(True, alpha=0.3)
plt.legend(fontsize=10)

plt.tight_layout()
plt.savefig("sod_shock_tube_pinn_all_timesteps.png", dpi=300, bbox_inches="tight")
plt.show()

# Also create individual plots for clearer visualization
fig, axs = plt.subplots(3, len(time_points), figsize=(18, 10))
fig.tight_layout(pad=3.0)

for i, time in enumerate(sorted(results.keys())):
    data = results[time]

    axs[0, i].plot(x, data["rho"], 'b-', linewidth=2)
    axs[0, i].set_title(f"t = {time:.2f}")
    axs[0, i].set_ylim(0, 1.1)
    axs[0, i].grid(True, alpha=0.3)

    axs[1, i].plot(x, data["u"], 'r-', linewidth=2)
    axs[1, i].set_ylim(-0.1, 1.0)
    axs[1, i].grid(True, alpha=0.3)

    axs[2, i].plot(x, data["p"], 'g-', linewidth=2)
    axs[2, i].set_ylim(0, 1.1)
    axs[2, i].grid(True, alpha=0.3)

    if i == 0:
        axs[0, i].set_ylabel("Density ρ")
        axs[1, i].set_ylabel("Velocity u")
        axs[2, i].set_ylabel("Pressure p")

    axs[2, i].set_xlabel("x")

plt.savefig("sod_shock_tube_pinn_grid.png", dpi=300, bbox_inches="tight")
plt.show()

# Plot loss history
plt.figure(figsize=(10, 6))
plt.semilogy(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss History')
plt.grid(True)
plt.savefig("pinn_training_loss.png", dpi=300)
plt.show()

# Optional: Compare with numerical solution at final time
try:
    # Load numerical solution from previous code (if available)
    numerical_data = np.load('numerical_solution.npz')
    t_final = tmax

    plt.figure(figsize=(15, 12))

    # Plot density comparison
    plt.subplot(3, 1, 1)
    plt.plot(x, results[t_final]["rho"], 'b-', label='PINN', linewidth=2)
    plt.plot(x, numerical_data['rho'], 'r--', label='Numerical', linewidth=2)
    plt.ylabel("Density ρ", fontsize=12)
    plt.title(f"Comparison at t = {t_final}", fontsize=14)
    plt.grid(True, alpha=0.3)
    plt.legend(fontsize=10)

    # Plot velocity comparison
    plt.subplot(3, 1, 2)
    plt.plot(x, results[t_final]["u"], 'b-', label='PINN', linewidth=2)
    plt.plot(x, numerical_data['u'], 'r--', label='Numerical', linewidth=2)
    plt.ylabel("Velocity u", fontsize=12)
    plt.grid(True, alpha=0.3)
    plt.legend(fontsize=10)

    # Plot pressure comparison
    plt.subplot(3, 1, 3)
    plt.plot(x, results[t_final]["p"], 'b-', label='PINN', linewidth=2)
    plt.plot(x, numerical_data['p'], 'r--', label='Numerical', linewidth=2)
    plt.ylabel("Pressure p", fontsize=12)
    plt.xlabel("x", fontsize=12)
    plt.grid(True, alpha=0.3)
    plt.legend(fontsize=10)

    plt.tight_layout()
    plt.savefig("pinn_vs_numerical.png", dpi=300)
    plt.show()
except:
    print("Comparison with numerical solution skipped.")

print("Analysis complete!")