In [1]:
import os
import re
import pandas as pd
import shutil

import h5py
import numpy as np
import geopandas as gpd
import pandas as pd
from shapely.geometry import Polygon, LineString
import math

from datetime import datetime, timedelta

In [2]:
# ---- Constants from config file ----
str_boundary_to_edit = "Emitter1"

# average time lookback to determine cells wsel 'stability' in hours
int_time_rolling_avg = 4

# additional time to extend run (in hours)
int_buffer_time = 5

# low flow in cfs for stability runs
flt_base_flow = 500

# model hydrofabric geopackage
str_model_hydrofabric_gpkg = r'E:\sample_2d_output\BLE_LBSG_501_p02\model_hydrofabric.gpkg'

# directory containing the HEC-RAS files to spawn new runs/geometry
str_ras_path = r'E:\HECRAS_2D_12070205\base_model_20240414_copy'

int_geom_to_copy = 1

In [3]:
# ---------------
def fn_format_flow_as_string(flt_flow):
    # format a flow value to no more than 5 characters
    # if bigger than 100000 ... returns 1.0e5
    
    # works up to 990 million cfs
    if flt_flow >= 100000:
        formatted_flow = "{:.1e}".format(flt_flow)
        parts = formatted_flow.split('e')
        formatted_flow = "{}e{}".format(parts[0], int(parts[1]))  # Reconstructing the notation
        if len(formatted_flow) > 8:  # Ensuring the total length is 8 characters
            formatted_flow = "{:.1e}".format(flt_flow).replace("e", "e+")
    else:
        formatted_flow = str(flt_flow)
    
    # returns a string
    return(formatted_flow)
# ---------------

# --------------------
def fn_list_filename_from_ras_prj(str_ras_prj_path, str_line_header):
    # Open the file
    with open(list_proj_title_files[0], 'r') as file:
        # Read lines
        lines = file.readlines()

        # Initialize a list to store File values
        list_names = []

        # Iterate through each line
        for line in lines:
            # Check if the line contains "Unsteady File"
            if str_line_header in line:
                # Extract the value after "Unsteady File="
                value = line.split(str_line_header)[1].strip()
                # Add the value to the list
                list_names.append(value)
        return(list_names)
# --------------------

# ----------
def fn_extract_numbers_from_strings(lst):
    numbers = []
    for item in lst:
        number = re.search(r'\d+', item).group()
        numbers.append(int(number))
    return numbers
# ----------

# --------------
def fn_list_of_file_exists(list_filepaths):
    
    list_b_return = []
    for filepath in list_filepaths:
        if os.path.exists(filepath):
            list_b_return.append(True)
        else:
            list_b_return.append(False)
    return(list_b_return)
# --------------

In [4]:
# Read the geopackage
gdf_area = gpd.read_file(str_model_hydrofabric_gpkg, layer='00_area_2d')
gdf_streams = gpd.read_file(str_model_hydrofabric_gpkg, layer='01_stream_lines')
gdf_mainstems = gpd.read_file(str_model_hydrofabric_gpkg, layer='03_flowpaths_stabilize')

# create a simulation time col in gdf_mainstems
gdf_mainstems['time_sim_hr'] = gdf_mainstems['travel_time_hr'].astype(int) + 1 + int_time_rolling_avg + int_buffer_time

In [5]:
# --------------
def fn_build_plan_names(flt_lower_flow, flt_upper_flow, dict_mainstem):
    str_run_name, str_short_plan = None, None
    
    # systematic naming of plans (if only one flow, flt_upper_flow should be None)
    is_single_flow = False
    
    try:
        if flt_upper_flow == None:
            is_single_flow = True
            
        str_mainstem = str(int(dict_mainstem['mainstem']))
        str_start_node = dict_mainstem['id_start_node']
        int_firehose_time = int(dict_mainstem['time_sim_hr'])
        str_flow =  str(int(flt_lower_flow))

        str_run_name = str_mainstem + "_" + str_start_node + "_" + str(int_firehose_time) + "-" + "hr"
        str_run_name += "_" + str(int(flt_lower_flow)) + "-cfs"

        if not is_single_flow:  
            # add the upper range flow to the description
            str_run_name += "_to_" + str(int(flt_upper_flow)) + "-cfs"

        str_short_plan = str_start_node + '-' + fn_format_flow_as_string(flt_lower_flow)
    except:
        pass
    
    return(str_run_name, str_short_plan)
# --------------

In [6]:
for index, row in gdf_mainstems.iterrows():
    a,b = fn_build_plan_names(flt_base_flow, None, row.to_dict())
    #print(a)
    #print(b)
    #print('-----')

