In [None]:
#import libraries
import rasterio as rio
import rasterio.mask as rio_mask
from rasterio.vrt import WarpedVRT
from matplotlib import pyplot as plt
from matplotlib import cm
import matplotlib.patches as mpatches
import numpy as np
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import os
import json
import string
from osgeo import gdal
from sklearn import metrics
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix
from sklearn.metrics import cohen_kappa_score
from sklearn.metrics import matthews_corrcoef
from sklearn.metrics import plot_roc_curve
from sklearn.metrics import roc_auc_score
from sklearn.metrics import average_precision_score
from sklearn.metrics import log_loss

plt.ioff()

In [None]:
#set path
src_path = Path("/path/S2A_T06VWP_HS.VRT") #this is path for Simulated image (VRT file generated using Simulation.ipynb)
ply_path = Path("/path/training_data.geojson") #set path to the training data
dem_path = Path("/path/DEMlayer.tif") #Tiff file generated using DEM_preprocessing.ipynb
resampling_alg = 1
dst_tag = "Sample_Region"
dst_dir = Path(r"/path/predict_normalise") #set output path
tiles_dir = dst_dir / "Tiles"
results_dir = dst_dir / "Evaluations"
tiles_dir.mkdir(mode=0o755, parents=True, exist_ok=True)
results_dir.mkdir(mode=0o755, parents=True, exist_ok=True)
id_attr = 'class_id' # Attribute name where class_id is stored in training data
name_attr = 'veg_class'
vrt_path = dst_dir / "Training_Samples.VRT"
data_path = dst_dir / "Data.npz"
model_path = dst_dir / "RandomForest.joblib"
train_validation_ratio = '70:30'
data_index_path = dst_dir / "Data_Index.npz"
shuffle_data = True
n_samples = None

#parameters for NDVI
nir_bid = 96
red_bid = 56
ndvi_ll = 0.3
ndvi_ul = 1.0

class_ledger = dict()

assert src_path.is_file()
assert ply_path.is_file()

In [None]:
print (src.transform)
print ("=====")
print (vrt_dem.transform)
print (imask.shape)

## `Clip` Raster using `Polygons` and save the images

In [None]:
with fiona.open(ply_path, 'r') as ply:
  feature_id = 0
  sample_tiles = list() 
  for feature in tqdm(list(ply)):
    polygon = feature['geometry']
    c_id = feature['properties'][id_attr]
    class_ledger[c_id] = {'Name': feature['properties'][name_attr]}
    with rio.open(src_path, 'r') as src, rio.open(dem_path, 'r') as dem:
      assert src.crs == dem.crs, "CRS Mismatch!"
      with WarpedVRT(
        dem, 
        height=src.height, 
        width=src.width,
        transform=src.transform,
        resampling=resampling_alg,
      ) as vrt_dem:
        #print(vrt_dem.meta)
       #print(src.meta)
        meta = src.meta.copy()
        dst_img, dst_transform = rio_mask.mask(
          dataset=src, 
          shapes=(polygon,), 
          invert=False,
          all_touched=False,
          crop=True,
          filled=False
        )
        dst_dem, dem_transform = rio_mask.mask(
          dataset=vrt_dem, 
          shapes=(polygon,), 
          invert=False,
          all_touched=False,
          crop=True,
          filled=False
        )
        nir = dst_img[nir_bid]
        nir_mask = nir == src.nodata
        nir = nir.astype(np.float32)
        nir[nir_mask] = np.nan
        red = dst_img[red_bid]
        red_mask = red == src.nodata
        red = red.astype(np.float32)
        red[red_mask] = np.nan
        denominator = nir + red
        denominator[denominator==0] = np.nan
        nominator = nir - red
        ndvi = nominator / denominator
        ndvi_mask = np.logical_or(
          (ndvi < ndvi_ll),
          (ndvi > ndvi_ul)
        )
        imask = np.any(dst_img.mask, axis=0)
        dmask = np.any(dst_dem.mask, axis=0)
        imask = np.logical_or(dmask, imask)
        imask = np.logical_or(ndvi_mask, imask)
        dst_img.mask = np.tile(imask, (dst_img.shape[0], 1, 1))
        dst_dem.mask = np.tile(imask, (dst_dem.shape[0], 1, 1))
        dst_dem = dst_dem.astype(dst_img.dtype)
        dst_dem.fill_value = dst_img.fill_value
        dst_img = np.concatenate((dst_img, dst_dem), axis=0)

        meta['count'], meta['height'], meta['width'] = dst_img.shape
        meta['driver'] = 'GTiff'
        meta['transform'] = dst_transform
        dst_path = tiles_dir / "{}_{}.{}".format(
          dst_tag, feature_id, 'tiff'
        )
        with rio.open(dst_path, 'w', **meta) as dst:
          dst.write(dst_img.filled())
          dst.update_tags(class_id=c_id)
        sample_tiles.append(dst_path)
    feature_id += 1
  wd = Path.cwd() 
  os.chdir(dst_dir)
  tile_paths = [
    str(tile_path.relative_to(dst_dir)) for tile_path in sample_tiles
  ]
  vrt_options = gdal.BuildVRTOptions(resampleAlg='near', addAlpha=False)
  ds = gdal.BuildVRT(
    str(vrt_path.relative_to(dst_dir)), tile_paths, options=vrt_options
  )
  ds.FlushCache()
  os.chdir(wd)

