Computing Project - Monte carlo Simulation of fluids

In [345]:
import numpy as np
import math
import matplotlib.pyplot as plt
import json
import re

In [346]:
%matplotlib qt

In [347]:
#load config parameters
def load_jsonc(path):
    with open(path, "r", encoding="utf-8") as f:
        text = f.read()

    # remove // comments
    text = re.sub(r"//.*", "", text)
    # remove /* block comments */
    text = re.sub(r"/\\*.*?\\*/", "", text, flags=re.DOTALL)

    return json.loads(text)

config = load_jsonc("config.jsonc")
N   = config["N"]
L   = config["L"]
eta = config["eta"]


print(config)


{'N': 100, 'eta': 0.1, 'L': 100, 'dim': 2, 'equil_steps': 50000, 'prod_steps': 500000, 'sample_every': 1000, 'tune_every': 1000, 'r_max': 5.0, 'gr_bins': 200, 'lattice': 'square', 'seed': 12345}


In [348]:
#Computing box size from N, density, and particle diameter. Maximum particle diameter of 1? Why not define box as sqrt(N)?
def sigma_from_eta_2d(N: int, L: float, eta: float) -> float:
    """
    From 2D packing fraction:
      eta = N * (pi * sigma^2 / 4) / L^2
      -> sigma = 2 L sqrt(eta / (N pi))
    """
    return 2.0 * L * math.sqrt(eta / (N * math.pi))

In [349]:
def balanced_grid_shape_2d(N: int) -> tuple[int, int]:
    """
    Choose an integer grid (nx, ny) with nx*ny >= N and as close to square as possible.
    """
    n = math.ceil(math.sqrt(N))
    return n, n  # simple and near-optimal for most N

In [350]:
def square_lattice_positions_2d(N: int, L: float) -> tuple[np.ndarray, tuple[float, float]]:
    """
    Return (N, 2) positions on a square grid, cell-centered so all disks lie fully inside [0, L]^2.
    Also return the spacings (ax, ay) between neighboring centers.
    """
    nx, ny = balanced_grid_shape_2d(N)
    ax, ay = L / nx, L / ny
    coords = []
    for j in range(ny):
        y = (j + 0.5) * ay
        for i in range(nx):
            x = (i + 0.5) * ax
            coords.append((x, y))
    return np.array(coords[:N], dtype=float), (ax, ay)

In [351]:
def initialize_cubic_lattice_2d(N: int, L: float, eta: float):
    """
    Initialize N particles on a 2D cubic (square) lattice in a fixed box of side L.
    Particle diameter sigma is determined by the packing fraction eta.
    Ensures no overlaps (spacing >= sigma). Returns (positions, sigma, eta_actual, (ax, ay)).
    """
    # 1) derive particle size from (N, L, eta)
    sigma = sigma_from_eta_2d(N, L, eta)

    # 2) place on a cell-centered square grid
    positions, (ax, ay) = square_lattice_positions_2d(N, L)

    # 3) feasibility: minimal center spacing must be >= sigma
    a_min = min(ax, ay)
    if a_min < sigma - 1e-12:
        # Maximum feasible eta with this grid (touching, sigma=a_min):
        eta_max_touching = (N * math.pi * (a_min**2) / 4.0) / (L**2)
        raise ValueError(
            f"Infeasible: target η={eta:.6g} ⇒ σ={sigma:.6g}, but grid spacing a_min={a_min:.6g} < σ (overlap).\n"
            f"Lower η to ≤ ~{eta_max_touching:.6g}, increase L, or reduce N."
        )

    # 4) actual eta (equals requested eta by construction)
    eta_actual = (N * math.pi * sigma**2 / 4.0) / (L**2)
    return positions, sigma, eta_actual, (ax, ay)

In [352]:
positions, sigma, eta_actual, spacings = initialize_cubic_lattice_2d(N, L, eta)

In [353]:
print("Loaded from config:")
print(f"N   = {N}")
print(f"L   = {L}")
print(f"eta = {eta}")

print("\nComputed:")
print(f"sigma = {sigma}")
print(f"actual packing = {eta_actual}")
print(f"grid spacing = {spacings}")

print("\nFirst 5 particles:\n", positions[:5])

Loaded from config:
N   = 100
L   = 100
eta = 0.1

Computed:
sigma = 3.5682482323055424
actual packing = 0.1
grid spacing = (10.0, 10.0)

First 5 particles:
 [[ 5.  5.]
 [15.  5.]
 [25.  5.]
 [35.  5.]
 [45.  5.]]


