# SPH Simulation of Phase Change: The Stefan Problem
**Author:** Bastien Bodin  
**Context:** This notebook implements a Smoothed Particle Hydrodynamics (SPH) solver to simulate the solidification process, specifically focusing on the Monaghan formulation for phase change.

### Physical Problem
The 1D/2D Stefan problem describes the temperature distribution in a medium undergoing a phase change. 
The main challenge is tracking the interface location $s(t)$ where latent heat is released/absorbed.

## Environment Setup and Project Paths
Since this notebook is located in a sub-directory (`./benchmarks/heat_transfer/`), we need to add the project root to the Python path to access the custom `modules` (Geometries, HeatTransfer, etc.) located at the root of the repository.

In [None]:
import sys
import os

# --- Path Configuration ---
# Navigate up two levels to reach the project root
# ./benchmarks/heat_transfer/ -> ./benchmarks/ -> ./
root_path = os.path.abspath(os.path.join('..', '..'))

if root_path not in sys.path:
    sys.path.append(root_path)

# Imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from math import exp, erf, sqrt, pi

# PySPH imports
from pysph.sph.equation import Group
from pysph.base.utils import get_particle_array_wcsph
from pysph.solver.application import Application
from pysph.solver.solver import Solver
from pysph.base.kernels import CubicSpline
from pysph.base.nnps import DomainManager

# Custom modules
from modules.Geometries import MyBlock2D
from modules.HeatTransfer import Conduction, HeatTransfer
from modules.TimeStep import AdaptiveTSThermo
from modules.Integrators import VerletIntegrator, VerletStepThermalSolidificationPure

# SPH Application

