In [79]:
import math
import itertools
import os
import io
import warnings

import numpy as np
import hoomd
import gsd.hoomd
import fresnel
import freud
import h5py
from scipy.stats import linregress

import matplotlib
import matplotlib_inline
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

matplotlib.style.use("ggplot")
matplotlib_inline.backend_inline.set_matplotlib_formats("svg")
matplotlib.rcParams['animation.embed_limit'] = 50

from IPython.display import HTML, display, set_matplotlib_formats
from PIL import Image as PILImage
import IPython

In [None]:
directory_path = "Data_B"
os.makedirs(directory_path, exist_ok=True)

In [80]:
device = fresnel.Device()
tracer = fresnel.tracer.Path(device=device, w=300, h=300)

def render(snapshot, traj=None, l=6, q_threshold=0.7, solid_threshold=6):
    if traj is not None:
        traj_path = gsd.hoomd.open(traj)
        last_frame = traj_path[-1]
        solid = freud.order.SolidLiquid(l=l, q_threshold=q_threshold, solid_threshold=solid_threshold)
        solid.compute(
            system=(last_frame.configuration.box, last_frame.particles.position),
            neighbors=dict(mode="nearest", num_neighbors=8),
        )
        solid_mask = solid.num_connections > solid.solid_threshold

    A_color = fresnel.color.linear([252/255, 209/255, 1/255])
    B_color = fresnel.color.linear([0.01, 0.74, 0.26])

    L = snapshot.configuration.box[0]
    scene = fresnel.Scene(device)
    geometry = fresnel.geometry.Sphere(
        scene, N=len(snapshot.particles.position), radius=0.5
    )
    
    geometry.material = fresnel.material.Material(
        color=(1,1,1),
        primitive_color_mix=1.0,
        roughness=0.5
    )

    geometry.position[:] = snapshot.particles.position[:]
    geometry.color[snapshot.particles.typeid[:] == 0] = B_color
    geometry.radius[snapshot.particles.typeid[:] == 0] = 0.5 * 1.1

    if traj is not None:
        red = fresnel.color.linear([1.0, 0.0, 0.0])
        geometry.color[solid_mask] = red

    #N = len(snapshot.particles.position)
    #base = np.array([252/255, 209/255, 1/255])
    #colors = np.tile(base, (N,1))
    #if solid_mask is not None:
    #    colors[solid_mask] = np.array([1.0, 0.0, 0.0])

    #for i in range(N):
    #    geometry.color[i] = fresnel.color.linear(colors[i])

    geometry.outline_width = 0.04
    fresnel.geometry.Box(scene, [L, L, L, 0, 0, 0], box_radius=0.02)

    scene.lights = [
        fresnel.light.Light(direction=(0, 0, 1), color=(0.8, 0.8, 0.8), theta=math.pi),
        fresnel.light.Light(
            direction=(1, 1, 1), color=(1.1, 1.1, 1.1), theta=math.pi / 3
        ),
    ]
    scene.camera = fresnel.camera.Orthographic(
        position=(L * 2, L, L * 2), look_at=(0, 0, 0), up=(0, 1, 0), height=L * 1.4 + 1
    )
    scene.background_alpha = 1
    scene.background_color = (1, 1, 1)
    samples = 100
    if "CI" in os.environ:
        samples = 100
    return IPython.display.Image(tracer.sample(scene, samples=samples)._repr_png_())

