# Workflow Management

Development and testing of the workflow management classes and models

In [89]:
import json
from pathlib import Path
from typing import List

from dataclasses import dataclass, field, asdict, replace
from dataclasses import fields as dcf

import pint
import click
import petl as etl
import marshmallow_dataclass
from marshmallow_dataclass import class_schema
from marshmallow import Schema, fields, EXCLUDE, INCLUDE, ValidationError

In [2]:
USE_ESRI = True

## Models

In [3]:
@dataclass
class RainfallRaster:
    """store a reference to a NOAA Rainfall raster
    """

    path: str = None
    freq: int = None
    ext: str = None

RainfallRasterSchema = class_schema(RainfallRaster)


class RainfallRasterConfig():
    """store rainfall download metadata with methods for portability
    """

    root: str = None
    rasters: List[RainfallRaster] = field(default_factory=[])

    # def __init__(self, path):
    #     self.path = Path(path)
    #     self.lookup_table = []

    # def as_dict(self):
    #     return {
    #         'path': str(self.path),
    #         'lookup_table': self.lookup_table
    #     }

    # def as_json(self, out_path):
    #     with open(out_path, 'w') as fp:
    #         json.dump(self.as_dict(), fp)

    # def as_csv(self, out_path):
    #     etl.tocsv(etl.fromdicts(self.lookup_table), out_path)

RainfallRasterConfigSchema = class_schema(RainfallRasterConfig)

@dataclass
class Precips:
    """Precipitation frequencies
    """
    f1: float = None
    f2: float = None
    f5: float = None
    f10: float = None
    f25: float = None
    f50: float = None
    f100: float = None
    f200: float = None
    f500: float = None
    f1000: float = None

PrecipsSchema = marshmallow_dataclass.class_schema(Precips)


@dataclass
class NaaccPoint:
    """NAACC model for a single culvert.
    
    NOTE: this is only a subset of available NAACC fields
    """

    Survey_Id: str = None # 'field_short': 'Survey_ID'
    Naacc_Culvert_Id: str = None # 'field_short': 'NAACC_ID'

    Number_Of_Culverts: int = 1 # 'field_short': 'Flags'

    Road: str = None # 'field_short': 'Rd_Name'
    Material: str = None # 'field_short': 'Culv_Mat'
    Inlet_Type: str = None # 'field_short': 'In_Type'
    Inlet_Structure_Type: str = None # 'field_short': 'In_Shape'

    Inlet_Width: float = None # 'field_short': 'In_A'
    Inlet_Height: float = None # 'field_short': 'In_B'
    Road_Fill_Height: float = None # 'field_short': 'HW'
    Slope_Percent: float = None # 'field_short': 'Slope'
    Crossing_Structure_Length: float = None # 'field_short': 'Length'
    Outlet_Structure_Type: float = None # 'field_short': 'Out_Shape'
    Outlet_Width: float = None # 'field_short': 'Out_A'
    Outlet_Height: float = None # 'field_short': 'Out_B'
    Crossing_Type: float = None # 'field_short': 'Crossing_Type'
    Crossing_Comment: float = None # 'field_short': 'Comments'

    GIS_Latitude: float = None # 'field_short': 'Lat'
    GIS_Longitude: float = None # 'field_short': 'Long'

NaaccPointSchema = marshmallow_dataclass.class_schema(NaaccPoint)


@dataclass
class Basin:
    """Base model for characteristics of a single basin, including
    characteristics of the outlet (point) and catchment (polygon) used for 
    analysis
    """
    # unique id field, derived from the basin outlet point; AKA the 
    # pour_point_field. For NAACC-based culvert modeling, this is the
    # NAACC Naacc_Culvert_Id field
    cid: str = None
    # group id field. non-unique ID field that indicates groups of related
    # outlets. Used primarily for NAACC-based culvert modeling, this is the
    # NAACC Survey_Id field
    gid: str = None

    # characteristics used for calculating peak flow
    area_sqkm: float = None# <area of inlet's catchment in square km>
    avg_slope_pct: float = None # <average slope of DEM in catchment>
    avg_cn: float = None # <average curve number in the catchment>
    max_fl: float = None # <maximum flow length in the catchment>
    precip_table: Precips = Precips # <basin-specific precipitation estimates>

    # geometries
    inlet_geom: str = None
    basin_geom: str = None
    
    # for recording the location of intermediate outputs
    basin_polygon_filepath: str = None
    basin_raster_filepath: str = None
    

BasinSchema = marshmallow_dataclass.class_schema(Basin)


