### This notebook was created by Corey Krewson in 2023 to facilitate all the steps needed to update the pipelines / EGIS map services, when a new FIM version is released. These steps entail:
1. Updating the FIM crosswalk table in the derived schema of the viz database.
2. Creating new AEP FIM schema/tables in the EGIS database.
3. Creating new FIM Catchments schema/tables in the EGIS database.
4. Update a bunch of other SRC Skill / FIM Performance / CatFIM data (not sure what all this entails - Shawn Crawley may know more).
5. Clear the HAND cache on the viz RDS database.

Unfortunately, these steps are still very manual, and this notebook is the main source of documentation to making these updates.

In [None]:
#from helper_functions.shared_functions import check_if_s3_key_exists, run_sql_in_db, get_db_engine  # no check_if_s3_key_exists function
from helper_functions.shared_functions import run_sql_in_db, get_db_engine
from helper_functions.viz_classes import database
import os
import codecs
import csv
import boto3
import json
import pandas as pd
from datetime import datetime
from io import StringIO
from sqlalchemy.exc import DataError
from sqlalchemy.types import Text

# Helps us get to the keys. Note: This was added Oct 16, 2024 and is untested
# /hydrovis/Core/Manual_Workflows/
sys.path.append(os.path.join(os.path.abspath(''), '../../../../AWS_Secret_keys'))
import AWS_Keys


In [None]:
FIM_VERSION = "4.4.0.0"
FIM_ROOT_DPATH = f"fim/fim_{FIM_VERSION.replace('.', '_')}"
HAND_DATASETS_DPATH = f"{FIM_ROOT_DPATH}/hand_datasets"
QA_DATASETS_DPATH = f"{FIM_ROOT_DPATH}/qa_datasets"
FIM_BUCKET = "hydrovis-ti-deployment-us-east-1"
FIM_CROSSWALK_FPATH = os.path.join(HAND_DATASETS_DPATH, "crosswalk_table.csv")
PIPELINE_ARN = ''

# SECRET KEYS AND TOKENS inside a folder in the sandbox folder to avoid checkins

S3_CLIENT = boto3.client("s3")
STEPFUNCTION_CLIENT = boto3.client('stepfunctions')
VIZ_DB_ENGINE = get_db_engine('viz')

<h2>1 - UPDATE VLAB PRIVATE REPO WITH NEW FIM VERSION</h2>

<h2>2 - UPLOAD FIM4 HYDRO ID/FEATURE ID CROSSWALK</h2>

February 2024 Update from Tyler: This code will need to be updated to handle a new hand_id unique integer that the fim team (Rob Hannah and Matt Luck) has added to the crosswalk, and is now important to fim runs. They also changed the field names / format to match our schema, so this chunk of code should be able to be simplified significantly.

In [None]:
print(f"Getting column name from {FIM_CROSSWALK_FPATH}")
data = S3_CLIENT.get_object(Bucket=FIM_BUCKET, Key=FIM_CROSSWALK_FPATH)
d_reader = csv.DictReader(codecs.getreader("utf-8")(data["Body"]))
headers = d_reader.fieldnames

print(headers)
raise Exception("Forced exit")

header_str = "("
for header in headers:
    header_str += header
    if header in ['HydroID', 'LakeID', 'feature_id']:
        header_str += ' integer,'
    elif header in ['BranchID']:
        header_str += ' bigint,'
    else:
        header_str += ' TEXT,'
header_str = header_str[:-1] + ")"


db = database(db_type="viz")
with db.get_db_connection() as conn, conn.cursor() as cur:
    print(f"Deleting/Creating derived.fim4_featureid_crosswalk using columns {header_str}")
    sql = f"DROP TABLE IF EXISTS derived.fim4_featureid_crosswalk; CREATE TABLE derived.fim4_featureid_crosswalk {header_str};"
    cur.execute(sql)
    conn.commit()

    print(f"Importing {FIM_CROSSWALK_FPATH} to derived.fim4_featureid_crosswalk")
    sql = f"""
        SELECT aws_s3.table_import_from_s3(
           'derived.fim4_featureid_crosswalk',
           '', 
           '(format csv, HEADER true)',
           (SELECT aws_commons.create_s3_uri(
               '{FIM_BUCKET}',
               '{FIM_CROSSWALK_FPATH}',
               'us-east-1'
                ) AS s3_uri
            ),
            aws_commons.create_aws_credentials('{TI_ACCESS_KEY}', '{TI_SECRET_KEY}', '{TI_TOKEN}')
           );
        """
    cur.execute(sql)
    conn.commit()

    print(f"Adding fim_version column to derived.fim4_featureid_crosswalk")
    sql = f"ALTER TABLE derived.fim4_featureid_crosswalk ADD COLUMN fim_version text DEFAULT '{FIM_VERSION}';"
    cur.execute(sql)
    conn.commit()

    print(f"Renaming columns in derived.fim4_featureid_crosswalk")
    sql = f"""
        ALTER TABLE derived.fim4_featureid_crosswalk RENAME COLUMN HydroID TO hydro_id;
        ALTER TABLE derived.fim4_featureid_crosswalk RENAME COLUMN LakeID TO lake_id;
        ALTER TABLE derived.fim4_featureid_crosswalk RENAME COLUMN BranchID  TO branch_id;
    """
    cur.execute(sql)
    conn.commit()

    print(f"Adding feature id index to derived.fim4_featureid_crosswalk")
    sql = f"CREATE INDEX fim4_crosswalk_feature_id ON derived.fim4_featureid_crosswalk USING btree (feature_id)"
    cur.execute(sql)
    conn.commit()

    print(f"Adding hydro id index to derived.fim4_featureid_crosswalk")
    sql = f"CREATE INDEX fim4_crosswalk_hydro_id ON derived.fim4_featureid_crosswalk USING btree (hydro_id)"
    cur.execute(sql)
    conn.commit()