def render_mov(snapshot, solid_mask=None):
    A_color = fresnel.color.linear([252/255, 209/255, 1/255])
    B_color = fresnel.color.linear([0.01, 0.74, 0.26])

    L = snapshot.configuration.box[0]
    scene = fresnel.Scene(device)
    geometry = fresnel.geometry.Sphere(
        scene, N=len(snapshot.particles.position), radius=0.5
    )
    
    geometry.material = fresnel.material.Material(
        color=(1,1,1),
        primitive_color_mix=1.0,
        roughness=0.5
    )

    geometry.position[:] = snapshot.particles.position[:]
    geometry.color[snapshot.particles.typeid[:] == 0] = B_color
    geometry.radius[snapshot.particles.typeid[:] == 0] = 0.5 * 1.1

    if solid_mask is not None:
        red = fresnel.color.linear([1.0, 0.0, 0.0])
        geometry.color[solid_mask] = red

    geometry.outline_width = 0.04
    fresnel.geometry.Box(scene, [L, L, L, 0, 0, 0], box_radius=0.02)

    scene.lights = [
        fresnel.light.Light(direction=(0, 0, 1), color=(0.8, 0.8, 0.8), theta=math.pi),
        fresnel.light.Light(
            direction=(1, 1, 1), color=(1.1, 1.1, 1.1), theta=math.pi / 3
        ),
    ]
    scene.camera = fresnel.camera.Orthographic(
        position=(L * 2, L, L * 2), look_at=(0, 0, 0), up=(0, 1, 0), height=L * 1.4 + 1
    )
    scene.background_alpha = 1
    scene.background_color = (1, 1, 1)
    samples = 100
    if "CI" in os.environ:
        samples = 100
    return IPython.display.Image(tracer.sample(scene, samples=samples)._repr_png_())

def movie(traj, frames, figsize=(4,4), backend="jshtml", l=6, q_threshold=0.7, solid_threshold=6):
    traj_path = gsd.hoomd.open(traj)
    solid = freud.order.SolidLiquid(l=l, q_threshold=q_threshold, solid_threshold=solid_threshold)
    is_solid = []
    for frame in traj_path:
        solid.compute(
            system=(frame.configuration.box, frame.particles.position),
            neighbors=dict(mode="nearest", num_neighbors=8),
        )
        is_solid.append(solid.num_connections > solid.solid_threshold)

    fig, ax = plt.subplots(figsize=figsize)
    ax.axis("off")
    im = ax.imshow(np.zeros((300,300,3), dtype=np.uint8))

    def update(i):
        img = render_mov(traj_path[i], solid_mask=is_solid[i])
        arr = np.frombuffer(img.data, np.uint8)
        frame = PILImage.open(io.BytesIO(arr))
        im.set_data(np.array(frame))
        ax.set_title(f"Frame {i}")
        return (im,)

    idxs = frames
    anim = FuncAnimation(fig, update, frames=idxs, interval=500, blit=True)

    if backend == "jshtml":
        return HTML(anim.to_jshtml())
    elif backend in ("notebook","nbagg"):
        plt.close(fig) 
        return anim      
    else:
        raise ValueError("backend must be 'jshtml' or 'notebook'")

In [81]:
def snap_molecule_indices(snap):
    system = freud.AABBQuery.from_system(snap)
    num_query_points = num_points = snap.particles.N
    query_point_indices = snap.bonds.group[:, 0]
    point_indices = snap.bonds.group[:, 1]
    vectors = system.box.wrap(
        system.points[query_point_indices] - system.points[point_indices]
    )
    nlist = freud.NeighborList.from_arrays(
        num_query_points, num_points, query_point_indices, point_indices, vectors
    )
    cluster = freud.cluster.Cluster()
    cluster.compute(system=system, neighbors=nlist)
    return cluster.cluster_idx


