In [None]:
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap, BoundaryNorm
import time
import random
colors_8_custom = [
        (0.8, 0, 0),  # Dark red
        (1, 0.3, 0),  # Darker orange
        (1, 1, 0),    # Light yellow
        (1, 1, 0.8),  # Cream
        (0.8, 0.9, 1),  # Very light blue
        (0.5, 0.7, 1),  # Light blue
        (0.2, 0.4, 1),  # Medium blue
        (0, 0, 1)     # Deep blue
    ]
custom_cmap = LinearSegmentedColormap.from_list("custom_cmap", colors_8_custom, N=8)
    
    # Define custom boundaries and ticks
boundaries = [-900, -800, -700, -600, -500, -400, -300, -200, -100]
ticks = [-800, -600, -400, -200]
norm = BoundaryNorm(boundaries, custom_cmap.N)

In [None]:
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import time

def flux_anode(u):
    """ anode flux = -((u - bA)/aA) """
    aA = 0.100184
    bA = -0.844434
    return -((u - bA) / aA)

def flux_cathode(u):
    """ cathode flux = (u - bK)/aK """
    aK = -0.0423633
    bK = -0.197805
    return (u - bK)/aK

class PINN(nn.Module):
    def __init__(self, layers=[2, 32, 32, 32, 32, 1]):
        super(PINN, self).__init__()
        self.activ = nn.Tanh()

        module_list = []
        for i in range(len(layers)-1):
            in_features  = layers[i]
            out_features = layers[i+1]
            linear = nn.Linear(in_features, out_features)
            nn.init.xavier_uniform_(linear.weight)
            nn.init.zeros_(linear.bias)

            module_list.append(linear)
            if i < len(layers)-2:
                module_list.append(self.activ)
        self.net = nn.Sequential(*module_list)

    def forward(self, x):
        return self.net(x)