print(f"Successully updated derived.fim4_featureid_crosswalk")

<h2>3 - UPDATE FIM HAND PROCESSING LAMBDA ENV VARIABLE WITH NEW FIM VERSION</h2>

<h2>4 - UPDATE FIM DATA PREP LAMBDA ENV VARIABLE WITH NEW FIM VERSION</h2>

<h2>5 - Run AEP FIM Pipelines.</h2>
Updated Documentation from Tyler Early 2024: This can be done in a couple of diferent ways. One option is to use the pipeline_input code created below by Corey to start the AEP pipelines directly from this notebook. However, those pipeline_input dictionaries may very well be be out of date, pending more recent updates to the pipelines. The other option, which I prefer, is to setup a manual test event in the initialize_pipeline lambda function to trigger an AEP pipeline like this:

{
  "configuration": "reference",
  "products_to_run": "static_nwm_aep_inundation_extent_library",
  "invoke_step_function": False
}

Using this test event will produce the pipeline instructions, printing any errors that come up, and you can simply change the invoke_step_function flag to True when you're ready to actually invoke a pipeline run (which you can monitor/manage in the step function gui). You will need to manually update the static_nwm_aep_inundation_extent_library.yml product config file to only run 1 aep configuration at a time, and work through the configs as the pipelines finish (takes about an hour each). I've also found that the fim_data_prep lambda function needs to be temporarilly increased to ~4,500mb of memory to run these pipelines. It's also worth noting that these are very resource intesive pipelines, as FIM is calculated for every reach in the nation. AWS costs can amount to hundreds or even thousands of dollars by running these pipelines, so use responsibly.

A couple other important notes:
- These AEP configurations write data directly to the aep_fim schema in the egis RDS database, instead of the viz database.
- You'll need to dump the aep_fim schema after that is complete for backup / deployment into other environments.
- This process has not been tested with new NWM 3.0 Recurrence Flows, and a good thorough audit / QC check of output data is warranted, given those changes and the recent updates to the pipelines.

In [None]:
pipeline_input = {
  "configuration": "reference",
  "job_type": "auto",
  "data_type": "channel",
  "keep_raw": False,
  "reference_time": datetime.now().strftime('%Y-%m-%d 00:00:00'),
  "configuration_data_flow": {
    "db_max_flows": [],
    "db_ingest_groups": [],
    "lambda_max_flows": []
  },
  "pipeline_products": [
    {
      "product": "static_nwm_aep_inundation_extent_library",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "rf_2_inundation",
          "target_table": "aep_fim.rf_2_inundation",
          "fim_type": "hand",
          "sql_file": "rf_2_inundation"
        }
      ],
      "services": [
        "static_nwm_aep_inundation_extent_library_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    }
  ],
  "sql_rename_dict": {}
}

STEPFUNCTION_CLIENT.start_execution(
    stateMachineArn = PIPELINE_ARN,
    name = f"sagemaker_aep_2_{datetime.now().strftime('%Y%m%dT%H%M')}",
    input= json.dumps(pipeline_input)
)

In [None]:
pipeline_input = {
  "configuration": "reference",
  "job_type": "auto",
  "data_type": "channel",
  "keep_raw": False,
  "reference_time": datetime.now().strftime('%Y-%m-%d 00:00:00'),
  "configuration_data_flow": {
    "db_max_flows": [],
    "db_ingest_groups": [],
    "lambda_max_flows": []
  },
  "pipeline_products": [
    {
      "product": "static_nwm_aep_inundation_extent_library",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "rf_5_inundation",
          "target_table": "aep_fim.rf_5_inundation",
          "fim_type": "hand",
          "sql_file": "rf_5_inundation"
        }
      ],
      "services": [
        "static_nwm_aep_inundation_extent_library_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    }
  ],
  "sql_rename_dict": {}
}

STEPFUNCTION_CLIENT.start_execution(
    stateMachineArn = PIPELINE_ARN,
    name = f"sagemaker_aep_5_{datetime.now().strftime('%Y%m%dT%H%M')}",
    input= json.dumps(pipeline_input)
)

In [None]:
pipeline_input = {
  "configuration": "reference",
  "job_type": "auto",
  "data_type": "channel",
  "keep_raw": False,
  "reference_time": datetime.now().strftime('%Y-%m-%d 00:00:00'),
  "configuration_data_flow": {
    "db_max_flows": [],
    "db_ingest_groups": [],
    "lambda_max_flows": []
  },
  "pipeline_products": [
    {
      "product": "static_nwm_aep_inundation_extent_library",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "rf_10_inundation",
          "target_table": "aep_fim.rf_10_inundation",
          "fim_type": "hand",
          "sql_file": "rf_10_inundation"
        }
      ],
      "services": [
        "static_nwm_aep_inundation_extent_library_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    }
  ],
  "sql_rename_dict": {}
}

STEPFUNCTION_CLIENT.start_execution(
    stateMachineArn = PIPELINE_ARN,
    name = f"sagemaker_aep_10_{datetime.now().strftime('%Y%m%dT%H%M')}",
    input= json.dumps(pipeline_input)
)

In [None]:
pipeline_input = {
  "configuration": "reference",
  "job_type": "auto",
  "data_type": "channel",
  "keep_raw": False,
  "reference_time": datetime.now().strftime('%Y-%m-%d 00:00:00'),
  "configuration_data_flow": {
    "db_max_flows": [],
    "db_ingest_groups": [],
    "lambda_max_flows": []
  },
  "pipeline_products": [
    {
      "product": "static_nwm_aep_inundation_extent_library",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "rf_25_inundation",
          "target_table": "aep_fim.rf_25_inundation",
          "fim_type": "hand",
          "sql_file": "rf_25_inundation"
        }
      ],
      "services": [
        "static_nwm_aep_inundation_extent_library_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    }
  ],
  "sql_rename_dict": {}
}

