# Similar to the code in "HydroShed_MeshGenerator.ipynb" to generate a mesh

In [323]:
# Libraries

import numpy as np
from skimage.transform import resize
import pymartini
import rasterio
import pyvista as pv
from scipy.ndimage import map_coordinates
import settings

# matplotlib
import matplotlib.pyplot as plt
from matplotlib import cm
# for interactions
from mpl_toolkits.mplot3d import Axes3D

# remove this (and possibly restart the kernel) if you don't want interactive plots
%matplotlib widget

In [None]:

dem_path = settings.WASHINGTON_SMALL
output_vtp = settings.DATA_DIR / "hyd_na_dem_30s_mesh_3d_corrected3.vtp"

# Load DEM and metadata
with rasterio.open(dem_path) as src:
    dem = src.read(1)
    transform = src.transform
    crs = src.crs
    nodata_val = src.nodata

# Handle NoData values: mask and fill with NaN
dem_masked = dem.astype(np.float32)
if nodata_val is not None:
    dem_masked[dem_masked == nodata_val] = np.nan

# Optionally clip extreme elevation values but ignore NaNs
valid_vals = dem_masked[~np.isnan(dem_masked)]
lower_clip = np.percentile(valid_vals, 1)
upper_clip = np.percentile(valid_vals, 99)
dem_clipped = np.clip(dem_masked, lower_clip, upper_clip)

# --- Resize DEM safely

# we need a 2^k + 1 sized grid for the Martini algorithm to work
max_size = 1025
dem_size = dem_clipped.shape[0]
target_size = min(max_size, 2 ** int(np.floor(np.log2(dem_size - 1))) + 1)

# resize
dem_resized = resize(dem_clipped, (target_size, target_size), preserve_range=True, anti_aliasing=True)
dem_resized = np.ascontiguousarray(dem_resized.astype(np.float32))

# Build Martini mesh
grid_size = dem_resized.shape[0]
martini = pymartini.Martini(grid_size)
tile = martini.create_tile(dem_resized)

# Extract mesh at desired level
level = 0
vertices, triangles = tile.get_mesh(level)
vertices = np.array(vertices, dtype=np.float32).reshape(-1, 2) # put into 2 columns
triangles = np.array(triangles, dtype=np.int32).reshape(-1, 3) # put into 3 columns

rows = vertices[:, 0]
cols = vertices[:, 1]

# Map grid indices to real-world coordinates (x, y)
rows_int = np.clip(np.round(rows).astype(int), 0, dem_resized.shape[0] - 1)
cols_int = np.clip(np.round(cols).astype(int), 0, dem_resized.shape[1] - 1)
xs, ys = rasterio.transform.xy(transform, rows_int, cols_int)
xs = np.array(xs, dtype=np.float32)
ys = np.array(ys, dtype=np.float32)

# Bilinear interpolate elevation for fractional vertices (rows, cols)
coords = np.vstack([rows, cols])
zs = map_coordinates(dem_resized, coords, order=1, mode='nearest')

# Fix NaN values by replacing with nearby valid elevations (note: this does not seem to be needed)
nan_mask = np.isnan(zs)
if np.any(nan_mask):
    # Replace NaNs by nearest valid value (simple approach)
    zs[nan_mask] = np.nanmean(zs[~nan_mask])

# Normalize elevation (between 0 and 1) ignoring NaNs
zs_min = np.nanmin(zs)
zs_max = np.nanmax(zs)
zs = (zs - zs_min) / (zs_max - zs_min)

# Fix vertical exaggeration
z_scale = 0.01
zs_scaled = zs * z_scale

# Stack into final vertices (X, Y, Z)
vertices_3d = np.column_stack([xs, ys, zs_scaled])

# Optional: center horizontally for better visualization
vertices_3d[:, 0] -= vertices_3d[:, 0].mean()
vertices_3d[:, 1] -= vertices_3d[:, 1].mean()

# Build faces for PyVista
faces = np.hstack([np.full((triangles.shape[0], 1), 3, dtype=np.int32), triangles]).flatten()

mesh = pv.PolyData(vertices_3d, faces)

In [None]:
# Plot the mesh
pv.plot(mesh)

In [None]:
np.arange(-5, 5, 0.25)

In [None]:
# plot the verticies

# only plotting a subset here
num = 10000
X = xs[0:num] 
Y = ys[0:num]
Z = zs[0:num]

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.scatter(X, Y, Z, s=0.5, c=Z, marker='o', cmap='viridis')

plt.show()

In [None]:
#mask = (xs < -116.5) & (xs > -116.52) & (ys < 49.34) & (ys > 49.32)
mask = (xs < -116.505) & (xs > -116.510) & (ys > 49.3275) & (ys < 49.3325)

verticies_subset = np.where(mask)
m = np.isin(triangles, verticies_subset)
triangles_subset = triangles[np.all(m, axis=1)]

In [None]:
# testing
test = np.array([[1, 2, 3], [4, 5, 6], [1, 2, 6], [1, 2, 3], [1, 2, 3]])
vert = np.array([1, 2, 3])

m = np.isin(test, vert)

test[np.all(m, axis=1)] # all the verticies have to match for each triangle