class RainfallRasterConfig():
    """store rainfall download metadata with methods for portability
    """

    def __init__(self, path):
        self.path = Path(path)
        self.lookup_table = []

    def as_dict(self):
        return {
            'path': str(self.path),
            'lookup_table': self.lookup_table
        }

    def as_json(self, out_path):
        with open(out_path, 'w') as fp:
            json.dump(self.as_dict(), fp)

    def as_csv(self, out_path):
        etl.tocsv(etl.fromdicts(self.lookup_table), out_path)


@dataclass
class WorkflowConfig:
    """Store all parameters required for any of our model runs.
    """

    # directories
    work_dir: str = None

    # -----------------------------
    # input points (culverts or catch-basins)

    points_filepath: str = None
    points_id_fieldname: str = None
    is_naacc: bool = False
    
    # -----------------------------
    # input landscape rasters

    raster_dem_filepath: str = None
    raster_flowdir_filepath: str = None
    raster_slope_filepath: str = None
    raster_curvenumber_filepath: str = None
    raster_watershed_filepath: str = None

    # --------------------------
    # input rainfall

    precip_src_config_filepath: str = None
    precip_noaa_csv_filepath: str = None

    # --------------------------
    # outputs
    output_points_filepath: str = None
    output_basins_filepath: str = None

    # --------------------------
    # models for intermediate data

    culverts: List[NaaccPoint] = field(default_factory=list)
    basins: List[Basin] = field(default_factory=list)

    # --------------------------
    # analysis parameters
    
    area_conv_factor: float = 0.00000009290304
    leng_conv_factor: float = 1
    basins_simplify: bool = False

WorkflowConfigSchema = marshmallow_dataclass.class_schema(WorkflowConfig)

In [30]:
class PeakFlow01Schema(Schema):

    points_filepath = fields.Str(required=True) # inlets
    points_id_fieldname = fields.Str(required=True) # pour_point_field
    raster_curvenumber_filepath = fields.Str(required=True) # cn_raster
    precip_src_config_filepath = fields.Str(required=True) # precip_data
    raster_dem_filepath = fields.Str(required=True)
    basins_in_series = fields.Bool(default=True)

    output_points_filepath = fields.Str(required=True) # output
    output_basins_filepath = fields.Str() # output_catchments
    
    class Meta:
        unknown = EXCLUDE    

## Managers

In [31]:
class WorkflowManager():
    """Base class for all workflows. Provides methods for storing and 
    persisting results from various workflow components.
    """

    def __init__(
        self, 
        use_esri=USE_ESRI, 
        config_json_filepath=None, 
        rainfall_raster_json_filepath=None, 
        **kwargs
    ):

        # initialize an empty WorkflowConfig object with default values
        self.config = WorkflowConfig()
        #click.echo(self.config)

        # if any confile files provided, load here. This will replace the
        # config object entirely
        self.config_json_filepath = config_json_filepath
        self.rainfall_raster_json_filepath = rainfall_raster_json_filepath
        self.load()
        #click.echo(self.config)
        
        # regardless of what happens above, we have a config object. Now we
        # use the provided keyword arguments to update it, overriding any that
        # were provided in the JSON file.
        self.config = replace(self.config, **kwargs)
        # (individual workflows that subclass WorkflowManager handle whether or 
        # not the needed kwargs are actually present)
        #click.echo(self.config)

        # self.schema = WorkflowConfigSchema().load(**kwargs)

        self.rainfall_config = None
        
        self.using_esri = use_esri
        self.using_wbt = not use_esri

        self.units = pint.UnitRegistry()

    
    def save(self, config_json_filepath):
        """Save workflow config to JSON.

        Note that validation via WorkflowConfigSchema will only fail
        if our code is doing something wrong.
        """

        self.config_json_filepath = Path(config_json_filepath)

        c = WorkflowConfigSchema().dump(asdict(self.config))
        with open(config_json_filepath, 'w') as fp:
            json.dump(c, fp)


    def load(self, config_json_filepath=None, rainfall_raster_json_filepath=None):
        """load a workflow from a JSON file
        
        Note that validation via WorkflowConfigSchema will fail if the JSON has 
        been manually changed outside in a way that doesn't follow the schema
        (i.e., it has to serialize correctly to load)
        """
        click.echo("loading...")

        # ----------------------------------------------------------------------
        # Workflow Config

        # select the file path ref to load. defaults to arg, fallsback to 
        # instance variable

        cjf=None
        if config_json_filepath:
            cjf = config_json_filepath
            self.config_json_filepath = cjf
        elif self.config_json_filepath:
            cjf = self.config_json_filepath

        # reads from disk, validates, and stores
        if cjf:
            click.echo("Reading config from JSON file")
            with open(cjf) as fp:
                config_as_dict = json.load(fp)
            self.config = WorkflowConfigSchema().load(config_as_dict)

        # ----------------------------------------------------------------------
        # Rainfall Raster Config

        # select the file path ref to load. defaults to arg, fallsback to 
        # instance variable

        rrj = None
        if rainfall_raster_json_filepath:
            rrj = rainfall_raster_json_filepath
            self.rainfall_raster_json_filepath = rrj
        elif self.rainfall_raster_json_filepath:
            rrj = self.rainfall_raster_json_filepath

        # reads from disk, validates, and stores

        if rrj:
            click.echo("Reading rainfall config from JSON file")
            with open(rrj) as fp:
                rainfall_config_as_dict = json.load(fp) 
                self.rainfall_config = RainfallRasterConfigSchema().load(rainfall_config_as_dict)