In [None]:
class MonaghanPhaseChange(Application):
    """
    SPH Application for the Stefan problem with phase change.
    Designed for cryolava flow modeling with solidified crust tracking.
    """

    def initialize(self):
        # --- Physical properties for the Stefan problem ---
        self.rho   : float = 1.0       # Density [kg/m^3]
        self.dH    : float = 1.0       # Latent heat [J/kg]
        self.cp    : float = 1.0       # Specific heat capacity [J/kg/K]
        self.kappa : float = 1.0       # Thermal conductivity [W/m/K]
        
        # Temperatures
        self.T0    : float = 1.2       # Initial temperature
        self.Tm    : float = 1.0       # Melting/Solidification temperature
        self.Tb    : float = 0.1906    # Boundary (Ground) temperature
        
        # Analytical parameter
        self.lambd : float = 0.5       
        
        # --- SPH Simulation Setup ---
        self.dim    : int   = 2
        self.length : float = 1.0      # Domain length
        self.dp     : float = 0.025    # Particle spacing
        self.hdp    : float = 1.3      # Smoothing length factor
        self.w_gr   : float = 5.0 * self.dp # Ground thickness
        
        # Derived numerical properties
        self.mass   : float = self.rho * self.dp**float(self.dim)
        self.h      : float = self.dp * self.hdp
        
        # Time-stepping & Output
        self.tf     : float = 0.0568   # Total simulation time
        # Thermal stability formula
        self.dt_th  : float = 0.1 * self.rho * self.cp * (self.dp * self.hdp)**2.0 / self.kappa
        
        N_out       : int   = 400
        self.out_t  : list  = [i * self.tf / N_out for i in range(N_out + 1)]

    def create_particles(self):
        """
        Creates 'fluid' and 'ground' particle arrays.
        Strictly preserves internal property names (pa.kappa, pa.cp, pa.Tsol, etc.)
        """
        # 1. Define Geometry
        # Fluid block (subject to phase change)
        Xf, Yf = MyBlock2D(
            x0=0.0, y0=self.dp/2.0,
            Lx=self.length, Ly=self.length, dp=self.dp
        )
        
        # Ground block (constant boundary)
        Xg, Yg = MyBlock2D(
            x0=0.0, y0=-self.w_gr + self.dp/2.0,
            Lx=self.length, Ly=self.w_gr, dp=self.dp
        )

        # 2. Initialize Particle Arrays
        fluid = get_particle_array_wcsph(
            name='fluid', x=Xf, y=Yf,
            m=self.mass, h=self.h, rho=self.rho
        )
        ground = get_particle_array_wcsph(
            name='ground', x=Xg, y=Yg,
            m=self.mass, h=self.h, rho=self.rho
        )

        # 3. Define and Assign Required Properties
        Props = [
            'Temp', 'Temp0', 'aTemp', 'kappa', 'cp',
            'Tsol', 'qstate', 'Lh',
            'Cond', 'Conv', 'Rad',
            'dttherm', 'dt_adapt'
        ]

        particles = [fluid, ground]

        for pa in particles:
            for prop in Props:
                pa.add_property(prop, default=0.0)
            
            # Populate properties with constants from initialize
            pa.kappa[:] = self.kappa
            pa.cp[:]    = self.cp
            pa.Tsol[:]  = self.Tm
            pa.Lh[:]    = self.dH
            
            # Ensure properties are saved in .hdf5 files
            pa.add_output_arrays(Props)

        # 4. Set Initial Conditions
        particles[0].Temp[:] = self.T0
        particles[0].dt_adapt[:] = 1.0e-7 

        particles[1].Temp[:] = self.Tb
        particles[1].qstate[:] = 1.0     # Ground starts as solid
        particles[1].dt_adapt[:] = 1.0e7

        return particles

    def create_equations(self):
        """Defines the interaction logic between particles."""
        return [
            Group(equations=[
                # Thermal interaction between arrays
                Conduction(dest='fluid', sources=['ground', 'fluid'])
            ]),
            Group(equations=[
                # Energy balance and phase change logic
                HeatTransfer(dest='fluid', sources=None, dp=self.dp)
            ])
        ]

    def create_domain(self):
        """DomainManager for periodicity and boundary handling."""
        return DomainManager(
            xmin=0.0, xmax=self.length,
            ymin=-self.w_gr + self.dp/2.0, ymax=self.length,
            periodic_in_x=True, periodic_in_y=False
        )

    def create_solver(self):
        """Configures the solver with your fixed thermal dt."""
        kernel = CubicSpline(dim=self.dim)
        
        integrator = VerletIntegrator(
            fluid=VerletStepThermalSolidificationPure()
        )
        
        return Solver(
            dim=self.dim, 
            integrator=integrator, 
            kernel=kernel,
            tf=self.tf, 
            dt=self.dt_th,
            fixed_h=True,
            adaptive_timestep=False,
            output_at_times=self.out_t, 
            pfreq=1
        )

# Instantiating and Executing the code

In [None]:
if __name__ == "__main__":
    # Configure the output directory
    output_dir = "MonaghanPhaseChange"
    # Create the application instance
    app = MonaghanPhaseChange(fname=output_dir)
    
    # Run the simulation
    # PySPH handles the solver loop and data saving (.hdf5)
    app.run()

## Analytical Solution: The Stefan Problem
To validate our SPH model, we compare the results with the analytical solution of the semi-infinite Stefan problem. 
The interface position $s(t)$ and the temperature profile $T(y, t)$ are given by:
- Interface: $s(t) = 2\lambda\sqrt{\alpha t}$
- Temperature ($y < s(t)$): $T(y,t) = T_b + \frac{T_m - T_b}{erf(\lambda)} erf\left(\frac{y}{2\sqrt{\alpha t}}\right)$

where $\alpha = \frac{\kappa}{\rho c_p}$ is the thermal diffusivity.

In [None]:
from scipy.special import erf, erfc
import numpy as np

