In [3]:
# import rasterio
# import dempy
# import numpy as np
# import pandas as pd
# import geopandas as gpd
import rasterio
import numpy as np
import os
print("import modules")

# Load DEM and Wind Direction Raster
dem_path = r"D:\ASOML\Rockies\features\ASO_CON_dem_albn83_60m.tif"
wind_dir_path = r"D:\ASOML\Rockies\ASO_CON_windDir_60m_albn83.tif"

with rasterio.open(dem_path) as dem_src, rasterio.open(wind_dir_path) as wind_src:
    dem = dem_src.read(1)  # Elevation values
    wind_dir = wind_src.read(1)  # Wind direction (in degrees)
    profile = dem_src.profile
    transform = dem_src.transform
    res = transform[0]  # Cell resolution (assuming square pixels)
    print("rasterio opened")

# Convert Wind Direction to Radians
wind_rad = np.deg2rad(wind_dir)

# Initialize output raster
rows, cols = dem.shape
sx = np.full_like(dem, np.nan, dtype=np.float32)  # Wind shelter index raster
print("rows and cols")

# Define maximum search distance (200m)
max_distance = 200  # meters
max_pixels = int(max_distance / res)
print("distance")

# Compute Wind Shelter Index (Sx) for each cell
for i in range(rows):
    for j in range(cols):
        if np.isnan(dem[i, j]) or np.isnan(wind_rad[i, j]):
            continue  # Skip no-data values
        
        max_slope = -np.inf  # Initialize the maximum slope found

        # Search in the direction specified by the wind direction raster
        for d in range(1, max_pixels + 1):  # Search up to 200m
            dx = int(np.round(d * np.cos(wind_rad[i, j])))  # X-offset
            dy = int(np.round(d * np.sin(wind_rad[i, j])))  # Y-offset
            ni, nj = i + dy, j + dx  # Neighboring pixel

            if 0 <= ni < rows and 0 <= nj < cols:
                slope = (dem[ni, nj] - dem[i, j]) / (d * res)  # Compute slope
                max_slope = max(max_slope, slope)  # Keep max upwind slope

        sx[i, j] = max_slope if max_slope != -np.inf else np.nan  # Assign value

# Save Wind Shelter Index (Sx) Raster
output_path = r"D:\ASOML\Rockies\features\ASOML_CON_windScour_60_albn83.tif"
profile.update(dtype=rasterio.float32, nodata=np.nan)
with rasterio.open(output_path, 'w', **profile) as dst:
    dst.write(sx, 1)

print(f"Saved wind shelter index raster following Winstral et al. (2002) to {output_path}")


import modules
rasterio opened
rows and cols
distance


  slope = (dem[ni, nj] - dem[i, j]) / (d * res)  # Compute slope


Saved wind shelter index raster following Winstral et al. (2002) to D:\ASOML\Rockies\features\ASOML_CON_windScour_60_albn83.tif


In [4]:
import numpy as np
import rasterio

def compute_winstral_sx_raster(dem, wind_dir_deg, cellsize, dmax=200, sector_width=30, angle_step=5):
    """
    Compute the Winstral Sx index with a per-pixel wind direction raster.

    Parameters:
        dem (np.ndarray): 2D array of elevation values
        wind_dir_deg (np.ndarray): 2D array of wind direction (degrees)
        cellsize (float): Pixel resolution (meters)
        dmax (float): Max distance to search (meters)
        sector_width (float): Width of the wind direction sector (degrees)
        angle_step (float): Angle increment within sector (degrees)

    Returns:
        sx (np.ndarray): 2D array of Sx index values
    """
    rows, cols = dem.shape
    max_pixels = int(dmax / cellsize)
    sx = np.full_like(dem, np.nan, dtype=np.float32)

    sector_angles = np.arange(-sector_width / 2, sector_width / 2 + angle_step, angle_step)

    for i in range(rows):
        for j in range(cols):
            if np.isnan(dem[i, j]) or np.isnan(wind_dir_deg[i, j]):
                continue

            local_wind_deg = wind_dir_deg[i, j]
            local_slopes = []

            for delta_deg in sector_angles:
                angle_deg = (local_wind_deg + delta_deg) % 360
                angle_rad = np.deg2rad(angle_deg)

                for d in range(1, max_pixels + 1):
                    dx = int(np.round(d * np.cos(angle_rad)))
                    dy = int(np.round(d * np.sin(angle_rad)))
                    ni, nj = i + dy, j + dx

                    if 0 <= ni < rows and 0 <= nj < cols and not np.isnan(dem[ni, nj]):
                        elev_diff = dem[ni, nj] - dem[i, j]
                        slope = elev_diff / (d * cellsize)
                        local_slopes.append(slope)

            if local_slopes:
                local_slopes = np.array(local_slopes)
                max_slope = np.nanmax(local_slopes)
                min_slope = np.nanmin(local_slopes)
                sx_val = np.arctan(min_slope) if -min_slope > max_slope else np.arctan(max_slope)
                sx[i, j] = sx_val

    return sx


In [None]:
# Load DEM and Wind Direction Raster
dem_path = r"D:\ASOML\Rockies\features\ASO_CON_dem_albn83_60m.tif"
wind_path = r"D:\ASOML\Rockies\ASO_CON_windDir_60m_albn83.tif"

with rasterio.open(dem_path) as dem_src, rasterio.open(wind_path) as wind_src:
    dem = dem_src.read(1)
    wind_dir = wind_src.read(1)
    profile = dem_src.profile
    transform = dem_src.transform
    cellsize = transform[0]

# Run function
sx_result = compute_winstral_sx_raster(dem, wind_dir, cellsize)

# Save output raster
output_path = r"D:\ASOML\Rockies\features\ASOML_CON_windScour_60_albn83_adjusts.tif"
profile.update(dtype=rasterio.float32, nodata=np.nan)
with rasterio.open(output_path, 'w', **profile) as dst:
    dst.write(sx_result, 1)

print("Saved enhanced Winstral Sx raster with per-pixel wind direction.")


  elev_diff = dem[ni, nj] - dem[i, j]