In [354]:
def plot_disks_LxL(positions: np.ndarray, L: float, sigma: float, title: str = ""):
    fig, ax = plt.subplots(figsize=(6, 6))
    ax.set_aspect("equal")
    ax.set_xlim(0, L)
    ax.set_ylim(0, L)
    ax.set_xticks([]); ax.set_yticks([])
    ax.set_position([0, 0, 1, 1])  # remove all margins
    if title:
        ax.set_title(title, pad=8)

    # draw disks fully inside box
    r = sigma / 2.0
    for (x, y) in positions:
        ax.add_patch(plt.Circle((x, y), r, fc="navy", ec="black", lw=0.6))

    # box outline
    ax.plot([0, L, L, 0, 0], [0, 0, L, L, 0], "k-", lw=1)
    plt.show()

In [355]:
plot_disks_LxL(
        positions,
        L,
        sigma,
        title=f"Cubic (square) lattice — N={N}, η={eta}, L={L}"
    )

In [356]:
# --- PBC Visualization Function ---
def plot_disks_LxL_pbc(positions: np.ndarray, L: float, sigma: float, title: str = ""):
    """
    Plot disks in the box [0,L]^2 plus their periodic 'ghosts'
    to visualize continuity across edges.
    """
    import matplotlib.pyplot as plt

    r = 0.5 * sigma
    fig, ax = plt.subplots(figsize=(6, 6))
    ax.set_aspect("equal")
    ax.set_xlim(0, L)
    ax.set_ylim(0, L)
    ax.set_xticks([])
    ax.set_yticks([])
    if title:
        ax.set_title(title, pad=8)

    # Draw simulation box outline
    ax.plot([0, L, L, 0, 0], [0, 0, L, L, 0], "k-", lw=1)

    # Draw each disk and its periodic copies
    offsets = (-L, 0.0, L)
    for (x, y) in positions:
        for dx in offsets:
            X = x + dx
            if X < -r or X > L + r:
                continue
            for dy in offsets:
                Y = y + dy
                if Y < -r or Y > L + r:
                    continue
                circ = plt.Circle((X, Y), r, fc="navy", ec="black", lw=0.6, clip_on=True)
                ax.add_patch(circ)

    plt.show()


In [357]:
#wrap boundary conditions
def wrap_pbc(r: np.ndarray, L: float) -> np.ndarray:
    """Wrap coordinates into [0, L)."""
    r_wrapped = r.copy()
    r_wrapped -= L * np.floor(r_wrapped / L)
    return r_wrapped

def min_image(dr: np.ndarray, L: float) -> np.ndarray:
    """Apply minimum image convention to a displacement vector."""
    return dr - L * np.round(dr / L)


In [358]:
def draw_disks(ax, positions, L, sigma, pbc: bool):
    """Draw disks (and optionally their periodic ghosts) on the given axes."""
    r = 0.5 * sigma
    ax.set_aspect("equal")
    ax.set_xlim(0, L); ax.set_ylim(0, L)
    ax.set_xticks([]); ax.set_yticks([])
    # box outline
    ax.plot([0, L, L, 0, 0], [0, 0, L, L, 0], "k-", lw=1)

    if not pbc:
        for (x, y) in positions:
            ax.add_patch(plt.Circle((x, y), r, fc="navy", ec="black", lw=0.6))
        return

    # draw ghosts too
    offsets = (-L, 0.0, L)
    for (x, y) in positions:
        for dx in offsets:
            X = x + dx
            if X < -r or X > L + r:
                continue
            for dy in offsets:
                Y = y + dy
                if Y < -r or Y > L + r:
                    continue
                ax.add_patch(plt.Circle((X, Y), r, fc="navy", ec="black", lw=0.6))


In [359]:
#fast overlap test
def has_overlap(i: int, trial_pos: np.ndarray, positions: np.ndarray, sigma: float, L: float) -> bool:
    sig2 = sigma * sigma
    for j in range(len(positions)):
        if j == i:
            continue
        dr = min_image(trial_pos - positions[j], L)
        if dr[0]*dr[0] + dr[1]*dr[1] < sig2:
            return True
    return False

In [360]:
def mc_step(positions: np.ndarray, sigma: float, L: float, d: float, rng: np.random.Generator) -> bool:
    N = len(positions)
    i = rng.integers(N)
    disp = d * (rng.random(2) - 0.5)
    trial = wrap_pbc(positions[i] + disp, L)
    if not has_overlap(i, trial, positions, sigma, L):
        positions[i] = trial     # accept
        return True
    return False 

