# Homework 3: Modeling flood risk in Pennsylvania using a HAND model
### Notebook and report by Jamie Song for MUSA 6950

### Inundation risk modeling with the HAND model using `PySheds`

We have adapted this notebook from Professor Xiaojiang Li, which demonstrates flood risk assessment using the Height Above Nearest Drainage (HAND) model. By using Digital Elevation Models (DEMs) and calculating the vertical distance of terrain points from the drainage network, the HAND model helps classify areas based on their flood risk potential. 

The workflow is implemented with Python libraries such as `rasterio`, `numpy`, `pysheds`, and `matplotlib`. The notebook covers data preparation, HAND calculation, and flood risk classification.

This adaptation of the notebook was created to generate a HAND model across the Commonwealth of Pennsylvania, which will be used in a report (attached as a PDF).

First, we must install any packages that are not already in the environment.

In [None]:
# uncomment and run chunk to install packages
# !pip install pysheds
# !pip install seaborn
# !pip install rasterio
# !pip install fiona



In [None]:
# import necessary libraries
import numpy as np
import pysheds
from pysheds.grid import Grid
import requests
import os
import matplotlib.pyplot as plt
from matplotlib import colors
import seaborn as sns
import rasterio as rio
from rasterio.merge import merge
from rasterio.mask import mask
import fiona
from fiona.transform import transform_geom
import zipfile
import io


## Download DEM files

In [None]:
# define a function to standardize USGS tile names
def get_tile_name(lat, lon):
    """
    Returns the USGS tile name formatted correctly (e.g., 'n39w076').
    """
    lat_tile = f"n{int(abs(lat)):02d}" if lat >= 0 else f"s{int(abs(lat)):02d}"
    lon_tile = f"w{int(abs(lon)):03d}" if lon < 0 else f"e{int(abs(lon)):03d}"
    return f"{lat_tile}{lon_tile}"

# define a function to retrieve tile names covering a bounding box
def get_tiles_in_bbox(min_lat, max_lat, min_lon, max_lon):
    """
    Returns a list of tile names covering a bounding box.
    """
    tile_list = []
    for lat in range(int(min_lat), int(max_lat) + 1):
        for lon in range(int(min_lon), int(max_lon) + 1):
            tile_list.append(get_tile_name(lat, lon))
    return tile_list

# get tiles for the full extent of PA land area
tiles = get_tiles_in_bbox(39.6, 43.0, -81.0, -74.5)
print("Tiles to Download:", tiles)

Tiles to Download: ['n39w081', 'n39w080', 'n39w079', 'n39w078', 'n39w077', 'n39w076', 'n39w075', 'n39w074', 'n40w081', 'n40w080', 'n40w079', 'n40w078', 'n40w077', 'n40w076', 'n40w075', 'n40w074', 'n41w081', 'n41w080', 'n41w079', 'n41w078', 'n41w077', 'n41w076', 'n41w075', 'n41w074', 'n42w081', 'n42w080', 'n42w079', 'n42w078', 'n42w077', 'n42w076', 'n42w075', 'n42w074', 'n43w081', 'n43w080', 'n43w079', 'n43w078', 'n43w077', 'n43w076', 'n43w075', 'n43w074']


In [None]:
# define a function to download DEM tiles
def download_dem_1arc(tile_name, save_path="."):
    """
    Downloads a 1-arc-second (~30m) DEM tile from USGS TNM.
    
    Parameters:
        tile_name (str): Tile name (e.g., "n39w076").
        save_path (str): Directory to save the DEM file.
    """
    base_url = "https://prd-tnm.s3.amazonaws.com/StagedProducts/Elevation/1/TIFF/current"
    url = f"{base_url}/{tile_name}/USGS_1_{tile_name}.tif"
    
    output_file = os.path.join(save_path, f"{tile_name}.tif")
    
    # Check if the file already exists
    if os.path.exists(output_file):
        print(f"⚠️ File {output_file} already exists. Skipping download.")
        return    
    
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(output_file, "wb") as file:
            for chunk in response.iter_content(1024):
                file.write(chunk)
        print(f"✅ DEM Tile {tile_name} downloaded successfully!")
    else:
        print(f"❌ Error: Tile {tile_name} not found. Check availability.")
        print(url)
# download multiple tiles
save_directory = "DEM_Tiles"
os.makedirs(save_directory, exist_ok=True)

# run function
for tile in tiles:
    download_dem_1arc(tile, save_directory)

✅ DEM Tile n39w081 downloaded successfully!
⚠️ File DEM_Tiles/n39w080.tif already exists. Skipping download.
⚠️ File DEM_Tiles/n39w079.tif already exists. Skipping download.
⚠️ File DEM_Tiles/n39w078.tif already exists. Skipping download.
⚠️ File DEM_Tiles/n39w077.tif already exists. Skipping download.
⚠️ File DEM_Tiles/n39w076.tif already exists. Skipping download.
⚠️ File DEM_Tiles/n39w075.tif already exists. Skipping download.
❌ Error: Tile n39w074 not found. Check availability.
https://prd-tnm.s3.amazonaws.com/StagedProducts/Elevation/1/TIFF/current/n39w074/USGS_1_n39w074.tif
✅ DEM Tile n40w081 downloaded successfully!
⚠️ File DEM_Tiles/n40w080.tif already exists. Skipping download.
⚠️ File DEM_Tiles/n40w079.tif already exists. Skipping download.
⚠️ File DEM_Tiles/n40w078.tif already exists. Skipping download.
⚠️ File DEM_Tiles/n40w077.tif already exists. Skipping download.
⚠️ File DEM_Tiles/n40w076.tif already exists. Skipping download.
⚠️ File DEM_Tiles/n40w075.tif already exists