STEPFUNCTION_CLIENT.start_execution(
    stateMachineArn = PIPELINE_ARN,
    name = f"sagemaker_aep_25_{datetime.now().strftime('%Y%m%dT%H%M')}",
    input= json.dumps(pipeline_input)
)

In [None]:
pipeline_input = {
  "configuration": "reference",
  "job_type": "auto",
  "data_type": "channel",
  "keep_raw": False,
  "reference_time": datetime.now().strftime('%Y-%m-%d 00:00:00'),
  "configuration_data_flow": {
    "db_max_flows": [],
    "db_ingest_groups": [],
    "lambda_max_flows": []
  },
  "pipeline_products": [
    {
      "product": "static_nwm_aep_inundation_extent_library",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "rf_50_inundation",
          "target_table": "aep_fim.rf_50_inundation",
          "fim_type": "hand",
          "sql_file": "rf_50_inundation"
        }
      ],
      "services": [
        "static_nwm_aep_inundation_extent_library_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    }
  ],
  "sql_rename_dict": {}
}

STEPFUNCTION_CLIENT.start_execution(
    stateMachineArn = PIPELINE_ARN,
    name = f"sagemaker_aep_50_{datetime.now().strftime('%Y%m%dT%H%M')}",
    input= json.dumps(pipeline_input)
)

In [None]:
pipeline_input = {
  "configuration": "reference",
  "job_type": "auto",
  "data_type": "channel",
  "keep_raw": False,
  "reference_time": datetime.now().strftime('%Y-%m-%d 00:00:00'),
  "configuration_data_flow": {
    "db_max_flows": [],
    "db_ingest_groups": [],
    "lambda_max_flows": []
  },
  "pipeline_products": [
    {
      "product": "static_nwm_aep_inundation_extent_library",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "rf_high_water_inundation",
          "target_table": "aep_fim.rf_high_water_inundation",
          "fim_type": "hand",
          "sql_file": "rf_high_water_inundation"
        }
      ],
      "services": [
        "static_nwm_aep_inundation_extent_library_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    }
  ],
  "sql_rename_dict": {}
}

STEPFUNCTION_CLIENT.start_execution(
    stateMachineArn = PIPELINE_ARN,
    name = f"sagemaker_aep_hw_{datetime.now().strftime('%Y%m%dT%H%M')}",
    input= json.dumps(pipeline_input)
)

<h2>6 - RUN CATCHMENT WORKFLOWS 2 CONFIGS AT A TIME. CHECK FOR STEP FUNCTION FINISHING BEFORE STARTING NEW ONE</h2>

In [None]:
pipeline_input = {
  "configuration": "reference",
  "job_type": "auto",
  "data_type": "channel",
  "keep_raw": False,
  "reference_time": datetime.now().strftime('%Y-%m-%d 00:00:00'),
  "configuration_data_flow": {
    "db_max_flows": [],
    "db_ingest_groups": [],
    "lambda_max_flows": []
  },
  "pipeline_products": [
    {
      "product": "static_hand_catchments_0_branches",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "branch_0_catchments",
          "target_table": "fim_catchments.branch_0_catchments",
          "fim_type": "hand",
          "sql_file": "branch_0_catchments"
        }
      ],
      "services": [
        "static_hand_catchments_0_branches_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    },
    {
      "product": "static_hand_catchments_0_branches_hi",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "branch_0_catchments_hi",
          "target_table": "fim_catchments.branch_0_catchments_hi",
          "fim_type": "hand",
          "sql_file": "branch_0_catchments_hi"
        }
      ],
      "services": [
        "static_hand_catchments_0_branches_hi_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    },
    {
      "product": "static_hand_catchments_0_branches_prvi",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "branch_0_catchments_prvi",
          "target_table": "fim_catchments.branch_0_catchments_prvi",
          "fim_type": "hand",
          "sql_file": "branch_0_catchments_prvi"
        }
      ],
      "services": [
        "static_hand_catchments_0_branches_prvi_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    }
  ],
  "sql_rename_dict": {}
}

STEPFUNCTION_CLIENT.start_execution(
    stateMachineArn = PIPELINE_ARN,
    name = f"sagemaker_0_catchments_{datetime.now().strftime('%Y%m%dT%H%M')}",
    input= json.dumps(pipeline_input)
)

In [None]:
pipeline_input = {
  "configuration": "reference",
  "job_type": "auto",
  "data_type": "channel",
  "keep_raw": False,
  "reference_time": datetime.now().strftime('%Y-%m-%d 00:00:00'),
  "configuration_data_flow": {
    "db_max_flows": [],
    "db_ingest_groups": [],
    "lambda_max_flows": []
  },
  "pipeline_products": [
    {
      "product": "static_hand_catchments_gms_branches",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "branch_gms_catchments",
          "target_table": "fim_catchments.branch_gms_catchments",
          "fim_type": "hand",
          "sql_file": "branch_gms_catchments"
        }
      ],
      "services": [
        "static_hand_catchments_gms_branches_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    },
    {
      "product": "static_hand_catchments_gms_branches_hi",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "branch_gms_catchments_hi",
          "target_table": "fim_catchments.branch_gms_catchments_hi",
          "fim_type": "hand",
          "sql_file": "branch_gms_catchments_hi"
        }
      ],
      "services": [
        "static_hand_catchments_gms_branches_hi_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    },
    {
      "product": "static_hand_catchments_gms_branches_prvi",
      "configuration": "reference",
      "product_type": "fim",
      "run": True,
      "fim_configs": [
        {
          "name": "branch_gms_catchments_prvi",
          "target_table": "fim_catchments.branch_gms_catchments_prvi",
          "fim_type": "hand",
          "sql_file": "branch_gms_catchments_prvi"
        }
      ],
      "services": [
        "static_hand_catchments_gms_branches_prvi_noaa"
      ],
      "raster_outputs": {
        "output_bucket": "",
        "output_raster_workspaces": []
      },
      "postprocess_sql": [],
      "product_summaries": [],
      "lambda_max_flow_dependent": False
    }
  ],
  "sql_rename_dict": {}
}