class CorrosionPINN:
    def __init__(self):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print("Using device:", self.device)

        # Domain
        self.lebar  = 0.8
        self.tinggi = 0.06
        self.hx = 0.0025
        self.hy = 0.0025

        # PDE interior
        x_vals = np.arange(0, self.lebar + self.hx, self.hx)
        y_vals = np.arange(0, self.tinggi + self.hy, self.hy)
        X, Y = np.meshgrid(x_vals, y_vals)
        XY = np.column_stack((X.flatten(), Y.flatten()))
        self.X_interior = torch.tensor(
            XY, dtype=torch.float32, requires_grad=True
        ).to(self.device)

        # Conductivity
        self.kpp = 0.0001

        # Model
        self.model = PINN().to(self.device)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-4)
        self.criterion = nn.MSELoss()

        # Weights
        self.w_pde   = 1e6
        self.w_ins   = 100.0
        self.w_elec  = 1e-7

        # Boundaries
        self.define_boundaries()

        # For tracking training history:
        self.history = {
            'epoch': [],
            'loss': [],
            'time': []
        }

        # Print total trainable parameters
        total_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)
        print("Trainable Parameters:", total_params)

        # Print GPU memory usage if available
        if torch.cuda.is_available():
            allocated = torch.cuda.memory_allocated(self.device)/1024**2
            reserved = torch.cuda.memory_reserved(self.device)/1024**2
            print(f"GPU Memory Allocated: {allocated:.2f} MB")
            print(f"GPU Memory Reserved : {reserved:.2f} MB")

    def define_boundaries(self):
        # Bottom boundary => electrode from x=0.01..0.99
        x_start = 0.01*self.lebar
        x_end   = 0.99*self.lebar
        Nx = 80
        x_electrode = np.linspace(x_start, x_end, Nx)
        y_electrode = np.zeros_like(x_electrode)
        electrode_points = np.column_stack((x_electrode, y_electrode))
        self.electrode_points = torch.tensor(
            electrode_points, dtype=torch.float32, requires_grad=True
        ).to(self.device)

        # Mark anode of length 0.18 (or 0.19) in the middle
        an_len = 0.19
        x_an_start = (self.lebar - an_len)/2.0
        x_an_end   = x_an_start + an_len
        seg_types = np.zeros(Nx, dtype=int)
        inside_anode = (x_electrode >= x_an_start) & (x_electrode <= x_an_end)
        seg_types[inside_anode] = 1
        self.segment_types = seg_types

        # Sides + top + bottom corners => insulation
        y_left = np.arange(0, self.tinggi + self.hy, self.hy)
        left_pts = np.column_stack((np.zeros_like(y_left), y_left))

        x_right = np.full_like(y_left, self.lebar)
        right_pts = np.column_stack((x_right, y_left))

        # top boundary => insulation
        x_top = np.arange(0, self.lebar + self.hx, self.hx)
        y_top = np.full_like(x_top, self.tinggi)
        top_pts = np.column_stack((x_top, y_top))

        # bottom corners
        x_bottom1 = np.arange(0, x_start, self.hx)
        b1_pts = np.column_stack((x_bottom1, np.zeros_like(x_bottom1)))

        x_bottom2 = np.arange(x_end, self.lebar + self.hx, self.hx)
        x_bottom2 = x_bottom2[x_bottom2 <= self.lebar]
        b2_pts = np.column_stack((x_bottom2, np.zeros_like(x_bottom2)))

        boundary_insul = np.vstack((left_pts, right_pts, top_pts, b1_pts, b2_pts))
        self.boundary_insul = torch.tensor(
            boundary_insul, dtype=torch.float32, requires_grad=True
        ).to(self.device)

    def _compute_pde_loss(self):
        x = self.X_interior
        u = self.model(x)

        grads = torch.autograd.grad(
            u, x, grad_outputs=torch.ones_like(u), create_graph=True
        )[0]
        du_dx = grads[:, 0].unsqueeze(-1)
        du_dy = grads[:, 1].unsqueeze(-1)

        d2u_dx2 = torch.autograd.grad(
            du_dx, x, grad_outputs=torch.ones_like(du_dx), create_graph=True
        )[0][:, 0].unsqueeze(-1)

        d2u_dy2 = torch.autograd.grad(
            du_dy, x, grad_outputs=torch.ones_like(du_dy), create_graph=True
        )[0][:, 1].unsqueeze(-1)

        lap_u = d2u_dx2 + d2u_dy2
        return self.criterion(-1.0*self.kpp * lap_u, torch.zeros_like(lap_u))

    def _compute_insulation_loss(self):
        b_pts = self.boundary_insul
        u_b = self.model(b_pts)
        grads_b = torch.autograd.grad(
            u_b, b_pts, grad_outputs=torch.ones_like(u_b), create_graph=True
        )[0]

        xy_arr = b_pts.detach().cpu().numpy()
        flux_vec = torch.zeros_like(grads_b[:,0:1])
        eps = 1e-7

        for i in range(len(xy_arr)):
            x_, y_ = xy_arr[i]
            if (abs(x_) < eps and abs(y_) < eps) or \
               (abs(x_-self.lebar) < eps and abs(y_) < eps) or \
               (abs(y_-self.tinggi) < eps and abs(x_) < eps) or \
               (abs(y_-self.tinggi) < eps and abs(x_-self.lebar) < eps):
                flux_vec[i] = 0.0
                continue

            if abs(x_) < eps:
                nvec = torch.tensor([-1., 0.], device=self.device)
            elif abs(x_ - self.lebar) < eps:
                nvec = torch.tensor([1., 0.], device=self.device)
            elif abs(y_ - self.tinggi) < eps:
                nvec = torch.tensor([0., 1.], device=self.device)
            else:
                nvec = torch.tensor([0., -1.], device=self.device)

            flux_val = grads_b[i, :].dot(nvec)
            flux_vec[i] = flux_val

        return self.criterion(flux_vec, torch.zeros_like(flux_vec))

    def _compute_electrode_loss(self):
        e_pts = self.electrode_points
        u_e   = self.model(e_pts)
        grads_e = torch.autograd.grad(
            u_e, e_pts, grad_outputs=torch.ones_like(u_e), create_graph=True
        )[0]

        normal_e = torch.tensor([[0., -1.]]*len(e_pts), dtype=torch.float32).to(self.device)
        dudn = torch.sum(grads_e*normal_e, dim=1, keepdim=True)

        fluxes = []
        for i in range(len(self.segment_types)):
            if self.segment_types[i] == 1:
                fluxes.append(flux_anode(u_e[i]))
            else:
                fluxes.append(flux_cathode(u_e[i]))

        flux_tensor = torch.stack(fluxes).unsqueeze(dim=1)

        residual = dudn - (1.0/self.kpp)*flux_tensor
        return self.criterion(residual, torch.zeros_like(residual))

    def _compute_total_loss(self):
        l_pde   = self._compute_pde_loss()
        l_ins   = self._compute_insulation_loss()
        l_elec  = self._compute_electrode_loss()

        total = ( self.w_pde*l_pde
                + self.w_ins*l_ins
                + self.w_elec*l_elec )
        return total, l_pde, l_ins, l_elec

    def train(self, num_epochs=3000, use_lr_schedule=False, step_size=2000, gamma=0.5):
        """
        Train the PINN with optional learning rate scheduling.
        :param num_epochs: (int) number of epochs
        :param use_lr_schedule: (bool) whether to enable a StepLR schedule
        :param step_size: (int) how often to step the LR schedule
        :param gamma: (float) factor by which to multiply LR at each step
        """
        print("Training 1")
        if use_lr_schedule:
            from torch.optim.lr_scheduler import StepLR
            scheduler = StepLR(self.optimizer, step_size=step_size, gamma=gamma)

        start_time = time.time()

        for epoch in range(num_epochs):
            self.optimizer.zero_grad()
            total_loss, lpde, lins, lelec = self._compute_total_loss()
            total_loss.backward()
            self.optimizer.step()

            if use_lr_schedule:
                scheduler.step()

            # Record history
            current_time = time.time() - start_time
            self.history['epoch'].append(epoch)
            self.history['loss'].append(total_loss.item())
            self.history['time'].append(current_time)

            # Weighted components
            PDE_component  = self.w_pde * lpde
            INS_component  = self.w_ins * lins
            ELEC_component = self.w_elec * lelec

            # Convert to float
            lpde_val      = lpde.item()
            lins_val      = lins.item()
            lelec_val     = lelec.item()
            PDE_w_val     = PDE_component.item()
            INS_w_val     = INS_component.item()
            ELEC_w_val    = ELEC_component.item()
            TOT_val       = total_loss.item()

            PDE_perc  = 100.0 * PDE_w_val   / TOT_val
            INS_perc  = 100.0 * INS_w_val   / TOT_val
            ELEC_perc = 100.0 * ELEC_w_val  / TOT_val

            # Print progress every 500 epochs
            if epoch % 500 == 0:
                print((
                    f"Epoch {epoch} | Total={TOT_val:.4e}\n"
                    f"  PDE   : {lpde_val:.4e} (unweighted), "
                    f"{PDE_w_val:.4e} (weighted), ({PDE_perc:.1f}%)\n"
                    f"  BCins : {lins_val:.4e} (unweighted), "
                    f"{INS_w_val:.4e} (weighted), ({INS_perc:.1f}%)\n"
                    f"  BCelec: {lelec_val:.4e} (unweighted), "
                    f"{ELEC_w_val:.4e} (weighted), ({ELEC_perc:.1f}%)"
                ))


    def evaluate_solution(self):
        self.model.eval()
        with torch.no_grad():
            upred = self.model(self.X_interior).cpu().numpy().flatten()
        nx = int(self.lebar/self.hx + 1)
        ny = int(self.tinggi/self.hy + 1)
        return upred.reshape(ny, nx)

    def plot_potential_mV(self, U2D):
        # Example usage; you must define the color mapping, boundaries, ticks, etc.
        U_mV = 1000.*U2D
        x_vals = np.arange(0, self.lebar + self.hx, self.hx)
        y_vals = np.arange(0, self.tinggi + self.hy, self.hy)
        X, Y = np.meshgrid(x_vals, y_vals)

        # Example custom boundaries for contour
        boundaries = np.linspace(np.min(U_mV), np.max(U_mV), 11)
        custom_cmap = plt.cm.jet
        norm = None
        ticks = np.round(boundaries,2)

        plt.figure(figsize=(9,3))
        c = plt.contourf(X, Y, U_mV, levels=boundaries, cmap=custom_cmap, norm=norm)
        plt.contour(X, Y, U_mV, levels=boundaries, colors="black", linewidths=0.5)
        cbar = plt.colorbar(c, label="Potential (mV)", ticks=ticks)
        cbar.ax.set_yticklabels(ticks)
        plt.title("Potential (mV) - PINN solution (All Insulated + Bottom Electrode)")
        plt.xlabel("X (m)")
        plt.ylabel("Y (m)")
        plt.gca().set_aspect('equal')
        plt.show()

    def plot_potential_at_y(self, U2D, y_val=0.06):
        U_mV = 1000.*U2D
        x_vals = np.arange(0, self.lebar + self.hx, self.hx)
        row_idx= int(round(y_val/self.hy))
        if row_idx<0 or row_idx>=U_mV.shape[0]:
            print(f"Requested y={y_val:.3f} out of domain range [0,{self.tinggi}]")
            return
        pot_line = U_mV[row_idx, :]

        plt.figure(figsize=(8,4))
        plt.plot(x_vals, pot_line, 'b-', label=f"y={y_val:.3f}")
        plt.title(f"Potential at y={y_val:.3f} m")
        plt.xlabel("X (m)")
        plt.ylabel("Potential (mV)")
        plt.legend()
        plt.grid(True)
        plt.show()

    def plot_flux_field(self, stride=5):
        x_vals = np.arange(0, self.lebar + self.hx, self.hx)
        y_vals = np.arange(0, self.tinggi + self.hy, self.hy)
        X, Y = np.meshgrid(x_vals, y_vals)

        Xsub = X[::stride, ::stride]
        Ysub = Y[::stride, ::stride]
        XYsub = np.column_stack((Xsub.flatten(), Ysub.flatten()))

        XYsub_t = torch.tensor(XYsub, dtype=torch.float32, requires_grad=True).to(self.device)

        self.model.eval()
        u_sub = self.model(XYsub_t)

        grads_sub = torch.autograd.grad(
            u_sub, XYsub_t,
            grad_outputs=torch.ones_like(u_sub),
            create_graph=False
        )[0]

        dudx = grads_sub[:,0].detach().cpu().numpy()
        dudy = grads_sub[:,1].detach().cpu().numpy()

        dudx_2d = dudx.reshape(Xsub.shape)
        dudy_2d = dudy.reshape(Xsub.shape)

        # Evaluate solution for contour
        U2D = self.evaluate_solution()
        U_mV = 1000.*U2D

        # Example custom boundaries for contour
        boundaries = np.linspace(np.min(U_mV), np.max(U_mV), 11)
        custom_cmap = plt.cm.jet
        norm = None
        ticks = np.round(boundaries,2)

        plt.figure(figsize=(9,3))
        c = plt.contourf(X, Y, U_mV, levels=boundaries, cmap=custom_cmap, norm=norm)
        plt.contour(X, Y, U_mV, levels=boundaries, colors="black", linewidths=0.5)
        cbar = plt.colorbar(c, label="Potential (mV)", ticks=ticks)
        cbar.ax.set_yticklabels(ticks)
        plt.quiver(Xsub, Ysub, dudx_2d, dudy_2d, color='k', scale=50)
        plt.title("Flux / Gradient Field (PINN)")
        plt.xlabel("x (m)")
        plt.ylabel("y (m)")
        plt.gca().set_aspect('equal')
        plt.show()

    def evaluate_solution_gpu(self):
        """
        Evaluate the solution using GPU and measure the computation time.
        """
        self.model.eval()  # Set model to evaluation mode
        start_time = time.time()  # Start timing

        with torch.no_grad():  # Disable gradient computation
            upred = self.model(self.X_interior).flatten()  # Keep the output on GPU

        end_time = time.time()  # End timing
        computation_time = end_time - start_time  # Calculate elapsed time

        # Reshape the output tensor to 2D
        nx = int(self.lebar / self.hx + 1)
        ny = int(self.tinggi / self.hy + 1)
        upred_2d = upred.view(ny, nx)  # Return as GPU tensor

        print(f"GPU Evaluation Time: {computation_time:.6f} seconds")
        return upred_2d

    def plot_training_history(self):
        """
        Plots the training loss versus epoch and versus time with log-scaled x-axes.
        """
        if len(self.history['epoch']) == 0:
            print("No training history found. Train the model first.")
            return

        fig, axes = plt.subplots(1, 2, figsize=(12, 5))

        # Plot Loss vs Epoch (log-scale x-axis)
        axes[0].plot(self.history['epoch'], self.history['loss'], 'b-o', markersize=3)
        axes[0].set_title("Training Loss vs. Epoch")
        axes[0].set_xlabel("Epoch")
        axes[0].set_ylabel("Loss")
        axes[0].set_xscale("log")  # Log scale for x-axis
        axes[0].grid(True, which="both", linestyle="--", linewidth=0.5)

        # Plot Loss vs Time (log-scale x-axis)
        axes[1].plot(self.history['time'], self.history['loss'], 'r-o', markersize=3)
        axes[1].set_title("Training Loss vs. Elapsed Time")
        axes[1].set_xlabel("Time (s)")
        axes[1].set_ylabel("Loss")
        axes[1].set_xscale("log")  # Log scale for x-axis
        axes[1].grid(True, which="both", linestyle="--", linewidth=0.5)

        plt.tight_layout()
        plt.show()

