# Create a regional or country-wide crop type map

## Background

Once we are satisfied with the map results for the test areas, we can proceed to apply the model over a larger region or an entire country. To limit the memory use, we will break a large area into tiles, make predictions over tiles and merge the results.

## Description

This notebook can be used to generate a crop type map over a region defined in a shapefile.

## Getting started
To run this analysis, run all the cells in the notebook, starting with the "Load packages" cell.

### Load packages

In [1]:
import datacube
import xarray as xr
from joblib import load
import matplotlib.pyplot as plt
from datacube.utils.cog import write_cog
import pandas as pd
import geopandas as gpd
import numpy as np
import os
import json
import pickle

from deafrica_tools.datahandling import load_ard
from deafrica_tools.bandindices import calculate_indices
from deafrica_tools.dask import create_local_dask_cluster
from deafrica_tools.plotting import rgb, display_map
from deafrica_tools.classification import predict_xr
from deafrica_tools.spatial import xr_rasterize
    
from datacube.utils import geometry
from datacube.utils.cog import write_cog

from odc.io.cgroups import get_cpu_quota
from odc.algo import geomedian_with_mads, xr_geomedian

from feature_collection import feature_layers

  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)
  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)
  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)


## Create Dask cluster for running predictions

We use dask to parallel and speed up the processing.

In [2]:
ncpus = round(get_cpu_quota())
print("ncpus = " + str(ncpus))

# client = create_local_dask_cluster(return_client=True, n_workers=1, threads_per_worker=ncpus)
create_local_dask_cluster()

ncpus = 4


0,1
Client  Scheduler: tcp://127.0.0.1:37469  Dashboard: /user/fang.yuan@digitalearthafrica.org/proxy/8787/status,Cluster  Workers: 1  Cores: 4  Memory: 28.14 GB


## Load model and class labels

We use the model trained and saved in the [fit classifier notebook](3_Fit_classifier.ipynb). it is important that the list of features used match the model.

In [3]:
model_basepath = "Results/Model/"

# Choose model and load
model_path = os.path.join(model_basepath, "rf_removecorrfeaturesgt0p9_simplified_cv.joblib")
model = load(model_path).set_params(n_jobs=1)


# Get label dictionary
labels_path = os.path.join(model_basepath, "class_labels.json")
with open(labels_path, "r") as json_file:
    labels_dict = json.load(json_file)

# Get model features
feautres_path = os.path.join(model_basepath, "rf_removecorrfeaturesgt0p9_simplified_features.json")
with open(feautres_path, "r") as json_file:
    features_dict = json.load(json_file)
    
features = features_dict["features"]

In [4]:
len(features)

68

## Load area of interest (AOI)

Shapefiles for Mozambique and large test areas are provided in the `Data` folder. 

In [5]:
output_crs="EPSG:6933"

# Choose the test area shapefile
AOIs_file = "Data/Crop_type_test_areas_3_larger.shp"
AOIs_gdf = gpd.read_file(AOIs_file).to_crs(output_crs)

# Set results path
output_folder = "Results/Map"
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

## Create the query for running the predictions

We use the query saved from the training data collection notebook to ensure data from the same periods are retrieved. However, only selected features will be used. 

> We add `dask_chunks` to the query parameter so the data will be lazy-loaded and only the features used by the model will be calculated.

In [6]:
# Load the query used for fitting
query_file = os.path.join(model_basepath, "query.pickle")

with open(query_file, "rb") as f:
    query = pickle.load(f)
    
# Specify any specific additions to the data query -- e.g. dask_chunks for enabling parallel computation
dask_chunks = {"x": 4000, "y": 4000}
query.update({"dask_chunks": dask_chunks})

query

{'annual_geomedian_times': {'annual_2021': '2021-01-01'},
 'semiannual_geomedian_times': {'semiannual_2021_07': '2021-07-01',
  'semiannual_2022_01': '2022-01-01'},
 'time_ranges': {'Q4_2021': slice('2021-10-01', '2021-12-31', None),
  'Q1_2022': slice('2022-01-01', '2022-03-31', None),
  'Q2_2022': slice('2022-04-01', '2022-06-30', None),
  'Q3_2022': slice('2022-07-01', '2022-09-30', None)},
 'resolution': (-10, 10),
 'output_crs': 'EPSG:6933',
 'dask_chunks': {'x': 4000, 'y': 4000}}

## Apply classification model to the AOI

The model will be applied over each tile, producing a prediction map and a probabilities map. The maps are saved as Cloud-Optimized Geotiffs (COGs).