def stefan_theoretical_profile(y, t, config):
    """
    Computes the theoretical temperature profile and interface position.
    Fixed: Always returns (TemperatureArray, InterfacePosition)
    """
    alpha = config['kappa'] / (config['rho'] * config['cp'])
    lambd = config['lambd']
    
    # Interface position calculation
    y_int = 2.0 * lambd * np.sqrt(alpha * t) if t > 0 else 0.0
    
    # If y is None or t is 0, we don't calculate the full profile
    if y is None:
        return None, y_int
    
    if t <= 0:
        # Return initial temperature and interface at 0
        return np.full_like(y, config['T0']), y_int
    
    # Normalized coordinate
    eta = y / (2.0 * np.sqrt(alpha * t))
    T_theory = np.zeros_like(y)
    
    # Solid phase (y <= y_interface)
    mask_solid = (y <= y_int)
    T_theory[mask_solid] = config['Tb'] + (config['Tm'] - config['Tb']) / erf(lambd) * erf(eta[mask_solid])
    
    # Liquid phase (y > y_interface)
    mask_liquid = ~mask_solid
    T_theory[mask_liquid] = config['T0'] + (config['Tm'] - config['T0']) / erfc(lambd) * erfc(eta[mask_liquid])
    
    return T_theory, y_int

## Data Extraction
We retrieve the SPH simulation results from the `.hdf5` files. 
We focus on a 1D slice along the Y-axis to compare with the theory.

In [None]:
import glob
from natsort import natsorted
from pysph.solver.utils import load

def get_simulation_data(output_dir):
    """Retrieves all hdf5 files and sorts them by time."""
    files = natsorted(glob.glob(f"{output_dir}/*.hdf5"))
    return files

def extract_snapshot(file_path):
    """Extracts relevant arrays from a single SPH file."""
    data = load(file_path)
    t = data['solver_data']['t']
    fluid = data['arrays']['fluid']
    
    # We take a thin slice in the middle of the X-axis (x ~ 0.5)
    # to avoid boundary effects and get a clean 1D-like profile
    mask = (fluid.x > 0.48) & (fluid.x < 0.52)
    
    return {
        't': t,
        'y': fluid.y[mask],
        'T': fluid.Temp[mask],
        'q': fluid.qstate[mask]
    }

In [None]:
import matplotlib.pyplot as plt

def plot_results(output_dir, config, n_steps=3):
    files = get_simulation_data(output_dir)
    if not files:
        print(f"Directory {output_dir} is empty or not found.")
        return
        
    indices = np.linspace(0, len(files)-1, n_steps, dtype=int)
    fig, axes = plt.subplots(1, n_steps, figsize=(16, 5), sharey=True)
    
    # Using a professional style for thesis/publication
    plt.style.use(['ggplot']) # 'science' is better if installed

    for i, idx in enumerate(indices):
        snap = extract_snapshot(files[idx])
        
        # SPH data sorting (essential for line plots)
        sort_idx = np.argsort(snap['y'])
        y_sph = snap['y'][sort_idx]
        T_sph = snap['T'][sort_idx]
        
        # Theory (The fixed function now returns 2 values correctly)
        T_th, y_int_th = stefan_theoretical_profile(y_sph, snap['t'], config)
        
        ax = axes[i]
        ax.plot(y_sph, T_sph, 'o', ms=2, label='SPH (Liquid/Solid)', alpha=0.5)
        ax.plot(y_sph, T_th, 'k--', lw=1.5, label='Stefan Theory')
        
        # Mark the theoretical front
        ax.axvline(y_int_th, color='tab:red', linestyle=':', label='Theoretical Front')
        
        ax.set_title(f"Time: {snap['t']*1000:.2f} ms")
        ax.set_xlabel("Depth y [m]")
        if i == 0: ax.set_ylabel("Temperature [adim]")
        ax.legend(fontsize='small')

    plt.suptitle(f"Solidification Front Evolution (kappa={config['kappa']})", fontsize=14)
    plt.tight_layout()
    plt.show()