STEPFUNCTION_CLIENT.start_execution(
    stateMachineArn = PIPELINE_ARN,
    name = f"sagemaker_gms_catchments_{datetime.now().strftime('%Y%m%dT%H%M')}",
    input= json.dumps(pipeline_input)
)

<h2>6 - UPDATE RATING CURVES IN DB</h2>

In [None]:
sql = '''
DROP TABLE IF EXISTS derived.hydrotable;
DROP TABLE IF EXISTS derived.usgs_elev_table;

SELECT distinct LPAD(huc8::text, 8, '0') as huc8 FROM derived.featureid_huc_crosswalk WHERE huc8 is not null;
'''
df = run_sql_in_db(sql)

col_names = ['HydroID', 'feature_id', 'NextDownID', 'order_', 'Number of Cells',
       'SurfaceArea (m2)', 'BedArea (m2)', 'TopWidth (m)', 'LENGTHKM',
       'AREASQKM', 'WettedPerimeter (m)', 'HydraulicRadius (m)',
       'WetArea (m2)', 'Volume (m3)', 'SLOPE', 'ManningN', 'stage',
       'default_discharge_cms', 'default_Volume (m3)', 'default_WetArea (m2)',
       'default_HydraulicRadius (m)', 'default_ManningN',
       'precalb_discharge_cms', 'calb_coef_spatial', 'HUC', 'LakeID',
       'subdiv_applied', 'channel_n', 'overbank_n', 'subdiv_discharge_cms',
       'last_updated', 'submitter', 'calb_coef_usgs', 'obs_source',
       'calb_coef_final', 'calb_applied', 'discharge_cms']

usecols = ['HydroID', 'feature_id', 'stage', 'discharge_cms']

paginator = S3_CLIENT.get_paginator('list_objects')
operation_parameters = {'Bucket': FIM_BUCKET,
                        'Prefix': f'{HAND_DATASETS_DPATH}/',
                        'Delimiter': '/'}
page_iterator = paginator.paginate(**operation_parameters)
page_count = 0
for page in page_iterator:
    page_count += 1
    prefix_objects = page['CommonPrefixes']
    for i, prefix_obj in enumerate(prefix_objects):
        print(f"Processing {i+1} of {len(prefix_objects)} on page {page_count}")
        branch_prefix = f'{prefix_obj.get("Prefix")}branches/0/'
#         ## UNCOMMENT FOR ALL BRANCHES - NOT JUST 0
#         huc_branches_prefix = f'{prefix_obj.get("Prefix")}branches/'
#         branches_result = S3_CLIENT.list_objects(Bucket=FIM_BUCKET, Prefix=huc_branches_prefix, Delimiter='/')
#         branch_prefix_objects = branches_result.get('CommonPrefixes')
#         for i, branch_prefix_obj in enumerate(branch_prefix_objects):
#             branch_prefix = branch_prefix_obj['Prefix']
#         ## END UNCOMMENT
        ## [UN]INDENT FROM HERE TO THE END IF [COMMENTED]UNCOMMENTED
        branch_files_result = S3_CLIENT.list_objects(Bucket=FIM_BUCKET, Prefix=branch_prefix, Delimiter='/')
        hydro_table_key = None
        usgs_elev_table_key = None
        for content_obj in branch_files_result.get('Contents'):
            branch_file_prefix = content_obj['Key']
            if 'hydroTable' in branch_file_prefix:
                hydro_table_key = branch_file_prefix
            elif 'usgs_elev_table.csv' in branch_file_prefix:
                usgs_elev_table_key = branch_file_prefix

        if hydro_table_key and usgs_elev_table_key:
#             print(f"Found usgs_elev_table and hydroTable in {branch_prefix}")
            try:
#                 print("...Fetching csvs...")
                uet = S3_CLIENT.get_object(Bucket=FIM_BUCKET, Key=usgs_elev_table_key)['Body']
                ht = S3_CLIENT.get_object(Bucket=FIM_BUCKET, Key=hydro_table_key)['Body']
#                 print("...Reading with pandas...")
                uet_df = pd.read_csv(uet)
                ht_df = pd.read_csv(ht, header=0, names=col_names, usecols=usecols)
#                 print('...Writing to db...')
                uet_df['fim_version'] = FIM_VERSION
                try:
                    drop_indicies = []
                    for index, row in uet_df.iterrows():
                        if 'HydroID' in row and row['HydroID']:
                            iter_hydro_id = row['HydroID']
#                             print(f"Subsetting hydrotable where HydroID == {iter_hydro_id}")
                            ht_df_iter = ht_df[ht_df['HydroID']==iter_hydro_id]
                            if ht_df_iter.empty:
                                drop_indicies.append(index)
                                continue
                            ht_df_iter['fim_version'] = FIM_VERSION
                            ht_df_iter.to_sql(con=VIZ_DB_ENGINE, schema='derived', name='hydrotable', index=False, if_exists='append')
                        else:
                            drop_indicies.append(index)
                    mod_uet_df = uet_df.drop(drop_indicies)
                    mod_uet_df.to_sql(con=VIZ_DB_ENGINE, dtype={"location_id": Text(), "nws_data_huc": Text()}, schema='derived', name='usgs_elev_table', index=False, if_exists='append')
                except Exception as e:
                    print('******************************************')
                    print(f'Error encountered on {branch_file_prefix}')
                    print(e)
                    print('******************************************')       
            except Exception as e:
                raise e
                print(f'Fetch failed: {e}')


