In [1]:
import torch
import torch.nn as nn
import numpy as np

# ---------------------------
# 1) Hyperparameters
# ---------------------------
LR = 1e-3       # learning rate for Adam
N_EPOCHS = 30000  # number of training iterations (may need more!)
N_COLLO = 2000  # number of interior collocation points
N_BC = 200      # number of boundary points
EPSILON = 1e-4  # diffusion coefficient
B_X, B_Y = 2.0, 3.0  # convection coefficients

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

# ---------------------------
# 2) Define the Forcing Function f(x,y)
# ---------------------------
def forcing_function(x, y, epsilon):
    """
    f(x,y) = 2 e^(-x + e^{2(x-1)/ε})
             + x y^2
             + 6 x y
             - x e^{3(y-1)/ε}
             - y^2 e^{2(x-1)/ε}
             + 2 y^2
    """
    return (2.0 * torch.exp(-x + torch.exp(2.0*(x-1.0)/epsilon))
            + x * (y**2)
            + 6.0 * x * y
            - x * torch.exp(3.0*(y - 1.0)/epsilon)
            - (y**2) * torch.exp(2.0*(x - 1.0)/epsilon)
            + 2.0 * (y**2))


# ---------------------------
# 3) Neural Network Definition
# ---------------------------
class PINN(nn.Module):
    def __init__(self, layers=[2, 64, 64, 64, 64, 1]):
        super(PINN, self).__init__()
        self.activation = nn.Tanh()
        self.linears = nn.ModuleList()
        
        for i in range(len(layers) - 1):
            self.linears.append(nn.Linear(layers[i], layers[i+1]))
        
        # Glorot initialization
        for m in self.linears:
            nn.init.xavier_normal_(m.weight)
            nn.init.zeros_(m.bias)
        
    def forward(self, x):
        # x shape: [batch_size, 2]
        for i in range(len(self.linears) - 1):
            x = self.activation(self.linears[i](x))
        x = self.linears[-1](x)
        return x


# ---------------------------
# 4) PINN Setup
# ---------------------------
pinn = PINN(layers=[2, 64, 64, 64, 64, 1]).to(DEVICE)
optimizer = torch.optim.Adam(pinn.parameters(), lr=LR)

# Optional: we could later use LBFGS for fine-tuning

# ---------------------------
# 5) Sampling Collocation Points
# ---------------------------
# Interior points in (0,1)x(0,1)
def sample_interior(n_points):
    x = np.random.rand(n_points, 1)
    y = np.random.rand(n_points, 1)
    return x, y

# Boundary points on x=0, x=1, y=0, y=1
def sample_boundary(n_points):
    # We can sample each edge with ~n_points/4
    # to get total n_points boundary points
    n_side = n_points // 4

    # x=0, y in [0,1]
    x0 = np.zeros((n_side,1))
    y0 = np.random.rand(n_side,1)

    # x=1, y in [0,1]
    x1 = np.ones((n_side,1))
    y1 = np.random.rand(n_side,1)

    # y=0, x in [0,1]
    y2 = np.zeros((n_side,1))
    x2 = np.random.rand(n_side,1)

    # y=1, x in [0,1]
    y3 = np.ones((n_side,1))
    x3 = np.random.rand(n_side,1)

    # Combine
    Xb = np.vstack([x0, x1, x2, x3])
    Yb = np.vstack([y0, y1, y2, y3])
    return Xb, Yb

# ---------------------------
# 6) Physics-Informed Loss Functions
# ---------------------------
def pde_residual(pinn, x, y, epsilon, bx, by):
    """
    PDE residual:
    - epsilon (u_xx + u_yy) + bx u_x + by u_y - f(x,y) = 0
    """
    # Convert x,y -> require grad
    x.requires_grad = True
    y.requires_grad = True
    
    u = pinn(torch.cat((x, y), dim=1))
    
    # First derivatives
    grads = torch.autograd.grad(u, (x, y), 
                                grad_outputs=torch.ones_like(u),
                                create_graph=True)
    u_x = grads[0]
    u_y = grads[1]
    
    # Second derivatives
    u_xx = torch.autograd.grad(u_x, x, 
                               grad_outputs=torch.ones_like(u_x),
                               create_graph=True)[0]
    u_yy = torch.autograd.grad(u_y, y, 
                               grad_outputs=torch.ones_like(u_y),
                               create_graph=True)[0]
    
    # Forcing term
    fxy = forcing_function(x, y, epsilon)
    
    # PDE residual
    res = -epsilon*(u_xx + u_yy) + bx*u_x + by*u_y - fxy
    return res

def boundary_loss(pinn, x, y):
    """
    Dirichlet BC: u=0 on boundary
    """
    u = pinn(torch.cat((x, y), dim=1))
    return u  # we want this to be 0

# ---------------------------
# 7) Training Loop
# ---------------------------
for epoch in range(N_EPOCHS):
    pinn.train()
    optimizer.zero_grad()
    
    # 7.1 Sample interior
    x_int_np, y_int_np = sample_interior(N_COLLO)
    x_int = torch.tensor(x_int_np, dtype=torch.float32).to(DEVICE)
    y_int = torch.tensor(y_int_np, dtype=torch.float32).to(DEVICE)
    
    # PDE residual
    res_int = pde_residual(pinn, x_int, y_int, EPSILON, B_X, B_Y)
    loss_pde = torch.mean(res_int**2)
    
    # 7.2 Sample boundary
    x_bc_np, y_bc_np = sample_boundary(N_BC)
    x_bc = torch.tensor(x_bc_np, dtype=torch.float32).to(DEVICE)
    y_bc = torch.tensor(y_bc_np, dtype=torch.float32).to(DEVICE)
    
    # BC residual
    u_bc = boundary_loss(pinn, x_bc, y_bc)
    loss_bc = torch.mean(u_bc**2)
    
    # 7.3 Combine losses
    loss = loss_pde + 10.0*loss_bc  # Weighted sum; tune the factor
    
    loss.backward()
    optimizer.step()
    
    if epoch % 2000 == 0:
        print(f"Epoch {epoch}/{N_EPOCHS}, Loss PDE: {loss_pde.item():.6e}, Loss BC: {loss_bc.item():.6e}, Total: {loss.item():.6e}")

