In [None]:
import matplotlib.pyplot as plt
# src/_tube_mesh.py  (or keep in your existing file)
import numpy as np
from ddgclib._misc import _set_boundary
from hyperct import Complex
from ddgclib.barycentric._duals import compute_vd

import numpy as np
import math


from ddgclib._plotting import plot_polyscope, plot_dual

from src._geometry import cube_to_tube, unit_cylinder


def unit_cylinder(r, refinements=1, height=1, up='z', distr_law='sinusoidal'):
    """
    Create a triangulated solid cylinder (tetrahedral mesh) of given radius and height.

    The mesh is constructed by:
    1. Starting with a uniform triangulation of the cube [-0.5, 0.5]^3
    2. Applying `refinements` global refinement steps
    3. Identifying boundary vertices on the four side faces (perpendicular to the cylinder axis)
    4. Projecting those side-boundary vertices onto a cylinder of radius `r`
    5. Distributing internal vertices radially according to `distr_law`
    6. Scaling the axial coordinate to achieve the desired height (centered at 0)

    The resulting mesh retains high-quality triangulation from the original cube.

    Parameters
    ----------
    r : float
        Cylinder radius.
    refinements : int, default=1
        Number of global refinement steps on the initial cube.
    height : float, default=1
        Cylinder height (axial length). The mesh is centered at origin.
    up : {'z', 'y'}, default='z'
        Axis along which the cylinder is oriented (cylinder axis direction).
    distr_law : {'sinusoidal', 'power_law', 'log_law'}, default='sinusoidal'
        Radial distribution law for interior vertices:
        - 'sinusoidal': smooth clustering near axis and surface (default)
        - 'power_law': r_eff = d**n * r (with n=0.5)
        - 'log_law': r_eff = r * log(d+1) / log(2)

    Returns
    -------
    HC : Complex
        A 3-dimensional simplicial complex (from ddgclib) representing the cylinder.
        Vertices are accessible via HC.V, with positions in .x_a (numpy array) and
        connectivity in .nn sets.
    """
    # Set up vector (used later to define orientation of the cylinder)
    if up == 'z':
        i_side = 1
        i_up = 2
        i_r = 0
    if up == 'y':
        i_side = 2
        i_up = 1
        i_r = 0

    # Construct the initial cube
    lb = -0.5
    ub = 0.5
    domain = [(lb, ub), ] * 3
    # symmetry = [0, 1, 1]
    HC = Complex(3, domain=domain, symmetry=None)
    HC.triangulate()
    for i in range(refinements):
        HC.refine_all()

    # Compute boundaries
    bV = set()
    for v in HC.V:
        if ((v.x_a[0] == lb or v.x_a[1] == lb or v.x_a[2] == lb) or
                (v.x_a[0] == ub or v.x_a[1] == ub or v.x_a[2] == ub)):
            bV.add(v)

    # Boundaries which exclude the interior vertices on the top/bottom of
    # the cylinder
    bV_sides = set()
    for v in HC.V:
        if ((v.x_a[0] == lb or v.x_a[0] == ub) or
                (v.x_a[i_side] == lb or v.x_a[i_side] == ub)):
            bV_sides.add(v)
            # Special side boundary property
            v.side_boundary = True
        else:
            v.side_boundary = False

    for bv in bV:
        _set_boundary(bv, True)
    for v in HC.V:
        if not (v in bV):
            _set_boundary(v, False)

    # Move the vertices to the tube radius
    for v in bV_sides:
        r_eff = r  # Trancated radius projection
        nv = np.zeros(3)
        theta = math.atan2(v.x_a[i_side], v.x_a[0])
        nv[0] = r_eff * np.cos(theta)
        nv[i_side] = r_eff * np.sin(theta)
        h_eff = height  # * ((1 - np.cos(np.pi * v.x[2]**0.5)) / 2)
        nv[i_up] = h_eff * (v.x_a[i_up])  # * 1e-3
        HC.V.move(v, tuple(nv))


    # Distribute the internal vertices
    for v in HC.V:
        if v.side_boundary:
            continue

        # Compute the distance from the rotational axis
        radial_idx = [i_r, i_side]          # [0, i_side]
        d = np.linalg.norm(v.x_a[radial_idx])

        # Distribute the internal vertices:
        # Sinusoidal scaling:
        if distr_law=='sinusoidal':
            r_eff = (r * ((1 - np.cos(np.pi * d ** 0.5)) / 2))
        # Power law scaling:
        if distr_law=='power_law':
            n = 0.5  # 0.6  # Aribtrarily chosen power law scaling, should be n<=r
            r_eff = d ** n * r  # Trancated radius projection
        # log law scaling:
        if distr_law=='log_law':
            r_eff = r * (np.log(d + 1) / np.log(2))

        # r_eff = r/d  # Trancated radius projection
        if up == 'z':
            nv = np.zeros(3)
            theta = math.atan2(v.x_a[i_side], v.x_a[0])
            nv[0] = r_eff * np.cos(theta)
            nv[i_side] = r_eff * np.sin(theta)

        elif up == 'y':
            nv = np.zeros(3)
            theta = math.atan2(v.x_a[i_side], v.x_a[0])
            nv[0] = r_eff * np.cos(theta)
            nv[i_side] = r_eff * np.sin(theta)

        # Height scaling
        h_eff = height  # * ((1 - np.cos(np.pi * v.x[2]**0.5)) / 2)
        nv[i_up] = h_eff * (v.x_a[i_up])  # * 1e-3

        HC.V.move(v, tuple(nv))

    return HC

