# import modules

In [73]:
import numpy as np
import pandas as pd
import rasterio, rasterio.mask
import fiona
import geopandas as gpd
from osgeo import gdal
import os
import math
import scipy
from scipy import ndimage
from rasterio.enums import Resampling
import subprocess
from numpy.lib.stride_tricks import as_strided
from collections import deque

# input files

In [74]:
# Input data
# --- Provide a folder for the working directory
wd = r'C:\temp\connaught_creek'
# --- Input DEM
DEM = r'C:\temp\pra_testdata\Connaught_dem.tif' # 200x200
# --- Do you have forest data?
forest_type = 'no_forest' # choose one of the follwoing: ['stems', 'pcc', 'no_forest']
# --- PRA threshold
pra_threshold = 0.15
# --- Windshelter inputs
radius = 8 # radius * DEM resolution should be approx. 60 meters
prob = 0.5

# Generate test data for forest
# --- Input forest
with rasterio.open(DEM) as src:
    profile = src.profile
    array = src.read()
    array[np.where(array >= 0)] = 0
# --- Save raster to path
with rasterio.open(os.path.join(wd, "forest_clip.tif"), "w", **profile) as dest:
    dest.write(array)

# slope and ruggedness

In [75]:
%%time

# --- Calculate slope angle
def calculate_slope(DEM):
    gdal.DEMProcessing('slope.tif', DEM, 'slope')
    with rasterio.open('slope.tif') as src:
        slope = src.read()
        profile = src.profile
    return slope, profile

slope, profile = calculate_slope(DEM)
slope = slope.astype('int16')

# Update metadata
profile.update({"driver": "GTiff", "nodata": -9999, 'dtype': 'int16'})

# Save raster to path
with rasterio.open(os.path.join(wd, "slope.tif"), "w", **profile) as dest:
    dest.write(slope)
    
# --- Calculate aspect
def calculate_aspect(DEM):
    gdal.DEMProcessing('aspect.tif', DEM, 'aspect')
    with rasterio.open('aspect.tif') as src:
        aspect = src.read()
        profile = src.profile
    return aspect, profile

aspect, profile = calculate_aspect(DEM)
aspect = aspect.astype('int16')

# Update metadata
profile.update({"driver": "GTiff", "nodata": -9999, 'dtype': 'int16'})

# Save raster to path
with rasterio.open(os.path.join(wd, "aspect.tif"), "w", **profile) as dest:
    dest.write(aspect)
    
slope = slope.reshape(slope.shape[1], slope.shape[2])
aspect = aspect.reshape(aspect.shape[1], aspect.shape[2])

# Convert to radians
slope_rad = slope*np.pi/180
aspect_rad = aspect*np.pi/180
#calculate xyz components
xy_raster = np.sin(slope_rad)
z_raster = np.cos(slope_rad)
x_raster = np.sin(aspect_rad) * xy_raster
y_raster = np.cos(aspect_rad) * xy_raster

view = np.lib.stride_tricks.as_strided(y_raster, shape=(3, 3, y_raster.shape[0] - 2, y_raster.shape[1] - 2), strides=y_raster.strides * 2)
ysum_raster = view.sum(axis=(0, 1))

view = np.lib.stride_tricks.as_strided(x_raster, shape=(3, 3, x_raster.shape[0] - 2, x_raster.shape[1] - 2), strides=x_raster.strides * 2)
xsum_raster = view.sum(axis=(0, 1))

view = np.lib.stride_tricks.as_strided(z_raster, shape=(3, 3, z_raster.shape[0] - 2, z_raster.shape[1] - 2), strides=z_raster.strides * 2)
zsum_raster = view.sum(axis=(0, 1))

result_raster = np.sqrt((xsum_raster)**2 + (ysum_raster)**2 + (zsum_raster)**2)
ruggedness_raster = (1- (result_raster/9))
ruggedness_raster = np.pad(ruggedness_raster, (1,1), "constant", constant_values=(0,0))

rugg = ruggedness_raster.reshape(1, ruggedness_raster.shape[0], ruggedness_raster.shape[1])

# Update metadata
profile.update({'dtype': 'float64'})

# Save raster to path using meta data from dem.tif (i.e. projection)
with rasterio.open(os.path.join(wd, "ruggedness.tif"), "w", **profile) as dest:
    dest.write(rugg)
    
slope = slope.reshape(1, slope.shape[0], slope.shape[1])

Wall time: 117 ms


# windshelter

## windshelter functions

