# Example of recipe computation, model fit, predict* and conversion to raster using [distributed](http://distributed.dask.org/en/latest/), API of antares3 and [kale](https://github.com/kubeflow-kale/kale) functionality

*Prediction is pixel wise.

**Will use an already ingested and processed Landsat8 data via antares3**

## Some imports

In [1]:
import sys

import os
import json
from datetime import datetime

import matplotlib
from matplotlib.patches import Patch
from matplotlib import pyplot as plt
import numpy as np
import xarray as xr
from shapely.geometry import Point
import rasterio
import dill
import geopandas as gpd
import fiona
from affine import Affine
from dask.distributed import Client
from rasterio.features import rasterize
import datacube
from datacube.api import GridWorkflow
from datacube.storage import masking
from datacube.drivers.netcdf import write_dataset_to_netcdf

from madmex.util.db import get_cmap_from_scheme
from madmex.models import Tag
from madmex.overlay.extractions import zonal_stats_xarray
from madmex.io.vector_db import VectorDb
from madmex.wrappers import gwf_query
from madmex.modeling.supervised.xgb import Model
from madmex.models import Tag
from madmex.overlay.extractions import zonal_stats_xarray
from madmex.util import randomword, mid_date, join_dicts
from madmex.util.xarray import to_float, to_int
from django.contrib.gis.geos.geometry import GEOSGeometry
from madmex.models import PredictObject

## Recipe computation

In [None]:
def recipe_computation(tile):
    crs = tile[1][0].geobox.crs

    ds = xr.combine_by_coords([GridWorkflow.load(x, dask_chunks={'x': 1200, 'y': 1200})
                                 for x in tile[1]], data_vars='minimal', coords='minimal')
    ds.attrs['geobox'] = tile[1][0].geobox
    
    # Mask clouds, shadow, water, ice,... and drop qa layer
    clear = masking.make_mask(ds.pixel_qa, cloud=False, cloud_shadow=False,
                              snow=False)
    ds_1 = ds.where(clear)
    ds_1 = ds_1.drop('pixel_qa')
    ds_1 = ds_1.apply(func=to_float, keep_attrs=True)
    # Compute vegetation indices
    ds_1['ndvi'] = ((ds_1.nir - ds_1.red) / (ds_1.nir + ds_1.red)) * 10000
    ds_1['ndvi'].attrs['nodata'] = -9999
    ds_1['ndmi'] = ((ds_1.nir - ds_1.swir1) / (ds_1.nir + ds_1.swir1)) * 10000
    ds_1['ndmi'].attrs['nodata'] = -9999
    # Run temporal reductions and rename DataArrays
    ds_mean = ds_1.mean('time', keep_attrs=True, skipna=True)
    ds_mean = ds_mean.rename({'blue': 'blue_mean',
                              'green': 'green_mean',
                              'red': 'red_mean',
                              'nir': 'nir_mean',
                              'swir1': 'swir1_mean',
                              'swir2': 'swir2_mean',
                              'ndmi': 'ndmi_mean',
                              'ndvi': 'ndvi_mean'})
    # Compute min/max/std only for vegetation indices
    ndvi_max = ds_1.ndvi.max('time', keep_attrs=True, skipna=True)
    ndvi_max = ndvi_max.rename('ndvi_max')
    ndvi_max.attrs['nodata'] = -9999
    ndvi_min = ds_1.ndvi.min('time', keep_attrs=True, skipna=True)
    ndvi_min = ndvi_min.rename('ndvi_min')
    ndvi_min.attrs['nodata'] = -9999
    # ndmi
    ndmi_max = ds_1.ndmi.max('time', keep_attrs=True, skipna=True)
    ndmi_max = ndmi_max.rename('ndmi_max')
    ndmi_max.attrs['nodata'] = -9999
    ndmi_min = ds_1.ndmi.min('time', keep_attrs=True, skipna=True)
    ndmi_min = ndmi_min.rename('ndmi_min')
    ndmi_min.attrs['nodata'] = -9999
    # Load terrain metrics using same spatial parameters than sr
    dc = datacube.Datacube(app = 'landsat_madmex_003_%s' % randomword(5))
    terrain = dc.load(product='srtm_cgiar_mexico', like=ds,
                      time=(datetime(1970, 1, 1), datetime(2018, 1, 1)),
                      dask_chunks={'x': 1200, 'y': 1200})
    dc.close()
    # Merge dataarrays
    combined = xr.merge([ds_mean.apply(to_int),
                         to_int(ndvi_max),
                         to_int(ndvi_min),
                         to_int(ndmi_max),
                         to_int(ndmi_min),
                         terrain])
    combined.attrs['crs'] = crs
    #write_dataset_to_netcdf(combined.compute(scheduler='threads'), nc_filename)
    return (tile[0], combined)