def intermolecular_rdf(
    gsdfile,
    A_name,
    B_name,
    start=0,
    stop=None,
    r_max=None,
    r_min=0,
    bins=100,
    exclude_bonded=True,
):
    with gsd.hoomd.open(gsdfile) as traj:
            min_dim = min(
                np.min(frame.configuration.box[:3]) for frame in traj
            )

    if r_max is None:
        # Use a value just less than half the maximum box length.
        r_max = np.nextafter(min_dim * 0.5, 0, dtype=np.float32)

    with gsd.hoomd.open(gsdfile) as trajectory:
        snap = trajectory[0]

        rdf = freud.density.RDF(bins=bins, r_max=r_max, r_min=r_min)

        type_A = snap.particles.typeid == snap.particles.types.index(A_name)
        type_B = snap.particles.typeid == snap.particles.types.index(B_name)

        if exclude_bonded:
            molecules = snap_molecule_indices(snap)
            molecules_A = molecules[type_A]
            molecules_B = molecules[type_B]

        for snap in trajectory[start:stop]:
            A_pos = snap.particles.position[type_A]
            if A_name == B_name:
                B_pos = A_pos
                exclude_ii = True
            else:
                B_pos = snap.particles.position[type_B]
                exclude_ii = False

            box = snap.configuration.box
            system = (box, A_pos)
            aq = freud.locality.AABBQuery.from_system(system)
            nlist = aq.query(
                B_pos, {"r_max": r_max, "exclude_ii": exclude_ii}
            ).toNeighborList()

            if exclude_bonded:
                pre_filter = len(nlist)
                indices_A = molecules_A[nlist.point_indices]
                indices_B = molecules_B[nlist.query_point_indices]
                nlist.filter(indices_A != indices_B)
                post_filter = len(nlist)

            rdf.compute(aq, neighbors=nlist, reset=False)
        normalization = post_filter / pre_filter if exclude_bonded else 1
        return rdf, normalization
    
def plot_solids(
    traj_path,
    l=6,
    q_threshold=0.7,
    solid_threshold=6,
    num_neighbors=8,
    figsize=(8, 6),
):
    
    traj = gsd.hoomd.open(traj_path)
    solid = freud.order.SolidLiquid(
        l=l, q_threshold=q_threshold, solid_threshold=solid_threshold
    )
    num_solid = []

    for frame in traj:
        solid.compute(
            system=(frame.configuration.box, frame.particles.position),
            neighbors=dict(mode="nearest", num_neighbors=num_neighbors),
        )
        mask = solid.num_connections > solid.solid_threshold
        num_solid.append(np.sum(mask))

    fig = matplotlib.figure.Figure(figsize=figsize)
    ax = fig.add_subplot()
    ax.plot(num_solid)
    ax.set_xlabel("Frame")
    ax.set_ylabel("Number of particles in a solid environment")
    ax.set_title("Solid Particle Count")
    ax.grid(True)

    return fig


def MSD(
    gsd_filename: str,
    dt: float = 0.005,
    write_every: float = 100.0,
    mode: str = "window",
    marker: str = "o",
    **plot_kwargs
):
    
    traj = gsd.hoomd.open(gsd_filename)
    pos_list, img_list = [], []
    for f in traj:
        pos_list.append(f.particles.position.copy())
        img_list.append(f.particles.image.copy())
    positions = np.stack(pos_list)  # (N_frames, N_particles, 3)
    images    = np.stack(img_list)  # (N_frames, N_particles, 3)
    box       = traj[0].configuration.box[:3]

    msd_calc = freud.msd.MSD(box=box, mode=mode)
    msd_calc.compute(positions=positions, images=images, reset=True)
    msd = msd_calc.msd

    dt_frame = dt * write_every
    times = np.arange(len(msd)) * dt_frame

    plt.scatter(times, msd, marker=marker, **plot_kwargs)
    plt.xlabel("Time")
    plt.ylabel("Mean Squared Displacement")
    plt.title(f"MSD vs Time ({mode} mode)")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    return times, msd


**INITIALIZING LATTICE**

In [None]:
m = 128
N_particles = m * 2
spacing = 1.3
K = math.ceil(N_particles ** (1 / 3))
L = K * spacing
x = np.linspace(-L / 2, L / 2, K, endpoint=False)
position = list(itertools.product(x, repeat=3))
frame = gsd.hoomd.Frame()
frame.particles.N = N_particles
frame.particles.position = position[0:N_particles]
frame.particles.typeid = [0] * N_particles
frame.configuration.box = [L, L, L, 0, 0, 0]
frame.particles.types = ['A']

fn = os.path.join(os.getcwd(), "Data_B/lattice.gsd")
![ -e "$fn" ] && rm "$fn"

