In [1]:
from pathlib import Path
from dotenv import load_dotenv, find_dotenv
import pandas as pd
import geopandas as gpd
from datetime import date, datetime, timedelta
import json
from shapely.geometry import box, mapping
from geocube.api.core import make_geocube
from geocube.rasterize import rasterize_points_griddata
from functools import partial
import os
from sqlalchemy import create_engine
import pandas.io.sql as psql
import xarray as xr
%load_ext autoreload
%autoreload 2

dotenv_path = Path('.env') # Load environment file
load_dotenv(dotenv_path=dotenv_path) # Loads environment variables from .env file inside the project
print(f"Current environment located at: {find_dotenv()}")

path_to_root = Path(os.path.dirname(find_dotenv())) # Path to root of project
path_to_wse = Path(path_to_root, "data", "wse",) # Path to wse data
path_to_collateral = Path(path_to_root, "data", "collateral") # Path to collateral data

if not path_to_wse.exists(): # Create directories if they don't exist
    path_to_wse.mkdir() # Create wse directory
if not path_to_collateral.exists(): # Create directories if they don't exist
    path_to_collateral.mkdir() # Create collateral directory

assert( path_to_root.exists() ) # Check that path to root exists
assert( path_to_wse.exists() ) # Check that path to wse exists
assert( path_to_collateral.exists() ) # Check that path to collateral exists


Current environment located at: z:\Python\smartport\smartport-master\.env


In [2]:
# Set some default time variables for reference
today = date.today(); current = today - timedelta(days = 1); yesterday = today - timedelta(days =2) # Set today, current, and yesterday variables

def wse_preprocess(
                min_date: str, # Defines min_date variable, needs to be passed in %Y-%m-%d format
                max_date: str, # Defines max_date variable, needs to be passed in %Y-%m-%d format
                resolution: int, # Defines resolution variable
                dbname: str = os.environ.get("POSTGRES_DATABASE"),  # Defines database name variable by retrieving value from environment variable
                user_db: str = os.environ.get("POSTGRES_USER"), # Defines user variable by retrieving value from environment variable
                password: str = os.environ.get("POSTGRES_PASSWORD"), # Defines password variable by retrieving value from environment variable
                port: str = os.environ.get("POSTGRES_PORT"), # Defines port variable by retrieving value from environment variable
                host: str = os.environ.get("POSTGRES_HOST"), # Defines host variable by retrieving value from environment variable
                ) -> tuple:
    """Retrieves water surface elevation data from PostgreSQL database and generates list of time steps and dictionary of time ranges."""

    #Set time query
    start = (pd.Timestamp(min_date) - timedelta(days = 1)).strftime('%Y-%m-%d') # Sets start variable as min_date - 3 hours
    end = (pd.Timestamp(max_date) + timedelta(days = 2)).strftime('%Y-%m-%d') # Converts max_date from datetime object to a string

    # Create connection
    engine = create_engine(f"postgresql://{user_db}:{password}@{host}:{port}/{dbname}") # Creates an engine connection with PostgreSQL using the variables above

    # Retrieves data from rg_wse_3h_subset between min_date and max_date 
    sql = f"SELECT * FROM rg_wse_silver_3h_subset WHERE time<'{end}'and time>'{start}'"# and sid in ('rg_01120','rg_01220')" # Second condition filtera data to specific stations 
    wse_df = psql.read_sql(sql, engine) # Executes SQL query and stores result as dataframe
    wse_df_types = wse_df.dtypes # Gets data types of columns in dataframe
    wse_df = wse_df.rename(columns={'long_g':'lon_g'}).sort_values('time').reset_index(drop=True) # Sorts values in dataframe by time column

    #Create list of time steps
    wse_period_range = wse_df['time'].drop_duplicates().values.tolist() # Generates list of unique time values from dataframe

    #Convert POSIX time to datetime then string
    def format_time(time): # Defines function to convert POSIX time to datetime/string format
        format = pd.to_datetime(time) # Converts POSIX time to datetime type
        return format.strftime('%Y-%m-%d %H:%M:%S') # Converts datetime type to string

    # Format time variable as datetime dtype and drop unnecessary timesteps
    wse_period_list = list((map(lambda t: format_time(t), wse_period_range))) # Generates list of strings from datetime objects
    wse_period_list = wse_period_list[6:-7] # Removes first 6 and last 7 timesteps from list

    # Collapse start and end time to generate dictionary of time ranges
    period_start = pd.Timestamp(start) + timedelta(days = 1) # Sets period_start variable as start date + 1 day
    period_end = pd.Timestamp(end) - timedelta(days = 2) # Sets period_end variable as end date - 2 days
    
    # Create dictionary of time ranges
    day_list = [] # Initialize day_list variable as empty list
    period_dict = {} # Initialize period_dict variable as empty dictionary
    day_range = pd.date_range(start=period_start, end=period_end, freq="D") # Generates list of days between period_start and period_end

    for d in day_range: # Loops over day_range
        day_list.append('y' + d.strftime('%Y') + '_d' + d.strftime('%j')) # Appends day_list with string of year and day of year
        previous_step = d - timedelta(hours = 3) # Sets previous_step variable as d - 3 hours
        next_step = d + timedelta(days = 1) # Sets next_step variable as d + 1 day
        period_dict[d.strftime('y%Y_d%j')] = [] # Sets key as string of year and day of year and value as empty list
        range = pd.date_range(start=previous_step, end=next_step, freq='3H') # Generates list of hours between previous_step and next_step
        for t in range: # Loops over range
            stamp = t.strftime('%Y%m%dT%H%M') # Sets stamp variable as string of year, month, day, hour, and minute
            day = t.strftime('%j') # Sets day variable as string of day of year
            filename_template = "wse_{resolution}m_{stamp}_y{t.year}_d{day}.nc" # Sets filename_template variable as string with resolution, stamp, year, and day
            ind_period = filename_template.format(resolution=resolution, stamp=stamp, t=t, day=day) # Sets ind_period variable as filename_template with resolution, stamp, t, and day
            filename = path_to_wse.joinpath(ind_period) # Sets filename variable as path_to_wse joined with ind_period
            abs_path = str(filename.absolute()) # Sets abs_path variable as string of absolute path of filename
            period_dict[d.strftime('y%Y_d%j')].append(abs_path) # Appends period_dict with key and value
            
    return resolution, wse_df, wse_period_list, day_list, period_dict