## Prepare `acronyms` for class names
### Makes it easy to label plots

In [None]:
cls_i = list(class_ledger.keys())
n_classes = len(cls_i)
assert n_classes <= 26

# Needs to be changed if n_classes > 26
cls_a = list(string.ascii_uppercase)[:n_classes] 
cls_n = list()
for i in range(n_classes):
  class_ledger[cls_i[i]]['Acronym'] = cls_a[i]
  cls_n.append(class_ledger[cls_i[i]]['Name'])

## Prepare `Training Data`,
### Collate training samples

In [None]:
sample_arrays = list()
target_arrays = list()
for tile_path in sample_tiles:
  with rio.open(tile_path, 'r') as tile:
    tile_arr = tile.read(masked=True)
    n_bands = tile_arr.shape[0]
    mask = np.any(a=tile_arr.mask, axis=0, keepdims=False).ravel(order='C')
    arr = (tile_arr.filled()).reshape((n_bands, -1), order='C')
    arr = arr[:, np.logical_not(mask)]
    arr = np.moveaxis(arr, 0, -1)
    sample_arrays.append(arr)
    target_arrays.append(
      np.full(
        shape=(arr.shape[0],), 
        fill_value=tile.tags()['class_id'],
        dtype=np.uint8
      )
    )

sample_array = np.concatenate(sample_arrays, axis=0)
target_array = np.concatenate(target_arrays, axis=0)
assert sample_array.shape[0] == target_array.shape[0]
n_samples = sample_array.shape[0]

with open(data_path, 'wb') as dat:
  np.savez_compressed(
    file=dat,
    X=sample_array,
    Y=target_array
  )

## `Shuffle` and `Split` 

In [None]:
if shuffle_data is True:
  # TODO: Train Validation Split
  row_indexes = np.arange(start=0, stop=n_samples, step=1)
  np.random.shuffle(row_indexes)
  ratio_parts = [float(p) for p in train_validation_ratio.split(':')]
  n_train = int(
    np.round(((n_samples * ratio_parts[0]) / sum(ratio_parts)), 0)
  )
  train_indexes = row_indexes[:n_train]
  validation_indexes = row_indexes[n_train:]
  with open(data_index_path, 'wb') as ip:
    np.savez_compressed(
      file=ip,
      train=train_indexes,
      validation=validation_indexes
    )

## Set `parameters` for `Model`
### Only change the parameters you need
### Default values are provided in the comments on the right hand side