In [None]:
# build a pipeline to calculate HAND model for each tile
def process_dem_tiles(input_dir, output_dir):
    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Loop through all files in the input directory
    for file_name in os.listdir(input_dir):
        if file_name.endswith('.tif'):
            # Construct full file path
            tif_file = os.path.join(input_dir, file_name)
            
            # Extract the base name of the file (without directory and extension)
            base_name = os.path.splitext(file_name)[0]

            # Instantiate grid from raster
            grid = Grid.from_raster(tif_file)
            dem = grid.read_raster(tif_file)

            # fill pits in DEM
            pit_filled_dem = grid.fill_pits(dem)

            # fill depressions in DEM
            flooded_dem = grid.fill_depressions(pit_filled_dem)

            # resolve flats in DEM
            inflated_dem = grid.resolve_flats(flooded_dem)

            # specify directional mapping
            dirmap = (64, 128, 1, 2, 4, 8, 16, 32)

            # compute flow directions
            fdir = grid.flowdir(inflated_dem, dirmap=dirmap)

            # calculate flow accumulation
            acc = grid.accumulation(fdir, dirmap=dirmap)

            # compute height above nearest drainage
            hand = grid.compute_hand(fdir, dem, acc > 200)

            # save HAND raster to new file
            output_file = os.path.join(output_dir, f'{base_name}_HAND.tif')
            grid.to_raster(hand, output_file)

# run the process
process_dem_tiles('DEM_Tiles', 'HAND_Tiles')

In [None]:
# define HAND tiles folder and output TIFF file
inputfolder= 'HAND_Tiles'
outfile = 'mosaiced_HAND.tif'

# initialize list to mosaic the raster tiles
tiflist = []

for file in os.listdir(inputfolder):    
    if file.endswith('.tif'):
        tiffile = os.path.join(inputfolder, file)
        tiflist.append(tiffile)

src_files_to_mosaic = []

for fp in tiflist:
    try:
        tile_src = rio.open(fp)
        tile_bounds = tile_src.bounds
        
        src_files_to_mosaic.append(tile_src)
    except:
        continue

print('The number of mosaiced tiles is:', len(src_files_to_mosaic))

# the method can be set as min, max
mosaic, out_trans = merge(src_files_to_mosaic) #, method='max'
print('You have mosaiced the results.')

# prepare the schema of the output mosaicied image
out_meta = tile_src.meta.copy()
out_meta.update({"driver": "GTiff",
                  "height": mosaic.shape[1],
                  "width": mosaic.shape[2],
                  "transform": out_trans, 
                  "compress": 'lzw',
                  'BIGTIFF': 'YES'
                  }
               )

# write the mosaicied result to the output file
with rio.open(outfile, "w", **out_meta) as dest:
     dest.write(mosaic)


The number of mosaiced tiles is: 38
You have mosaiced the results.


## Mask to PA state boundary

Here, we manually download the PA state boundary from the [PennDOT Open Data site](https://data-pennshare.opendata.arcgis.com/maps/28547b42b7c640ef83c9e515c7c1a6a7) and load it into our project. We will also check its CRS after retrieving it.

In [97]:
import geopandas as gpd
PA_boundary = 'Pennsylvania_State_Boundary/Pennsylvania_State_Boundary.shp'
gdf = gpd.read_file(PA_boundary)
print(gdf.crs)

EPSG:3857


Now, we can proceed to mask the raster using the boundary we loaded.

In [None]:
input_value_raster = 'mosaiced_HAND.tif'
#PA_boundary = 'Pennsylvania_State_Boundary/Pennsylvania_State_Boundary.shp'
out_raster = 'maskedPA_HAND.tif'

# define the source and target CRS
source_crs = 'EPSG:3857'
target_crs = 'EPSG:4269'

# use fiona to open the PA boundary 
with fiona.open(PA_boundary, "r") as shp:
    shapes = transform_geom(source_crs, target_crs, [feature["geometry"] for feature in shp])

# use rasterio to mask the raster
with rio.open(input_value_raster) as src:
    out_image, out_transform = mask(src, shapes, crop=True)
    out_meta = src.meta

# configure and write output
out_meta.update({"driver": "GTiff",
                 "height": out_image.shape[1],
                 "width": out_image.shape[2],
                 "compress": 'lzw',
                 "transform": out_transform})

with rio.open(out_raster, "w", **out_meta) as dest:
    dest.write(out_image)    

Please proceed to view the PDF of the report detailing the proceeding steps after the output file is generated.