In [None]:
config_dict = {'rho': 1.0, 'cp': 1.0, 'kappa': 1.0, 'Tm': 1.0, 'Tb': 0.1906, 'T0': 1.2, 'lambd': 0.5}
output_path = "MonaghanPhaseChange_output"
plot_results(output_path, config_dict)

# Error Analysis
To quantify the accuracy of the Monaghan phase change formulation, we calculate the relative error for the temperature field:
$$\epsilon_{rel}(y, t) = \frac{|T_{SPH}(y, t) - T_{theory}(y, t)|}{T_{theory}(y, t)} \times 100$$

We also track the **interface migration error**. In the Stefan problem, the interface position $s(t)$ is the most critical physical quantity to capture correctly for geophysical applications like cryolava cooling.

In [None]:
def plot_spatial_errors(output_dir, config, n_steps=3):
    """
    Plots the relative error distribution across the domain for different timestamps.
    """
    files = get_simulation_data(output_dir)
    indices = np.linspace(0, len(files)-1, n_steps, dtype=int)
    
    fig, axes = plt.subplots(1, n_steps, figsize=(16, 5), sharey=True)
    plt.style.use(['ggplot'])

    for i, idx in enumerate(indices):
        snap = extract_snapshot(files[idx])
        sort_idx = np.argsort(snap['y'])
        y_sph = snap['y'][sort_idx]
        T_sph = snap['T'][sort_idx]
        
        # Get theory
        T_th, y_int_th = stefan_theoretical_profile(y_sph, snap['t'], config)
        
        # Calculate relative error (%)
        # Small epsilon added to denominator to avoid division by zero
        error_rel = np.abs(T_sph - T_th) / (np.abs(T_th) + 1e-10) * 100
        
        ax = axes[i]
        ax.plot(y_sph, error_rel, '-', color='tab:orange', lw=1.5, label='Relative Error')
        ax.axvline(y_int_th, color='k', linestyle='--', alpha=0.3, label='Interface')
        
        ax.set_title(f"Time: {snap['t']*1000:.2f} ms")
        ax.set_xlabel("Position y [m]")
        if i == 0: ax.set_ylabel("Rel. Error [%]")
        
        # Focus on the relevant error range
        ax.set_ylim(0, min(20, np.max(error_rel) * 1.1)) 
        ax.legend(fontsize='small')

    plt.suptitle("Spatial Distribution of Temperature Relative Error", fontsize=14)
    plt.tight_layout()
    plt.show()

In [None]:
def plot_interface_tracking(output_dir, config):
    """
    Compares the SPH interface position vs Theoretical interface position over time.
    The SPH interface is identified where qstate > 0.
    """
    files = get_simulation_data(output_dir)
    times = []
    pos_sph = []
    pos_th = []
    
    for f in files:
        snap = extract_snapshot(f)
        times.append(snap['t'])
        
        # Identify SPH interface: max Y where particle is solidifying (qstate > 0)
        # Using a small threshold for qstate
        solid_particles = snap['y'][snap['q'] > 0.01]
        if len(solid_particles) > 0:
            pos_sph.append(np.max(solid_particles))
        else:
            pos_sph.append(0.0)
            
        # Theory interface
        _, y_int_th = stefan_theoretical_profile(None, snap['t'], config)
        pos_th.append(y_int_th)
    
    plt.figure(figsize=(8, 6))
    plt.plot(times, pos_th, 'k-', label='Theory: $2\lambda\sqrt{\\alpha t}$')
    plt.plot(times, pos_sph, 'r--', label='SPH Front ($q_{state} > 0$)')
    
    plt.xlabel("Time [s]")
    plt.ylabel("Interface Position $s(t)$ [m]")
    plt.title("Solidification Front Migration")
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

In [None]:
print("Generating Spatial Error Distribution...")
plot_interface_tracking(output_path, config_dict)

In [None]:
print("Generating Spatial Error Distribution...")
plot_spatial_errors(output_path, config_dict)