# WOfS Validation Data Analysis - Window-based and Parallel  <img align="right" src="../Supplementary_data/DE_Africa_Logo_Stacked_RGB_small.jpg">

* **Products used:** 
[ga_ls8c_wofs_2](https://explorer.digitalearth.africa/ga_ls8c_wofs_2),
[ga_ls8c_wofs_2_summary ](https://explorer.digitalearth.africa/ga_ls8c_wofs_2_summary)

## Background
The [Water Observations from Space (WOfS)](https://www.ga.gov.au/scientific-topics/community-safety/flood/wofs/about-wofs) is a derived product from Landsat 8 satellite observations as part of provisional Landsat 8 Collection 2 surface reflectance and shows surface water detected in Africa.
Individual water classified images are called Water Observation Feature Layers (WOFLs), and are created in a 1-to-1 relationship with the input satellite data. 
Hence there is one WOFL for each satellite dataset processed for the occurrence of water.

The data in a WOFL is stored as a bit field. This is a binary number, where each digit of the number is independantly set or not based on the presence (1) or absence (0) of a particular attribute (water, cloud, cloud shadow etc). In this way, the single decimal value associated to each pixel can provide information on a variety of features of that pixel. 
For more information on the structure of WOFLs and how to interact with them, see [Water Observations from Space](../Datasets/Water_Observations_from_Space.ipynb) and [Applying WOfS bitmasking](../Frequently_used_code/Applying_WOfS_bitmasking.ipynb) notebooks.

## Description
This notebook explains how you can perform validation analysis for WOFS derived product using collected ground truth dataset and window-based sampling. 

The notebook demonstrates how to:

1. Load validation points for each partner institutions following cleaning stage as an ESRI shapefile
2. Query WOFL data for validation points and capture available WOfS observation available
3. Extract statistics for each WOfS observation in each validation point using a 3 by 3 window and multiprocessing functionality 
4. Extract a LUT for each point that contains both validation info and WOfS result for each month 
***

## Getting started

To run this analysis, run all the cells in the notebook, starting with the "Load packages" cell.

After finishing the analysis, you can modify some values in the "Analysis parameters" cell and re-run the analysis to load WOFLs for a different location or time period.

### Load packages
Import Python packages that are used for the analysis.

In [6]:
%matplotlib inline

import time 
import datacube
from datacube.utils import masking, geometry 
import sys
import os
import dask 
import rasterio, rasterio.features
import xarray
import glob
import numpy as np
import pandas as pd
import seaborn as sn
import geopandas as gpd
import subprocess as sp
import matplotlib.pyplot as plt
import scipy, scipy.ndimage
import warnings
warnings.filterwarnings("ignore") #this will suppress the warnings for multiple UTM zones in your AOI 

sys.path.append("../Scripts")
from rasterio.mask import mask
from geopandas import GeoSeries, GeoDataFrame
from shapely.geometry import Point
from deafrica_plotting import map_shapefile,display_map, rgb
from deafrica_spatialtools import xr_rasterize
from deafrica_datahandling import wofs_fuser, mostcommon_crs,load_ard,deepcopy
from deafrica_dask import create_local_dask_cluster

#for parallelisation 
from multiprocessing import Pool, Manager
import multiprocessing as mp
from tqdm import tqdm

### Connect to the datacube
Activate the datacube database, which provides functionality for loading and displaying stored Earth observation data.

In [7]:
dc = datacube.Datacube(app='WOfS_accuracy')

### Analysis parameters

To analyse validation points collected by each partner institution, we need to obtain WOfS surface water observation data that corresponds with the labelled input data locations. 

### Loading Dataset

1. Load validation points for each partner institutions as a list of observations each has a location and month
    * Load the cleaned validation file as ESRI `shapefile`
    * Inspect the shapefile

In [8]:
#path = '../Supplementary_data/Validation/Refined/groundtruth_AFRIGIST.shp'
path = '../Supplementary_data/Validation/Refined/subsetAGRI.shp'

In [10]:
input_data = gpd.read_file(path).to_crs('epsg:6933') #reading the table and converting CRS to metric 
input_data

Unnamed: 0,F1,Unnamed__0,PLOT_ID,LON,LAT,FLAGGED,ANALYSES,SENTINEL2Y,STARTDATE,ENDDATE,WATER,NO_WATER,BAD_IMAGE,NOT_SURE,CLASS,COMMENT,MONTH,WATERFLAG,geometry
0,1217.0,91.0,137483266.0,5.270490,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,1.0,1.0,POINT (508530.000 1593330.000)
1,1218.0,91.0,137483266.0,5.270490,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,2.0,1.0,POINT (508530.000 1593330.000)
2,1219.0,91.0,137483266.0,5.270490,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,3.0,1.0,POINT (508530.000 1593330.000)
3,1220.0,91.0,137483266.0,5.270490,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,4.0,1.0,POINT (508530.000 1593330.000)
4,1221.0,91.0,137483266.0,5.270490,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,5.0,1.0,POINT (508530.000 1593330.000)
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7241,2477.0,180.0,137482796.0,13.710861,-8.536884,0.0,1.0,2018.0,,,2020-12-01,0.0,44166.0,0.0,Wetlands - marine (e.g. mangroves),,8.0,2.0,POINT (1322910.000 -1085160.000)
7242,2478.0,180.0,137482796.0,13.710861,-8.536884,0.0,1.0,2018.0,,,2020-12-01,0.0,44166.0,0.0,Wetlands - marine (e.g. mangroves),,9.0,2.0,POINT (1322910.000 -1085160.000)
7243,2479.0,180.0,137482796.0,13.710861,-8.536884,0.0,1.0,2018.0,,,2020-12-01,0.0,44166.0,0.0,Wetlands - marine (e.g. mangroves),,10.0,2.0,POINT (1322910.000 -1085160.000)
7244,2480.0,180.0,137482796.0,13.710861,-8.536884,0.0,1.0,2018.0,,,2020-12-01,0.0,44166.0,0.0,Wetlands - marine (e.g. mangroves),,11.0,2.0,POINT (1322910.000 -1085160.000)


In [11]:
input_data= input_data.drop(['F1','Unnamed__0'], axis=1)

In [12]:
input_data[0:10]

Unnamed: 0,PLOT_ID,LON,LAT,FLAGGED,ANALYSES,SENTINEL2Y,STARTDATE,ENDDATE,WATER,NO_WATER,BAD_IMAGE,NOT_SURE,CLASS,COMMENT,MONTH,WATERFLAG,geometry
0,137483266.0,5.27049,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,1.0,1.0,POINT (508530.000 1593330.000)
1,137483266.0,5.27049,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,2.0,1.0,POINT (508530.000 1593330.000)
2,137483266.0,5.27049,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,3.0,1.0,POINT (508530.000 1593330.000)
3,137483266.0,5.27049,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,4.0,1.0,POINT (508530.000 1593330.000)
4,137483266.0,5.27049,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,5.0,1.0,POINT (508530.000 1593330.000)
5,137483266.0,5.27049,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,6.0,1.0,POINT (508530.000 1593330.000)
6,137483266.0,5.27049,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,7.0,1.0,POINT (508530.000 1593330.000)
7,137483266.0,5.27049,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,8.0,1.0,POINT (508530.000 1593330.000)
8,137483266.0,5.27049,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,9.0,1.0,POINT (508530.000 1593330.000)
9,137483266.0,5.27049,12.587901,0.0,1.0,2018.0,,,2020-12-01,0.0,0.0,0.0,Open water - freshwater,River,10.0,1.0,POINT (508530.000 1593330.000)


In [8]:
coords = [(x,y) for x, y in zip(input_data.geometry.x, input_data.geometry.y)]

### Sample WOfS at the ground truth coordinates in 3 by 3 window size
To load WOFL data, we can first create a re-usable query as below that will define the time period we are interested in, as well as other important parameters that are used to correctly load the data. 

As WOFLs are created scene-by-scene, and some scenes overlap, it's important when loading data to `group_by` solar day, and ensure that the data between scenes is combined correctly by using the WOfS `fuse_func`.
This will merge observations taken on the same day, and ensure that important data isn't lost when overlapping datasets are combined.

We can convert the WOFL bit field into a binary array containing True and False values. This allows us to use the WOFL data as a mask that can be applied to other datasets.
The `make_mask` function allows us to create a mask using the flag labels (e.g. "wet" or "dry") rather than the binary numbers we used above. For more details on how to do masking on WOfS, see the [Applying_WOfS_bit_masking](../Frequently_used_code/Applying_WOfS_bitmasking.ipynb) notebook.

In [10]:
coords[0:10]

[(508530.0000273566, 1593330.0000055272),
 (508530.0000273566, 1593330.0000055272),
 (508530.0000273566, 1593330.0000055272),
 (508530.0000273566, 1593330.0000055272),
 (508530.0000273566, 1593330.0000055272),
 (508530.0000273566, 1593330.0000055272),
 (508530.0000273566, 1593330.0000055272),
 (508530.0000273566, 1593330.0000055272),
 (508530.0000273566, 1593330.0000055272),
 (508530.0000273566, 1593330.0000055272)]

In [11]:
#generate query object 
query ={'group_by':'solar_day',
        'resampling':'nearest'}

In [12]:
def get_wofs_for_point(index, row, input_data, query, results_wet, results_clear):
    dc = datacube.Datacube(app='WOfS_accuracy')
    #get the month value for each index
    month = input_data.loc[index]['MONTH'] 
    plot_id = input_data.loc[index]['PLOT_ID']
    #set the time for query of the WOfS database according to the month value in the validation table 
    time = '2018-' + f'{month:02d}' 
    #having the original query as it is 
    dc_query = deepcopy(query) 
    geom = geometry.Geometry(input_data.geometry.values[index].__geo_interface__,  geometry.CRS('EPSG:6933'))
    q = {"geopolygon":geom}
    t = {"time":time} 
    dc_query.update(t)
    dc_query.update(q)
    wofls = dc.load(product ="ga_ls8c_wofs_2",
                    y = (input_data.geometry.y[index] - 30.5, input_data.geometry.y[index] + 30.5),
                    x =(input_data.geometry.x[index] - 30.5, input_data.geometry.x[index] + 30.5),
                    crs = 'EPSG:6933',
                    time=time,
                    output_crs = 'EPSG:6933',
                    resolution=(-30,30))

    #Define a mask for wet and clear pixels 
    wet_nocloud = {"water_observed":True, "cloud_shadow":False, "cloud":False,"nodata":False}
    #Define a mask for dry and clear pixels 
    dry_nocloud = {"water_observed":False, "cloud_shadow":False, "cloud":False, "nodata":False}
    wofl_wetnocloud = masking.make_mask(wofls, **wet_nocloud).astype(int) 
    wofl_drynocloud = masking.make_mask(wofls, **dry_nocloud).astype(int)
    clear = (wofl_wetnocloud | wofl_drynocloud).water.all(dim=['x','y']).values
    n_clear = clear.sum() #record this and use it to filter out month with no valid data 
    if n_clear > 0:
        wet = wofl_wetnocloud.isel(time=clear).water.max().values  #record this as WOfS has seen water in the 3*3 window
    else:
        wet = 0 

    results_wet.update({str(int(plot_id))+"_"+str(month) : int(wet)})
    results_clear.update({str(int(plot_id))+"_"+str(month) : int(n_clear)})

In [13]:
def _parallel_fun(input_data, query, ncpus):
    
    manager = mp.Manager()
    results_wet = manager.dict()
    results_clear = manager.dict()
   
    # progress bar
    pbar = tqdm(total=len(input_data))
        
    def update(*a):
        pbar.update()

    with mp.Pool(ncpus) as pool:
        for index, row in input_data.iterrows():
            pool.apply_async(get_wofs_for_point,
                                 [index,
                                 row,
                                 input_data,
                                 query,
                                 results_wet,
                                 results_clear], callback=update)
        pool.close()
        pool.join()
        pbar.close()
        
    return results_wet, results_clear

> **The following cell can take several minutes to run.** If you set `ncpus > 1`, then this function will be run in parallel across the specified number of processes.

In [None]:
%%time
wet, clear= _parallel_fun(input_data, query, ncpus=15)

  0%|          | 0/7246 [00:00<?, ?it/s]Process ForkPoolWorker-9:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
Process ForkPoolWorker-16:
  File "<ipython-input-12-9a1738e88c9f>", line 2, in get_wofs_for_point
    dc = datacube.Datacube(app='WOfS_accuracy')
Process ForkPoolWorker-14:
Process ForkPoolWorker-15:
Process ForkPoolWorker-10:
Process ForkPoolWorker-8:
Process ForkPoolWorker-5:
Process ForkPoolWorker-11:
Process ForkPoolWorker-12:
Process ForkPoolWorker-13:
  File "/env/lib/python3.6/site-packages/datacube/api/core.py", line 84, in __init__
    validate_connection=validate_connection)
Process ForkPoolWorker-2:
Process ForkPoolWorker-4:
Process ForkPoolWorker-6:
Tra

In [11]:
input_data['PLOT_ID'] = input_data.PLOT_ID.astype(str)
input_data['MONTH']= input_data.MONTH.astype(str)

In [None]:
for key,value in wet.items(): #wet is a dictionary 
    ID = key.split('_')[0] +'.0' #plotid of observation 
    M = key.split('_')[1] #month of observation  
    for index, row in input_data.iterrows():
        if (input_data.at[index,'PLOT_ID'] == str(ID) and input_data.at[index,'MONTH'] == str(M)):
           
            input_data.at[index,'CLASS_WET'] = str(value)

In [13]:
for key,value in clear.items(): #clear observation is a dictionary 
    ID = key.split('_')[0] +'.0' #plotid of observation 
    M = key.split('_')[1] #month of observation 
    for index, row in input_data.iterrows():
        if (input_data.at[index,'PLOT_ID'] == str(ID) and input_data.at[index,'MONTH'] == str(M)):
          
            input_data.at[index,'CLEAR_OBS'] = str(value)

In [21]:
# input_data[0:10]

In [16]:
#converting the datatype to the numeric values for further analysis 
input_data['PLOT_ID'] = pd.to_numeric(input_data['PLOT_ID'])
input_data['CLASS_WET'] = pd.to_numeric(input_data['CLASS_WET'])
input_data['CLEAR_OBS'] = pd.to_numeric(input_data['CLEAR_OBS'])

In [None]:
#input_data.to_csv(('../Supplementary_data/Validation/Refined/ground_truth_RCMRD.csv'))
input_data.to_csv(('../Supplementary_data/Validation/Refined/validationpoints_w305m.csv'))

In [None]:
# As water flag more than 1 and also clear observation equal to zero 
indexNames = input_data[(input_data['WATERFLAG'] > 1) | (input_data['CLEAR_OBS'] == 0)].index 
input_data.drop(indexNames, inplace=True)

In [None]:
#this will count on the number of month for each plotID in the final table 
count = input_data.groupby(['PLOT_ID'])['MONTH'].count()

In [None]:
#save the check on the count for each plot id as a csv file 
count.to_csv('../Supplementary_data/Validation/Refined/final_RCMRD_count_w305P.csv')

In [None]:
input_data.to_csv(('../Supplementary_data/Validation/Refined/ground_truth_RCMRD_W305mfinal_parallel.csv'))

In [None]:
print(datacube.__version__)

***

## Additional information

**License:** The code in this notebook is licensed under the [Apache License, Version 2.0](https://www.apache.org/licenses/LICENSE-2.0). 
Digital Earth Africa data is licensed under the [Creative Commons by Attribution 4.0](https://creativecommons.org/licenses/by/4.0/) license.

**Contact:** If you need assistance, please post a question on the [Open Data Cube Slack channel](http://slack.opendatacube.org/) or on the [GIS Stack Exchange](https://gis.stackexchange.com/questions/ask?tags=open-data-cube) using the `open-data-cube` tag (you can view previously asked questions [here](https://gis.stackexchange.com/questions/tagged/open-data-cube)).
If you would like to report an issue with this notebook, you can file one on [Github](https://github.com/digitalearthafrica/deafrica-sandbox-notebooks).

**Last modified:** January 2020

**Compatible datacube version:** 

## Tags
Browse all available tags on the DE Africa User Guide's [Tags Index](https://) (placeholder as this does not exist yet)

### Review for further development 
* the following cells were tested for the RCMRD dataset following query WOfS for both wet and clear but it didnt improve the timing compared to the previous code. 

In [None]:
#wetDF = pd.DataFrame.from_dict(wet, orient = 'index')
#clearDF = pd.DataFrame.from_dict(clear,orient='index')

In [None]:
#mergedDF = wetDF.merge(clearDF, left_index=True, right_index=True)

In [None]:
#mergedDF = mergedDF.rename(columns={'0_x':'CLASS_WET','0_y':'CLEAR_OBS'})
#mergedDF

In [None]:
# for index, row in mergedDF.iterrows():
#     mergedDF.at[index,'ID'] = index.split('_')[0] +'.0'
#     mergedDF.at[index,'M'] = index.split('_')[1]

In [None]:
#mergedDF.reset_index(drop=True, inplace=True)
#mergedDF

In [None]:
# for index, row in input_data.iterrows():
#     for i, j in mergedDF.iterrows():
#         if (input_data.at[index,'PLOT_ID'] == mergedDF.loc[i]['ID'] and input_data.loc[index]['MONTH'] == mergedDF.loc[i]['M']):
#             input_data.at[index,'CLASS_WET'] = mergedDF.loc[i]['CLASS_WET']
#             input_data.at[index,'CLEAR_OBS'] = mergedDF.loc[i]['CLEAR_OBS']