> Tiles are processed in sequence. For each tile, the processing needs to fit into the compute resources available in the sandbox. Make the tile size smaller if you run out of memory. For production of a map over a large region or country, consider applying for [a large sandbox (with more CPUs and momery)](
https://helpdesk.digitalearthafrica.org/portal/en/community/topic/call-for-application-for-access-to-large-sandboxes-15-processing-cores-and-120-gb-of-memory)

If output files for a tile already exist, processing for the tile can be skipped. This is useful if the process fails partway through, or if you are logged out of the sandbox before all tiles are completed.

In [7]:
skip_exisiting = False

In [9]:
%%time
dc = datacube.Datacube(app="crop_type_ml")

# for index, aoi in AOIs_gdf.iterrows():
for index in range(0,len(AOIs_gdf)):
    aoi=AOIs_gdf.iloc[index]
    print(f"Processing Polygon {index}")
    
    # Check if polygon has already been processed. If so, skip
    output_filename = os.path.join(output_folder, f"Test_area_{index}_croptype_prediction.tif")
    probabilities_filename = os.path.join(output_folder, f"Test_area_{index}_croptype_probabilities.tif")
    if skip_exisiting and os.path.exists(output_filename) and os.path.exists(probabilities_filename):
         print("Completed; Skipping")
         continue

    # set up query based on aoi polygon
    geom = geometry.Geometry(geom=aoi.geometry, crs=AOIs_gdf.crs)
    query.update({"geopolygon": geom})

    # Load the feature data
    print("    Loading feature data")
    data = feature_layers(query)
#     data = feature_layers(query).persist()
    
    
    # Only keep features that are used by the model
    data = data[features]

#     data=data.fillna(0)
#     #predict using the imported model
#     predicted = predict_xr(model,
#                            data.unify_chunks(),
#                            proba=True,
#                            persist=True,
#                            clean=True,
#                            return_input=False
#                           ).astype(np.uint8).persist()
    predicted = predict_xr(model,
                           data,
                           proba=True,
                           persist=False,
                           clean=True,
                           return_input=False
                          ).compute().astype(np.uint8)
    # Load masks and clip
#     crop_mask_query = dict((k,query[k]) for k in ('resolution','output_crs','dask_chunks','geopolygon'))
    crop_mask_query = dict((k,query[k]) for k in ('resolution','output_crs','geopolygon'))
    crop_mask_query.update({"time": "2019"})

    # Load the crop mask
    print("    Loading crop_mask")
    crop_mask = dc.load(product="crop_mask", **crop_mask_query)
    
    # Create a mask for the aoi
    print("    Getting AOI mask")
    aoi_mask = xr_rasterize(
        gdf=gpd.GeoDataFrame({"Polygon": [index], "geometry": [aoi.geometry]}, crs=AOIs_gdf.crs),
        da=predicted,
        transform=predicted.geobox.transform,
        crs=output_crs,
    )

    # set the no data value
    NODATA = 255

    # Mask the predictions to
    print("    Preparing predictions")
    predicted_masked = (
#         predicted.Predictions.where((crop_mask.filtered == 1) & (aoi_mask==1), NODATA)
        predicted.Predictions.where((crop_mask.mask == 1) & (aoi_mask==1), NODATA) # revised: using unfiltered crop mask instead
    )
    
    predicted_masked.attrs["nodata"] = NODATA
    
    # Write to cog
    print(f"    Writing predictions to {output_filename}")
    write_cog(
        predicted_masked,
        fname=output_filename,
        overwrite=True,
        nodata=255,
    )
    
    del predicted_masked
    
    probability_masked = (
#         predicted.Probabilities.where((crop_mask.filtered == 1) & (aoi_mask==1), NODATA)
        predicted.Probabilities.where((crop_mask.mask == 1) & (aoi_mask==1), NODATA) # revised: using unfiltered crop mask instead
    )
    
    probability_masked.attrs["nodata"] = NODATA
    
    print(f"    Writing probabilities to {probabilities_filename}")
    write_cog(
        probability_masked,
        fname=probabilities_filename,
        overwrite=True,
        nodata=255,
    )
    
    del probability_masked
    
    del crop_mask
    del aoi_mask

    

Processing Polygon 0
    Loading feature data
predicting...
   probabilities...


  x = np.divide(x1, x2, out)
  x = np.divide(x1, x2, out)
  x = np.divide(x1, x2, out)
  x = np.divide(x1, x2, out)
  x = np.divide(x1, x2, out)
  x = np.divide(x1, x2, out)
  x = np.divide(x1, x2, out)
  x = np.divide(x1, x2, out)
  x = np.divide(x1, x2, out)
  x = np.divide(x1, x2, out)
  _reproject(


KeyboardInterrupt: 

Process Dask Worker process (from Nanny):
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/distributed/nanny.py", line 789, in _run
    loop.run_sync(run)
  File "/usr/local/lib/python3.8/dist-packages/tornado/ioloop.py", line 523, in run_sync
    self.start()
  File "/usr/local/lib/python3.8/dist-packages/tornado/platform/asyncio.py", line 215, in start
    self.asyncio_loop.run_forever()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.8/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/li

## Mosaic maps

The tiled maps merged.

In [None]:
! gdal_merge.py -o results/Test_areas_mosaic_croptype_merged_prediction.tif -co COMPRESS=Deflate -ot Byte results/Test_area_*_croptype_prediction.tif -init 255 -a_nodata 255
! gdal_merge.py -o results/Test_areas_mosaic_croptype_merged_probabilities.tif -co COMPRESS=Deflate -ot Byte results/Test_area_*_croptype_probabilities.tif -init 255 -a_nodata 255