In [76]:
def sliding_window_view(arr, window_shape, steps):
    """ 
    Produce a view from a sliding, striding window over `arr`.
    The window is only placed in 'valid' positions - no overlapping
    over the boundary
    """
    
    in_shape = np.array(arr.shape[-len(steps):])  # [x, (...), z]
    window_shape = np.array(window_shape)  # [Wx, (...), Wz]
    steps = np.array(steps)  # [Sx, (...), Sz]
    nbytes = arr.strides[-1]  # size (bytes) of an element in `arr`

    # number of per-byte steps to take to fill window
    window_strides = tuple(np.cumprod(arr.shape[:0:-1])[::-1]) + (1,)
    # number of per-byte steps to take to place window
    step_strides = tuple(window_strides[-len(steps):] * steps)
    # number of bytes to step to populate sliding window view
    strides = tuple(int(i) * nbytes for i in step_strides + window_strides)

    outshape = tuple((in_shape - window_shape) // steps + 1)
    # outshape: ([X, (...), Z], ..., [Wx, (...), Wz])
    outshape = outshape + arr.shape[:-len(steps)] + tuple(window_shape)
    return as_strided(arr, shape=outshape, strides=strides, writeable=False)

def sector_mask(shape,centre,radius,angle_range): # used in windshelter_prep
    """
    Return a boolean mask for a circular sector. The start/stop angles in  
    `angle_range` should be given in clockwise order.
    """

    x,y = np.ogrid[:shape[0],:shape[1]]
    cx,cy = centre
    tmin,tmax = np.deg2rad(angle_range)

    # ensure stop angle > start angle
    if tmax < tmin:
            tmax += 2*np.pi

    # convert cartesian --> polar coordinates
    r2 = (x-cx)*(x-cx) + (y-cy)*(y-cy)
    theta = np.arctan2(x-cx,y-cy) - tmin

    # wrap angles between 0 and 2*pi
    theta %= (2*np.pi)

    # circular mask
    circmask = r2 <= radius*radius

    # angular mask
    anglemask = theta <= (tmax-tmin)
    
    a = circmask*anglemask

    return a

def windshelter_prep(radius, direction, tolerance, cellsize):
    x_size = y_size = 2*radius+1
    x_arr, y_arr = np.mgrid[0:x_size, 0:y_size]
    cell_center = (radius, radius)
    dist = (np.sqrt((x_arr - cell_center[0])**2 + (y_arr - cell_center[1])**2))*cellsize
    # dist = np.round(dist, 5)
    
    mask = sector_mask(dist.shape, (radius, radius), radius, (direction, tolerance))
    
    return dist, mask

def windshelter(x, prob, dist, mask, radius): # applying the windshelter function
    data = x*mask
    data[data==profile['nodata']]=np.nan
    data[data==0]=np.nan
    center = data[radius, radius]
    data[radius, radius]=np.nan
    data = np.arctan((data-center)/dist)
    data = np.nanquantile(data, prob)
    return data

def windshelter_window(radius, prob):

    dist, mask = windshelter_prep(radius, 0, 360, cell_size)
    window = sliding_window_view(array[-1], ((radius*2)+1,(radius*2)+1), (1, 1))

    nc = window.shape[0]
    nr = window.shape[1]
    ws = deque()

    for i in range(nc):
        for j in range(nr):
            data = window[i, j]
            data = windshelter(data, prob, dist, mask, radius).tolist()
            ws.append(data)

    data = np.array(ws)
    data = data.reshape(nc, nr)
    data = np.pad(data, pad_width=radius, mode='constant', constant_values=0)
    data = data.reshape(1, data.shape[0], data.shape[1])
    
    profile.update({"dtype": "float64"})
    
    # Save raster to path using meta data from dem.tif (i.e. projection)
    with rasterio.open(os.path.join(wd, 'windshelter.tif'), "w", **profile) as dest:
        dest.write(data)
        
    print(os.path.join(wd, 'windshelter.tif'))

## calculate windshelter

In [77]:
%%time

with rasterio.open(DEM) as src:
    array = src.read()
    array = array.astype('float')
    profile = src.profile
    cell_size = profile['transform'][0]

data = windshelter_window(radius, prob)

C:\temp\connaught_creek\windshelter.tif
Wall time: 6.86 s


# fuzzy operator

In [80]:
# define bell curve parameters for ruggedness
a = 0.01
b = 5
c = -0.007

ruggC = 1/(1+((rugg-c)/a)**(2*b))
ruggC[np.where(rugg > 0.01)] = 0

# define bell curve parameters for slope
a = 7
b = 2
c = 38

slopeC = 1/(1+((slope-c)/a)**(2*b))
slopeC[np.where(slope < 25)] = 0
slopeC[np.where(slope > 55)] = 0

# define bell curve parameters for windshelter
a = 2
b = 3
c = 2

with rasterio.open(os.path.join(wd, "windshelter.tif")) as src:
    windshelter = src.read()
    
windshelterC = 1/(1+((windshelter-c)/a)**(2*b))


# define bell curve parameters for forest stem density
if forest_type in ['stems']:
    a = 350
    b = 2.5
    c = -150

# define bell curve parameters for percent canopy cover
if forest_type in ['pcc', 'no_forest']:
    a = 350
    b = 2.5
    c = -150

with rasterio.open(os.path.join(wd, 'forest_clip.tif')) as src:
    forest = src.read()

forestC = 1/(1+((forest-c)/a)**(2*b))

#### Fuzzy logic operator
minvar = np.minimum(slopeC, ruggC)
minvar = np.minimum(minvar, windshelterC)
minvar = np.minimum(minvar, forestC)

PRA = (1-minvar)*minvar+minvar*(slopeC+ruggC+windshelterC+forestC)/4

with rasterio.open(os.path.join(wd, "slope.tif")) as src:
    profile = src.profile

PRA = PRA.astype('float32')
PRA[np.where(PRA <= 0)] = -9999

# Update metadata
profile.update({'dtype': 'float32'})

# Save raster to path using meta data from dem.tif (i.e. projection)
with rasterio.open(os.path.join(wd, 'pra.tif'), "w", **profile) as dest:
    dest.write(PRA)
    
# --- Make PRA layer binary (0 and 1)
with rasterio.open(os.path.join(wd, "pra.tif")) as src:
    profile = src.profile
    array = src.read()
    
    array[np.where(array < pra_threshold)] = 0
    array[np.where(array >= pra_threshold)] = 1
    array[np.where(array == -9999)] = 255
    array = array.astype('uint8')
    profile.update({'driver': 'GTiff', 'dtype': 'uint8', 'nodata': 255})

# --- Save raster to path
with rasterio.open(os.path.join(wd, "pra_binary.tif"), "w", **profile) as dest:
    dest.write(array)
    
print(os.path.join(wd, "pra_binary.tif"))

C:\temp\connaught_creek\pra_binary.tif