def plot_boundary_conditions(pinn):
    import matplotlib.pyplot as plt
    electrode_pts = pinn.electrode_points.detach().cpu().numpy()
    insul_pts = pinn.boundary_insul.detach().cpu().numpy()

    plt.figure(figsize=(8, 5))
    plt.scatter(electrode_pts[:, 0], electrode_pts[:, 1],
                c='red', label="Electrode Points")
    plt.scatter(insul_pts[:, 0], insul_pts[:, 1],
                c='blue', label="Insulation Points")

    seg_types = pinn.segment_types
    anode_pts = electrode_pts[seg_types == 1]
    cathode_pts = electrode_pts[seg_types == 0]
    plt.scatter(anode_pts[:, 0], anode_pts[:, 1],
                c='orange', label="Anode")
    plt.scatter(cathode_pts[:, 0], cathode_pts[:, 1],
                c='green', label="Cathode")

    plt.title("Boundary Conditions Visualization")
    plt.xlabel("X (m)")
    plt.ylabel("Y (m)")
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)
    plt.grid(True)
    plt.gca().set_aspect('equal')
    plt.tight_layout()
    plt.show()


In [None]:
pinn = CorrosionPINN()
pinn.train(num_epochs=50000, use_lr_schedule=False)

In [None]:
tr1=pinn.plot_training_history()
U2D = pinn.evaluate_solution()
pinn.plot_potential_at_y(U2D, y_val=0.06)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.colors import LinearSegmentedColormap, BoundaryNorm

