In [None]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as sts

from collections import namedtuple
from scipy.ndimage import generic_filter

import richdem as rd

import os
import elevation

# tqdm
import tqdm

# PRIMER

Uses a DEM
DEM = Digital elevation model
    used to init model and params:
        grid init:
            1500m * 1500m 
            DEM for each thing, with raster 10m
                raster  = increments of 10
                width = resolution
                each cell = elevation info
        param init:
            Land Cover
                surface roughness, inflitration ~> rate of spread
                    rep: separate layers, or constant Manning coefficient for whole sim
                        0.011 for rocky (fast flow), **0.2** for vegetation (slow flow)   

4 grids
    1. water level
        **height of water column (float m)** in cell
            init: >0 if water else 0
    
    2. flooded cell grid
       FLOODED? **bool** flag on flooding conditions
            if flooded, can transfer water to neighboring cells
            flooding conditions:

    3. **direction** grid
        each cell [1,255]
            
        ![](./media/directions.jpg)
        preffered cell direction of water flow is one of 6 vals,
                NW, N, NE, W, E, SW, S, SE = 1,2,4,8,16,32,64,128
                *or everything lower?*
            determined by considering **height** in 8 neighbors
                lowest height = direction
                    for ex N = -10, [...EWS] = 0, N is preffered
                    N = 2 (while E = 4, W = 8 etc)
                if >1 cell have the same lowest height, 
                    use sum of directions
                    therefore, each cell [1,255] (255 if every cell has same elevation)
                
    4. slope grid
        generated from DEM, calculates **slope across neighborhood** for each cell

        tg(slope) = root(change_x^2 + change_y^2)
            df/dx = (h1,3 + h2,3 + h3,3 - h1,1 - h2,1 - h3,1) / 6
            df/dy = (h3,1 + h3,2 + h3,3 - h1,1 - h1,2 - h1,3) / 6
            h is contents of DEM in each cell (height)
                result: value of slope in degrees

for iter in iterations:
    temp_drain = np.zeros((N,N))
    temp_water = np.zeros((N,N))
    for i in N:
        for j in N:

            cell = grid[i,j]

            is cell[water] > 0:
                if water in cell.neighborhood:
                    check directions of flow in neighbors
                        if direction towards cell 
                            if cell is activated:
                                water_spread = calc_water_spread()
                                temp_water[i,j] = sw
                            if water in cell:
                                if cell with water == activated:
                                    water_drain = calc_water_drain()
                                    temp_drain[i,j] = water_drain

    water_grid += temp_water - temp_drain
        

In [None]:
### GRID CREATION ###

N = 5

def create_basin(N = 5):
    # Return a toy eleveation model

    # Initializations
    N = 5

    # [DEM, WaterCol, Flooded?, Direction, Slope]
    layers = 5
    grid = np.zeros((N,N,layers))

    for i in range(N):
        for j in range(N):
            # Use small scale as tally is easier for testing
            grid[i,j,0] -= 50**2*sts.norm.pdf(i, loc = N/2, scale = N) + \
                        50**2*sts.norm.pdf(j, loc = N/2, scale = N) - 500


    return grid

def create_pyramid(N = 5):
    pass

def init_grid(grid):

    grid = grid.copy()

    init_water(grid)
    init_flood(grid)
    init_slope(grid)
    init_directions(grid)
    
    return grid
    
basin = create_basin()
basin_grid = init_grid(basin)

np.all(basin_grid[...,1] == basin[...,1]), "Init Works Correctly"

In [None]:
### Plotting Utils ###

# Plotting Utils

LAYER_NAMES = {
    0 : "Digital Elevation Model",
    1 : "Water Column Height (m)",
    2 : "FLOODED? (bool)",
    3 : "Direction (int)",
    4 : "Slope (degrees)"
}