with gsd.hoomd.open(name="Data_B/lattice.gsd", mode="x") as f:
    f.append(frame)

render(frame)

**RANDOMIZING SYSTEM**

In [None]:
simulation = hoomd.Simulation(device=hoomd.device.CPU(), seed=4)
simulation.create_state_from_gsd(filename="Data_B/lattice.gsd")

integrator = hoomd.md.Integrator(dt=0.005)
cell = hoomd.md.nlist.Cell(buffer=0.4)
lj = hoomd.md.pair.LJ(nlist=cell)
lj.params[("A", "A")] = dict(epsilon=1.1, sigma=1.1)
lj.r_cut[("A", "A")] = 2.5 * 1.1
integrator.forces.append(lj)
nvt = hoomd.md.methods.ConstantVolume(
    filter=hoomd.filter.All(), thermostat=hoomd.md.methods.thermostats.Bussi(kT=1.0)
)
integrator.methods.append(nvt)
simulation.operations.integrator = integrator

simulation.state.thermalize_particle_momenta(filter=hoomd.filter.All(), kT=1.0)
thermodynamic_properties = hoomd.md.compute.ThermodynamicQuantities(
    filter=hoomd.filter.All()
)
simulation.operations.computes.append(thermodynamic_properties)

fn = os.path.join(os.getcwd(), "Data_B/randomize_trajectory.gsd")
![ -e "$fn" ] && rm "$fn"

gsd_writer = hoomd.write.GSD(
    filename="Data_B/randomize_trajectory.gsd", trigger=hoomd.trigger.Periodic(10), mode="xb"
)
simulation.operations.writers.append(gsd_writer)

simulation.run(10000)

gsd_writer.flush()

render(simulation.state.get_snapshot(), "Data_B/randomize_trajectory.gsd")

In [None]:
file = "Data_B/randomize_trajectory.gsd"
rdf, normalization = intermolecular_rdf(file, "A", "A")

plt.plot(rdf.bin_centers, rdf.rdf * normalization)
plt.xlabel("r")
plt.ylabel("g(r)")
plt.show()

In [None]:
traj = gsd.hoomd.open("Data_B/randomize_trajectory.gsd")
movie("Data_B/randomize_trajectory.gsd", range(0, len(traj), 10))

**COMPRESSING SYSTEM (NVT SIMULATION)**

In [None]:
simulation = hoomd.Simulation(device=hoomd.device.CPU(), seed=42)
simulation.create_state_from_gsd(filename="Data_B/randomize_trajectory.gsd")

integrator = hoomd.md.Integrator(dt=0.005)
cell = hoomd.md.nlist.Cell(buffer=0.4)
lj = hoomd.md.pair.LJ(nlist=cell)
lj.params[("A", "A")] = dict(epsilon=1.1, sigma=1.1)
lj.r_cut[("A", "A")] = 2.5 * 1.1
integrator.forces.append(lj)

temp_schedule = hoomd.variant.Ramp(
    A=1.0,           
    B=0.001,          
    t_start=simulation.timestep,       
    t_ramp=400_000  
)
nvt = hoomd.md.methods.ConstantVolume(
    filter=hoomd.filter.All(), thermostat=hoomd.md.methods.thermostats.Bussi(kT=temp_schedule)
)
integrator.methods.append(nvt)
simulation.operations.integrator = integrator

thermodynamic_properties = hoomd.md.compute.ThermodynamicQuantities(
    filter=hoomd.filter.All()
)
simulation.operations.computes.append(thermodynamic_properties)
logger = hoomd.logging.Logger(categories=["scalar"])
logger.add(simulation, quantities=["timestep"])
logger.add(thermodynamic_properties, quantities=["kinetic_temperature", "pressure", "volume"])

fn = os.path.join(os.getcwd(), "Data_B/trajectory.gsd")
![ -e "$fn" ] && rm "$fn"
gsd_writer = hoomd.write.GSD(
    filename="Data_B/trajectory.gsd", trigger=hoomd.trigger.Periodic(100), mode="xb")