In [None]:
sql = '''
DROP TABLE IF EXISTS derived.hydrotable_staggered;
SELECT
    et.location_id,
    ht.feature_id,
    (stage + et.dem_adj_elevation) * 3.28084 as elevation_ft,
    LEAD((stage + et.dem_adj_elevation) * 3.28084) OVER (PARTITION BY ht.feature_id ORDER BY ht.feature_id, stage) as next_elevation_ft,
    discharge_cms * 35.3147 as discharge_cfs,
    LEAD(discharge_cms * 35.3147) OVER (PARTITION BY ht.feature_id ORDER BY ht.feature_id, stage) as next_discharge_cfs
INTO derived.hydrotable_staggered
FROM derived.hydrotable AS ht
JOIN derived.usgs_elev_table AS et ON ht."HydroID" = et."HydroID" AND et.location_id IS NOT NULL;
'''

run_sql_in_db(sql)

In [None]:
usgs_curves_key = f'{QA_DATASETS_DPATH}/usgs_rating_curves.csv'
obj_body = S3_CLIENT.get_object(Bucket=FIM_BUCKET, Key=usgs_curves_key)['Body']
df = pd.read_csv(obj_body)
df.to_sql(con=VIZ_DB_ENGINE, schema='derived', name='usgs_rating_curves', index=False, if_exists='replace', chunksize=150000, method='multi')

In [None]:
sql = '''
DROP TABLE IF EXISTS derived.usgs_rating_curves_staggered;
SELECT 
    location_id,
    flow as discharge_cfs, 
    LEAD(flow) OVER (PARTITION BY location_id ORDER BY location_id, stage) as next_discharge_cfs,
    stage,
    navd88_datum,
    elevation_navd88 as elevation_ft,
    LEAD(elevation_navd88) OVER (PARTITION BY location_id ORDER BY location_id, stage) as next_elevation_ft
INTO derived.usgs_rating_curves_staggered
FROM derived.usgs_rating_curves;
'''

run_sql_in_db(sql)

<h2>7 - UPDATE SRC SKILL METRICS IN DB</h2>

In [None]:
src_stats_key = f'{QA_DATASETS_DPATH}/agg_nwm_recurr_flow_elev_stats_location_id.csv'
obj_body = S3_CLIENT.get_object(Bucket=FIM_BUCKET, Key=src_stats_key)['Body']
df = pd.read_csv(obj_body)
# df['fim_version'] = FIM_VERSION
print(df.to_dict('records'))
# df.to_sql(con=VIZ_DB_ENGINE, schema='derived', name='src_skill', index=False, if_exists='replace')

In [None]:
from helper_functions.shared_funcs import execute_sql

<h2>8 - UPDATE FIM PERFORMANCE METRICS IN DB</h2>

In [None]:
import os
from helper_functions.shared_functions import *
import boto3
import geopandas as gpd
import pandas as pd
import sqlalchemy
from geoalchemy2 import Geometry
from geopandas import GeoDataFrame

# os.environ['EGIS_DB_HOST'] =''  #TI DB

db_type = "egis"
db_engine = get_db_engine(db_type)

s3 = boto3.client('s3')

# Define bucket and parent directories.
bucket = "hydrovis-ti-deployment-us-east-1"
parent_directory = "qc_fim_data"
local_download_parent_directory = f'brad_data/qc_fim_data'

#file_handles = ['fim_performance_points.csv']

file_handles = ['fim_performance_points.csv', 'fim_performance_polys.csv', 'fim_performance_catchments_dissolved.csv']

#file_handles = ['fim_performance_catchments_dissolved.csv']