In [7]:
# ----------------
def fn_list_of_ras_projects(str_ras_path):

    list_proj_title_files = []

    # locate the valid HEC-RAS project
    try:
        list_prj_files = [os.path.join(str_ras_path, f) for f in os.listdir(str_ras_path) if f.endswith('.prj')]

        if len(list_prj_files) > 0:
            list_proj_title_files = []

            for file_path in list_prj_files:
                with open(file_path, 'r') as file:
                    contents = file.read()
                    if 'Proj Title' in contents:
                        list_proj_title_files.append(file_path)

            for file_path in list_proj_title_files:
                print(f"Valid HEC-RAS Project: {file_path}")

        else:
            print(f"No HEC-RAS PRJ found in {str_ras_path}")

    except Exception as e:
        print(f"An error occurred: {e}")
        
    return(list_proj_title_files)
# ----------------

In [8]:
# ....................
def fn_copy_geom_file(list_proj_title_files, int_geom_to_copy):
    
    str_copy_to_geom_full_path = None
    str_copy_to_geom_hdf_full_path = None

    # Splitting the path into folder and filename
    str_prj_folder, str_filename = os.path.split(list_proj_title_files[0])

    # Splitting the filename and its extension
    str_file_only, str_extension = os.path.splitext(str_filename)

    # determine the geometry files that are in the project
    list_geom_names = fn_list_filename_from_ras_prj(list_proj_title_files[0], "Geom File=")
    list_geom_int = fn_extract_numbers_from_strings(list_geom_names)

    list_geom_hdf_fullpath = []
    list_geom_fullpath = []

    for geom_item in list_geom_names:
        str_hdf_geom_file = str_file_only + "." + geom_item + ".hdf"
        str_geom_file = str_file_only + "." + geom_item

        # Combine the folder path and filename
        str_hdf_full_path = os.path.join(str_prj_folder, str_hdf_geom_file)
        str_geom_full_path = os.path.join(str_prj_folder, str_geom_file)

        list_geom_hdf_fullpath.append(str_hdf_full_path)
        list_geom_fullpath.append(str_geom_full_path)

    list_b_hdf_exists = []

    list_b_hdf_exists = fn_list_of_file_exists(list_geom_hdf_fullpath)
    list_b_geom_exists = fn_list_of_file_exists(list_geom_fullpath)

    # Assuming all lists have the same length
    data = {
        'geom_int': list_geom_int,
        'geom_name': list_geom_names,
        'hdf_path': list_geom_hdf_fullpath,
        'hdf_exists': list_b_hdf_exists,
        'geom_path': list_geom_fullpath,
        'geom_exists': list_b_geom_exists
    }

    df = pd.DataFrame(data)

    # Filter the DataFrame
    df_filtered = df[(df['geom_int'] == int_geom_to_copy) & (df['hdf_exists']) & (df['geom_exists'])]

    # Check if any rows match the condition
    if not df_filtered.empty:
        # If there's at least one row matching the condition
        print('Valid Geometry Match Found')

        # Determine the highest number in geom_int
        highest_geom_int = df['geom_int'].max()

        # Add one to the highest number
        next_geom_int = highest_geom_int + 1

        # Convert next_geom_int to string with leading zero padding if necessary
        next_geom_int_str = '{:02d}'.format(next_geom_int)

        # copy geom file
        str_copy_from = df_filtered.iloc[0]['geom_path']
        str_copy_to_geom = str_file_only + ".g" + next_geom_int_str
        str_copy_to_geom_full_path = os.path.join(str_prj_folder, str_copy_to_geom)

        shutil.copy(str_copy_from, str_copy_to_geom_full_path)
        print(f"Copied {str_copy_from} to {str_copy_to_geom_full_path}")

        # copy the hdf geom file
        str_copy_from_hdf = df_filtered.iloc[0]['hdf_path']
        str_copy_to_geom = str_file_only + ".g" + next_geom_int_str + ".hdf"
        str_copy_to_geom_hdf_full_path = os.path.join(str_prj_folder, str_copy_to_geom)

        shutil.copy(str_copy_from_hdf, str_copy_to_geom_hdf_full_path)
        print(f"Copied {str_copy_from_hdf} to {str_copy_to_geom_hdf_full_path}")

        # ~~~~~~~~~~~~~~~~~~~~~
        # add the geom to the project file

        # Read the contents of the file
        with open(list_proj_title_files[0], 'r') as file:
            lines = file.readlines()

        # Find the index of the last occurrence of a line starting with "Geom File="
        last_geom_index = -1
        for i in range(len(lines)):
            if lines[i].strip().startswith("Geom File="):
                last_geom_index = i

        # Insert a new line after the last occurrence of "Geom File="
        if last_geom_index != -1:
            lines.insert(last_geom_index + 1, "Geom File=g" + next_geom_int_str + "\n")

        # Write the modified content back to the file
        with open(list_proj_title_files[0], 'w') as file:
            file.writelines(lines)

    else:
        # Otherwise, print an error statement
        print("Error: Geometry HDF and gXX not available.")
        
    return(str_copy_to_geom_full_path, str_copy_to_geom_hdf_full_path)