def view_layer(grid, layer):
    fig = plt.figure(figsize=(6,5))

    ax = fig.add_subplot(1,1,1)
    lc = ax.contourf(range(N), range(N), grid[:,:,layer], 50, cmap = 'magma')
    fig.colorbar(lc, ax=ax, shrink = 0.8)

    ax.set_xticks(range(0,grid.shape[1]))
    ax.set_yticks(range(0,grid.shape[0]))
    ax.grid(alpha = 0.2)

def view_3d_layer(grid, layer):
    fig = plt.figure(figsize=(6,5))

    ax = fig.add_subplot(1,1,1, projection = '3d')
    lc = ax.contourf(range(N), range(N), grid[:,:,layer], 50, cmap = 'magma')
    fig.colorbar(lc, ax=ax, shrink = 0.8)

    ax.set_xticks(range(0,grid.shape[1]))
    ax.set_yticks(range(0,grid.shape[0]))
    ax.grid(alpha = 0.2)

def view_slice(grid, slice = None, ax = None):

    # Plot a bar chart of heights
    # Brown: elevation 
    # Blue: water
    if slice == None:
        slice = grid.shape[0]//2

    if ax == None:
        fig, ax = plt.subplots()

    N = grid.shape[0]
    ground    = grid[slice,:,0]
    water_col = grid[slice,:,1]

    # Plot Ground
    # plot a bar chart of heights with wave hatching
    

    ax.bar(
        x = range(1,N+1), 
        height = ground,
        hatch = '///',
        color = 'brown')

    # Plot water column
    ax.bar(
        x = range(1,N+1),
        height  = water_col,
        bottom  = ground,
        hatch = '~'
    )

    lower = 0.95*(min(ground) + min(water_col))
    upper = 1.05*(max(ground) + max(water_col))
    ax.set_ylim((lower,upper))

    # For anim
    return ax

def hist(
    grid, 
    layer = 3, 

    benchmark = None,
    ax = None,
    bins=20, 
    color='w', 
    edgecolor='k', 
    figsize=(5,3), 
    ):

    if ax == None:
        fig = plt.figure(figsize=figsize)
        ax = fig.add_subplot(1,1,1)

    ax.hist(
        grid[...,layer].flatten(),
        bins = 15,
        color = color, 
        hatch = '///',
        edgecolor = edgecolor)

    ax.set_title(
        LAYER_NAMES[layer])

    if benchmark:
        ax.axvline(
            benchmark,
            color = 'red',
            label = f''
        )

        ax.plot(
            0,0,',',
            label = f'')
        ax.legend(loc = "upper left")

    adjust_spines(ax, ['bottom'])
    ax.set_xlabel(f'')
    
    plt.tight_layout()
    return ax
        
def adjust_spines(ax, spines, offset = 0):
        for loc, spine in ax.spines.items():
            if loc in spines:
                spine.set_position(('outward', offset))  # outward by offset points
                #spine.set_smart_bounds(True)
            else:
                spine.set_color('none')
        # turn off ticks where there is no spine
        if 'left' in spines:
            ax.yaxis.set_ticks_position('left')
        else:
            # no yaxis ticks
            ax.yaxis.set_ticks([])

        if 'bottom' in spines:
            ax.xaxis.set_ticks_position('bottom')
        else:
            # no xaxis ticks
            ax.xaxis.set_ticks([])

view_3d_layer(basin_grid, 0)
view_layer(basin_grid, 0)
view_slice(basin_grid, slice = 2)
hist(basin_grid)

In [None]:
### LAYER INITS ###

def init_water(grid, fill = 1, test = False):
    # grid is a 1D slice
    if test:
        water_layer = grid.copy()
    # modify an existing grid
    else:
        # fill (m) water in cell perimenter
        water_layer = grid[...,1]

    water_layer[0,:], water_layer[-1,:] = fill, fill
    water_layer[:,0], water_layer[:, -1] = fill, fill

    return water_layer

