# Combining Overland Flow and Simple Crop

## Running Overland Flow on its Own

The overland flow model routes water over a landscape and determines how much water infiltrates particular cells in response to rain events and elevation data. Rainfall in this model is assumed to occur instantaneously and rainfall events are assumed to be independent (so a large rainfall event on a previous day has no bearing on the present day).

In order to run the model we first need to import some source and sink types as well as classes for calling off to CLI models and building requests.

In [1]:
import itertools

import os.path
import netCDF4
from meillionen import FunctionModelCLI, \
    file_source, feather_source, feather_sink, \
    netcdf_sink, file_sink, parquet_sink, parquet_source
from meillionen.io import PandasLoader, PandasSaver
from meillionen.meillionen import FuncRequest
from prefect import task, Flow


BASE_DIR = '../../examples/crop-pipeline'

INPUT_DIR = os.path.join(BASE_DIR, 'workflows/inputs')
OUTPUT_DIR = os.path.join(BASE_DIR, 'workflows/outputs')

 We also need to create sources and sinks to describe our data

In [2]:
elevation = file_source(os.path.join(INPUT_DIR, "hugo_site.asc"))

weather = feather_source(os.path.join(INPUT_DIR, "rainfall.feather"))

swid = netcdf_sink({
    "type": "NetCDFResource",
    "path": os.path.join(OUTPUT_DIR, "swid.nc"),
    "variable": "soil_water_infiltration__depth",
    "data_type": "Float32",
    "slices": {}
})

and build a request to call our model with.

In [3]:
overlandflow_req = FuncRequest()
overlandflow_req.set_source('elevation', elevation)
overlandflow_req.set_source('weather', weather)
overlandflow_req.set_sink('soil_water_infiltration__depth', swid)

overlandflow = FunctionModelCLI.from_path(os.path.join(BASE_DIR, 'overlandflow/model.py'))

Then call the overland flow model with our request (which will create files on the file system)

In [4]:
overlandflow(overlandflow_req)

## Running Simple Crop on its Own

`simplecrop`  is a model of yearly crop growth that operates on the command line. When called in the current directory with no arguments it expects that there is an data folder to be present. It expects that the data folder contains five files

- `irrig.inp` - daily irrigation
- `plant.inp` - plant growth parameters for the simulation
- `simctrl.inp` - simulation reporting parameters
- `soil.inp` - soil characteristic parameters for the simulation
- `weather.inp` - daily weather data (with variables like maximum temperature, solar energy flux)

`simplecrop` also expects there to be an output folder. After the model has run it will populate the output folder with three files

- `plant.out`- daily plant characteristics
- `soil.out` - daily soil characteristics
- `wbal.out` - summary soil and plant statistics about simulation

In order to wrap this model in an interface that allows you to run the model without manually building those input files and manually converting the output files into a format conducive to analysis we need to have a construct the input files and parse the output files. Fortunately we have such a model wrapper already.

In [None]:
from meillionen import FunctionModelCLI, feather_sink, feather_source, file_sink
from meillionen.meillionen import FuncRequest
import pandas as pd

run_simple_crop = FunctionModelCLI.from_path('simplecrop')

In [None]:
daily = feather_source(os.path.join(BASE_DIR, 'simplecrop/data/daily.feather'))
yearly = feather_source(os.path.join(BASE_DIR, 'simplecrop/data/yearly.feather'))
plant = feather_sink(os.path.join(OUTPUT_DIR, 'plant.feather'))
soil = feather_sink(os.path.join(OUTPUT_DIR, 'soil.feather'))
tempdir = file_sink('tmp')

fr = FuncRequest()
fr.set_source('daily', daily)
fr.set_source('yearly', yearly)
fr.set_sink('plant', plant)
fr.set_sink('soil', soil)
fr.set_sink('tempdir', tempdir)

In [None]:
run_simple_crop(fr)

In [None]:
soil_df = pd.read_feather(os.path.join(OUTPUT_DIR, 'soil.feather'))
soil_df

## Using Prefect to Feed Overland Flow Results into Simple Crop

In [None]:
overlandflow = FunctionModelCLI.from_path(os.path.join(BASE_DIR, 'overlandflow/model.py'))
simplecrop = FunctionModelCLI.from_path('simplecrop')


@task()
def run_overland_flow():
    elevation = file_source(os.path.join(INPUT_DIR, "hugo_site.asc"))
    weather = feather_source(os.path.join(BASE_DIR, "overlandflow/rainfall.feather"))
    swid = netcdf_sink({
        "type": "NetCDFResource",
        "path": f"{OUTPUT_DIR}/swid.nc",
        "variable": "soil_water_infiltration__depth",
        "data_type": "Float32",
        "slices": {}
    })

    overlandflow_req = FuncRequest()
    overlandflow_req.set_source('elevation', elevation)
    overlandflow_req.set_source('weather', weather)
    overlandflow_req.set_sink('soil_water_infiltration__depth', swid)

    overlandflow(overlandflow_req)

    return swid


@task()
def chunkify_soil_water_infiltration_depth(swid):
    swid = swid.to_dict()
    ds = netCDF4.Dataset(swid['path'], 'r')
    variable = ds[swid['variable']]
    x_d, y_d, time_d = variable.get_dims()
    return [{'soil_water_infiltration__depth': variable[x, y, :], 'x': x, 'y': y} for (x,y) in itertools.product(range(10,12), range(21,23))]


@task()
def convert_swid_chunk_to_simplecrop(c):
    return {'daily': daily, 'yearly': yearly, **c}


@task()
def simplecrop_process_chunk(data):
    yearly = data['yearly']
    daily = data['daily']
    soil_water_infiltration__depth = data['soil_water_infiltration__depth']
    x = data['x']
    y = data['y']
    d = PandasLoader.load(daily)
    d['rainfall'] = soil_water_infiltration__depth
    d_resource = parquet_source(os.path.join(OUTPUT_DIR, f'outputs/daily/{x}/{y}.feather'))
    PandasSaver.save(d_resource, d)

    plant = parquet_sink(os.path.join(OUTPUT_DIR, f'plant/{x}/{y}/data.parquet'))
    soil = parquet_sink(os.path.join(OUTPUT_DIR, f'soil/{x}/{y}/data.parquet'))
    tempdir = file_sink(os.path.join(OUTPUT_DIR, f'temp/{x}/{y}'))
    sc_req = FuncRequest()
    sc_req.set_source('daily', d_resource)
    sc_req.set_source('yearly', yearly)
    sc_req.set_sink('plant', plant)
    sc_req.set_sink('soil', soil)
    sc_req.set_sink('tempdir', tempdir)

    simplecrop(sc_req)


daily = feather_source(os.path.join(BASE_DIR, 'simplecrop/data/daily.feather'))
yearly = feather_source(os.path.join(BASE_DIR, 'simplecrop/data/yearly.feather'))


with Flow('crop_pipeline') as flow:
    overland_flow = run_overland_flow()
    surface_water_depth_chunks = chunkify_soil_water_infiltration_depth(overland_flow)
    simplecrop_chunks = convert_swid_chunk_to_simplecrop.map(surface_water_depth_chunks)
    yield_chunks = simplecrop_process_chunk.map(simplecrop_chunks)

flow.run()