for file_handle in file_handles:
    # Define path to file to download and its local download path, the download.
    filename = f"{QA_DATASETS_DPATH}/{file_handle}"
    print(filename)
    local_download_path = os.path.join(local_download_parent_directory, f'{file_handle}')
    print(f"--> Downloading {FIM_BUCKET}/{filename} to {local_download_path}")
    #s3.download_file(FIM_BUCKET, filename, local_download_path)
    
    if file_handle == 'fim_performance_catchments_dissolved.csv':
        file_handle = 'fim_performance_catchments.csv'
    
    #  -- Open file and reformat -- #
    print("Reading file...")
    df = pd.read_csv(local_download_path)
    print("File read.")
    # Rename headers.
    if file_handle == 'fim_performance_points.csv':
        df = df.rename(columns={'Unnamed: 0': 'oid', 'geometry': 'geom'})
    else:
        df = df.rename(columns={'Unnamed: 0': 'oid', 'geometry': 'geom', 'huc':'huc8'})
    
    print(df.dtypes)
    # Convert all field names to lowercase (needed for ArcGIS Pro).
    df.columns= df.columns.str.lower()

    # Enforce data types on df before loading in DB (TODO: need to create special cases for each layer).
    if file_handle == 'fim_performance_points.csv':
        df = df.astype({'huc': 'str'})
    else:
        df = df.astype({'huc8': 'str'})
    df = df.fillna(0)
    try:
        df = df.astype({'feature_id': 'int'})
        df = df.astype({'feature_id': 'str'})
        df = df.astype({'oid': 'int'})
    except KeyError:  # If there is no feature_id field
        pass
    try:
        df = df.astype({'nwm_seg': 'int'})
        df = df.astype({'nwm_seg': 'str'})
    except KeyError:  # If there is no nwm_seg field
        pass
    try:
        df = df.astype({'usgs_gage': 'int'})
        df = df.astype({'usgs_gage': 'str'})
    except KeyError:  # If there is no usgs_gage field
        pass
        
    # zfill HUC8 field.
    if file_handle == 'fim_performance_points.csv':
        df['huc'] = df['huc'].apply(lambda x: x.zfill(8))
    else:
        df['huc8'] = df['huc8'].apply(lambda x: x.zfill(8))
 
    # Upload df to database.
    stripped_layer_name = file_handle.replace(".csv", "")
    table_name = "reference." + stripped_layer_name
    print("Loading data into DB...")
    
    # Chunk load data into DB
    if file_handle in ['fim_performance_catchments.csv']:
        df['version'] = FIM_VERSION
        print("Chunk loading...")
        # Create list of df chunks
        n = 10000  #chunk row size
        list_df = [df[i:i+n] for i in range(0,df.shape[0],n)]
        #geometry = 'MULTIPOLYGON'
        # Load the first chunk into the DB as a new table
        first_chunk_df = list_df[0]
        print(first_chunk_df.shape[0])
        
        first_chunk_df.to_sql(
            name=stripped_layer_name, 
            con=db_engine, 
            schema='reference',
            if_exists='replace', 
            index=False,
            dtype={'oid': sqlalchemy.types.Integer(),
                   'version': sqlalchemy.types.String(), 
                   'geom': Geometry('MULTIPOLYGON',srid=3857)
                  }
        )
        # Load remaining chunks into newly created table
        
        for remaining_chunk_df in list_df[1:]:
            print(remaining_chunk_df.shape[0])
            remaining_chunk_df.to_sql(
                name=stripped_layer_name, 
                con=db_engine, 
                schema='reference',
                if_exists='append', 
                index=False,
                dtype={'oid': sqlalchemy.types.Integer(),
                       'version': sqlalchemy.types.String(), 
                       'geom': Geometry('MULTIPOLYGON',srid=3857)
                      }
            )
    else:
        if 'points' in stripped_layer_name: geometry = 'POINT'
        if 'polys' in stripped_layer_name: geometry = 'POLYGON'
        print("GEOMETRY")
        print(geometry)
        df.to_sql(
            name=stripped_layer_name, 
            con=db_engine, 
            schema='reference',
            if_exists='replace', 
            index=False,
            dtype={'oid': sqlalchemy.types.Integer(),
                   'version': sqlalchemy.types.String(), 
                   'geom': Geometry(geometry,srid=3857)
                  }
        )

<h2>9 - UPDATE STAGE-BASED CATFIM DATA IN DB</h2>

In [None]:
import os 
from helper_functions.shared_functions import * 
import boto3 
import geopandas as gpd 
import pandas as pd
import sqlalchemy
from geoalchemy2 import Geometry

db_type = "egis" 
db_engine = get_db_engine(db_type)

s3 = boto3.client('s3')

# Define bucket and parent directories.
bucket = "hydrovis-ti-deployment-us-east-1"
parent_directory = "qc_fim_data"
local_download_parent_directory = f'brad_data/qc_fim_data'

#file_handles = ['stage_based_catfim_sites.csv', 'stage_based_catfim.csv']

#file_handles = ['stage_based_catfim_sites.csv', 'catfim_library_dissolved.csv']

file_handles = ['catfim_library_dissolved.csv']