# Define the custom colormap
colors_8_custom = [
    (0.8, 0, 0),  # Dark red
    (1, 0.3, 0),  # Darker orange
    (1, 1, 0),    # Light yellow
    (1, 1, 0.8),  # Cream
    (0.8, 0.9, 1),  # Very light blue
    (0.5, 0.7, 1),  # Light blue
    (0.2, 0.4, 1),  # Medium blue
    (0, 0, 1)     # Deep blue
]
custom_cmap = LinearSegmentedColormap.from_list("custom_cmap", colors_8_custom, N=8)

# Define custom boundaries and ticks
boundaries = [-900, -800, -700, -600, -500, -400, -300, -200, -100]
ticks = [-800, -600, -400, -200]
norm = BoundaryNorm(boundaries, custom_cmap.N)

# Load the Mathematica-exported data from Excel
file_path = "FEM_DATA/PotentialField.xlsx"  # Update with the correct file path if necessary
data = pd.read_excel(file_path)

# Extract the columns (assuming columns: X (m), Y (m), Potential (mV))
x = data['X (m)'].values
y = data['Y (m)'].values
potential = data['Potential (mV)'].values

# Scatter plot with the customized colormap and normalization
plt.figure(figsize=(8, 6))
plt.scatter(x, y, c=potential, cmap=custom_cmap, s=10, norm=norm)  # Use the custom colormap and normalization
plt.colorbar(label="Potential (mV)", ticks=ticks)
plt.title("Scatter Plot of Data Points (X vs Y)")
plt.xlabel("X (m)")
plt.ylabel("Y (m)")
plt.gca().set_aspect('equal')
plt.grid(True)
plt.show()