# ....................

In [9]:
# ++++++++++++++++++++++++++++
def fn_get_gdf_of_cells_from_list(hdf_file_path, list_unique_indices_sorted, str_2darea_name):

    # Specify the HDF5 file path and group path
    area_2D_path = '/Geometry/2D Flow Areas/'

    str_hdf_folder_2darea = area_2D_path + str_2darea_name + '/'

    # Location of Face Point Coordinates in HDF5
    str_facepoint_coords = str_hdf_folder_2darea + 'FacePoints Coordinate'

    # Open the HDF5 file
    with h5py.File(hdf_file_path, 'r') as hdf_file:
        # Extract X and Y coordinates
        x_coordinates = hdf_file[str_facepoint_coords][:, 0]
        y_coordinates = hdf_file[str_facepoint_coords][:, 1]

    # Create a pandas DataFrame
    df_facepoints = pd.DataFrame({'X': x_coordinates, 'Y': y_coordinates})

    # Location of Indices of face points making up the cells
    str_cells_facepoint_indexes = str_hdf_folder_2darea + 'Cells FacePoint Indexes'

    # Open the HDF5 file
    with h5py.File(hdf_file_path, 'r') as hdf_file:
        # Extract face points coordinate data
        facepoints_data = hdf_file[str_cells_facepoint_indexes][:]

        # Extract the projection
        projection_wkt = hdf_file.attrs['Projection'].decode('utf-8')

    # Create a GeoDataFrame to store the polygons
    geometry = []
    indices = []

    for row_idx, row in enumerate(facepoints_data):
        polygon_coords = []

        if row_idx in list_unique_indices_sorted:
            for idx in row:
                if idx != -1:
                    x = df_facepoints.loc[idx, 'X']
                    y = df_facepoints.loc[idx, 'Y']
                    polygon_coords.append((x, y))
            # Check if the polygon has at least 3 points (needed to create a polygon)
            if len(polygon_coords) >= 3:
                # Connect to the first point to close the polygon
                polygon_coords.append(polygon_coords[0])
                geometry.append(Polygon(polygon_coords))
                indices.append(row_idx)  # Append the row index as the cell index

    # Create a GeoDataFrame
    gdf_cells = gpd.GeoDataFrame(geometry=geometry, index=indices, columns=['geometry'], crs=projection_wkt)
    
    return(gdf_cells)
# ++++++++++++++++++++++++++++

In [10]:
# ---------------------------
def fn_create_line_inside_polygon(shp_polygon):
    
    # Define your original polygon
    original_polygon = gdf_firehose_cell.iloc[0]['geometry']

    # Calculate the length of the shortest side
    shortest_side_length = min(original_polygon.length for side in original_polygon.exterior.coords[:-1])

    # Offset the polygon internally by 1% of the length of the shortest side
    offset_distance = 0.01 * shortest_side_length
    offset_polygon = original_polygon.buffer(-offset_distance)

    # Extract the exterior boundary of the offset polygon
    offset_exterior = offset_polygon.exterior

    # Find the shortest side of the offset polygon
    shortest_side_length = float('inf')
    shortest_side = None
    for i in range(len(offset_exterior.coords) - 1):
        p1 = offset_exterior.coords[i]
        p2 = offset_exterior.coords[i + 1]
        length = LineString([p1, p2]).length
        if length < shortest_side_length:
            shortest_side_length = length
            shortest_side = (p1, p2)

    # Create a LineString representing the shortest side
    shortest_side_line = LineString(shortest_side)
    
    return(shortest_side_line)
# ---------------------------

In [18]:
# --------------
def fn_number_round_digits(number, int_requested_digits):
    integer_digits = int(math.log10(abs(number))) + 1 if number != 0 else 1
    decimal_places = max(0, int_requested_digits - integer_digits - 1)  # Ensure decimal_places is non-negative
    formatted_number = '{:.{}f}'.format(number, decimal_places)
    return formatted_number
# --------------


# --------------
def fn_format_coords(coords, int_requested_digits):
    formatted_coords = []
    for pair in coords:
        formatted_pair = []
        for num in pair:
            formatted_num = fn_number_round_digits(num, int_requested_digits)
            formatted_pair.append(formatted_num)
        formatted_coords.append(formatted_pair)
    return formatted_coords
# --------------


