# Deploying a model at scale

In this notebook we will be taking a pre-trained model and deploying it using the DL Tasks and Scenes APIs. The model takes an interpolated cloud-free aggregated timeseries of Sentinel-2 L2A NDVI and predicts whether or not a given field is corn. The model was trained using CDL data for 2019 and should be used only as a reasonable stand-in for the types of models you may be interested in deploying on the DL platform.

We will use the following DL API's in this exercise:
- [Scenes](https://docs.descarteslabs.com/descarteslabs/scenes/readme.html) - Query for and access imagery over our AOI
- [Storage](https://docs.descarteslabs.com/descarteslabs/client/services/storage/readme.html) - Store our model on the DL backend/cloud data store
- [Tasks](https://docs.descarteslabs.com/descarteslabs/client/services/tasks/readme.html) - Deploy data pipeline and model code on dsitributed DL backend

We will use the following external Python packages:
- [geopandas](https://geopandas.org/en/stable/docs.html) - Import, transform, and query our reference dataset for Iowa agricultural fields
- [scipy.interpolate](https://docs.scipy.org/doc/scipy/reference/interpolate.html) - Interpolate our NDVI tseries onto a regular temporal grid
- [matplotlib.pyplot](https://matplotlib.org/3.5.0/api/_as_gen/matplotlib.pyplot.html) - Plot imagery
- [numpy](https://numpy.org/doc/stable/index.html) - Array/imagery operations and manipulations
- [datetime](https://docs.python.org/3/library/datetime.html) - Create date ranges and timestamps for interpolation
- [joblib](https://joblib.readthedocs.io/en/latest/) - Save and load model
- [tqdm](https://tqdm.github.io/) - Fancy progress bars

In [None]:
import descarteslabs as dl
from descarteslabs.client.services.tasks import as_completed
import geopandas as gpd
from scipy.interpolate import interp1d
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta
from joblib import dump, load
from tqdm.notebook import tqdm

We start by writing a function that combines a series of data access steps (as in the previous notebook) and data manipulations/refinements. We use `dl.scenes.search` to find imagery over an input geometry and date range. Then we pull the red, nir, and cloud mask bands from that imagery into a single stack aranged by day. Similar to the last notebook we then mask out any cloudy pixels and pixels outside the input geometry. This results in an array of shape *(time/days, xs, ys)*. NDVI is then spatially aggregated over the input geometry leaving a single time series of shape *(time/days, NDVI value)*. This time series need to be interpolated onto a standard temporal grid so that it can match the input data for the model. This interpolation is done by converting the dates of each image into a timestamp and interpolating onto a new grid of timestamp values between the input start and end dates. The interpolation is then done by `scipy.interpolate.interp1d`. The interpolated time series and corresponding dates are then returned.

In [None]:
def get_ndvi_tseries(
    geom, 
    start_date="2019-04-01", 
    end_date="2019-10-01"
):
    scenes, ctx = dl.scenes.search(
        geom,
        products="esa:sentinel-2:l2a:v1",
        start_datetime=start_date,
        end_datetime=end_date,
        limit=None
    )
    print(f"Found {len(scenes)} scenes for specified geometry")
    
    print(f"Pulling raster data from DL Catalog")
    stack = scenes.stack(
        ["red", "nir", "cloud_mask"],
        ctx,
        flatten=lambda x: x.properties.date.strftime("%Y-%m-%d"),
        scaling="physical"
    )
    
    print(f"Masking out clouds")
    cmask = np.repeat(
        (stack[:,-1].data==1)[:, np.newaxis],
        stack.shape[1],
        axis=1
    )
    
    stack.mask = (stack.mask) | cmask
    
    print(f"Computing NDVI")
    ndvi = (stack[:,1] - stack[:,0])/(stack[:,1] + stack[:,0])

    ndvi_ts = np.ma.median(ndvi, axis=[1,2])
    dates = list(scenes.groupby("properties.date.day"))
    
    dates = [
        key for key, scene in scenes.groupby(
            lambda x: x.properties.date.strftime("%Y-%m-%d")
        )
    ]
    
    dates_ts = [
        datetime.strptime(date, "%Y-%m-%d").timestamp() for date in dates
    ]
    
    new_dates = np.arange(
        datetime.strptime(start_date, "%Y-%m-%d"),
        datetime.strptime(end_date, "%Y-%m-%d"),
        timedelta(days=6)
    ).astype(datetime)
    
    new_dates_ts = [t.timestamp() for t in new_dates]
    
    tseries_masked = ndvi_ts.data[~ndvi_ts.mask]
    dates_masked = np.array(dates_ts)[~ndvi_ts.mask]
    
    print(f"Interpolating time series from dates: {dates} to new dates: {new_dates.tolist()}")
    
    f_interp = interp1d(
        dates_masked,
        tseries_masked,
        bounds_error=False,
        copy=False,
        fill_value="extrapolate",
    )
    
    return f_interp(new_dates_ts)[1:], new_dates[1:]

## Loading reference data

We load in the field geometries in the same way as the previous notebook. These are the geometries we'll be feeding to our `get_ndvi_tseries` function above. We need to convert the reference fields to EPSG 4326.

In [None]:
ia_fields = gpd.read_file("../data/IowaFieldBoundaries2019.shp")

In [None]:
ia_fields = ia_fields.to_crs("EPSG:4326")

## Testing our `get_ndvi_tseries` function

Let's grab a test geometry from our fields dataset. We can test out function using this test geometry to examine what exactly this function returns.

In [None]:
test_geom = sg.mapping(ia_fields.iloc[2000].geometry)

In [None]:
ndvi_ts, ndvi_dates = get_ndvi_tseries(test_geom)

We plot the returned NDVI time series and dates. We can see a fairly reasonable NDVi curve (with higher NDVI in the growing season).

In [None]:
plt.plot(ndvi_dates, ndvi_ts)

## Creating a `CloudFunction`

Now that we have our function for pulling a clean interpolated timeseries we need to write a function that takes that function and gets the timeseries, loads our model, then returns a prediction. We start by writing our model to DL Storage. We do this so that the model can be loaded from the cloud into each task being run on the DL backend.

In [None]:
dl.storage.set_file("classifier.joblib", "../models/classifier.joblib")

Now let's look at the function below: `classify_ndvi`. We've taken the `get_ndvi_tseries` function and put it into a `utils.py` file. This function imports the `get_ndvi_tseries` function, `joblib.load` and the dl client. We generate a clean timeseries, retrieve the model from storage, load the model, then return a prediction. The function takes both a geometry and a unique id as input. The `field_id` argument isn't necessary but will simplify writing the results of our model back into the reference `GeoDataFrame`.

In [None]:
def classify_ndvi(geom, field_id):
    from utils import get_ndvi_tseries
    from joblib import load
    import descarteslabs as dl
    
    ndvi_ts, ndvi_dates = get_ndvi_tseries(geom)
    
    dl.storage.get_file("classifier.joblib", "classifier.joblib")
    clf = load("classifier.joblib")
    
    return clf.predict(ndvi_ts.reshape(1,-1)), field_id

We test our `classify_ndvi` function below using a test geometry and print our the result.

In [None]:
test_predict, test_fid = classify_ndvi(test_geom, ia_fields.iloc[2000].FBndID)

In [None]:
test_predict

We take the `classify_ndvi` function and now "turn it into" a `CloudFunction`. We do this by using `dl.tasks.create_function`. This will wrap our function and send it to the DL backend to be deployed. The function can be viewed in the task monitor UI [here](https://monitor.descarteslabs.com/). This UI displays your asctive task groups. A task group is the resources/build that will run your tasks as you submit them with your `CloudFunction`.

When we make our `CloudFunction` we also specify the name, [DL provided Docker image](https://docs.descarteslabs.com/guides/tasks.html#choosing-your-environment), how many concurrent workers we would like, modules to include, Python package requirements, number of CPUs, and memory amount. There are other parameters you can specify for your tasks function as well. Please consult the docs [here](https://docs.descarteslabs.com/descarteslabs/client/services/tasks/readme.html#descarteslabs.client.services.tasks.Tasks.create_function) for more info.

In [None]:
async_func = dl.tasks.create_function(
    classify_ndvi,
    name="NDVI classifier prediction",
    image="us.gcr.io/dl-ci-cd/images/tasks/public/py3.7:v2022.01.20-7-gc73f23f4",
    maximum_concurrency=150,
    include_modules=["utils"],
    requirements=[],
    cpus=1,
    memory="2Gi"
)

We can submit a single job in the same way we use the `classify_ndvi` function. We simply supply the arguments to the returned `CloudFunction` object as you would the original function. We do this below for our test geometry.

In [None]:
async_func(test_geom, ia_fields.iloc[2000].FBndID)

## Scaling up our deployment

Now that we have our `CloudFunction` let's submit a number of jobs to our task group. We start by taking a random subset of our reference fields dataset. 

In [None]:
predict_idx = np.random.choice(range(len(ia_fields)), size=500)
ia_fields_predict_sample = ia_fields.iloc[predict_idx]

Using that sample we now create a list of geometries we'd like to submit. We also create a list of the unique field ids found in the `FBndID` column of our reference data.

In [None]:
geoms_predict = list(map(lambda k: sg.mapping(k.buffer(0)), ia_fields_predict_sample.geometry))

In [None]:
field_ids_predict = list(ia_fields_predict_sample.FBndID)

With these two lists we can the use the `.map()` method of our `CloudFunction` and submit all 500 jobs quickly.

In [None]:
predict_tasks = async_func.map(geoms_predict, field_ids_predict)

You should now see a number of jobs in the task monitor UI under the task group you submitted earlier. For more information on the task monitor and what it shows you about your running tasks please see this article [here](https://docs.descarteslabs.com/ui/monitor.html?highlight=monitor).

We then get our sample reference dataset ready to right the results of the model deployment back into our original array.

In [None]:
ia_fields_predict_sample = ia_fields_predict_sample.set_index("FBndID")

In [None]:
ia_fields_predict_sample["class"] = 0

Finally we wait for the tasks to complete. We can use the `as_completed` function from the Tasks api to check for tasks as they complete. As each task completes we write the result into our sample reference dataset.

In [None]:
# print the shape of the image array returned by each task
print("starting to wait for task completions")
failed_tasks = []
for task in tqdm(as_completed(predict_tasks, show_progress=False), total=len(predict_tasks)):
    if task.is_success:
        cid, field_id = task.result
        ia_fields_predict_sample.loc[field_id, "class"] = cid
    else:
        failed_tasks.append(task)

Should the need arise we can resubmit any jobs that fail in a tasks group by using the `dl.tasks.rerun_failed_tasks()` function. 

In [None]:
group_id = async_func.group_id

In [None]:
dl.tasks.rerun_failed_tasks(group_id)

In [None]:
ia_fields_predict_sample