In [None]:
model_conf = {
  'n_estimators': 500,  # 100
  'criterion': 'gini',  # 'gini'
  'max_depth': None,  # None
  'min_samples_split': 2,  # 2
  'min_samples_leaf': 1,  # 1
  'min_weight_fraction_leaf': 0.0,  # 0.0
  'max_features': 'auto',  # 'auto'
  'max_leaf_nodes': None,  # None
  'min_impurity_decrease': 0.0,  # 0.0
  'min_impurity_split': None,  # None
  'bootstrap': True,  # True
  'oob_score': True,  # False
  'n_jobs': -1,  # None
  'random_state': None,  # None
  'verbose': 0,  # 0
  'warm_start': False,  # False
  'class_weight': None,  # None
  'ccp_alpha': 0.0,  # 0.0
  'max_samples': None  # None
}

## Initialize `Model` for training

In [None]:
clf = RandomForestClassifier(**model_conf)

## Load Data

In [None]:
with open(data_path, 'rb') as dp:
  data_archive = np.load(file=dp, allow_pickle=False)
  data_x = data_archive['X']
  data_y = data_archive['Y']
with open(data_index_path, 'rb') as idxp:
  index_archive = np.load(file=idxp, allow_pickle=False)
  t_idx = index_archive['train']
  v_idx = index_archive['validation']
train_x, train_y = data_x[t_idx, :], data_y[t_idx]
validation_x, validation_y = data_x[v_idx, :], data_y[v_idx]

## `Train` the `Classifier`

In [None]:
print(train_x.shape, train_y.shape)
clf.fit(X=train_x, y=train_y, sample_weight=None)

## `Save` the `Model`

In [None]:
with open(model_path, 'wb') as mp:
  joblib.dump(value=clf, filename=mp)

## Define a `function` to calculate various `metrics` from `confusion_metrics
### Takes `normalized confusion matrix` as input

In [None]:
def eval_cm(confusion_matrix, class_refs=None):
  fp = confusion_matrix.sum(axis=0) - np.diag(confusion_matrix)
  fn = confusion_matrix.sum(axis=1) - np.diag(confusion_matrix)
  tp = np.diag(confusion_matrix)
  tn = confusion_matrix.sum() - (fp + fn + tp)

  # Sensitivity, hit rate, recall, or true positive rate
  a, b = tp, (tp + fn)
  tpr = np.divide(a, b, out=np.full_like(a, fill_value=np.nan), where=b!=0)
  # False negative rate
  a, b = fn, (tp + fn)
  fnr = np.divide(a, b, out=np.full_like(a, fill_value=np.nan), where=b!=0)
  # Specificity or true negative rate
  a, b = tn, (tn + fp)
  tnr = np.divide(a, b, out=np.full_like(a, fill_value=np.nan), where=b!=0)
  # Fall out or false positive rate
  a, b = fp, (tn + fp)
  fpr = np.divide(a, b, out=np.full_like(a, fill_value=np.nan), where=b!=0)
  # Precision or positive predictive value
  a, b = tp, (tp + fp)
  ppv = np.divide(a, b, out=np.full_like(a, fill_value=np.nan), where=b!=0)
  # False discovery rate
  a, b = fp, (tp + fp)
  fdr = np.divide(a, b, out=np.full_like(a, fill_value=np.nan), where=b!=0)
  # Negative predictive value
  a, b = tn, (tn + fn)
  npv = np.divide(a, b, out=np.full_like(a, fill_value=np.nan), where=b!=0)
  # Intersection over Union
  a, b = tp, (tp + fn + fp)
  iou = np.divide(a, b, out=np.full_like(a, fill_value=np.nan), where=b!=0)
  # F1 score
  a, b = (2 * (ppv * tpr)), (ppv + tpr)
  f1 = np.divide(a, b, out=np.full_like(a, fill_value=np.nan), where=b!=0)
  # Overall accuracy
  a, b = (tp + tn), (tp + fp + fn + tn)
  acc = np.divide(a, b, out=np.full_like(a, fill_value=np.nan), where=b!=0)
  # Balanced Accuracy
  bacc = 0.5 * (tpr + tnr)

  metrics_df = pd.DataFrame(
    data=np.stack(
      (tpr, tnr, ppv, npv, fpr, fnr, fdr, iou, f1, bacc, acc),
      axis=-1
    ), 
    columns=(
      "TPR", "TNR", "PPV", "NPV", "FPR", "FNR", 
      "FDR", "IoU", "F1", "BACC", "ACC"
    ),
    index=class_refs
  )
  metrics_df.loc['Mean'] = metrics_df.mean()
  return metrics_df