simulation.operations.writers.append(gsd_writer)

fn = os.path.join(os.getcwd(), "Data_B/log.h5")
![ -e "$fn" ] && rm "$fn"
hdf5_writer = hoomd.write.HDF5Log(
    trigger=hoomd.trigger.Periodic(100), filename="Data_B/log.h5", mode="x", logger=logger)
simulation.operations.writers.append(hdf5_writer)



simulation.run(50001)
simulation.operations.writers.remove(hdf5_writer)
#simulation.operations.writers.remove(gsd_writer)

#gsd_writer = hoomd.write.GSD(
#    filename="trajectory_A.gsd", trigger=hoomd.trigger.Periodic(1000), mode="ab"
#)
#simulation.operations.writers.append(gsd_writer)

#v = simulation.state.box.volume
#if round(v, 2) != round(final_volume, 2):
    #raise ValueError(
    #    f"Final volume {simulation.state.box.volume:.2f} does not match expected volume {final_volume:.2f}"
    #)

#simulation.operations.updaters.remove(box_resize)
simulation.run(10e5)

gsd_writer.flush()

render(simulation.state.get_snapshot(), "Data_B/trajectory.gsd")

In [None]:
plot_solids("Data_B/trajectory.gsd", l=6, q_threshold=0.65, solid_threshold=5, num_neighbors=8)

In [None]:
traj = gsd.hoomd.open("Data_B/trajectory.gsd")
movie("Data_B/trajectory.gsd", range(0, len(traj), 10))

**NPT SIMULATION**

In [None]:
# GET INITIAL PRESSURE
simulation = hoomd.Simulation(device=hoomd.device.CPU(), seed=4)
simulation.create_state_from_gsd(filename="Data_B/randomize_trajectory.gsd")

integrator = hoomd.md.Integrator(dt=0.005)
cell = hoomd.md.nlist.Cell(buffer=0.4)
lj = hoomd.md.pair.LJ(nlist=cell)
lj.params[("A", "A")] = dict(epsilon=1.1, sigma=1.1)
lj.r_cut[("A", "A")] = 2.5 * 1.1
integrator.forces.append(lj)
nvt = hoomd.md.methods.ConstantVolume(
    filter=hoomd.filter.All(), thermostat=hoomd.md.methods.thermostats.Bussi(kT=1.0)
)
integrator.methods.append(nvt)
simulation.operations.integrator = integrator

simulation.state.thermalize_particle_momenta(filter=hoomd.filter.All(), kT=1.0)
thermodynamic_properties = hoomd.md.compute.ThermodynamicQuantities(
    filter=hoomd.filter.All()
)
simulation.operations.computes.append(thermodynamic_properties)

logger = hoomd.logging.Logger(categories=["scalar"])
logger.add(simulation, quantities=["timestep"])
logger.add(thermodynamic_properties, quantities=["kinetic_temperature", "pressure", "volume"])

fn = os.path.join(os.getcwd(), "Data_B/pressure_check.h5")
![ -e "$fn" ] && rm "$fn"
hdf5_writer = hoomd.write.HDF5Log(
        trigger=hoomd.trigger.Periodic(1), filename="Data_B/pressure_check.h5", mode="a", logger=logger
    )
simulation.operations.writers.append(hdf5_writer)

simulation.run(1)

with h5py.File("Data_B/pressure_check.h5", "r") as f:
    P = f['/hoomd-data/md/compute/ThermodynamicQuantities/pressure'][:]  
P_init = P[0]
print("Pressure =", P_init)

# Initialize system from random.gsd file. 
simulation = hoomd.Simulation(device=hoomd.device.CPU(), seed=4)
simulation.create_state_from_gsd(filename="Data_B/randomize_trajectory.gsd")

# Set up integrator and LJ forces.
integrator = hoomd.md.Integrator(dt=0.005)
cell = hoomd.md.nlist.Cell(buffer=0.4)
lj = hoomd.md.pair.LJ(nlist=cell)
lj.params[("A", "A")] = dict(epsilon=1, sigma=1)
lj.r_cut[("A", "A")] = 2.5
integrator.forces.append(lj)