def wse_interp(
            time_query: datetime, 
            resolution: int,
            wse_df: pd.DataFrame,
                ) -> None:
    """Interpolates water surface elevation data to a regular grid and exports as .netcdf file."""
        
    # Pass period selection to create subset dataframe 
    period_select = str(time_query) # Convert time_query to string
    wse_slice_df = wse_df.loc[wse_df['time'] == period_select] # Create subset dataframe of WSE values for selected period
    
    # Remove gage at Cape Giradeau to fix conflict with river mile join
    bad_gage = 'rg_CE401278' # Define gage to be removed
    wse_slice_df = wse_slice_df.loc[wse_slice_df['sid'] != bad_gage] # Remove gage from subset dataframe
    
    # Import river mile .geojson into gdf and drop matching columns to simplify following join
    url = 'https://raw.githubusercontent.com/hbienn/smartport_wse/main/'; rm_formatted = f'{url}/mr_rm_banks.geojson' # Define url for river mile .geojson
    rm_gdf = gpd.read_file(rm_formatted, crs='epsg:4326') # Import river mile .geojson into gdf
    rm_gdf = rm_gdf.drop(columns=['OBJECTID', 'ord', 'sid', 'wse', 'time', 'lat_g', 'lon_g', 'bank']) # Drop matching columns to simplify following join
    
    # Round river mile to 1 decimal place to account for any floating precision errors
    rm_gdf = rm_gdf.round({'mile':1}).sort_values('mile').reindex() # Round river mile to 1 decimal place then sort and reindex
    
    # Merge WSE df with river mile gdf using mile as key
    wse_gdf = rm_gdf.merge(wse_slice_df, how='outer', on='mile') # Merge WSE df with river mile gdf using mile as key
    cols = wse_gdf.columns.tolist() # Create list of column names
    cols = ['mile', 'sid', 'z', 'time','lon', 'lat', 'lat_g', 'lon_g', 'geometry'] # Reorder column names
    wse_gdf = wse_gdf[cols].sort_values('mile') # Reassign column names to gdf and sort values by river mile
    
    # Establishes a time dimension for each point that is persistent through the downstream interpolation.
    period = pd.to_datetime(time_query); year = period.strftime("%Y"); day = period.strftime("%j") # Extract year and day of year from time_query
    period = int(round(period.timestamp())*1000000000) # Convert POSIX time to integer
    wse_gdf.insert(4,'period', period) # Insert period column into gdf
    
    # Interpolate missing WSE values
    wse_gdf = wse_gdf.loc[wse_gdf['mile'] <= 1000] # Subset gdf to limit spatial domain to south of RM 1000 in the vicinity of Cape Giradeau, MO
    #wse_gdf = wse_gdf.dissolve(by='mile', aggfunc='mean')
    wse_gdf = wse_gdf.sort_values('mile') # Sort values by river mile
    wse_gdf['z'] = wse_gdf['z'].interpolate(method='linear', limit_direction = 'both') # Interpolate missing WSE values based on a linear relationship between river mile and known WSE values.
    
    # User modified variables
    bounding_box = json.dumps(mapping(box(-91.7,28.9,-89,38.8))); projection = 'EPSG:26915'  # Bounding box for Mississippi River and PCS: NAD83 UTM Zone 15N
    
    # Interpolate WSE values to a regular grid
    wse_xr = make_geocube(
                        vector_data = wse_gdf,
                        measurements = ['z',],
                        #datetime_measurements=['period'],
                        output_crs = projection,
                        resolution = (resolution, resolution),
                        geom = bounding_box,
                        #interpolate_na_method='linear',
                        rasterize_function=partial(rasterize_points_griddata, method='linear', filter_nan = True)
                          )
    # Expand dimensions and populate with the POSIX time value variable previously assigned 
    period = int(wse_gdf['period'].mean()) # Calculate mean of POSIX time values
    wse_xr = wse_xr.expand_dims('time') # Expand dimensions to incorporate time
    arr = wse_xr['time'].to_numpy(); arr[0,] = period # Assign POSIX time value to array
    wse_xr['time'] = arr; wse_xr['time'] = pd.to_datetime(wse_xr['time'],utc=True) # Assign POSIX time value to xarray and convert to datetime64[ns] dtype
    period_label = pd.to_datetime(time_query).strftime('%Y%m%dT%H%M') # Create period label for filename

    # Clip surface to extent of Mississippi River
    url = 'https://raw.githubusercontent.com/hbienn/smartport_wse/main/'; mr_formatted = f'{url}/generalized_nhdarea_stlouistogulf_utm.geojson'
    mr = gpd.read_file(mr_formatted, crs=projection) # Import Mississippi River .geojson into gdf
    wse_xr = wse_xr.rio.clip(mr.geometry, mr.crs, drop=True, invert=False) # Clip surface to extent of Mississippi River
    
    # Export as .netcdf 
    filename_template = "wse_{resolution}m_{stamp}_y{year}_d{day}.nc" # Make filename template by filling in the required data
    ind_period = filename_template.format(resolution=resolution, stamp=period_label, year=year, day=day) # Execute format on the filename_template 
    filename = path_to_wse.joinpath(ind_period) # Join path to output directory with filename
    abs_path = str(filename.absolute()) # Convert path to string
    wse_xr.to_netcdf(filename) # Export as .netcdf
        
    return None