## Evaluate `Trained Model` using various `metrics`

In [None]:
feature_importance = clf.feature_importances_
oob_score = clf.oob_score_
train_yy = clf.predict(X=train_x)
validation_yy = clf.predict(X=validation_x)
train_yp = clf.predict_proba(train_x)
validation_yp = clf.predict_proba(validation_x)

train_cm = confusion_matrix(
  y_true=train_y, 
  y_pred=train_yy,  
  labels=cls_i, 
  sample_weight=None, 
  normalize=None
)
validation_cm = confusion_matrix(
  y_true=validation_y, 
  y_pred=validation_yy,  
  labels=cls_i, 
  sample_weight=None, 
  normalize=None
)

train_cm_norm = confusion_matrix(
  y_true=train_y, 
  y_pred=train_yy,  
  labels=cls_i, 
  sample_weight=None, 
  normalize="all"
)
validation_cm_norm = confusion_matrix(
  y_true=validation_y, 
  y_pred=validation_yy,  
  labels=cls_i, 
  sample_weight=None, 
  normalize="all"
)

train_k = cohen_kappa_score(
  y1=train_y,
  y2=train_yy,
  labels=None, 
  weights=None, 
  sample_weight=None
)
validation_k = cohen_kappa_score(
  y1=validation_y,
  y2=validation_yy,
  labels=None, 
  weights=None, 
  sample_weight=None
)

train_mcc = matthews_corrcoef(
  y_true=train_y,
  y_pred=train_yy,
  sample_weight=None
)
validation_mcc = matthews_corrcoef(
  y_true=validation_y,
  y_pred=validation_yy,
  sample_weight=None
)

train_roc_auc = roc_auc_score(
  y_true=train_y, y_score=train_yp, average='macro', multi_class='ovr'
)
validation_roc_auc = roc_auc_score(
  y_true=train_y, y_score=train_yp, average='macro', multi_class='ovr'
)

#train_pr_auc = average_precision_score(train_y, train_yp)
#validation_pr_auc = average_precision_score(validation_y, validation_yp)

train_ll = log_loss(y_true=train_y, y_pred=train_yp, normalize=True)
validation_ll = log_loss(
  y_true=validation_y, y_pred=validation_yp, normalize=True
)

## Save some `metrics`

In [None]:
train_df = eval_cm(train_cm_norm, cls_a)
validation_df = eval_cm(validation_cm_norm, cls_a)
cor_df = pd.DataFrame(
  columns=[
    'MCC', 'Kappa', 'CE_Loss'
  ]
)
cor_df.loc['Train'] = [
  train_mcc, train_k, train_ll
]
cor_df.loc['Validation'] = [
  validation_mcc, validation_k, validation_ll
]
cmt_df = pd.DataFrame(
  data=train_cm,
  index=cls_n,
  columns=cls_n
)
cmv_df = pd.DataFrame(
  data=validation_cm,
  index=cls_n,
  columns=cls_n
)
oob_df = pd.DataFrame([oob_score,], columns=['OOB_Score',])
with pd.ExcelWriter((results_dir / 'Metrics.xlsx')) as writer:
  cmt_df.to_excel(
    writer, sheet_name='CM_Train',
    index=True, index_label="True / Prediction"
  )
  cmv_df.to_excel(
    writer, sheet_name='CM_Validation',
    index=True, index_label="True / Prediction"
  )
  train_df.to_excel(
    writer, sheet_name='Training',
    index=True, index_label="Class_Indicator"
  )
  validation_df.to_excel(
    writer, sheet_name='Validation',
    index=True, index_label="Class_Indicator"
  )
  cor_df.to_excel(
    writer, sheet_name='Evaluation Coeeficients',
    index=True, index_label='Type'
  )
  oob_df.to_excel(
    writer, sheet_name='OOB',
    index=False
  )