r=1
#HC_unit = unit_cylinder(r, refinements=1, height=1, up='z', distr_law='sinusoidal')
HC_unit = unit_cylinder(r, refinements=1, height=1, up='y', distr_law='sinusoidal')
plot_polyscope(HC_unit)

In [91]:
HC_unit

<ddgclib.hyperct._complex.Complex at 0x7f1f75f3f9d0>

## Translate

In [106]:
import copy

def translate_complex(HC, axis=0, d=0.0, copy_complex=True, jitter=0.0):
    """
    Translate all vertices of a Complex along one coordinate axis.

    IMPORTANT WARNING
    -----------------
    The vertex cache uses exact tuple coordinates as keys.
    If a translated vertex lands exactly on an existing vertex (common when
    translating by integer or 0.5 multiples on a cube-derived mesh), the cache
    will merge the two vertices, collapsing parts of the mesh.

    Recommended practices:
      - Use small incremental translations (e.g. d=0.1 repeated 10 times)
      - OR set jitter > 0 (e.g. jitter=1e-12) to break exact equality
      - Avoid exact multiples of the original grid spacing (0.5, 1.0, etc.)

    Parameters
    ----------
    HC : Complex
        The simplicial complex to translate.
    axis : int, default=0
        Axis to translate along (0=x, 1=y, 2=z).
    d : float, default=0.0
        Translation distance.
    copy : bool, default=True
        If True, return a deep copy; if False, modify in-place.
    jitter : float, default=0.0
        If > 0, add uniform random noise ∈ [-jitter/2, jitter/2] to each
        coordinate after translation. Recommended value: 1e-12 when large
        translations are needed.

    Returns
    -------
    Complex
        The translated complex.
    """
    if axis not in (0, 1, 2):
        raise ValueError("axis must be 0, 1, or 2")

    if copy_complex:
        HC = copy.deepcopy(HC)

    rng = np.random.default_rng()  # reproducible if seeded later if needed

    for v in list(HC.V):
        new_pos = v.x_a.copy()
        new_pos[axis] += d

        # Optional jitter to prevent exact cache collisions
        if jitter > 0:
            new_pos += rng.uniform(-jitter/2, jitter/2, size=3)

        HC.V.move(v, tuple(new_pos))

    return HC


HC_unit = unit_cylinder(r, refinements=1, height=1, up='z', distr_law='sinusoidal')
for i in range(5):
    HC_unit_trans = translate_complex(HC_unit, axis=2, d=0.1, copy_complex=False)

plot_polyscope(HC_unit_trans)

In [None]:
plot_polyscope(HC_unit)

In [66]:
# Extrusion vector direction
L = 3.3  #
N = math.floor(3.3)

for i in range(1, N-1):
    for v in HC_unit.V:
        vnn = v.nn
        v_i_t = v.x_a + i
        print(f'v.x_a = {v.x_a}')
        print(f'v_i_t = {v_i_t}')

        # if v.boundary:

v.x_a = [ 0.70710678 -0.70710678 -1.        ]
v_i_t = [1.70710678 0.29289322 0.        ]
v.x_a = [ 0.70710678 -0.70710678  0.        ]
v_i_t = [1.70710678 0.29289322 1.        ]
v.x_a = [ 0.70710678 -0.70710678 -0.5       ]
v_i_t = [1.70710678 0.29289322 0.5       ]
v.x_a = [-1.0000000e+00  1.2246468e-16 -5.0000000e-01]
v_i_t = [0.  1.  0.5]
v.x_a = [-1.0000000e+00  1.2246468e-16 -1.0000000e+00]
v_i_t = [0. 1. 0.]
v.x_a = [ 6.123234e-17  1.000000e+00 -1.000000e+00]
v_i_t = [1. 2. 0.]
v.x_a = [ 6.123234e-17  1.000000e+00 -5.000000e-01]
v_i_t = [1.  2.  0.5]
v.x_a = [-1.0000000e+00  1.2246468e-16  0.0000000e+00]
v_i_t = [0. 1. 1.]
v.x_a = [6.123234e-17 1.000000e+00 0.000000e+00]
v_i_t = [1. 2. 1.]
v.x_a = [ 0.70710678  0.70710678 -1.        ]
v_i_t = [1.70710678 1.70710678 0.        ]
v.x_a = [ 0.70710678  0.70710678 -0.5       ]
v_i_t = [1.70710678 1.70710678 0.5       ]
v.x_a = [0.70710678 0.70710678 0.        ]
v_i_t = [1.70710678 1.70710678 1.        ]
v.x_a = [-0.70710678 -0.7071067

In [109]:
import numpy as np

def extrude(HC_unit, L, axis=2, cdist=1e-10):
    """
    Extrude a unit-length simplicial complex (e.g. unit_cylinder) to total length L
    along the specified axis.

    - Uses ceil(L) segments → minimum one unit per integer length
    - Each segment is scaled to length L/n_segments
    - Manual vertex replication + topology copy (no deepcopy)
    - Final merge_all glues adjacent segments together

    Parameters
    ----------
    HC_unit : Complex
        Unit-length mesh (height ≈1 along the extrusion axis, centered at 0)
    L : float > 0
        Total extrusion length
    axis : int, default=2
        Extrusion axis (0=x, 1=y, 2=z)
    cdist : float, default=1e-10
        Tolerance for merge_all at segment interfaces

    Returns
    -------
    Complex
        Extruded mesh of exact length L
    """
    if L <= 0:
        raise ValueError("L must be positive")
    if axis not in (0, 1, 2):
        raise ValueError("axis must be 0, 1, or 2")

    n_segments = max(1, int(np.ceil(L)))
    seg_len = L / n_segments

    extruded = Complex(3, domain=None)   # fresh target complex

    for k in range(n_segments):
        offset = k * seg_len
        vertex_map = {}  # old_v → new_v for this segment

        # 1. Create transformed vertices
        for old_v in HC_unit.V:
            pos = np.array(old_v.x_a, dtype=float)
            pos[axis] = pos[axis] * seg_len + offset   # scale + shift
            new_v = extruded.V[tuple(pos)]             # insert into cache
            vertex_map[old_v] = new_v

        # 2. Copy connectivity
        for old_v, new_v in vertex_map.items():
            for old_nb in old_v.nn:
                new_v.connect(vertex_map[old_nb])

    # 3. Glue segment interfaces
    extruded.V.merge_all(cdist=cdist)

    return extruded

In [121]:
cyl_unit = unit_cylinder(r=0.5, height=1.0, up='z', refinements=1)

long_cyl = extrude(cyl_unit, L=3.3, axis=2)          # 27 segments
long_cyl_y = extrude(cyl_unit, L=10.0, axis=1)

In [123]:
plot_polyscope(long_cyl)

In [124]:
import numpy as np

class MovingCylinder:
    """
    Moving cylindrical mesh with open outlet and continuous periodic inlet.

    - main  : current physical domain [inlet_pos, outlet_pos]
    - ghost : one full unit cylinder living upstream (periodic inlet source)
    - Every time step we advance all vertices by dx = velocity * dt
    - Delete vertices that leave the outlet
    - Copy vertices + local connections from ghost when they cross the inlet
    - When the entire ghost has entered, shift it upstream by exactly 1 unit
    """
    def __init__(self, unit_cyl, L, velocity=1.0, axis=2, inlet_pos=0.0, cdist=1e-10):
        self.axis = axis
        self.velocity = velocity
        self.inlet_pos = inlet_pos
        self.outlet_pos = inlet_pos + L
        self.cdist = cdist
        self.time = 0.0

        # Main physical mesh (starts full length)
        self.main = extrude(unit_cyl, L, axis=axis, cdist=cdist)

        # Ghost segment (fresh unit cylinder, placed just upstream)
        self.ghost = self._make_clean_unit_copy(unit_cyl)
        self._shift_ghost_upstream()   # initial position just before inlet

    def _make_clean_unit_copy(self, unit_cyl):
        """Create a fresh unit cylinder without deepcopy problems"""
        # Safest: just call the constructor again
        return unit_cylinder(
            r=0.5,                    # you can make this configurable
            refinements=unit_cyl.V.refinements if hasattr(unit_cyl.V, 'refinements') else 2,
            height=1.0,
            up='z' if self.axis == 2 else 'y',
            distr_law='sinusoidal'
        )

    def _shift_ghost_upstream(self):
        """Place ghost so its leading face is just upstream of inlet"""
        shift = self.inlet_pos - 1.0
        for v in list(self.ghost.V):
            pos = v.x_a.copy()
            pos[self.axis] += shift
            self.ghost.V.move(v, tuple(pos))

    def advance(self, dt):
        """Advance simulation by dt (constant velocity)"""
        dx = self.velocity * dt
        self.time += dt

        # 1. Move main mesh
        for v in list(self.main.V):
            pos = v.x_a.copy()
            pos[self.axis] += dx
            self.main.V.move(v, tuple(pos))

        # 2. Move ghost mesh
        for v in list(self.ghost.V):
            pos = v.x_a.copy()
            pos[self.axis] += dx
            self.ghost.V.move(v, tuple(pos))

        # 3. Delete vertices that left the outlet
        to_delete = [v for v in self.main.V if v.x_a[self.axis] >= self.outlet_pos]
        for v in to_delete:
            self.main.V.remove(v)          # assumes VertexCache has .remove()

        # 4. Add new vertices that crossed the inlet from ghost
        entered = []
        for gv in list(self.ghost.V):
            if gv.x_a[self.axis] >= self.inlet_pos:
                # Create/copy vertex in main at same position
                new_v = self.main.V[tuple(gv.x_a)]
                entered.append((gv, new_v))

        # 5. Copy connections only for vertices that also entered
        for gv, new_v in entered:
            for gnb in gv.nn:
                if gnb.x_a[self.axis] >= self.inlet_pos:   # neighbor also entered
                    new_nb = self.main.V[tuple(gnb.x_a)]
                    new_v.connect(new_nb)

        # 6. If whole ghost has entered → regenerate upstream (periodic inlet)
        if all(gv.x_a[self.axis] >= self.inlet_pos for gv in self.ghost.V):
            self._shift_ghost_upstream()

        # 7. Optional safety merge (helps with floating-point drift)
        self.main.V.merge_all(cdist=self.cdist)

        return len(to_delete)   # useful for monitoring outflow

    def current_length(self):
        zs = [v.x_a[self.axis] for v in self.main.V]
        return max(zs) - min(zs) if zs else 0.0

In [129]:
unit = unit_cylinder(r=0.5, height=1.0, up='z', refinements=1)

pipe = MovingCylinder(unit, L=5, velocity=1.0, axis=2)

# Advance simulation
for step in range(100):
    deleted = pipe.advance(dt=0.01)
    if step % 20 == 0:
        print(f"t={pipe.time:.2f}, length={pipe.current_length():.3f}, outflow={deleted}")

t=0.01, length=5.000, outflow=0
t=0.21, length=5.000, outflow=0
t=0.41, length=5.000, outflow=0
t=0.61, length=4.750, outflow=0
t=0.81, length=4.750, outflow=0


In [130]:

plot_polyscope(pipe.main)

# Object based boundary condition

In [131]:
from abc import ABC, abstractmethod
import numpy as np

class BoundaryCondition(ABC):
    """Base class for boundary conditions on a moving mesh."""
    def __init__(self, axis=2):
        self.axis = axis

    @abstractmethod
    def apply(self, mesh, dt):
        """Apply the BC to the mesh for this time step."""
        pass

## Periodic ghost inlet

In [132]:
class PeriodicInletBC(BoundaryCondition):
    """
    Periodic inlet: continuously injects copies of a unit mesh from upstream.
    Ghost mesh lives in a periodic box just before the inlet.
    """
    def __init__(self, unit_mesh, velocity, axis=2, inlet_pos=0.0, cdist=1e-10):
        super().__init__(axis)
        self.unit_mesh = unit_mesh          # source geometry (cylinder, rectangle, ...)
        self.velocity = velocity
        self.inlet_pos = inlet_pos
        self.cdist = cdist

        # Ghost copy (initially placed just upstream)
        self.ghost = self._clone_unit(unit_mesh)
        self._reset_ghost()

    def _clone_unit(self, unit_mesh):
        """Safe clone without deepcopy issues"""
        # Re-create via constructor (assumes unit_cylinder or similar factory)
        # For generality, you can pass a factory function if needed
        return unit_mesh   # placeholder – replace with proper clone if needed

    def _reset_ghost(self):
        """Shift ghost so its leading face is exactly 1 unit upstream"""
        shift = self.inlet_pos - 1.0
        for v in list(self.ghost.V):
            pos = v.x_a.copy()
            pos[self.axis] += shift
            self.ghost.V.move(v, tuple(pos))

    def apply(self, mesh, dt):
        dx = self.velocity * dt

        # Move ghost forward
        for v in list(self.ghost.V):
            pos = v.x_a.copy()
            pos[self.axis] += dx
            self.ghost.V.move(v, tuple(pos))

        # Inject any vertices that crossed the inlet
        entered = []
        for gv in list(self.ghost.V):
            if gv.x_a[self.axis] >= self.inlet_pos:
                new_v = mesh.V[tuple(gv.x_a)]          # insert or reuse
                entered.append((gv, new_v))

        # Copy connections for entered vertices
        for gv, new_v in entered:
            for gnb in gv.nn:
                if gnb.x_a[self.axis] >= self.inlet_pos:   # neighbor also entered
                    new_nb = mesh.V[tuple(gnb.x_a)]
                    new_v.connect(new_nb)

        # Periodic reset
        if all(gv.x_a[self.axis] >= self.inlet_pos for gv in self.ghost.V):
            self._reset_ghost()

        mesh.V.merge_all(cdist=self.cdist)

## Outlet

In [133]:
class OutletDeleteBC(BoundaryCondition):
    """Simple open outlet: delete vertices that leave the domain"""
    def __init__(self, outlet_pos, axis=2):
        super().__init__(axis)
        self.outlet_pos = outlet_pos

    def apply(self, mesh, dt):
        to_delete = [v for v in mesh.V if v.x_a[self.axis] >= self.outlet_pos]
        for v in to_delete:
            mesh.V.remove(v)          # assumes VertexCache supports .remove()
        return len(to_delete)

## TimeStepper / Advancer

In [136]:
class MeshAdvancer:
    def __init__(self, mesh, inlet_bc, outlet_bc, velocity):
        self.mesh = mesh
        self.inlet = inlet_bc
        self.outlet = outlet_bc
        self.velocity = velocity
        self.time = 0.0
        self.axis = inlet_bc.axis          # ← FIX: store axis from inlet (or outlet)

    def step(self, dt):
        dx = self.velocity * dt
        self.time += dt

        # 1. Advect main mesh
        for v in list(self.mesh.V):
            pos = v.x_a.copy()
            pos[self.axis] += dx
            self.mesh.V.move(v, tuple(pos))

        # 2. Apply boundary conditions
        outflow_count = self.outlet.apply(self.mesh, dt)
        self.inlet.apply(self.mesh, dt)

        return outflow_count

In [144]:
# Create unit geometry
unit_cyl = unit_cylinder(r=0.5, height=1.0, up='z', refinements=1)

# Domain length 10.0, inlet at x=0
main_mesh = extrude(unit_cyl, L=4, axis=2)

# Boundary conditions
inlet_bc = PeriodicInletBC(
    unit_mesh=unit_cyl,
    velocity=1.0,
    axis=2,
    inlet_pos=0.0
)

outlet_bc = OutletDeleteBC(outlet_pos=10.0, axis=2)

# Time integration
advancer = MeshAdvancer(main_mesh, inlet_bc, outlet_bc, velocity=1.0)

for step in range(500):
    deleted = advancer.step(dt=0.02)
    if step % 50 == 0:
        pass#print(f"t={advancer.time:.2f}, vertices={len(main_mesh.V)}, outflow={deleted}")

In [145]:
main_mesh.V

<ddgclib.hyperct._vertex.VertexCacheIndex at 0x7f1f744da0d0>

In [146]:
plot_polyscope(main_mesh)

In [143]:
main_mesh = extrude(unit_cyl, L=4, axis=2)
plot_polyscope(main_mesh)