for file_handle in file_handles: 
    
    # Define path to file to download and its local download path, then download. 
    filename = f"{QA_DATASETS_DPATH}/{file_handle}" 
    print(filename) 
    local_download_path = os.path.join(local_download_parent_directory, f'{file_handle}') 
    print(f"--> Downloading {FIM_BUCKET}/{filename} to {local_download_path}") 
    #s3.download_file(FIM_BUCKET, filename, local_download_path)

    #  -- Open file and reformat -- #
    print("Reading file...")
    df = pd.read_csv(local_download_path)
    print("File read.")

    if file_handle == 'catfim_library_dissolved.csv':
        file_handle = 'stage_based_catfim.csv'
    # Rename headers.
    df = df.rename(columns={'Unnamed: 0': 'oid', 'geometry': 'geom', 'huc':'huc8'})

    # Convert all field names to lowercase (needed for ArcGIS Pro).
    df.columns= df.columns.str.lower()

    # Remove sites that are in derived.ahps_restricted_sites
    restricted_sites_df = get_db_values("derived.ahps_restricted_sites", ["*"])
    restricted_dict = restricted_sites_df.to_dict('records')

    # Change 'mapped' to 'no' if sites are present in restricted_sites_df
    for site in restricted_dict:
        nws_lid = site['nws_lid'].lower()
        if "sites" in file_handle:
            df.loc[df.ahps_lid==nws_lid, 'mapped'] = 'no'
            df.loc[df.ahps_lid==nws_lid, 'status'] = site['restricted_reason']
        else:
            df.loc[df.ahps_lid==nws_lid, 'viz'] = 'no'
            df = df[df['viz']=='yes']  # Subset df to only sites desired for mapping

    for sea_level_site in ['qutg1', 'augg1', 'baxg1', 'lamf1', 'adlg1', 'hrag1', 'stng1']:
        if "sites" in file_handle:
            df.loc[df.ahps_lid==sea_level_site, 'mapped'] = 'no'
            df.loc[df.ahps_lid==sea_level_site, 'status'] = 'Stage thresholds seem to be based on sea level and not channel thalweg'
        else:
            df.loc[df.ahps_lid==sea_level_site, 'viz'] = 'no'
            df = df[df['viz']=='yes']  # Subset df to only sites desired for mapping
            
    # Enforce data types on df before loading in DB (TODO: need to create special cases for each layer).
    df = df.astype({'huc8': 'str'})
    df = df.fillna(0)

    if file_handle == 'stage_based_catfim.csv':
        df['fim_version'] = FIM_VERSION

    try:
        df = df.astype({'feature_id': 'int'})
        df = df.astype({'feature_id': 'str'})
    except KeyError:  # If there is no feature_id field
        pass
    try:
        df = df.astype({'nwm_seg': 'int'})
        df = df.astype({'nwm_seg': 'str'})
    except KeyError:  # If there is no nwm_seg field
        pass
    try:
        df = df.astype({'usgs_gage': 'int'})
        df = df.astype({'usgs_gage': 'str'})
    except KeyError:  # If there is no usgs_gage field
        pass

    # zfill HUC8 field.
    df['huc8'] = df['huc8'].apply(lambda x: x.zfill(8))

    if file_handle in ['stage_based_catfim_sites.csv']:
        df = df.astype({'nws_data_rfc_forecast_point': 'str'})
        df = df.astype({'nws_data_rfc_defined_fcst_point': 'str'})
        df = df.astype({'nws_data_riverpoint': 'str'})

    # Upload df to database.
    stripped_layer_name = file_handle.replace(".csv", "")
    table_name = "reference." + stripped_layer_name
    print("Loading data into DB...")

    print("Dataframe shape")
    print(df.shape[0])

    # Chunk load data into DB
    if file_handle in ['stage_based_catfim.csv']:
        print("Chunk loading...")
        # Create list of df chunks
        n = 1000  #chunk row size
        list_df = [df[i:i+n] for i in range(0,df.shape[0],n)]

        # Load the first chunk into the DB as a new table
        first_chunk_df = list_df[0]
        print(first_chunk_df.shape[0])
        #geometry = 'MULTIPOLYGON'
        first_chunk_df.to_sql(
            name=stripped_layer_name, 
            con=db_engine, 
            schema='reference',
            if_exists='replace', 
            index=False,
            dtype={'oid': sqlalchemy.types.Integer(), 
                   'geom': Geometry('MULTIPOLYGON',srid=3857)
                  }
        )
        
        # Load remaining chunks into newly created table
        
        for remaining_chunk in list_df[1:]:
            print(remaining_chunk.shape[0])
            remaining_chunk.to_sql(
                name=stripped_layer_name, 
                con=db_engine, 
                schema='reference',
                if_exists='append', 
                index=False,
                dtype={'oid': sqlalchemy.types.Integer(),
                       'geom': Geometry('MULTIPOLYGON',srid=3857)
                      }
            )
        
        
        
    else:
        geometry = 'POINT'
        df.to_sql(
            name=stripped_layer_name, 
            con=db_engine, 
            schema='reference',
            if_exists='replace', 
            index=False,
            dtype={'oid': sqlalchemy.types.Integer(),
                   'geom': Geometry(geometry,srid=3857)
                  }
        )
        
        
    print("Creating index...")
    with db_engine.connect() as conn:
            result = conn.execute(text(f'CREATE INDEX ON reference.{stripped_layer_name} USING GIST (geom);'))
            

In [None]:
db_type = "egis"
db_engine = get_db_engine(db_type)
print(db_engine)


In [None]:
s3 = boto3.client('s3')
local_download_parent_directory = f'brad_data/qc_fim_data'
local_download_path = os.path.join(local_download_parent_directory, f'{file_handle}')
filename = f"/temp/catfim_library_exploded.gpkg"
s3.download_file(FIM_BUCKET, filename, local_download_path)

<h2>10 - UPDATE FLOW-BASED CATFIM DATA IN DB</h2>

In [None]:
import os
from helper_functions.shared_functions import *
import boto3
import geopandas as gpd
import pandas as pd

# os.environ['EGIS_DB_HOST'] = ''  #TI DB

db_type = "egis"
db_engine = get_db_engine(db_type)
print(db_engine)

s3 = boto3.client('s3')

# Define bucket and parent directories.
bucket = "hydrovis-ti-deployment-us-east-1"
parent_directory = "qc_fim_data"
local_download_parent_directory = f'brad_data/qc_fim_data'

file_handles = ['flow_based_catfim_sites.csv', 'catfim_library_dissolved_flow_based.csv']