## Plot `Confusion Metrix`

In [None]:
#Plot Confusion Metrix
fig = plt.figure(figsize=(15,18),dpi=500)
ax1 = fig.add_subplot(111)
sns.heatmap(
  data=train_cm,
  #vmin=0.0,
  #vmax=1.0,
  center=None,
  cmap=cm.get_cmap(name='magma_r'),
  robust=False,
  annot=True,
  fmt='.0f',
  cbar=True,
  xticklabels=cls_a,
  yticklabels=cls_a,
  square=True,
  ax=ax1
)
ax1.set_xlabel("Predicted Labels")
ax1.set_ylabel("True Labels")
ax1.set_title(label="Confusion Matrix: Training Data")
fig.savefig(fname=(results_dir / "Confusion_Matrix_Train.pdf"), dpi=600)

fig = plt.figure(figsize=(15,18),dpi=500)
ax2 = fig.add_subplot(111)
sns.heatmap(
  data=validation_cm,
  #vmin=0.0,
  #vmax=1.0,
  center=None,
  cmap=cm.get_cmap(name='magma_r'),
  robust=False,
  annot=True,
  fmt='.0f',
  cbar=True,
  xticklabels=cls_a,
  yticklabels=cls_a,
  square=True,
  ax=ax2
)
ax2.set_xlabel("Predicted Labels")
ax2.set_ylabel("True Labels")
ax2.set_title(label="Confusion Matrix: Validation Data")
fig.savefig(fname=(results_dir / "Confusion_Matrix_Validation.pdf"), dpi=600)

## Plot `Feature Impotance`

In [None]:
# No. features to plot. 
# Plotting all bands would be difficult to fit in a single plot
n_features = 20
sorted_index = np.argsort(a=feature_importance)
bad_bidx = sorted_index[:n_features]
bad_vals = feature_importance[bad_bidx]
good_bidx = sorted_index[-n_features:]
good_vals = feature_importance[good_bidx]
bad_bidx += 1
good_bidx += 1
x_ticks = np.arange(n_features)

fig = plt.figure(figsize=(16, 3), dpi=500)
ax1 = fig.add_subplot(121)
ax2 = fig.add_subplot(122)
ax1.bar(
  x=x_ticks, 
  height=good_vals, 
  tick_label=good_bidx,
  width=0.25, 
  bottom=0, 
  align='center',
  color='C0'
)
ax1.tick_params(axis='x', rotation=45)
ax1.set_xlabel("Bands")
ax1.set_ylabel("Importance")
ax1.set_title(label="{} Most Important Bands".format(n_features))

ax2.bar(
  x=x_ticks, 
  height=bad_vals, 
  tick_label=bad_bidx,
  width=0.25, 
  bottom=0, 
  align='center',
  color='C1'
)
ax2.tick_params(axis='x', rotation=45)
ax2.set_xlabel("Bands")
ax2.set_ylabel("Importance")
ax2.set_title(label="{} Least Important Bands".format(n_features))
fig.savefig(fname=(results_dir / "Feature_Importance.pdf"), dpi=500)

## `Color` generator

In [None]:
def color_generator(n, name='tab20'):
  return cm.get_cmap(name, n)

## Generate `color palette` for classes 

In [None]:
cg = color_generator(n_classes)
for i in range(n_classes):
  r, g, b, a = cg(i)
  class_ledger[cls_i[i]]['RED'] = r
  class_ledger[cls_i[i]]['GREEN'] = g
  class_ledger[cls_i[i]]['BLUE'] = b
  class_ledger[cls_i[i]]['ALPHA'] = a

## Save `meta` to file

In [None]:
with open((results_dir / 'Meta_Info.json'), 'w',encoding="utf-8") as fp:
  json.dump(class_ledger, fp)