pressures = [0.5, 1.0, 2.0, 3.0, 5.0, 7.0, 10.0, 15.0, 20.0, 25.0, 30.0]

thermodynamic_properties = hoomd.md.compute.ThermodynamicQuantities(
        filter=hoomd.filter.All()
    )
simulation.operations.computes.append(thermodynamic_properties)

equil_steps = 20_000
prod_steps = 50_000

logger = hoomd.logging.Logger(categories=["scalar"])
logger.add(simulation, quantities=["timestep"])
logger.add(thermodynamic_properties, quantities=["kinetic_temperature", "pressure", "volume"])

fn = os.path.join(os.getcwd(), "Data_B/npt_log.h5")
![ -e "$fn" ] && rm "$fn"
fn = os.path.join(os.getcwd(), "Data_B/npt_trajectory.gsd")
![ -e "$fn" ] && rm "$fn"

gsd_writer = hoomd.write.GSD(
        filename="Data_B/npt_trajectory.gsd", trigger=hoomd.trigger.Periodic(100), mode="ab"
    )
simulation.operations.writers.append(gsd_writer)

hdf5_writer = hoomd.write.HDF5Log(
        trigger=hoomd.trigger.Periodic(100), filename="Data_B/npt_log.h5", mode="a", logger=logger
    )
simulation.operations.writers.append(hdf5_writer)

for P in pressures:
    print(f"=== Sampling at P = {P} ===")
    integrator.methods.clear()
    pressure_schedule = hoomd.variant.Ramp(
        A=P_init,           
        B=P,          
        t_start=simulation.timestep,       
        t_ramp=20_000  
    )
    npt = hoomd.md.methods.ConstantPressure(
        filter=hoomd.filter.All(),
        tauS=1.0,
        S=pressure_schedule,
        couple="xyz",
        thermostat=hoomd.md.methods.thermostats.Bussi(kT=1.0),
    )
    integrator.methods.append(npt)
    simulation.operations.integrator = integrator

    simulation.run(equil_steps)

    simulation.run(prod_steps)
    P_init = P
    integrator.methods.clear()

gsd_writer.flush()

In [None]:
render(simulation.state.get_snapshot(), "Data_B/npt_trajectory.gsd")

In [None]:
with h5py.File('Data_B/npt_log.h5','r') as f:
    timesteps = f['/hoomd-data/Simulation/timestep'][:]  
    pressure  = f['/hoomd-data/md/compute/ThermodynamicQuantities/pressure'][:]  
    volume    = f['/hoomd-data/md/compute/ThermodynamicQuantities/volume'][:]

density = simulation.state.N_particles / volume

plt.figure()
plt.plot(timesteps, pressure, lw=1)
plt.xlabel("Timestep")
plt.ylabel("Pressure")
plt.title("Pressure vs. time")
plt.show()

In [None]:
plot_solids("Data_B/npt_trajectory.gsd")

In [None]:
file = "Data_B/npt_trajectory.gsd"
rdf, normalization = intermolecular_rdf(file, "A", "A")

plt.plot(rdf.bin_centers, rdf.rdf * normalization)
plt.xlabel("r")
plt.ylabel("g(r)")
plt.show()

In [None]:
traj = gsd.hoomd.open("Data_B/npt_trajectory.gsd")
movie("Data_B/npt_trajectory.gsd", range(0, len(traj), 50))

In [None]:
density = simulation.state.N_particles / volume

plt.figure(figsize=(6,4))
plt.scatter(density, pressure, marker='o')
plt.xlabel('Density ρ (N/V)')
plt.ylabel('Pressure P')
plt.title('Equation of State: P vs. ρ')
plt.grid(True)
plt.tight_layout()
plt.axvline(1.03, color='b', ls='--', label='Phase Transition (ρ = 1.03)')
plt.legend()
plt.show()