In [32]:
class PeakFlowCore(WorkflowManager):

    def __init__(
        self, 
        rainfall_raster_json_filepath,
        save_config_json_filepath=None,
        **kwargs
    ):
        """Core Peak Flow workflow.

        :param save_config_json_filepath: save workflow config to a file, defaults to None
        :type save_config_json_filepath: str, optional
        :param kwargs: relevant properties in the WorkflowConfig object
        :type kwargs: kwargs, optional 
        """

        super().__init__(**kwargs)
        self.save_config_json_filepath = save_config_json_filepath
        self.load(rainfall_raster_json_filepath=rainfall_raster_json_filepath)
    
    def run_core_workflow(self):
        print("running Peak Flow analysis")
        
        as_d = WorkflowConfigSchema().dump(asdict(self.config))
        # print(as_d)

        # initialize the GP object with the config variables
        print("initialize the GP object with the config variables")

        # read in precipitation data
        print("read in precipitation data")
        
        # delineate watersheds
        print("delineate watersheds (as needed)")

        # derive data from catchments
        print("derive parameters from basins")

        # calculate peak flow (t of c and flow per return period)
        print("calculate peak flow")

        # save the config
        if self.save_config_json_filepath:
            self.save(self.save_config_json_filepath)
        
        return

In [49]:
class PeakFlow01(PeakFlowCore):
    """Peak flow calculator; derives needed rasters from the DEM.
    """
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.run_core_workflow()

## Test config i/o with the peak flow workflow manager

Some test kwargs. In practice these would come through from an ArcToolbox tool via a `sys.arg`.

In [50]:
test_kwargs={
    'rainfall_raster_json_filepath': r"C:\Users\chris\dev\drainage\drainit\tests\rainfall_config_example.json",
    'raster_watershed_filepath': 'raster_watershed_filepath', 
    'culverts': [], 
    'precip_noaa_csv_filepath': 'precip_noaa_csv_filepath', 
    'raster_curvenumber_filepath': 'raster_curvenumber_filepath', 
    'is_naacc': True, 
    'points_id_fieldname': 'points_id_fieldname', 
    'work_dir': 'work_dir', 
    'output_basins_filepath': 'output_basins_filepath', 
    'raster_flowdir_filepath': 'raster_flowdir_filepath', 
    'precip_src_config_filepath': 'precip_src_config_filepath', 
    'output_points_filepath': 'output_points_filepath', 
    'points_filepath': 'points_filepath', 
    'basins': [],
    'area_conv_factor': 9.290304e-08, 
    'leng_conv_factor': 100.0, 
    'basins_simplify': True, 
    'raster_dem_filepath': 'raster_dem_filepath', 
    'raster_slope_filepath': 'raster_slope_filepath'    
}

In practice we'd take the positional args that come through from the ArcToolbox tool python script a put them in the right kwargs to instantiate the class here:

In [51]:
p = PeakFlow01(**test_kwargs)

loading...
loading...
Reading rainfall config from JSON file
running Peak Flow analysis
initialize the GP object with the config variables
read in precipitation data
delineate watersheds (as needed)
derive parameters from basins
calculate peak flow


In [36]:
p.config

WorkflowConfig(work_dir='work_dir', points_filepath='points_filepath', points_id_fieldname='points_id_fieldname', is_naacc=True, raster_dem_filepath='raster_dem_filepath', raster_flowdir_filepath='raster_flowdir_filepath', raster_slope_filepath='raster_slope_filepath', raster_curvenumber_filepath='raster_curvenumber_filepath', raster_watershed_filepath='raster_watershed_filepath', precip_src_config_filepath='precip_src_config_filepath', precip_noaa_csv_filepath='precip_noaa_csv_filepath', output_points_filepath='output_points_filepath', output_basins_filepath='output_basins_filepath', culverts=[], basins=[], area_conv_factor=9.290304e-08, leng_conv_factor=100.0, basins_simplify=True)