Following [landsat_madmex_003.py](https://github.com/CONABIO/antares3/blob/develop/madmex/recipes/landsat_madmex_003.py)

Also could be helpful:

[1c_clusterization_for_agriculture_inecol](https://github.com/CONABIO/antares3-sandbox/blob/master/notebooks/agriculture_madmex_app/1c_clusterization_for_agriculture_inecol.ipynb)


[1d_clusterization_for_agriculture_inecol](https://github.com/CONABIO/antares3-sandbox/blob/master/notebooks/agriculture_madmex_app/1d_clusterization_for_agriculture_inecol.ipynb)

[2_clusterization_for_agriculture_inecol_intersect_with_area_of_interest.](https://github.com/CONABIO/antares3-sandbox/blob/master/notebooks/agriculture_madmex_app/2_clusterization_for_agriculture_inecol_intersect_with_area_of_interest.ipynb)

In [4]:
os.environ.setdefault("DJANGO_ALLOW_ASYNC_UNSAFE", "true")

region = 'Chiapas'
products = ['ls8_espa_mexico']
begin = '2017-01-01'
end = '2017-12-31'
gwf_kwargs = {'region': region, 
              'begin': begin, 
              'end':end}  
#query

dict_list = []
for prod in products:
    gwf_kwargs.update(product = prod)
    try:
        dict_list.append(gwf_query(**gwf_kwargs, view=False))
    # Exception is in case one of the product hasn't been registered in the datacube
    except Exception as e:
        pass
iterable = join_dicts(*dict_list, join='full').items()


list_iter = list(iterable)

list_iter_sorted = sorted(list_iter, key = lambda x: (x[0][0], x[0][1]))

list_iter_sorted = list_iter_sorted[0:3]

In [5]:
print(list_iter_sorted)

In [None]:
os.environ.setdefault("DJANGO_ALLOW_ASYNC_UNSAFE", "true")
futures_recipe = Client(n_workers=2,memory_limit='15GB', threads_per_worker=1).map(recipe_computation, list_iter_sorted, pure=False)
results_recipe = [future.result() for future in futures_recipe]

In [None]:
def model_fit(tup, training_data, path_result):
    tile, combined = tup
    loader = VectorDb()
    fc_train_0 = loader.load_training_from_dataset(dataset=combined,
                                                   training_set=training_data, 
                                                   sample=1)
    
    fc_train_0 = list(fc_train_0)
    
    
    #Assign code level to this training data according to next scheme...
    scheme = "madmex"
    
    qs = Tag.objects.filter(scheme=scheme)
    tag_mapping = {x.id:x.numeric_code for x in qs}
    tag_id_list = [x['properties']['class'] for x in fc_train_0]
    
    fc_train = [{'geometry': x[0]['geometry'],
                 'properties': {'code': tag_mapping[x[1]]},
                 'type': 'feature'} for x in zip(fc_train_0, tag_id_list)]
    X_train, y_train = zonal_stats_xarray(combined, fc_train, 'code')

    xgb_model = Model()
    xgb_model.fit(X_train, y_train)
    #filename_model = 'model_landsat8_chiapas_2017_madmex_31_clases_via_kale' + '_%d_%d' %(tile[0],tile[1]) + '.pkl'
    #filepath_model = os.path.join(path_result, filename_model)
    #with open(filepath_model, 'wb') as dst:
    #    dill.dump(xgb_model, dst)
    return (tile, xgb_model, combined)

## Model Fit

In [None]:
os.environ.setdefault("DJANGO_ALLOW_ASYNC_UNSAFE", "true")
training_data = "train_chiapas_dummy"
#path_result = "/shared_volume/land_cover_results_parallel"
#if not os.path.exists(path_result):
#    os.makedirs(path_result)
futures_model_fit = Client(n_workers=2,memory_limit='15GB', threads_per_worker=1).map(model_fit, results_recipe,
                                                                                      **{'training_data': training_data,
                                                                                         'path_result': path_result})
results_model_fit = [future.result() for future in futures_model_fit]


## Predict and write raster to FS

In [None]:
def predict_and_write_raster(tup, path_result):
    tile, xgb_model, combined = tup
    arr_3d = combined.to_array().squeeze().values #squeeze to remove time dimension 
                                        #because has length 1
    arr_3d = np.moveaxis(arr_3d, 0, 2)
    
    shape_2d = (arr_3d.shape[0] * arr_3d.shape[1], arr_3d.shape[2])
    
    arr_2d = arr_3d.reshape(shape_2d)
    
    predicted_array = xgb_model.predict(arr_2d)
    
    #write to FS
    
    predicted_array = predicted_array.reshape((arr_3d.shape[0], arr_3d.shape[1]))
    predicted_array = predicted_array.astype('uint8')
    
    rasterio_meta = {'width': predicted_array.shape[1],
                     'height': predicted_array.shape[0],
                     'transform': ds.affine,
                     'crs': ds.crs.crs_str,
                     'count': 1,
                     'dtype': 'uint8',
                     'compress': 'lzw',
                     'driver': 'GTiff',
                     'nodata': 0}
    
    filename_raster = 'raster_landsat8_chiapas_madmex_31_clases_pixel_wise_via_kale' + '_%d_%d' %(tile[0],tile[0]) + '.tif'
    filename_raster = os.path.join(path_result, filename_raster)
    
    with rasterio.open(filename_raster, 'w', **rasterio_meta) as dst:
        dst.write(predicted_array, indexes = 1)
    return filename_raster

In [None]:
os.environ.setdefault("DJANGO_ALLOW_ASYNC_UNSAFE", "true")
training_data = "train_chiapas_dummy"
path_result = "/shared_volume/land_cover_results_parallel"
if not os.path.exists(path_result):
    os.makedirs(path_result)
futures_predict_and_write_raster = Client(n_workers=2,memory_limit='15GB', threads_per_worker=1).map(predict_and_write_raster, 
                                                                                                     results_model_fit,
                                                                                                     **{'path_result': path_result})
results_predict_and_write_raster = [future.result() for future in futures_predict_and_write_raster]
print(results_predict_and_write_raster)