# --------------
def fn_midpoint(coords):
    # this assumes only two points
    # Extracting coordinates
    x1, y1 = coords[0]
    x2, y2 = coords[1]
    
    # Calculating midpoint
    mid_x = (x1 + x2) / 2
    mid_y = (y1 + y2) / 2
    
    return (mid_x, mid_y)
# --------------

In [11]:
list_proj_title_files = fn_list_of_ras_projects(str_ras_path)
str_geom_path, hdf_file_path = fn_copy_geom_file(list_proj_title_files, int_geom_to_copy)

Valid HEC-RAS Project: E:\HECRAS_2D_12070205\base_model_20240414_copy\BLE_LBSG_501.prj
Valid Geometry Match Found
Copied E:\HECRAS_2D_12070205\base_model_20240414_copy\BLE_LBSG_501.g01 to E:\HECRAS_2D_12070205\base_model_20240414_copy\BLE_LBSG_501.g08
Copied E:\HECRAS_2D_12070205\base_model_20240414_copy\BLE_LBSG_501.g01.hdf to E:\HECRAS_2D_12070205\base_model_20240414_copy\BLE_LBSG_501.g08.hdf


In [13]:
# Edit the Internal boundary condition
target_line = "BC Line Name=" + str_boundary_to_edit

# Open the file and read its content
with open(str_geom_path, 'r') as file:
    list_lines = file.readlines()

# Find the index of the line starting with the target string
index = None
for i, line in enumerate(list_lines):
    if line.startswith(target_line):
        index = i
        break


# Extract the following six lines into a list
list_boundary_lines = list_lines[index+2:index+7]

In [14]:
list_boundary_lines

['BC Line Start Position= 2979518.67298034 , 10258382.7677894 \n',
 'BC Line Middle Position= 2979059.69353878 , 10257459.6518339 \n',
 'BC Line End Position= 2978600.71409721 , 10256536.5358783 \n',
 'BC Line Arc= 2 \n',
 '2979518.6729803410258382.76778942978600.7140972110256536.5358783\n']

In [19]:
dict_mainstem = gdf_mainstems.iloc[17].to_dict()
int_idx_start_cell = int(dict_mainstem['idx_start_cell'])
list_unique_indices_sorted = [int_idx_start_cell]

dict_area_2d = gdf_area.iloc[0].to_dict()
str_2d_area_name = dict_area_2d['area_2d_name']

In [25]:
# *****************************
def fn_build_internal_boundary_text(hdf_file_path, list_unique_indices_sorted, str_2d_area_name):
    gdf_firehose_cell = fn_get_gdf_of_cells_from_list(hdf_file_path, list_unique_indices_sorted, str_2d_area_name )

    # Create a shapley line inside the 'source' emmiter cell
    shp_line = fn_create_line_inside_polygon(gdf_firehose_cell.iloc[0]['geometry'])

    # Create a numpy array of the the shapel line coordinates
    coords = np.array(shp_line.coords)

    tup_mid_coords = fn_midpoint(coords)

    # Converting tuple to list
    list_mid_coords = list(tup_mid_coords)

    list_mid_coords_formatted = []
    for item in list_mid_coords:
        str_format = fn_number_round_digits(item, 16)
        list_mid_coords_formatted.append(str_format)

    # Format the boundarline coords as strings
    formatted_coords = fn_format_coords(coords, 16)

    first_pair = formatted_coords[0]
    last_pair = formatted_coords[-1]
    int_point_len = len(formatted_coords)

    # Join the formatted coordinates into a string
    str_start_point = 'BC Line Start Position= {} , {} \n'.format(*first_pair)
    str_mid_point = 'BC Line Middle Position= {} , {} \n'.format(*list_mid_coords_formatted)
    str_last_point = 'BC Line End Position= {} , {} \n'.format(*last_pair)
    str_line_arc = f'BC Line Arc= {int_point_len} \n'

    # Flatten the list of boundary condition points
    str_point_list = [item for sublist in formatted_coords for item in sublist]
    str_point_list += ' \n'

    # Join the elements into one continuous string
    str_line_points = ''.join(str_point_list)

    list_new_boundary_lines = [str_start_point,str_mid_point,str_last_point,str_line_arc,str_line_points]
    
    return(list_new_boundary_lines)
# *****************************

In [26]:
list_new_boundary_lines = fn_build_internal_boundary_text(hdf_file_path, list_unique_indices_sorted, str_2d_area_name)

In [27]:
list_new_boundary_lines

['BC Line Start Position= 3028536.82550507 , 10258145.7856357 \n',
 'BC Line Middle Position= 3028496.27205166 , 10258138.0573055 \n',
 'BC Line End Position= 3028455.71859825 , 10258130.3289753 \n',
 'BC Line Arc= 2 \n',
 '3028536.8255050710258145.78563573028455.7185982510258130.3289753 \n']