In [361]:
def tune_stepsize(d: float, accept_rate: float, up: float = 1.05, down: float = 0.95,
                  d_min: float = 1e-4, d_max: float = np.inf) -> float:
    if accept_rate > 0.50:
        d *= up
    elif accept_rate < 0.25:
        d *= down
    return float(np.clip(d, d_min, d_max))

In [362]:
# --- Visualization during Monte Carlo ---

def plot_snapshot(positions, L, sigma, step, ax=None):
    """Plot disks for a given MC step (reusing axes for speed)."""
    if ax is None:
        fig, ax = plt.subplots(figsize=(6, 6))
        ax.set_aspect("equal")
        ax.set_xlim(0, L)
        ax.set_ylim(0, L)
        ax.set_xticks([]); ax.set_yticks([])
    ax.clear()
    ax.set_xlim(0, L)
    ax.set_ylim(0, L)
    ax.set_title(f"Monte Carlo step {step}")
    
    r = sigma / 2.0
    for (x, y) in positions:
        ax.add_patch(plt.Circle((x, y), r, fc="navy", ec="black", lw=0.6))
    ax.plot([0, L, L, 0, 0], [0, 0, L, L, 0], "k-", lw=1)
    plt.pause(0.001)
    return ax


In [363]:
def run_mc(positions: np.ndarray,
           sigma: float,
           L: float,
           d_init: float,
           equil_steps: int,
           prod_steps: int,
           tune_every: int,
           rng: np.random.Generator,
           plot_every: int | None = None,      # e.g., 100 for live snapshots; None to disable
           save_frames: bool = False,          # save PNGs during the run
           frames_dir: str = "frames_mc",      # where PNGs go
           cap_sigma_frac: float = 0.5,        # cap step size: d <= cap_sigma_frac * sigma
           stitch_gif: bool = False,           # stitch PNGs into a GIF at the end
           gif_out: str = "mc_evolution.gif",
           fps: int = 15):
    """
    Single-particle Metropolis MC with step-size auto-tuning.

    Returns:
      positions        : final positions (in-place updated)
      d_final          : final step size
      acc_rates        : np.array of acceptance rates measured every `tune_every` steps
      d_history        : np.array of d after each tuning
      totals           : dict with accepted, rejected, attempted, acceptance_rate
    """
    import os
    import matplotlib.pyplot as plt

    # ---- helper: capped tuner to avoid runaway step sizes
    def tune_stepsize_capped(d: float, accept_rate: float,
                             up: float = 1.05, down: float = 0.95,
                             d_min: float = 1e-4) -> float:
        if accept_rate > 0.50:
            d *= up
        elif accept_rate < 0.25:
            d *= down
        d_max = cap_sigma_frac * sigma if cap_sigma_frac is not None else None
        if d_max is not None:
            d = min(d, d_max)
        return max(d, d_min)

    # ---- prep
    if save_frames:
        os.makedirs(frames_dir, exist_ok=True)

    total_steps = int(equil_steps + prod_steps)
    d = float(d_init)

    # counters
    window_accepted = 0
    window_attempted = 0
    total_accepted = 0
    total_attempted = 0

    acc_rates: list[float] = []
    d_history: list[float] = []

    # live plot (reuse axes for speed)
    live_fig = None
    live_ax = None

    # ---- main loop
    for step in range(1, total_steps + 1):
        # one MC move
        accepted = mc_step(positions, sigma, L, d, rng)  # bool
        window_accepted += int(accepted)
        window_attempted += 1
        total_accepted  += int(accepted)
        total_attempted += 1

        # tune step size
        if step % tune_every == 0:
            acc_rate = (window_accepted / window_attempted) if window_attempted else 0.0
            d = tune_stepsize_capped(d, acc_rate)
            acc_rates.append(acc_rate)
            d_history.append(d)
            window_accepted = 0
            window_attempted = 0

        # live snapshot + optional frame save
        if plot_every and (step % plot_every == 0):
            # live display (ghosted view)
            if live_ax is None:
                live_fig, live_ax = plt.subplots(figsize=(6, 6))
            live_ax.clear()
            live_ax.set_title(f"Monte Carlo step {step}")
            draw_disks(live_ax, positions, L, sigma, pbc=True)
            plt.pause(0.001)

            # save PNG frame (ghosted view)
            if save_frames:
                fig2, ax2 = plt.subplots(figsize=(5, 5))
                ax2.set_title(f"Step {step}")
                draw_disks(ax2, positions, L, sigma, pbc=True)
                out_path = os.path.join(frames_dir, f"frame_{step:07d}.png")
                fig2.savefig(out_path, dpi=150, bbox_inches="tight")
                plt.close(fig2)

    if live_fig is not None:
        plt.show()

    totals = {
        "accepted": int(total_accepted),
        "attempted": int(total_attempted),
        "rejected": int(total_attempted - total_accepted),
        "acceptance_rate": float(total_accepted / total_attempted) if total_attempted else 0.0,
    }

    # optional: stitch frames into a GIF at the end
    if stitch_gif and save_frames:
        try:
            import imageio.v2 as imageio
            files = sorted(
                os.path.join(frames_dir, f) for f in os.listdir(frames_dir)
                if f.lower().endswith(".png")
            )
            if files:
                imgs = [imageio.imread(f) for f in files]
                imageio.mimsave(gif_out, imgs, duration=1.0 / fps, loop=0)
                print(f"[run_mc] Saved GIF: {gif_out}")
            else:
                print(f"[run_mc] No PNG frames in {frames_dir}; GIF skipped.")
        except Exception as e:
            print(f"[run_mc] GIF stitching skipped ({e}). Install `imageio` if needed.")

    return positions, d, np.array(acc_rates), np.array(d_history), totals


