diff --git a/data/wbd/clip_vectors_to_wbd.py b/data/wbd/clip_vectors_to_wbd.py index ede7a6526..5d4b1ebbe 100755 --- a/data/wbd/clip_vectors_to_wbd.py +++ b/data/wbd/clip_vectors_to_wbd.py @@ -1,17 +1,83 @@ #!/usr/bin/env python3 import argparse +import logging import sys import geopandas as gpd import pandas as pd import rasterio as rio -from shapely.geometry import MultiPolygon, Polygon +from shapely.geometry import LineString, MultiPolygon, Point, Polygon +from shapely.ops import nearest_points from utils.shared_functions import getDriver -# from utils.shared_variables import DEFAULT_FIM_PROJECTION_CRS +gpd.options.io_engine = "pyogrio" + + +def extend_outlet_streams(streams, wbd_buffered, wbd): + """ + Extend outlet streams to nearest buffered WBD boundary + """ + + wbd['geometry'] = wbd.geometry.boundary + wbd = gpd.GeoDataFrame(data=wbd, geometry='geometry') + + wbd_buffered["linegeom"] = wbd_buffered.geometry + + # Select only the streams that are outlets + levelpath_outlets = streams[streams['to'] == 0] + + # Select only the streams that don't intersect the WBD boundary line + levelpath_outlets = levelpath_outlets[~levelpath_outlets.intersects(wbd['geometry'].iloc[0])] + + levelpath_outlets['nearest_point'] = None + levelpath_outlets['last'] = None + + # print(list(levelpath_outlets.columns)) + + levelpath_outlets = levelpath_outlets.explode(index_parts=False) + + for index, row in levelpath_outlets.iterrows(): + coords = [(coords) for coords in list(row['geometry'].coords)] + last_coord = coords[-1] + levelpath_outlets.at[index, 'last'] = Point(last_coord) + + wbd_buffered['geometry'] = wbd_buffered.geometry.boundary + wbd_buffered = gpd.GeoDataFrame(data=wbd_buffered, geometry='geometry') + + for index, row in levelpath_outlets.iterrows(): + levelpath_geom = row['last'] + nearest_point = nearest_points(levelpath_geom, wbd_buffered) + + levelpath_outlets.at[index, 'nearest_point'] = nearest_point[1]['geometry'].iloc[0] + + levelpath_outlets_nearest_points = levelpath_outlets.at[index, 'nearest_point'] + if isinstance(levelpath_outlets_nearest_points, pd.Series): + levelpath_outlets_nearest_points = levelpath_outlets_nearest_points.iloc[-1] + + # tlist = list([levelpath_outlets.at[index, 'nearest_point'].coords[0]]) + # tlist = list([levelpath_outlets.at[index, 'nearest_point'].row.geometry.get_coordinates().oords[0]]) + + levelpath_outlets.at[index, 'geometry'] = LineString( + list(row['geometry'].coords) + list([levelpath_outlets_nearest_points.coords[0]]) + ) + + # levelpath_outlets.at[index, 'geometry'] = LineString( + # list(row.geometry.get_coordinates()) + list([levelpath_outlets.at[index, 'nearest_point'].coords[0]]) + # ) + + # geometry.get_coordinates() + + levelpath_outlets = gpd.GeoDataFrame(data=levelpath_outlets, geometry='geometry') + levelpath_outlets = levelpath_outlets.drop(columns=['last', 'nearest_point']) + + # Replace the streams in the original file with the extended streams + streams = streams[~streams['ID'].isin(levelpath_outlets['ID'])] + streams = pd.concat([streams, levelpath_outlets], ignore_index=True) + + return streams def subset_vector_layers( @@ -40,7 +106,8 @@ def subset_vector_layers( subset_levee_protected_areas, huc_CRS, ): - print(f"Getting Cell Size for {hucCode}", flush=True) + + # print(f"Getting Cell Size for {hucCode}", flush=True) with rio.open(dem_filename) as dem_raster: dem_cellsize = max(dem_raster.res) @@ -48,44 +115,71 @@ def subset_vector_layers( dem_domain = gpd.read_file(dem_domain, engine="pyogrio", use_arrow=True) # Get wbd buffer - print(f"Create wbd buffer for {hucCode}", flush=True) + logging.info(f"Create wbd buffer for {hucCode}") wbd_buffer = wbd.copy() wbd_buffer.geometry = wbd_buffer.geometry.buffer(wbd_buffer_distance, resolution=32) wbd_buffer = gpd.clip(wbd_buffer, dem_domain) + # Clip ocean water polygon for future masking ocean areas (where applicable) + logging.info(f"Clip ocean water polygon for {hucCode}") + landsea = gpd.read_file(landsea, mask=wbd_buffer, engine="fiona") + if not landsea.empty: + # print(f"Create landsea gpkg for {hucCode}", flush=True) + landsea.to_file( + subset_landsea, driver=getDriver(subset_landsea), index=False, crs=huc_CRS, engine="fiona" + ) + + # Exclude landsea area from WBD and wbd_buffer + wbd = wbd.overlay(landsea, how='difference') + wbd.to_file( + wbd_filename, + layer='WBDHU8', + driver=getDriver(wbd_filename), + index=False, + crs=huc_CRS, + engine="fiona", + ) + + wbd_buffer = wbd_buffer.overlay(landsea, how='difference') + + del landsea + # Make the streams buffer smaller than the wbd_buffer so streams don't reach the edge of the DEM + logging.info(f"Create stream buffer for {hucCode}") wbd_streams_buffer = wbd_buffer.copy() wbd_streams_buffer.geometry = wbd_streams_buffer.geometry.buffer(-8 * dem_cellsize, resolution=32) wbd_buffer = wbd_buffer[['geometry']] wbd_streams_buffer = wbd_streams_buffer[['geometry']] - wbd_buffer.to_file(wbd_buffer_filename, driver=getDriver(wbd_buffer_filename), index=False, crs=huc_CRS) + wbd_buffer.to_file( + wbd_buffer_filename, driver=getDriver(wbd_buffer_filename), index=False, crs=huc_CRS, engine="fiona" + ) wbd_streams_buffer.to_file( - wbd_streams_buffer_filename, driver=getDriver(wbd_streams_buffer_filename), index=False, crs=huc_CRS + wbd_streams_buffer_filename, + driver=getDriver(wbd_streams_buffer_filename), + index=False, + crs=huc_CRS, + engine="fiona", ) - # Clip ocean water polygon for future masking ocean areas (where applicable) - landsea = gpd.read_file(landsea, mask=wbd_buffer) - if not landsea.empty: - print(f"Create landsea gpkg for {hucCode}", flush=True) - landsea.to_file(subset_landsea, driver=getDriver(subset_landsea), index=False, crs=huc_CRS) - del landsea - # Clip levee-protected areas polygons for future masking ocean areas (where applicable) - print(f"Subsetting Levee Protected Areas for {hucCode}", flush=True) - levee_protected_areas = gpd.read_file(levee_protected_areas, mask=wbd_buffer) + # print(f"Subsetting Levee Protected Areas for {hucCode}", flush=True) + logging.info(f"Clip levee-protected areas for {hucCode}") + levee_protected_areas = gpd.read_file(levee_protected_areas, mask=wbd_buffer, engine="fiona") if not levee_protected_areas.empty: levee_protected_areas.to_file( subset_levee_protected_areas, driver=getDriver(subset_levee_protected_areas), index=False, crs=huc_CRS, + engine="fiona", ) del levee_protected_areas # Find intersecting lakes and writeout - print(f"Subsetting NWM Lakes for {hucCode}", flush=True) - nwm_lakes = gpd.read_file(nwm_lakes, mask=wbd_buffer) + # print(f"Subsetting NWM Lakes for {hucCode}", flush=True) + logging.info(f"Subsetting NWM Lakes for {hucCode}") + nwm_lakes = gpd.read_file(nwm_lakes, mask=wbd_buffer, engine="fiona") nwm_lakes = nwm_lakes.loc[nwm_lakes.Shape_Area < 18990454000.0] if not nwm_lakes.empty: @@ -97,63 +191,79 @@ def subset_vector_layers( # Loop through the filled polygons and insert the new geometry for i in range(len(nwm_lakes_fill_holes.geoms)): nwm_lakes.loc[i, 'geometry'] = nwm_lakes_fill_holes.geoms[i] - nwm_lakes.to_file(subset_nwm_lakes, driver=getDriver(subset_nwm_lakes), index=False, crs=huc_CRS) + nwm_lakes.to_file( + subset_nwm_lakes, driver=getDriver(subset_nwm_lakes), index=False, crs=huc_CRS, engine="fiona" + ) del nwm_lakes # Find intersecting levee lines - print(f"Subsetting NLD levee lines for {hucCode}", flush=True) - nld_lines = gpd.read_file(nld_lines, mask=wbd_buffer) + logging.info(f"Subsetting NLD levee lines for {hucCode}") + nld_lines = gpd.read_file(nld_lines, mask=wbd_buffer, engine="fiona") if not nld_lines.empty: - nld_lines.to_file(subset_nld_lines, driver=getDriver(subset_nld_lines), index=False, crs=huc_CRS) + nld_lines.to_file( + subset_nld_lines, driver=getDriver(subset_nld_lines), index=False, crs=huc_CRS, engine="fiona" + ) del nld_lines # Preprocessed levee lines for burning - nld_lines_preprocessed = gpd.read_file(nld_lines_preprocessed, mask=wbd_buffer) + nld_lines_preprocessed = gpd.read_file(nld_lines_preprocessed, mask=wbd_buffer, engine="fiona") if not nld_lines_preprocessed.empty: nld_lines_preprocessed.to_file( subset_nld_lines_preprocessed, driver=getDriver(subset_nld_lines_preprocessed), index=False, crs=huc_CRS, + engine="fiona", ) del nld_lines_preprocessed # Subset NWM headwaters - print(f"Subsetting NWM Headwater Points for {hucCode}", flush=True) - nwm_headwaters = gpd.read_file(nwm_headwaters, mask=wbd_streams_buffer) + logging.info(f"Subsetting NWM Headwater Points for {hucCode}") + nwm_headwaters = gpd.read_file(nwm_headwaters, mask=wbd_streams_buffer, engine="fiona") if len(nwm_headwaters) > 0: nwm_headwaters.to_file( - subset_nwm_headwaters, driver=getDriver(subset_nwm_headwaters), index=False, crs=huc_CRS + subset_nwm_headwaters, + driver=getDriver(subset_nwm_headwaters), + index=False, + crs=huc_CRS, + engine="fiona", ) else: print("No headwater point(s) within HUC " + str(hucCode) + " boundaries.") + logging.info("No headwater point(s) within HUC " + str(hucCode) + " boundaries.") sys.exit(0) del nwm_headwaters # Find intersecting nwm_catchments - print(f"Subsetting NWM Catchments for {hucCode}", flush=True) - nwm_catchments = gpd.read_file(nwm_catchments, mask=wbd_buffer) + # print(f"Subsetting NWM Catchments for {hucCode}", flush=True) + nwm_catchments = gpd.read_file(nwm_catchments, mask=wbd_buffer, engine="fiona") if len(nwm_catchments) > 0: nwm_catchments.to_file( - subset_nwm_catchments, driver=getDriver(subset_nwm_catchments), index=False, crs=huc_CRS + subset_nwm_catchments, + driver=getDriver(subset_nwm_catchments), + index=False, + crs=huc_CRS, + engine="fiona", ) else: print("No NWM catchments within HUC " + str(hucCode) + " boundaries.") + logging.info("No NWM catchments within HUC " + str(hucCode) + " boundaries.") sys.exit(0) del nwm_catchments # Subset nwm streams - print(f"Subsetting NWM Streams for {hucCode}", flush=True) - - nwm_streams = gpd.read_file(nwm_streams, mask=wbd_buffer) + logging.info(f"Subsetting NWM Streams for {hucCode}") + nwm_streams = gpd.read_file(nwm_streams, mask=wbd_buffer, engine="fiona") # NWM can have duplicate records, but appear to always be identical duplicates nwm_streams.drop_duplicates(subset="ID", keep="first", inplace=True) + nwm_streams = extend_outlet_streams(nwm_streams, wbd_buffer, wbd) + nwm_streams_outlets = nwm_streams[~nwm_streams['to'].isin(nwm_streams['ID'])] nwm_streams_nonoutlets = nwm_streams[nwm_streams['to'].isin(nwm_streams['ID'])] @@ -177,10 +287,11 @@ def subset_vector_layers( nwm_streams = pd.concat([nwm_streams_nonoutlets, nwm_streams_outlets]) nwm_streams.to_file( - subset_nwm_streams, driver=getDriver(subset_nwm_streams), index=False, crs=huc_CRS + subset_nwm_streams, driver=getDriver(subset_nwm_streams), index=False, crs=huc_CRS, engine="fiona" ) else: print("No NWM stream segments within HUC " + str(hucCode) + " boundaries.") + logging.info("No NWM stream segments within HUC " + str(hucCode) + " boundaries.") sys.exit(0) del nwm_streams @@ -230,7 +341,7 @@ def subset_vector_layers( '-lps', '--subset-levee-protected-areas', help='Levee-protected areas subset', required=True ) - parser.add_argument('-crs', '--huc-crs', help='HUC crs', required=True) + parser.add_argument('-crs', '--huc-CRS', help='HUC crs', required=True) args = vars(parser.parse_args()) diff --git a/data/wbd/generate_pre_clip_fim_huc8.py b/data/wbd/generate_pre_clip_fim_huc8.py index d1391aaf7..6bf2ea581 100755 --- a/data/wbd/generate_pre_clip_fim_huc8.py +++ b/data/wbd/generate_pre_clip_fim_huc8.py @@ -6,11 +6,15 @@ import os import shutil import subprocess -from multiprocessing import Pool +import traceback +from concurrent.futures import ProcessPoolExecutor, as_completed +from pathlib import Path from clip_vectors_to_wbd import subset_vector_layers from dotenv import load_dotenv +from utils.shared_functions import FIM_Helpers as fh + ''' Overview: @@ -27,8 +31,8 @@ Usage: generate_pre_clip_fim_huc8.py - -n /data/inputs/pre_clip_huc8/24_3_20 - -u /data/inputs/huc_lists/included_huc8.lst + -n /data/inputs/pre_clip_huc8/24_04_23 + -u /data/inputs/huc_lists/included_huc8_withAlaska.lst -j 6 -o @@ -36,7 +40,7 @@ If running this script to generate new data, modify the pre_clip_huc_dir variable in src/bash_variables.env to the corresponding outputs_dir argument after running and testing this script. The newly generated data should be created in a new folder using the format - (i.e. September 26, 2023 would be 23_9_26) + (i.e. September 26, 2023 would be 23_09_26) ''' srcDir = os.getenv('srcDir') @@ -84,7 +88,7 @@ wbd_buffer_int = int(wbd_buffer) -def __setup_logger(outputs_dir): +def __setup_logger(outputs_dir, huc=None): ''' Set up logging to file. Since log file includes the date, it will be overwritten if this script is run more than once on the same day. @@ -92,7 +96,10 @@ def __setup_logger(outputs_dir): datetime_now = dt.datetime.now(dt.timezone.utc) curr_date = datetime_now.strftime("%m_%d_%Y") - log_file_name = f"generate_pre_clip_fim_huc8_{curr_date}.log" + if huc is None: + log_file_name = f"generate_pre_clip_fim_huc8_{curr_date}.log" + else: + log_file_name = f"mp_{huc}_generate_pre_clip_fim_huc8_{curr_date}.log" log_file_path = os.path.join(outputs_dir, log_file_name) @@ -118,6 +125,22 @@ def __setup_logger(outputs_dir): logging.info(f"\n \t Started: {start_time_string} \n") +def __merge_mp_logs(outputs_dir): + log_file_list = list(Path(outputs_dir).rglob("mp_*")) + if len(log_file_list) > 0: + log_file_list.sort() + + log_mp_rollup_file = os.path.join(outputs_dir, "mp_merged_logs.log") + + with open(log_mp_rollup_file, 'a') as main_log: + # Iterate through list + for temp_log_file in log_file_list: + # Open each file in read mode + with open(temp_log_file) as infile: + main_log.write(infile.read()) + os.remove(temp_log_file) + + def pre_clip_hucs_from_wbd(outputs_dir, huc_list, number_of_jobs, overwrite): ''' The function is the main driver of the program to iterate and parallelize writing @@ -185,8 +208,6 @@ def pre_clip_hucs_from_wbd(outputs_dir, huc_list, number_of_jobs, overwrite): elif not os.path.isdir(os.path.join(outputs_dir, huc)): os.mkdir(os.path.join(outputs_dir, huc)) - logging.info(f"Created directory: {outputs_dir}/{huc}, huc level files will be written there.") - print(f"Created directory: {outputs_dir}/{huc}, huc level files will be written there.") # Build arguments (procs_list) for each process to execute (huc_level_clip_vectors_to_wbd) procs_list = [] @@ -199,8 +220,23 @@ def pre_clip_hucs_from_wbd(outputs_dir, huc_list, number_of_jobs, overwrite): # Parallelize each huc in hucs_to_parquet_list logging.info('Parallelizing HUC level wbd pre-clip vector creation. ') print('Parallelizing HUC level wbd pre-clip vector creation. ') - with Pool(processes=number_of_jobs) as pool: - pool.map(huc_level_clip_vectors_to_wbd, procs_list) + # with Pool(processes=number_of_jobs) as pool: + # pool.map(huc_level_clip_vectors_to_wbd, procs_list) + + with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: + futures = {} + for huc in hucs_to_pre_clip_list: + args = {"huc": huc, "outputs_dir": outputs_dir} + future = executor.submit(huc_level_clip_vectors_to_wbd, **args) + futures[future] = future + + for future in as_completed(futures): + if future is not None: + if future.exception(): + raise future.exception() + + print("Merging MP log files") + __merge_mp_logs(outputs_dir) # Get time metrics end_time = dt.datetime.now(dt.timezone.utc) @@ -220,7 +256,7 @@ def pre_clip_hucs_from_wbd(outputs_dir, huc_list, number_of_jobs, overwrite): print(f"\t \t TOTAL RUN TIME: {str(time_duration).split('.')[0]}") -def huc_level_clip_vectors_to_wbd(args): +def huc_level_clip_vectors_to_wbd(huc, outputs_dir): ''' Create pre-clipped vectors at the huc level. Necessary to have this as an additional function for multiprocessing. This is mostly a wrapper for the subset_vector_layers() method in @@ -242,182 +278,197 @@ def huc_level_clip_vectors_to_wbd(args): - .gpkg files* dependant on HUC's WBD (*differing amount based on individual huc) ''' - # We have to explicitly unpack the args from pool.map() - huc = args[0] - outputs_dir = args[1] - - huc_directory = os.path.join(outputs_dir, huc) + huc_processing_start = dt.datetime.now(dt.timezone.utc) + # with this in Multi-proc, it needs it's own logger and unique logging file. + __setup_logger(outputs_dir, huc) + logging.info(f"Processing {huc}") + + try: + + huc_directory = os.path.join(outputs_dir, huc) + + # SET VARIABLES AND FILE INPUTS # + hucUnitLength = len(huc) + huc2Identifier = huc[:2] + + # Check whether the HUC is in Alaska or not and assign the CRS and filenames accordingly + if huc2Identifier == '19': + huc_CRS = ALASKA_CRS + input_NHD_WBHD_layer = 'WBD_National_South_Alaska' + input_WBD_filename = input_WBD_gdb_Alaska + wbd_gpkg_path = f'{inputsDir}/wbd/WBD_National_South_Alaska.gpkg' + else: + huc_CRS = DEFAULT_FIM_PROJECTION_CRS + input_NHD_WBHD_layer = f"WBDHU{hucUnitLength}" + input_WBD_filename = input_WBD_gdb + wbd_gpkg_path = f'{inputsDir}/wbd/WBD_National.gpkg' + + # Define the landsea water body mask using either Great Lakes or Ocean polygon input # + if huc2Identifier == "04": + input_LANDSEA = f"{input_GL_boundaries}" + # print(f'Using {input_LANDSEA} for water body mask (Great Lakes)') + elif huc2Identifier == "19": + input_LANDSEA = f"{inputsDir}/landsea/water_polygons_alaska.gpkg" + # print(f'Using {input_LANDSEA} for water body mask (Alaska)') + else: + input_LANDSEA = f"{inputsDir}/landsea/water_polygons_us.gpkg" + + print(f"\n Get WBD {huc}") + + # TODO: Use Python API (osgeo.ogr) instead of using ogr2ogr executable + get_wbd_subprocess = subprocess.run( + [ + 'ogr2ogr', + '-f', + 'GPKG', + '-t_srs', + huc_CRS, + f'{huc_directory}/wbd.gpkg', + input_WBD_filename, + input_NHD_WBHD_layer, + '-where', + f"HUC{hucUnitLength}='{huc}'", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + universal_newlines=True, + ) - # SET VARIABLES AND FILE INPUTS # - hucUnitLength = len(huc) - huc2Identifier = huc[:2] + logging.info(f"{huc} : {get_wbd_subprocess.stdout}") + + if get_wbd_subprocess.stderr != "": + if "ERROR" in get_wbd_subprocess.stderr.upper(): + msg = ( + f" - Creating -- {huc_directory}/wbd.gpkg" + f" ERROR -- details: ({get_wbd_subprocess.stderr})" + ) + print(msg) + logging.info(msg) + else: + msg = f" - Creating -- {huc_directory}/wbd.gpkg - Complete \n" + print(msg) + logging.info(msg) - # Check whether the HUC is in Alaska or not and assign the CRS and filenames accordingly - if huc2Identifier == '19': - huc_CRS = ALASKA_CRS - input_NHD_WBHD_layer = 'WBD_National_South_Alaska' - input_WBD_filename = input_WBD_gdb_Alaska - wbd_gpkg_path = f'{inputsDir}/wbd/WBD_National_South_Alaska.gpkg' - else: - huc_CRS = DEFAULT_FIM_PROJECTION_CRS - input_NHD_WBHD_layer = f"WBDHU{hucUnitLength}" - input_WBD_filename = input_WBD_gdb - wbd_gpkg_path = f'{inputsDir}/wbd/WBD_National.gpkg' - - # Define the landsea water body mask using either Great Lakes or Ocean polygon input # - if huc2Identifier == "04": - input_LANDSEA = f"{input_GL_boundaries}" - print(f'Using {input_LANDSEA} for water body mask (Great Lakes)') - elif huc2Identifier == "19": - input_LANDSEA = f"{inputsDir}/landsea/water_polygons_alaska.gpkg" - print(f'Using {input_LANDSEA} for water body mask (Alaska)') - else: - input_LANDSEA = f"{inputsDir}/landsea/water_polygons_us.gpkg" - - print(f"\n Get WBD {huc}") - - # TODO: Use Python API (osgeo.ogr) instead of using ogr2ogr executable - get_wbd_subprocess = subprocess.run( - [ - 'ogr2ogr', - '-f', - 'GPKG', - '-t_srs', - huc_CRS, - f'{huc_directory}/wbd.gpkg', - input_WBD_filename, - input_NHD_WBHD_layer, - '-where', - f"HUC{hucUnitLength}='{huc}'", - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True, - universal_newlines=True, - ) + msg = f"Get Vector Layers and Subset {huc}" + # print(msg) + logging.info(msg) - msg = get_wbd_subprocess.stdout - print(msg) - logging.info(msg) + # Subset Vector Layers (after determining whether it's alaska or not) + if huc2Identifier == '19': + # Yes Alaska + subset_vector_layers( + subset_nwm_lakes=f"{huc_directory}/nwm_lakes_proj_subset.gpkg", + subset_nwm_streams=f"{huc_directory}/nwm_subset_streams.gpkg", + hucCode=huc, + subset_nwm_headwaters=f"{huc_directory}/nwm_headwater_points_subset.gpkg", + wbd_buffer_filename=f"{huc_directory}/wbd_buffered.gpkg", + wbd_streams_buffer_filename=f"{huc_directory}/wbd_buffered_streams.gpkg", + wbd_filename=f"{huc_directory}/wbd.gpkg", + dem_filename=input_DEM_Alaska, + dem_domain=input_DEM_domain_Alaska, + nwm_lakes=input_nwm_lakes, + nwm_catchments=input_nwm_catchments_Alaska, + subset_nwm_catchments=f"{huc_directory}/nwm_catchments_proj_subset.gpkg", + nld_lines=input_NLD_Alaska, + nld_lines_preprocessed=input_levees_preprocessed_Alaska, + landsea=input_LANDSEA, + nwm_streams=input_nwm_flows_Alaska, + subset_landsea=f"{huc_directory}/LandSea_subset.gpkg", + nwm_headwaters=input_nwm_headwaters_Alaska, + subset_nld_lines=f"{huc_directory}/nld_subset_levees.gpkg", + subset_nld_lines_preprocessed=f"{huc_directory}/3d_nld_subset_levees_burned.gpkg", + wbd_buffer_distance=wbd_buffer_int, + levee_protected_areas=input_nld_levee_protected_areas_Alaska, + subset_levee_protected_areas=f"{huc_directory}/LeveeProtectedAreas_subset.gpkg", + huc_CRS=huc_CRS, # TODO: simplify + ) - if get_wbd_subprocess.stderr != "": - if "ERROR" in get_wbd_subprocess.stderr.upper(): - msg = ( - f" - Creating -- {huc_directory}/wbd.gpkg" - f" ERROR -- details: ({get_wbd_subprocess.stderr})" + else: + # Not Alaska + subset_vector_layers( + subset_nwm_lakes=f"{huc_directory}/nwm_lakes_proj_subset.gpkg", + subset_nwm_streams=f"{huc_directory}/nwm_subset_streams.gpkg", + hucCode=huc, + subset_nwm_headwaters=f"{huc_directory}/nwm_headwater_points_subset.gpkg", + wbd_buffer_filename=f"{huc_directory}/wbd_buffered.gpkg", + wbd_streams_buffer_filename=f"{huc_directory}/wbd_buffered_streams.gpkg", + wbd_filename=f"{huc_directory}/wbd.gpkg", + dem_filename=input_DEM, + dem_domain=input_DEM_domain, + nwm_lakes=input_nwm_lakes, + nwm_catchments=input_nwm_catchments, + subset_nwm_catchments=f"{huc_directory}/nwm_catchments_proj_subset.gpkg", + nld_lines=input_NLD, + nld_lines_preprocessed=input_levees_preprocessed, + landsea=input_LANDSEA, + nwm_streams=input_nwm_flows, + subset_landsea=f"{huc_directory}/LandSea_subset.gpkg", + nwm_headwaters=input_nwm_headwaters, + subset_nld_lines=f"{huc_directory}/nld_subset_levees.gpkg", + subset_nld_lines_preprocessed=f"{huc_directory}/3d_nld_subset_levees_burned.gpkg", + wbd_buffer_distance=wbd_buffer_int, + levee_protected_areas=input_nld_levee_protected_areas, + subset_levee_protected_areas=f"{huc_directory}/LeveeProtectedAreas_subset.gpkg", + huc_CRS=huc_CRS, # TODO: simplify ) - print(msg) - logging.error(msg) - else: - msg = f" - Creating -- {huc_directory}/wbd.gpkg - Complete \n" + + msg = f" Completing Get Vector Layers and Subset: {huc} \n" print(msg) logging.info(msg) - msg = f"Get Vector Layers and Subset {huc}" - print(msg) - logging.info(msg) - - # Subset Vector Layers (after determining whether it's alaska or not) - if huc2Identifier == '19': - # Yes Alaska - subset_vector_layers( - subset_nwm_lakes=f"{huc_directory}/nwm_lakes_proj_subset.gpkg", - subset_nwm_streams=f"{huc_directory}/nwm_subset_streams.gpkg", - hucCode=huc, - subset_nwm_headwaters=f"{huc_directory}/nwm_headwater_points_subset.gpkg", - wbd_buffer_filename=f"{huc_directory}/wbd_buffered.gpkg", - wbd_streams_buffer_filename=f"{huc_directory}/wbd_buffered_streams.gpkg", - wbd_filename=f"{huc_directory}/wbd.gpkg", - dem_filename=input_DEM_Alaska, - dem_domain=input_DEM_domain_Alaska, - nwm_lakes=input_nwm_lakes, - nwm_catchments=input_nwm_catchments_Alaska, - subset_nwm_catchments=f"{huc_directory}/nwm_catchments_proj_subset.gpkg", - nld_lines=input_NLD_Alaska, - nld_lines_preprocessed=input_levees_preprocessed_Alaska, - landsea=input_LANDSEA, - nwm_streams=input_nwm_flows_Alaska, - subset_landsea=f"{huc_directory}/LandSea_subset.gpkg", - nwm_headwaters=input_nwm_headwaters_Alaska, - subset_nld_lines=f"{huc_directory}/nld_subset_levees.gpkg", - subset_nld_lines_preprocessed=f"{huc_directory}/3d_nld_subset_levees_burned.gpkg", - wbd_buffer_distance=wbd_buffer_int, - levee_protected_areas=input_nld_levee_protected_areas_Alaska, - subset_levee_protected_areas=f"{huc_directory}/LeveeProtectedAreas_subset.gpkg", - huc_CRS=huc_CRS, # TODO: simplify - ) - - else: - # Not Alaska - subset_vector_layers( - subset_nwm_lakes=f"{huc_directory}/nwm_lakes_proj_subset.gpkg", - subset_nwm_streams=f"{huc_directory}/nwm_subset_streams.gpkg", - hucCode=huc, - subset_nwm_headwaters=f"{huc_directory}/nwm_headwater_points_subset.gpkg", - wbd_buffer_filename=f"{huc_directory}/wbd_buffered.gpkg", - wbd_streams_buffer_filename=f"{huc_directory}/wbd_buffered_streams.gpkg", - wbd_filename=f"{huc_directory}/wbd.gpkg", - dem_filename=input_DEM, - dem_domain=input_DEM_domain, - nwm_lakes=input_nwm_lakes, - nwm_catchments=input_nwm_catchments, - subset_nwm_catchments=f"{huc_directory}/nwm_catchments_proj_subset.gpkg", - nld_lines=input_NLD, - nld_lines_preprocessed=input_levees_preprocessed, - landsea=input_LANDSEA, - nwm_streams=input_nwm_flows, - subset_landsea=f"{huc_directory}/LandSea_subset.gpkg", - nwm_headwaters=input_nwm_headwaters, - subset_nld_lines=f"{huc_directory}/nld_subset_levees.gpkg", - subset_nld_lines_preprocessed=f"{huc_directory}/3d_nld_subset_levees_burned.gpkg", - wbd_buffer_distance=wbd_buffer_int, - levee_protected_areas=input_nld_levee_protected_areas, - subset_levee_protected_areas=f"{huc_directory}/LeveeProtectedAreas_subset.gpkg", - huc_CRS=huc_CRS, # TODO: simplify + ## Clip WBD8 ## + print(f" Creating WBD buffer and clip version {huc}") + + clip_wbd8_subprocess = subprocess.run( + [ + 'ogr2ogr', + '-f', + 'GPKG', + '-t_srs', + huc_CRS, + '-clipsrc', + f'{huc_directory}/wbd_buffered.gpkg', + f'{huc_directory}/wbd8_clp.gpkg', + wbd_gpkg_path, + input_NHD_WBHD_layer, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + universal_newlines=True, ) - msg = f"\n\t Completing Get Vector Layers and Subset: {huc} \n" - print(msg) - logging.info(msg) - - ## Clip WBD8 ## - print(f" Clip WBD {huc}") - - clip_wbd8_subprocess = subprocess.run( - [ - 'ogr2ogr', - '-f', - 'GPKG', - '-t_srs', - huc_CRS, - '-clipsrc', - f'{huc_directory}/wbd_buffered.gpkg', - f'{huc_directory}/wbd8_clp.gpkg', - wbd_gpkg_path, - input_NHD_WBHD_layer, - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True, - universal_newlines=True, - ) + # msg = clip_wbd8_subprocess.stdout + # print(f"{huc} : {msg}") + # logging.info(f"{huc} : {msg}") + + if clip_wbd8_subprocess.stderr != "": + if "ERROR" in clip_wbd8_subprocess.stderr.upper(): + msg = ( + f" - Creating -- {huc_directory}/wbd.gpkg" + f" ERROR -- details: ({clip_wbd8_subprocess.stderr})" + ) + print(msg) + logging.info(msg) + else: + msg = f" - Creating -- {huc_directory}/wbd.gpkg - Complete" + print(msg) + logging.info(msg) - msg = clip_wbd8_subprocess.stdout - print(msg) - logging.info(msg) + except Exception: + print(f"*** An error occurred while processing {huc}") + print(traceback.format_exc()) + logging.info(f"*** An error occurred while processing {huc}") + logging.info(traceback.format_exc()) + print() - if clip_wbd8_subprocess.stderr != "": - if "ERROR" in clip_wbd8_subprocess.stderr.upper(): - msg = ( - f" - Creating -- {huc_directory}/wbd.gpkg" - f" ERROR -- details: ({clip_wbd8_subprocess.stderr})" - ) - print(msg) - logging.error(msg) - else: - msg = f" - Creating -- {huc_directory}/wbd.gpkg - Complete" - print(msg) - logging.info(msg) + huc_processing_end = dt.datetime.now(dt.timezone.utc) + time_duration = huc_processing_end - huc_processing_start + duraton_msg = f"\t \t run time for huc {huc}: is {str(time_duration).split('.')[0]}" + print(duraton_msg) + logging.info(duraton_msg) + return if __name__ == '__main__': @@ -428,7 +479,7 @@ def huc_level_clip_vectors_to_wbd(args): usage=''' ./generate_pre_clip_fim_huc8.py -n /data/inputs/pre_clip_huc8/24_3_20 - -u /data/inputs/huc_lists/included_huc8.lst + -u /data/inputs/huc_lists/included_huc8_withAlaska.lst -j 6 -o ''', @@ -438,7 +489,7 @@ def huc_level_clip_vectors_to_wbd(args): '-n', '--outputs_dir', help='Directory to output all of the HUC level .gpkg files. Use the format: ' - ' (i.e. September 26, 2023 would be 2023_9_26)', + ' (i.e. September 26, 2023 would be 23_09_26)', ) parser.add_argument('-u', '--huc_list', help='List of HUCs to genereate pre-clipped vectors for.') parser.add_argument( diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index dc1d3e722..64be87feb 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,18 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. +## v4.4.16.0 - 2024-05-06 - [PR#1121](https://github.com/NOAA-OWP/inundation-mapping/pull/1121) + +Some NWM streams, particularly in coastal areas, fail to reach the edge of the DEM resulting in reverse flow. This issue was resolved by clipping the ocean mask from the buffered WBD and DEM, and any remaining streams that didn't have outlets reaching the edge of the buffered WBD boundary were extended by snapping the end to the nearest point on the buffered WBD. + +### Changes + +- `data/wbd/clip_vectors_to_wbd.py`: Clips `landsea` ocean mask from the buffered WBD and adds a function to extend outlet streams to the buffered WBD +- `data/wbd/clip_vectors_to_wbd.py`: Updated multi-processing and added more logging. + +

+ + ## v4.4.15.4 - 2024-05-06 - [PR#1115](https://github.com/NOAA-OWP/inundation-mapping/pull/1115) This PR addresses issue #1040 and includes the following updates: @@ -142,6 +154,7 @@ The "black" packages is also be upgraded from 23.7.0 to 24.3.

+ ## v4.4.13.1 - 2024-03-11 - [PR#1086](https://github.com/NOAA-OWP/inundation-mapping/pull/1086) Fixes bug where levee-protected areas were not being masked from branch 0 DEMs.