In [17]:
from helpers import *

import gt4py.next as gtx

In [119]:
def diffusion_step_numpy(
    e2c2v: np.array,
    v2e: np.array,
    TE: np.array,
    TE_t: np.array,
    inv_primal_edge_length: np.array,
    inv_vert_vert_length: np.array,
    nnbhV: np.array,
    boundary_edge: np.array,
    kappa: float,
    dt: float,
) -> tuple[np.array, np.array]:

    # initialize
    TEinit = TE
    inv_primal_edge_length = inv_primal_edge_length[:, np.newaxis]
    inv_vert_vert_length = inv_vert_vert_length[:, np.newaxis]

    # predict
    TE = TEinit + 0.5*dt*TE_t

    # interpolate temperature from edges to vertices
    TV = np.sum(TE[v2e], axis=1) / nnbhV

    # compute nabla2 using the finite differences
    TEnabla2 = np.sum(
        TV[e2c2v] * inv_primal_edge_length ** 2
        + TV[e2c2v] * inv_vert_vert_length ** 2,
        axis=1,
    )

    TEnabla2 = TEnabla2 - (
        (2.0 * TE * inv_primal_edge_length ** 2)
        + (2.0 * TE * inv_vert_vert_length ** 2)
    )

    # build ODEs
    TE_t = np.where(
        boundary_edge,
        0.,
        kappa*TEnabla2,
    )

    # correct
    TE = TEinit + dt*TE_t
    return TE_t, TE

In [120]:
@gtx.field_operator
def diffusion_step(
    TE: gtx.Field[Dims[E], float],
    TE_t: gtx.Field[Dims[E], float],
    inv_primal_edge_length: gtx.Field[Dims[E], float],
    inv_vert_vert_length: gtx.Field[Dims[E], float],
    nnbhV: gtx.Field[Dims[V], float],
    boundary_edge: gtx.Field[Dims[E], bool],
    kappa: float,
    dt: float,
) -> gtx.tuple[
    gtx.Field[Dims[E], float],
    gtx.Field[Dims[E], float],
]:

    # initialize
    TEinit = TE

    # predict
    TE = TEinit + 0.5*dt*TE_t

    # interpolate temperature from edges to vertices
    TV = neighbor_sum(TE(V2E), axis=V2EDim) / nnbhV

    # compute nabla2 using the finite differences
    TEnabla2 = neighbor_sum(
        (TV(E2C2V) * inv_primal_edge_length ** 2
        + TV(E2C2V) * inv_vert_vert_length ** 2),
        axis=E2C2VDim
    )

    TEnabla2 = TEnabla2 - (
        (2.0 * TE * inv_primal_edge_length ** 2)
        + (2.0 * TE * inv_vert_vert_length ** 2)
    )

    # build ODEs
    TE_t = where(
        boundary_edge,
        0.,
        kappa*TEnabla2,
    )

    # correct
    TE = TEinit + dt*TE_t
    
    return TE_t, TE

In [127]:
def test_diffusion_step():
    backend = None
    # backend = gtfn_cpu
    # backend = gtfn_gpu
    
    cell_domain = gtx.domain({C: n_cells})
    edge_domain = gtx.domain({E: n_edges})
    vertex_domain = gtx.domain({V: n_vertices})
    
    u = random_field(edge_domain, allocator=backend)
    v = random_field(edge_domain, allocator=backend)
    nx = random_field(edge_domain, allocator=backend)
    ny = random_field(edge_domain, allocator=backend)
    L = random_field(edge_domain, allocator=backend)
    dualL = random_field(vertex_domain, allocator=backend)
    divergence_gt4py_1 =  gtx.zeros(edge_domain, allocator=backend)
    divergence_gt4py_2 =  gtx.zeros(edge_domain, allocator=backend)
    kappa = 1.0
    dt = 1.0

    divergence_ref_1, divergence_ref_2 = diffusion_step_numpy(        
        e2c2v_table,
        v2e_table,
        u.asnumpy(),
        v.asnumpy(),
        nx.asnumpy(),
        ny.asnumpy(),
        dualL.asnumpy(),
        L.asnumpy(),
        kappa,
        dt
    )

    e2c2v_connectivity = gtx.NeighborTableOffsetProvider(e2c2v_table, E, V, 4, has_skip_values=False)
    v2e_connectivity = gtx.NeighborTableOffsetProvider(v2e_table, V, E, 6, has_skip_values=False)

    diffusion_step(
        u, v, nx, ny, dualL, L, kappa, dt, out=(divergence_gt4py_1, divergence_gt4py_2), offset_provider = {E2C2V.value: e2c2v_connectivity, V2E.value: v2e_connectivity}
    )
    
    assert np.allclose(divergence_gt4py_1.asnumpy(), divergence_ref_1)
    assert np.allclose(divergence_gt4py_2.asnumpy(), divergence_ref_2)

In [128]:
test_diffusion_step()
print("Test successful")

Test successful