In [None]:
def compare_potential_fields(pinn, math_data):
    """
    Compare PINN potential field with Mathematica data.

    :param pinn: Trained CorrosionPINN object.
    :param math_data: Pandas DataFrame with Mathematica data (X, Y, Potential).
    """
    from scipy.interpolate import griddata
    # Extract Mathematica data
    x_math = math_data['X (m)'].values
    y_math = math_data['Y (m)'].values
    potential_math = math_data['Potential (mV)'].values

    # Evaluate PINN solution
    U2D_pinn = pinn.evaluate_solution()
    U_mV_pinn = 1000. * U2D_pinn  # Convert to mV

    # Create grid for PINN
    x_vals_pinn = np.arange(0, pinn.lebar + pinn.hx, pinn.hx)
    y_vals_pinn = np.arange(0, pinn.tinggi + pinn.hy, pinn.hy)
    X_pinn, Y_pinn = np.meshgrid(x_vals_pinn, y_vals_pinn)

    # Interpolate Mathematica data onto PINN grid
    points_math = np.column_stack((x_math, y_math))
    points_pinn = np.column_stack((X_pinn.flatten(), Y_pinn.flatten()))
    interpolated_math_data = griddata(points_math, potential_math, points_pinn, method='linear')
    interpolated_math_data = interpolated_math_data.reshape(X_pinn.shape)

    # Debugging: Check data shapes
    print("X_pinn shape:", X_pinn.shape)
    print("Y_pinn shape:", Y_pinn.shape)
    print("U_mV_pinn shape:", U_mV_pinn.shape)
    print("Interpolated Mathematica data shape:", interpolated_math_data.shape)

    # Ensure data validity
    if np.isnan(interpolated_math_data).any():
        print("Warning: Interpolated Mathematica data contains NaNs!")

    # Plotting
    plt.figure(figsize=(24, 8))

    # Plot PINN
    plt.subplot(1, 3, 1)
    c1 = plt.contourf(X_pinn, Y_pinn, U_mV_pinn, levels=boundaries, cmap=custom_cmap, norm=norm)
    plt.contour(X_pinn, Y_pinn, U_mV_pinn, levels=boundaries, colors="black", linewidths=0.5)
    plt.colorbar(c1, label="Potential (mV)", ticks=ticks, shrink=0.5)
    plt.title("PINN Solution")
    plt.xlabel("X (m)")
    plt.ylabel("Y (m)")
    plt.gca().set_aspect('equal')

    # Plot Mathematica (interpolated)
    plt.subplot(1, 3, 2)
    c2 = plt.contourf(X_pinn, Y_pinn, interpolated_math_data, levels=boundaries, cmap=custom_cmap, norm=norm)
    plt.contour(X_pinn, Y_pinn, interpolated_math_data, levels=boundaries, colors="black", linewidths=0.5)
    plt.colorbar(c2, label="Potential (mV)", ticks=ticks, shrink=0.5)
    plt.title("Mathematica (Interpolated)")
    plt.xlabel("X (m)")
    plt.ylabel("Y (m)")
    plt.gca().set_aspect('equal')

    # Plot difference
    plt.subplot(1, 3, 3)
    difference = (abs(U_mV_pinn**2 - interpolated_math_data**2))**0.5
    c3 = plt.contourf(X_pinn, Y_pinn, difference, levels=50, cmap="coolwarm")
    plt.colorbar(c3, label="Difference (mV)", shrink=0.5)
    plt.title("Difference (PINN - Mathematica)")
    plt.xlabel("X (m)")
    plt.ylabel("Y (m)")
    plt.gca().set_aspect('equal')

    plt.tight_layout()
    plt.show()


In [None]:
compare_potential_fields(pinn, data)