# Point-based and Parallel Processing Water Observations from Space (WOfS) Product in Africa


## Description 
The [Water Observations from Space (WOfS)](https://www.ga.gov.au/scientific-topics/community-safety/flood/wofs/about-wofs) is a derived product from Landsat 8 satellite observations as part of provisional Landsat 8 Collection 2 surface reflectance and shows surface water detected in Africa.
Individual water classified images are called Water Observation Feature Layers (WOFLs), and are created in a 1-to-1 relationship with the input satellite data. 
Hence there is one WOFL for each satellite dataset processed for the occurrence of water.

The data in a WOFL is stored as a bit field. This is a binary number, where each digit of the number is independantly set or not based on the presence (1) or absence (0) of a particular attribute (water, cloud, cloud shadow etc). In this way, the single decimal value associated to each pixel can provide information on a variety of features of that pixel. 
For more information on the structure of WOFLs and how to interact with them, see [Water Observations from Space](../Datasets/Water_Observations_from_Space.ipynb) and [Applying WOfS bitmasking](../Frequently_used_code/Applying_WOfS_bitmasking.ipynb) notebooks.

This notebook explains how you can query WOfS product for each collected validation points in Africa based on point-based sampling approach. 

The notebook demonstrates how to:

1. Load validation points for each partner institutions
2. Query WOFL data for validation points and capture available WOfS defined class using point-based sampling and multiprocessing functionality
3. Extract a LUT for each point that contains both information for validation points and WOfS class as well number of clear observation in each month 
***


**Input data** : `<INSTITUTION>_ValidationPoints.csv>`

**Output_data** : `<INSTITUTION>_wofs_ls_valid.csv`

Last modified: 04/02/2022

### Load packages
Import Python packages that are used for the analysis.

In [1]:
%matplotlib inline

import datacube
import numpy as np
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
import multiprocessing as mp
from geopandas import GeoDataFrame
from shapely.geometry import Point
from datacube.utils import masking, geometry 
from deafrica_tools.spatial import xr_rasterize
from deafrica_tools.datahandling import wofs_fuser, deepcopy
from tqdm import tqdm

import warnings
warnings.filterwarnings("ignore")

## Analysis parameters

In [2]:
institutions = ['OSS', 'RCMRD', 'AGRHYMET', 'AFRIGIST']
ncpus = 15

## Open data, convert to geodataframe

In [3]:
gdfs = []
for i in institutions:
    path2csv = (
        "../02_Validation_data/Processed/" + i + "/" + i + "_ValidationPoints.csv"
    )
    df = pd.read_csv(path2csv, delimiter=",")

    geometries = [Point(xy) for xy in zip(df.LON, df.LAT)]
    gdf = GeoDataFrame(df, crs="epsg:4326", geometry=geometries)

    gdf.to_file(
        filename="../02_Validation_data/Processed/"
        +i+"/"+i+"_ValidationPoints.geojson"
    )

    # converting CRS to metric
    gdf = gdf.to_crs("epsg:6933")

    # Checking the size of the input data
    print(i, gdf.shape)

    gdfs.append(gdf)

OSS (8673, 19)
RCMRD (8899, 17)
AGRHYMET (8724, 18)
AFRIGIST (13835, 19)


### Sample WOfS at the ground truth coordinates

**Assumptions for ODC data query:**
- WOfS data is loaded for the nearest pixel to the validation sample lat/lon
- WOfS Data is loaded in a 10 day window around the beginning of each month
- IF WOfS has a 'wet' and 'clear' value anytime during the 10 day window then the pixel is said to be 'wet'

In [4]:
#generate query object 
query ={'group_by':'solar_day',
        'resampling':'nearest'}

## Defining functions to query WOfS database

- according to the first five days before and after of each calendar month 

In [5]:
def get_wofs_for_point(index, row, input_data, query, results_wet, results_clear):
    dc = datacube.Datacube(app="WOfS_accuracy")

    # get the month value for each index
    month = input_data.loc[index]["MONTH"]

    # get the value for time including year, month, start date and end date
    timeYM = "2018-" + f"{month:02d}"
    start_date = np.datetime64(timeYM) - np.timedelta64(5, "D")
    end_date = np.datetime64(timeYM) + np.timedelta64(5, "D")
    time = (str(start_date), str(end_date))
    plot_id = input_data.loc[index]["PLOT_ID"]

    # having the original query as it is
    dc_query = deepcopy(query)
    geom = geometry.Geometry(
        input_data.geometry.values[index].__geo_interface__, geometry.CRS("EPSG:6933")
    )
    q = {"geopolygon": geom}
    t = {"time": time}

    # updating the query
    dc_query.update(t)
    dc_query.update(q)

    # loading landsat-8 WOfs product and set the values
    # for x and y (point-based) and also (window-based)
    wofls = dc.load(
        product="wofs_ls",
        skip_broken_datasets=True,
        y=(input_data.geometry.y[index], input_data.geometry.y[index]),
        x=(input_data.geometry.x[index], input_data.geometry.x[index]),
        crs="EPSG:6933",
        time=time,
        output_crs="EPSG:6933",
        resolution=(-30, 30),
    )

    # exclude the records that wofl return as empty for water
    if not "water" in wofls:
        pass

    else:
        # Define a mask for wet and clear pixels
        wet_nocloud = {"wet": True}
        # Define a mask for dry and clear pixels
        dry_nocloud = {"dry": True}
        wofl_wetnocloud = masking.make_mask(wofls, **wet_nocloud).astype(int)
        wofl_drynocloud = masking.make_mask(wofls, **dry_nocloud).astype(int)
        clear = (wofl_wetnocloud | wofl_drynocloud).water.all(dim=["x", "y"]).values
        # record the total number of clear observations for each point in each month
        # and use it to filter out month with no valid data
        n_clear = clear.sum()
        # condition to identify whether WOfS seen water in specific month for a particular location
        if n_clear > 0:
            wet = wofl_wetnocloud.isel(time=clear).water.max().values
        else:
            wet = 0
        # updating results for both wet and clear observations
        results_wet.update({str(int(plot_id)) + "_" + str(month): int(wet)})
        results_clear.update({str(int(plot_id)) + "_" + str(month): int(n_clear)})

        return time


# parallel function for above function
def _parallel_fun(input_data, query, ncpus):

    manager = mp.Manager()
    results_wet = manager.dict()
    results_clear = manager.dict()

    # progress bar
    pbar = tqdm(total=len(input_data))

    def update(*a):
        pbar.update()

    with mp.Pool(ncpus) as pool:
        for index, row in input_data.iterrows():
            pool.apply_async(
                get_wofs_for_point,
                [index, row, input_data, query, results_wet, results_clear],
                callback=update,
            )
        pool.close()
        pool.join()
        pbar.close()

    return results_wet, results_clear

## Loop through dataframes and extract wofs data in parallel

Results are saved as .csv in the `../02_Validation_results/WOfS_Assessment/wofs_ls/Institutions/` folder

This cell directly below is for testing purposes only (hard to debug a parallel workflow)

In [6]:
# results_wet_test = dict()
# results_clear_test = dict()

# for index, row in gdfs[0][0:14].iterrows():
#     time = get_wofs_for_point(index, row, input_data, query, results_wet_test, results_clear_test)
# #     print(time)

In [7]:
for input_data, i in zip(gdfs, institutions):
    print("Running", i)

    # run the parallel function
    wet, clear = _parallel_fun(input_data, query, ncpus=ncpus)

    # extracting the final table with both CEO labels
    # and WOfS class Wet and clear observations
    wetdf = pd.DataFrame.from_dict(wet, orient="index")
    cleardf = pd.DataFrame.from_dict(clear, orient="index")
    df2 = wetdf.merge(cleardf, left_index=True, right_index=True)
    df2 = df2.rename(columns={"0_x": "CLASS_WET", "0_y": "CLEAR_OBS"})

    # split the index (which is plotid + month) into seperate columns
    for index, row in df2.iterrows():
        df2.at[index, "PLOT_ID"] = index.split("_")[0] + ".0"
        df2.at[index, "MONTH"] = index.split("_")[1]

    # reset the index
    df2 = df2.reset_index(drop=True)

    # convert plot id and month to str to help with matching
    input_data["PLOT_ID"] = input_data.PLOT_ID.astype(str)
    input_data["MONTH"] = input_data.MONTH.astype(str)

    # merge both dataframe at locations where plotid and month match
    final_df = pd.merge(input_data, df2, on=["PLOT_ID", "MONTH"], how="outer")
    
    # only keep columns we need
    final_df = final_df[
        [
            "PLOT_ID",
            "LON",
            "LAT",
            "MONTH",
            "WATERFLAG",
            "CLASS_WET",
            "CLEAR_OBS",
            "geometry"
        ]
    ]

    # check number of nan rows
    nans = final_df["CLASS_WET"].isna().sum()
    print("No of NaNs", nans)

    # export results
    final_df.to_csv(
        (
            "../02_Validation_results/WOfS_Assessment/wofs_ls/Institutions/"
            + i
            + "_wofs_ls_valid_test.csv"
        )
    )

Running OSS


100%|██████████| 8673/8673 [03:01<00:00, 47.68it/s]


No of NaNs 44
Running RCMRD


100%|██████████| 8899/8899 [02:57<00:00, 50.13it/s]


No of NaNs 214
Running AGRHYMET


100%|██████████| 8724/8724 [03:07<00:00, 46.52it/s]


No of NaNs 216
Running AFRIGIST


100%|█████████▉| 13832/13835 [05:37<00:00, 41.01it/s]


No of NaNs 307


***

## Additional information

**License:** The code in this notebook is licensed under the [Apache License, Version 2.0](https://www.apache.org/licenses/LICENSE-2.0). 
Digital Earth Africa data is licensed under the [Creative Commons by Attribution 4.0](https://creativecommons.org/licenses/by/4.0/) license.

**Contact:** If you need assistance, please post a question on the [Open Data Cube Slack channel](http://slack.opendatacube.org/) or on the [GIS Stack Exchange](https://gis.stackexchange.com/questions/ask?tags=open-data-cube) using the `open-data-cube` tag (you can view previously asked questions [here](https://gis.stackexchange.com/questions/tagged/open-data-cube)).
If you would like to report an issue with this notebook, you can file one on [Github](https://github.com/digitalearthafrica/deafrica-sandbox-notebooks).

**Last modified:** September 2020

**Compatible datacube version:** 

In [8]:
print(datacube.__version__)

1.8.6