for file_handle in file_handles:
    # Define path to file to download and its local download path, the download.
    filename = f"{FIM_ROOT_DPATH}/qa_datasets/{file_handle}"
    print(filename)
    local_download_path = os.path.join(local_download_parent_directory, f'{file_handle}')
    print(f"--> Downloading {FIM_BUCKET}/{filename} to {local_download_path}")
    #s3.download_file(FIM_BUCKET, filename, local_download_path)
    
    #  -- Open file and reformat -- #
    print("Reading file...")
    df = pd.read_csv(local_download_path)
    print("File read.")
    # Rename headers.
    df = df.rename(columns={'Unnamed: 0': 'oid', 'geometry': 'geom', 'huc':'huc8'})
        
    if file_handle == 'catfim_library_dissolved_flow_based.csv':
        file_handle = 'flow_based_catfim.csv'
        
    # Convert all field names to lowercase (needed for ArcGIS Pro).
    df.columns= df.columns.str.lower()

    # Remove sites that are in derived.ahps_restricted_sites
    restricted_sites_df = get_db_values("derived.ahps_restricted_sites", ["*"])
    restricted_dict = restricted_sites_df.to_dict('records')

    # Change 'mapped' to 'no' if sites are present in restricted_sites_df
    for site in restricted_dict:
        nws_lid = site['nws_lid'].lower()
        print(nws_lid)
        if "sites" in file_handle:
            #print(True)
            #print(nws_lid)
            df.loc[df.ahps_lid==nws_lid, 'mapped'] = 'no'
            df.loc[df.ahps_lid==nws_lid, 'status'] = site['restricted_reason']
            #print(df.loc[df.ahps_lid==nws_lid]['status'])
        else:
            df.loc[df.ahps_lid==nws_lid, 'viz'] = 'no'
            df = df[df['viz']=='yes']
    
    # Enforce data types on df before loading in DB (TODO: need to create special cases for each layer).
    df = df.astype({'huc8': 'str'})
    df = df.fillna(0)
    try:
        df = df.astype({'feature_id': 'int'})
        df = df.astype({'feature_id': 'str'})
    except KeyError:  # If there is no feature_id field
        pass
    try:
        df = df.astype({'nwm_seg': 'int'})
        df = df.astype({'nwm_seg': 'str'})
    except KeyError:  # If there is no nwm_seg field
        pass
    try:
        df = df.astype({'usgs_gage': 'int'})
        df = df.astype({'usgs_gage': 'str'})
    except KeyError:  # If there is no usgs_gage field
        pass
        
    # zfill HUC8 field.
    df['huc8'] = df['huc8'].apply(lambda x: x.zfill(8))
    
    if file_handle in ['flow_based_catfim_sites.csv']:
        df = df.astype({'nws_data_rfc_forecast_point': 'str'})
        df = df.astype({'nws_data_rfc_defined_fcst_point': 'str'})
        df = df.astype({'nws_data_riverpoint': 'str'})
        
    # Upload df to database.
    stripped_layer_name = file_handle.replace(".csv", "")
    table_name = "reference." + stripped_layer_name
    print("Loading data into DB...")
    
    print("Dataframe shape")
    print(df.shape[0])
    
    # Chunk load data into DB
    if file_handle in ['flow_based_catfim.csv']:
        print("Chunk loading...")
        # Create list of df chunks
        n = 1000  #chunk row size
        list_df = [df[i:i+n] for i in range(0,df.shape[0],n)]
        
        # Load the first chunk into the DB as a new table
        first_chunk_df = list_df[0]
        print(first_chunk_df.shape[0])
        #geometry = 'POLYGON'
        
        df.to_sql(
            name=stripped_layer_name, 
            con=db_engine, 
            schema='reference',
            if_exists='replace', 
            index=False,
            dtype={'oid': sqlalchemy.types.Integer(),
                   'geom': Geometry('MULTIPOLYGON',srid=3857)
                  }
        )
        
        # Load remaining chunks into newly created table
        for remaining_chunk in list_df[1:]:
            print(remaining_chunk.shape[0])
            
            remaining_chunk.to_sql(
                        name=stripped_layer_name, 
                        con=db_engine, 
                        schema='reference',
                        if_exists='append', 
                        index=False,
                        dtype={'oid': sqlalchemy.types.Integer(),
                               'geom': Geometry('MULTIPOLYGON',srid=3857)
                              }
                    )
            
        
    else:
        geometry = 'POINT'
        df.to_sql(
            name=stripped_layer_name, 
            con=db_engine, 
            schema='reference',
            if_exists='replace', 
            index=False,
            dtype={'oid': sqlalchemy.types.Integer(),
                   'geom': Geometry(geometry,srid=3857)
                  }
        )

<h2>11 - UPDATE RAS2FIM DATA IN DB</h2>

Update from Tyler in early 2024: This process will need to be revisited, as Rob Hannah was working on updates to the Ras2FIM data model to sync up with our database. Brad and Corey were on point for this, so proper attention / planning will need to happen to mitigate the knowledge transfer loss / properly test any new updates.

In [None]:
sql = """
ALTER TABLE ras2fim.geocurves ADD COLUMN previous_stage_ft double precision;
ALTER TABLE ras2fim.geocurves ADD COLUMN previous_stage_m double precision;
ALTER TABLE ras2fim.geocurves ADD COLUMN previous_discharge_cfs double precision;
ALTER TABLE ras2fim.geocurves ADD COLUMN previous_discharge_cms double precision
"""

sql = """
WITH lagged as (SELECT 
    feature_id,
    (lag(stage_m, 1) OVER (PARTITION BY feature_id ORDER by stage_m)) as previous_stage_m,
    (lag(stage_ft, 1) OVER (PARTITION BY feature_id ORDER by stage_ft)) as previous_stage_ft,
    (lag(discharge_cfs, 1) OVER (PARTITION BY feature_id ORDER by discharge_cfs)) as previous_discharge_cfs,
    (lag(discharge_cms, 1) OVER (PARTITION BY feature_id ORDER by discharge_cms)) as previous_discharge_cms
FROM ras2fim.geocurves)

UPDATE ras2fim.geocurves gc
SET previous_stage_ft = lagged.previous_stage_ft,
    previous_stage_m = lagged.previous_stage_m,
    previous_discharge_cfs = lagged.previous_discharge_cfs,
    previous_discharge_cms = lagged.previous_discharge_cms
FROM lagged
WHERE gc.feature_id = lagged.feature_id and gc.stage_ft = lagged.stage_ft;
"""

sql = """
SELECT
    feature_id,
    max(discharge_cfs) as max_rc_discharge_cfs,
    max(stage_ft) as max_rc_stage_ft,
    max(discharge_cms) as max_rc_discharge_cms,
    max(stage_m) as max_rc_stage_m
INTO ras2fim.max_geocurves
FROM ras2fim.geocurves
"""

<h2>12 - Clear the HAND Cache</h2>

In [None]:
sql = """
TRUNCATE TABLE fim_cache.hand_hydrotable_cached;
TRUNCATE TABLE fim_cache.hand_hydrotable_cached_max;
TRUNCATE TABLE fim_cache.hand_hydrotable_cached_geo;
TRUNCATE TABLE fim_cache.hand_hydrotable_cached_zero_stage;
"""
run_sql_in_db(sql)