def init_flood(grid, t = 1):
    # t (s) : timestep
    flood_layer  = ...
    # calc Flood flag over grid
    # vels[vels > t]
    # update own flag

    return grid

directions = {
    k:v for k,v in zip(
        range(0,9), 
        [2**p for p in range(0,9)])}

def find_direction(x):
    # alias: D8 in literature. Direct flow to lowest neighbor(s)
    # If multiple, split evenly
    
    # Reject is extends beyond border
    # idx is direction
    
    center = x[4]
    x = np.where(x < center, x, float('inf'))

    idxs = np.where(x.flatten() == x.flatten().min())
    idxs = list(*idxs)

    return np.sum([directions[i] for i in idxs])

def init_directions(grid):
    # idx for elevation values
    DEM = 0
    DIRECTION = 2

    raster_kernel = np.ones((3,3))

    # TODO: better cval for constant mode
    # maybe repeat? 
    directions = generic_filter(
                    grid[...,DEM],
                    find_direction,
                    footprint = raster_kernel,  
                    mode = 'constant',
                    cval = np.nan)
    
    grid[...,DIRECTION] = directions

    return 


def calculate_slope(window, d = 1):
    
    df_dx = (np.sum(window[[2, 4, 4, 7]])  - np.sum(window[[0, 3, 3, 5]]))/8*d
    df_dy = (np.sum(window[[5, 6, 6, 7]])  - np.sum(window[[0, 1, 1, 2]]))/8*d

    rise_run = np.sqrt(df_dx**2 + df_dy**2)

    # rise/run -> degrees 
    # 57.29578 ~ 180/pi (acceptable precision)
    degree_slope = np.arctan(rise_run) * 57.29578

    return degree_slope

def init_slope(grid):
    # idx for elevation values
    DEM = 0
    SLOPE = 3

    moore_kernel = np.ones((3,3))
    moore_kernel[1,1] = 0   
    
    slopes = generic_filter(
                grid[...,DEM],
                calculate_slope,
                footprint = moore_kernel,  
                mode = 'nearest',
                cval = 0)
    
    grid[...,SLOPE] = slopes

    return 
    

In [None]:
### Testing ###

# angles match package? 
# Compare existing slope calc with calc from lib
path = './media/beauford.npz'
with np.load(path) as data:
    dem = data['beauford']

dem = rd.rdarray(dem[400:800,400:800], no_data=-9999)
slope = rd.TerrainAttribute(dem, attrib='slope_degrees')

moore_kernel = np.ones((3,3))
moore_kernel[1,1] = 0   

slopes = generic_filter(
            dem,
            calculate_slope,
            footprint = moore_kernel,  
            mode = 'nearest',
            cval = 0)

slopes_test = slopes

#plt.imshow(slopes_test, cmap = 'magma')
#plt.colorbar()
#plt.show()

#rd.rdShow(slope, axes=False, cmap='magma', figsize=(8, 5.5))
#plt.show()

# Is total water conserved?
    # How much infiltrates? 

# Is distribution of slope angles/rise_run normalish? 

slopes[200,200], slope[200,200]


# From main paper: see if gradient matches with paper
juraj_test = np.array(
    [
        [203.83, 201.09, 201.59, 204.31, 200.21, 201.71],
        [204.35, 203.89, 204.13, 200.29, 202.02, 200.20],
        [202.18, 201.27, 203.95, 201.20, 200.46, 201.72],
        [200.74, 202.66, 202.71, 204.34, 200.52, 201.95],
        [201.14, 204.04, 200.64, 203.18, 204.03, 204.35],
        [201.57, 203.51, 200.79, 200.01, 202.00, 202.02]
    ])

juraj_result = np.array([
       [  0.,   0.,   0.,   0.,   0.,   0.],
       [  0., 103., 251.,   4., 254.,   0.],
       [  0.,  32., 125., 146.,   5.,   0.],
       [  0., 171.,  77., 255.,   2.,   0.],
       [  0., 255., 128., 237., 238.,   0.],
       [  0.,   0.,   0.,   0.,   0.,   0.]
])