# ---------------------------
# 8) After Training: Predict on Test Points
# ---------------------------
def predict_u(pinn, x_test, y_test):
    """
    Evaluate the trained PINN at given points (x_test, y_test).
    """
    pinn.eval()
    with torch.no_grad():
        X = torch.tensor(x_test, dtype=torch.float32).to(DEVICE)
        Y = torch.tensor(y_test, dtype=torch.float32).to(DEVICE)
        U = pinn(torch.cat((X, Y), dim=1))
    return U.cpu().numpy()

# Example usage after training:
# Suppose you have test points in test.csv with columns (ID, x, y).
# You can do something like:
#
#   test_data = np.loadtxt("test.csv", delimiter=",", skiprows=1)  # example
#   # or use pandas
#   # test_data = pd.read_csv("test.csv").values
#
#   # assume the order is ID, x, y
#   # test_data[:,0] => ID, test_data[:,1] => x, test_data[:,2] => y
#   x_test = test_data[:,1].reshape(-1,1)
#   y_test = test_data[:,2].reshape(-1,1)
#
#   u_pred = predict_u(pinn, x_test, y_test)
#
#   # Then write to submission.csv:
#   # ID, x, y, u
#   submission = np.column_stack((test_data[:,0], x_test, y_test, u_pred))
#   np.savetxt("submission.csv", submission, delimiter=",", 
#              header="ID,x,y,u", comments="", fmt=["%d","%.6f","%.6f","%.6e"])


Using device: cpu
Epoch 0/30000, Loss PDE: 1.951390e+01, Loss BC: 5.536057e-02, Total: 2.006751e+01
Epoch 2000/30000, Loss PDE: 1.466609e-01, Loss BC: 1.369515e-01, Total: 1.516176e+00
Epoch 4000/30000, Loss PDE: 1.672026e-01, Loss BC: 1.314029e-01, Total: 1.481232e+00
Epoch 6000/30000, Loss PDE: 2.112493e-01, Loss BC: 1.327448e-01, Total: 1.538697e+00
Epoch 8000/30000, Loss PDE: 1.717952e-01, Loss BC: 1.313083e-01, Total: 1.484879e+00
Epoch 10000/30000, Loss PDE: 1.807090e-01, Loss BC: 1.139398e-01, Total: 1.320107e+00
Epoch 12000/30000, Loss PDE: 1.488011e-01, Loss BC: 1.542621e-01, Total: 1.691422e+00
Epoch 14000/30000, Loss PDE: 1.836690e-01, Loss BC: 1.239351e-01, Total: 1.423020e+00
Epoch 16000/30000, Loss PDE: 1.794232e-01, Loss BC: 1.343316e-01, Total: 1.522739e+00
Epoch 18000/30000, Loss PDE: 1.897828e-01, Loss BC: 1.298301e-01, Total: 1.488084e+00
Epoch 20000/30000, Loss PDE: 2.001412e-01, Loss BC: 1.194160e-01, Total: 1.394301e+00
Epoch 22000/30000, Loss PDE: 1.220577e-01, L

In [3]:
import pandas as pd
x=pd.read_csv("test.csv")
x

Unnamed: 0,ID,x,y
0,1,0.000000,0.0
1,2,0.002506,0.0
2,3,0.005013,0.0
3,4,0.007519,0.0
4,5,0.010025,0.0
...,...,...,...
159995,159996,0.989975,1.0
159996,159997,0.992481,1.0
159997,159998,0.994987,1.0
159998,159999,0.997494,1.0


In [4]:
import pandas as pd
def generate_submission(model, test_csv_path="test.csv", submission_path="submission.csv"):
    """
    1) Read test.csv (with columns ID, x, y)
    2) Predict u for each (x,y)
    3) Write submission.csv with columns (ID, x, y, u)
    """
    # Read test.csv
    df_test = pd.read_csv(test_csv_path)
    
    # Must preserve order & use correct columns
    # Assuming columns are: "ID", "x", "y"
    x_test = df_test["x"].values.reshape(-1,1)
    y_test = df_test["y"].values.reshape(-1,1)
    
    # Predict
    u_pred = predict_u(model, x_test, y_test)
    
    # Create submission dataframe
    df_submit = pd.DataFrame({
        "ID": df_test["ID"],
        "x": df_test["x"],
        "y": df_test["y"],
        "u": u_pred.flatten()
    })
    
    # Save to CSV
    df_submit.to_csv(submission_path, index=False)
    print(f"Submission saved to {submission_path}.")

# -----------------------------------------------------
# 9) Example Usage: Generate Submission
# -----------------------------------------------------
if __name__ == "__main__":
    # After training, generate submission file
    # You can pass custom paths if needed
    generate_submission(pinn, test_csv_path="test.csv", submission_path="submission.csv")

Submission saved to submission.csv.