In [364]:
import os

def stitch_frames(frames_dir: str,
                  gif_out: str = "mc_evolution2.gif",
                  mp4_out: str | None = "mc_evolution2.mp4",
                  fps: int = 15):
    import imageio.v2 as imageio
    files = sorted(
        os.path.join(frames_dir, f) for f in os.listdir(frames_dir)
        if f.lower().endswith(".png")
    )
    if not files:
        print(f"No PNG frames in {frames_dir}.")
        return
    imgs = [imageio.imread(f) for f in files]
    imageio.mimsave(gif_out, imgs, duration=1.0/fps, loop=0)
    print("Saved GIF:", gif_out)
    if mp4_out:
        try:
            import imageio_ffmpeg  # noqa
            w = imageio.get_writer(mp4_out, fps=fps, codec="libx264", quality=8)
            for im in imgs: w.append_data(im)
            w.close()
            print("Saved MP4:", mp4_out)
        except Exception as e:
            print("MP4 skipped:", e)


In [365]:
def tune_stepsize(d, accept_rate, up=1.05, down=0.95,
                  d_min=1e-4, d_max=None, sigma=None):
    if accept_rate > 0.50:
        d *= up
    elif accept_rate < 0.25:
        d *= down
    if sigma is not None:
        d_max = 0.5 * sigma
    if d_max is not None:
        d = min(d, d_max)
    return max(d, d_min)


In [366]:
rng = np.random.default_rng(config["seed"])
d0 = 0.2 * sigma

positions_run, d_final, acc_hist, d_hist, totals = run_mc(
    positions=positions.copy(),
    sigma=sigma,
    L=L,
    d_init=d0,
    equil_steps=10000,
    prod_steps=0,
    tune_every=config["tune_every"],
    rng=rng,
    plot_every=100,
    save_frames=True,
    frames_dir="frames_mc",
    cap_sigma_frac=0.5,
    stitch_gif=True,                 # auto-make GIF at the end (requires imageio)
    gif_out="mc_evolution.gif",
    fps=15
)

print("Final tuned d:", d_final)
print("Moves — accepted:", totals["accepted"],
      "rejected:", totals["rejected"],
      "attempted:", totals["attempted"],
      f"acceptance: {100*totals['acceptance_rate']:.1f}%")


[run_mc] Saved GIF: mc_evolution.gif
Final tuned d: 1.162460074522121
Moves — accepted: 9938 rejected: 62 attempted: 10000 acceptance: 99.4%


In [367]:
# 1) crossing the right boundary wraps to the left
x = L - 0.1
trial = wrap_pbc(np.array([x, 0.5*L]), L)
assert 0 <= trial[0] < L and 0 <= trial[1] < L

# 2) minimum-image distance across the boundary is small (not ~L)
a = np.array([L - 0.05, 0.5*L])
b = np.array([0.02,     0.5*L])
dr = min_image(a - b, L)
print("min-image |dr| =", np.linalg.norm(dr))  # should be ~0.07, not ~L


min-image |dr| = 0.06999999999999318