In [None]:
rng = np.random.default_rng()

In [None]:
start_point = [-116.507, 49.33]
#start_point = [-116.5072, 49.3308]
#start_point = [-116.5077, 49.3285]

In [None]:
# see https://stackoverflow.com/questions/2049582/how-to-determine-if-a-point-is-in-a-2d-triangle
def sign(p1, p2, p3):
    return (p1[0] - p3[0]) * (p2[1] - p3[1]) - (p2[0] - p3[0]) * (p1[1] - p3[1])

# returns True if the point is inside the given triangle, otherwise False
def point_in_triangle(point, triangle):
    d1 = sign(point, triangle[0], triangle[1])
    d2 = sign(point, triangle[1], triangle[2])
    d3 = sign(point, triangle[2], triangle[0])
    
    has_neg = (d1 < 0) or (d2 < 0) or (d3 < 0)
    has_pos = (d1 > 0) or (d2 > 0) or (d3 > 0)

    return not (has_neg and has_pos)

In [None]:
# AI wrote this code:
def line_segment_line_intersection(p1, p2, q1, v):
    """
    Finds the intersection point of a line segment and a line in 2D.

    Args:
        p1 (np.array): Start point of the line segment (e.g., np.array([x1, y1])).
        p2 (np.array): End point of the line segment (e.g., np.array([x2, y2])).
        q1 (np.array): A point on the line (e.g., np.array([qx, qy])).
        v (np.array): Direction vector of the line (e.g., np.array([vx, vy])).

    Returns:
        np.array or None: The intersection point if it exists on the segment, otherwise None.
    """
    
    A = np.array([p2 - p1, -v]).T  # Matrix for coefficients of t and s
    b = q1 - p1                   # Right-hand side vector

    # Check for parallel lines (determinant of A close to zero)
    if np.isclose(np.linalg.det(A), 0):
        # Lines are parallel, no unique intersection or infinite intersections
        return None 

    try:
        ts = np.linalg.solve(A, b)
        t, s = ts[0], ts[1]

        if 0 <= t <= 1 and s >= 0:  # Check if intersection is on the line segment
            intersection_point = p1 + t * (p2 - p1)
            return intersection_point
        else:
            return None  # Intersection is outside the line segment
    except np.linalg.LinAlgError:
        return None # No solution (e.g., singular matrix if lines are parallel)

In [None]:
def get_real_vertex_3D(vertex):
    return np.array([xs[vertex], ys[vertex], zs[vertex]])

def get_real_vertex_2D(vertex):
    return np.array([xs[vertex], ys[vertex]])

In [None]:
# translates the given triangle that contains only the indicies of each vertex into a triangle that
# contains the actual values for each vertex (ignores z)
def get_full_2D_triangle(triangle):
    return np.array([[xs[triangle[0]], ys[triangle[0]]], [xs[triangle[1]], ys[triangle[1]]], [xs[triangle[2]], ys[triangle[2]]]])

def get_full_3D_triangle(triangle):
    return np.array([[xs[triangle[0]], ys[triangle[0]], zs[triangle[0]]],
            [xs[triangle[1]], ys[triangle[1]], zs[triangle[1]]],
            [xs[triangle[2]], ys[triangle[2]], zs[triangle[2]]]])

def get_triangle_at(point):
    for triangle in triangles_subset:
        full = get_full_2D_triangle(triangle)
        if point_in_triangle(point, full):
            return triangle

    print("Failed to get triangle for point")
    return False

def get_triangles_with_edge(p1, p2):
    triangle_list = [] # there should only ever be two triangles that share the same edge

    for triangle in triangles_subset:
        # this is not the best way to do it, but it was quick and easy
        if triangle[0] == p1 and triangle[1] == p2:
            triangle_list.append(triangle)
        elif triangle[1] == p1 and triangle[2] == p2:
            triangle_list.append(triangle)
        elif triangle[2] == p1 and triangle[0] == p2:
            triangle_list.append(triangle)
        elif triangle[0] == p2 and triangle[1] == p1:
            triangle_list.append(triangle)
        elif triangle[1] == p2 and triangle[2] == p1:
            triangle_list.append(triangle)
        elif triangle[2] == p2 and triangle[0] == p1:
            triangle_list.append(triangle)

    if len(triangle_list) != 2:
        print("Failed getting 2 triangles with the same specified edge: ", p1, ", ", p2)
    
    return triangle_list

# from Jones et al. (p. 1237)
def calculate_steepest_descent(triangle):
    x = (triangle[0][0], triangle[1][0], triangle[2][0])
    y = (triangle[0][1], triangle[1][1], triangle[2][1])
    z = (triangle[0][2], triangle[1][2], triangle[2][2])

    A = y[0] * (z[1] - z[2]) + y[1] * (z[2] - z[0]) + y[2] * (z[0] - z[1])
    B = z[0] * (x[1] - x[2]) + z[1] * (x[2] - x[0]) + z[2] * (x[0] - x[1])
    C = x[0] * (y[1] - y[2]) + x[1] * (y[2] - y[0]) + x[2] * (y[0] - y[1])
    #D = -(A * x[0]) - (B * y[0]) - (C * z[1])

    descent = [(A / C), (B / C)]
    return descent