# Indexed row-by-row
directions = {
    0: 1,
    1: 2,
    2: 4,
    3: 8,
    5: 16,
    6: 32, 
    7: 64, 
    8: 128}

def find_direction(x):
    # alias: D8 in literature. Direct flow to lowest neighbor(s)
    # If multiple, split evenly
    
    # Reject is extends beyond border
    # idx is direction

    # filter uses flat arrays
    center = x[4]
    narrow = np.where(x < center, x, float('inf'))

    #idxs = np.where(x == narrow.min())
    #idxs = list(*idxs)
    
    idxs = np.where(narrow < float('inf'))
    idxs = list(*idxs)
    
    return np.sum([directions[i] for i in idxs])

def init_direction(grid):
    # idx for elevation values
    moore_kernel = np.ones((3,3)) 
    
    # TODO: better cval for constant mode
    # maybe repeat? 
    directions = generic_filter(
                    grid,
                    find_direction,
                    footprint = moore_kernel,  
                    mode = 'constant',
                    cval = np.nan)

    # Set border of directions to 0
    directions[0,:] = 0
    directions[:,0] = 0
    directions[-1,:] = 0
    directions[:,-1] = 0
    
    return directions

assert init_direction(juraj_test).all() == juraj_result.all()


# convert direction back into vector of indices to pour into
def get_pos(num):
    binary = np.binary_repr(num, width=8)
    # invert binary
    binary = binary[::-1]
    return binary


# Erosion
# props[0] = 0: produces flow, -1: no flow, -2:NoData

"""
234
105
876
"""

test = rd.rdarray(grid[...,0], no_data = 999)
rd.FlowProportions(test, method= "D8")[...,0]

#Different Flow Methods
# D8
# Quinn: to all cells, but as a function of slope to neighbor
# Rho : weight probability by slope
methods = ['D8','D4','Rho8', 'Rho4', 'Quinn', 'Freeman', 'Dinf']
flow = rd.FlowAccumulation(test, method = 'D8')

In [None]:
#### Update Rules ### 

# liquid loss = inflitration
depth = np.linspace(0,10,100)
s = np.linspace(0,90,100)
n = 0.2

w = 10

# Flow rate 
def fluid_velocity(depth = 4, s=np.pi, n = 0.2):
    """Params
        cell  (array[4]): a sim cell
        depth[m]   (int): depth of water column at cell
        s[deg]   (float): slope of cell
        n        (float): Manning roughness coefficient."""

    #depth = cell[0] 
    #s     = cell[2] 

    return np.cbrt(depth**2)*np.sqrt(s) / n


def time_through_cell(cell, width = 10):
    """Params
        v    (float): calculated fluid_velocity through cell
        width  (int): width of cell/raster resolution"""

    v = fluid_velocity(cell)

    return width/v

# What is this referring to
def volume_entering_cell(depth, width, v, t = 1):
    return depth * width * v * t

# Meet flooding conditions
def is_flooded(cell, t = 1):
    """check if cell is flooded
    Params
    Returns
        flooded? (bool)"""
    return time_through_cell(cell) > t


def infiltration_velocity(Ks = 2, Hf = 1, F = 2, Md = 3):
    """
    Params
        Ks(mmh^-1) : hydraulic conductivity
        Hf(mm)     : suction lift
        Md(mm)     : soil moisture deficit
        F          : total depth of infiltration
    """
    # How fast water seeps into the soil
    # infiltration_velocity << fluid_velocity
    return Ks*(1 + Hf*Md/F)

    

In [None]:
# Maximum theoretical flow
def compute_max_flow(h):
    # Energy is conserved
    g = 9.81
    v = np.sqrt(2*g*h)
    return v

# for different heights, plot max flow
h  = np.linspace(0,10,100)
plt.plot(h, compute_max_flow(h))