def wse(
        min_date: str = yesterday.strftime('%Y-%m-%d'), # Defines min_date variable, defaults as yesterday's date (with respect to data availability ~ 24 hours lag)
        max_date: str = current.strftime('%Y-%m-%d'), # Defines max_date variable, defaults as current date (with respect to data availability ~ 24 hours lag) 
        resolution: int = 500, # Defines resolution variable, defaults as 500
            ) -> None:
    """Runs wse_interp function using output from wse_preprocess function."""
    
    # Call wse_preprocess to preprocess the data
    resolution, wse_df, wse_period_list, day_list, period_dict = wse_preprocess(min_date = min_date, max_date = max_date, resolution = resolution)
    
    # Loop over wse_period_list and call wse_interp on each item
    for t in wse_period_list: # Loops over wse_period_list
        wse_interp(time_query = t, resolution = resolution, wse_df = wse_df) # Run wse_interp function
        '''if t != wse_period_list[-1]: # If t is not the last item in wse_period_list
            wse_interp(time_query = t, resolution = resolution, wse_df = wse_df) # Run wse_interp function
        if t == wse_period_list[-1]: # If t is the last item in wse_period_list
            break # Break loop'''
    
    # Loop over day_list and create a netCDF file for each day
    for d in day_list: # Loops over day_list
        ds = xr.open_mfdataset(period_dict[d], combine = 'by_coords') # Open multiple files as a single dataset
        t = pd.to_datetime(ds['time'].values[2]).strftime('%Y%m%d') # Extract date from time variable
        combined_output_name = f'wse_{resolution}m_{t}_combine.nc' # Define output name
        combined_filename = path_to_wse.joinpath(combined_output_name) # Join path to output directory with filename
        ds.to_netcdf(combined_filename) # Export as .netcdf

    return None

In [3]:
wse(min_date='2023-03-01', max_date='2023-03-07', resolution=500)