def get_point_from_descent(triangle, start, descent):
    full = get_full_2D_triangle(triangle)

    intersection = line_segment_line_intersection(full[0], full[1], start, descent)
    if intersection is not None:
        return (intersection, triangle[0], triangle[1])
    intersection = line_segment_line_intersection(full[1], full[2], start, descent)
    if intersection is not None:
        return (intersection, triangle[1], triangle[2])
        
    intersection = line_segment_line_intersection(full[2], full[0], start, descent)

    if intersection is None:
        print("ERROR: next_point should never be None")

    return (intersection, triangle[2], triangle[0])

# really long name, but does the same as "get_point_from_descent" except it
# also gets the other, adjacent triangle that touches the intersection point
def get_point_and_adj_triangle_from_descent(triangle, start, descent):
    intersection, v1, v2 = get_point_from_descent(triangle, start, descent)

    adj_triangles = get_triangles_with_edge(v1, v2)
    if len(adj_triangles) != 2:
        print("Error: not enough (or too many) triangles")
    
    current_tri = None
    adj_tri = None
    if np.array_equal(adj_triangles[0], np.array(triangle)):
        current_tri = adj_triangles[0]
        adj_tri = adj_triangles[1]
    elif np.array_equal(adj_triangles[1], np.array(triangle)):
        current_tri = adj_triangles[1]
        adj_tri = adj_triangles[0]
    else:
        print("Error: this should not happen, one of the triangles must be equal")
    
    return (intersection, adj_tri, v1, v2)

In [None]:
# see: https://math.stackexchange.com/questions/1324179/how-to-tell-if-3-connected-points-are-connected-clockwise-or-counter-clockwise
# triangle should be an array of three points (e.g. [[x1, y1, z1], [x2, y2, z2], [x3, y3, z3]])
def is_clockwise(triangle):
    matrix = np.array([[triangle[0][0], triangle[0][1], 1],
                       [triangle[1][0], triangle[1][1], 1],
                       [triangle[2][0], triangle[2][1], 1]])
    
    det = np.linalg.det(matrix)

    if det < 0:
        return True # clockwise
    else:
        return False # counterclockwise

def make_triangle_counterclockwise(triangle):
    if is_clockwise(triangle):
        return triangle[::-1] # reverse the array
    
    return triangle # already counterclockwise

In [None]:
def find_row_index(array, row):
    index = 0
    for r in array:
        if np.array_equal(r, row):
            return index
        
        index += 1
    
    print("Failed to find row")
    return None

def cross_2D(v1, v2):
    return v1[0] * v2[1] - v1[1] * v2[0]

In [None]:
def draw_triangle(ax, triangle, color):
    ax.plot([xs[triangle[0]], xs[triangle[1]]], [ys[triangle[0]], ys[triangle[1]]], "-", color=color, linewidth=1)
    ax.plot([xs[triangle[1]], xs[triangle[2]]], [ys[triangle[1]], ys[triangle[2]]], "-", color=color, linewidth=1)
    ax.plot([xs[triangle[2]], xs[triangle[0]]], [ys[triangle[2]], ys[triangle[0]]], "-", color=color, linewidth=1)

In [None]:
# plot some verticies in 2D
X = xs[mask]
Y = ys[mask]
Z = zs[mask]

fig = plt.figure()
ax = fig.add_subplot(111)

# draw triangles
for triangle in triangles_subset:
    #random_color = rng.choice(["k", "b"])
    random_color = "#00000055"
    draw_triangle(ax, triangle, random_color)

ax.plot(start_point[0], start_point[1], 'o')

tri = get_triangle_at(start_point)
draw_triangle(ax, tri, "#FF0000FF")

full_tri = get_full_3D_triangle(tri)

descent = calculate_steepest_descent(full_tri)
descent = descent / np.linalg.norm(descent)
descent *= 0.001
ax.plot([start_point[0], start_point[0] + descent[0]], [start_point[1], start_point[1] + descent[1]], "-", color="red", linewidth=1)

next_point, adj_tri, v1, v2 = get_point_and_adj_triangle_from_descent(tri, start_point, descent)
ax.plot(next_point[0], next_point[1], 'o')
draw_triangle(ax, adj_tri, "#00FF00FF")

full_adj_tri = get_full_3D_triangle(adj_tri)
descent_adj = calculate_steepest_descent(full_adj_tri) # gh from p. 1239, Jones et al.

full_adj_tri = make_triangle_counterclockwise(full_adj_tri)

v1 = get_real_vertex_3D(v1)
v2 = get_real_vertex_3D(v2)

if find_row_index(full_adj_tri, v1) < find_row_index(full_adj_tri, v2): # if v1 comes before v2
    ij = v2 - v1
else: # v2 comes before v1
    ij = v1 - v2

# direction of the adj. triangle: if positive then adj. triangle slopes toward the current edge
direction = cross_2D(np.array(descent_adj), ij[0:2])
print(direction)

ax.scatter(X, Y, s=10, c=Z, marker='o', cmap='viridis')

plt.show()