In [37]:
p.rainfall_config

RainfallRasterConfig(root='D:\\Dropbox (CivicMapper)\\Projects\\202004-02 Cornell Modeling\\3 - Production\\tool outputs\\c19 tests\\noaa_rainfall_2', rasters=[RainfallRaster(path='orb1yr24ha.asc', freq=1, ext='.asc'), RainfallRaster(path='orb1yr24ha.prj', freq=1, ext='.prj'), RainfallRaster(path='orb1yr24ha.xml', freq=1, ext='.xml'), RainfallRaster(path='orb2yr24ha.asc', freq=2, ext='.asc'), RainfallRaster(path='orb2yr24ha.prj', freq=2, ext='.prj'), RainfallRaster(path='orb2yr24ha.xml', freq=2, ext='.xml'), RainfallRaster(path='orb5yr24ha.asc', freq=5, ext='.asc'), RainfallRaster(path='orb5yr24ha.prj', freq=5, ext='.prj'), RainfallRaster(path='orb5yr24ha.xml', freq=5, ext='.xml'), RainfallRaster(path='orb10yr24ha.asc', freq=10, ext='.asc'), RainfallRaster(path='orb10yr24ha.prj', freq=10, ext='.prj'), RainfallRaster(path='orb10yr24ha.xml', freq=10, ext='.xml'), RainfallRaster(path='orb25yr24ha.asc', freq=25, ext='.asc'), RainfallRaster(path='orb25yr24ha.prj', freq=25, ext='.prj'), Rain

In [38]:
p.save(r"C:\Users\chris\dev\drainage\drainit\tests\config_example.json")

### Workflow-specific validation

Passing all properties from the Workflow Config, validating only what is specified for the workflow-specific schema:

In [41]:
params = PeakFlow01Schema().load(asdict(p.config))
params

{'raster_curvenumber_filepath': 'raster_curvenumber_filepath',
 'points_id_fieldname': 'points_id_fieldname',
 'output_basins_filepath': 'output_basins_filepath',
 'precip_src_config_filepath': 'precip_src_config_filepath',
 'output_points_filepath': 'output_points_filepath',
 'points_filepath': 'points_filepath',
 'raster_dem_filepath': 'raster_dem_filepath'}

When passing not enough kwargs passed it, throws error:

In [46]:
try:
    params = PeakFlow01Schema().load({
        'raster_curvenumber_filepath': 'raster_curvenumber_filepath',
        'points_id_fieldname': 'points_id_fieldname',
        'output_basins_filepath': 'output_basins_filepath',
        'precip_src_config_filepath': 'precip_src_config_filepath',
        'output_points_filepath': 'output_points_filepath',
        'points_filepath': 'points_filepath'
        #'raster_dem_filepath': 'raster_dem_filepath'
    })
    params
except ValidationError as e:
    print("Missing parameters!\n", e)

Missing parameters!
 {'raster_dem_filepath': ['Missing data for required field.']}


Validation without deserialization

In [48]:
errors = PeakFlow01Schema().validate(asdict(p.config))
if errors:
    print("Invalide schema", errors)
else:
    print("Valid schema!")

Valid schema!


### Workflow methods

In [20]:
p.run_core_workflow()

running Peak Flow analysis
initialize the GP object with the config variables
read in precipitation data
delineate watersheds (as needed)
derive parameters from basins
calculate peak flow


In [71]:
@dataclass
class Tester:
    
    attr1: float
    attr2: float
        
    def adder(self):
        if all([
            self.attr1 is not None,
            self.attr2 is not None
        ]):
            return self.attr1 + self.attr2
        else:
            print("set the attrs first")
            
TesterSch = marshmallow_dataclass.class_schema(Tester)

In [75]:
x = TesterSch().load(dict(attr1=1, attr2=2))

3.0

In [66]:
x.adder()

set the attrs first


In [77]:
TesterSch().validate(dict(attr1=1))

{'attr2': ['Missing data for required field.']}

In [91]:
[f.name for f in dcf(NaaccPoint)]

['Survey_Id',
 'Naacc_Culvert_Id',
 'Number_Of_Culverts',
 'Road',
 'Material',
 'Inlet_Type',
 'Inlet_Structure_Type',
 'Inlet_Width',
 'Inlet_Height',
 'Road_Fill_Height',
 'Slope_Percent',
 'Crossing_Structure_Length',
 'Outlet_Structure_Type',
 'Outlet_Width',
 'Outlet_Height',
 'Crossing_Type',
 'Crossing_Comment',
 'GIS_Latitude',
 'GIS_Longitude']