In [1]:
import logging
import os
from importlib import import_module

import click
import datacube
import fsspec
import geopandas as gpd
from rasterio.errors import RasterioIOError

from deafrica_conflux.cli.logs import logging_setup
from deafrica_conflux.db import get_engine_waterbodies
from deafrica_conflux.id_field import guess_id_field
from deafrica_conflux.io import (
    check_file_exists,
    check_if_s3_uri,
    table_exists,
    write_table_to_parquet,
)
from deafrica_conflux.plugins.utils import run_plugin, validate_plugin
from deafrica_conflux.stack import stack_waterbodies_parquet_to_db

In [2]:
# These are the default AWS configurations for the Analysis Sandbox.
# that are set in the environmnet variables. 
aws_default_config = {
    #'AWS_NO_SIGN_REQUEST': 'YES', 
    'AWS_SECRET_ACCESS_KEY': 'fake',
    'AWS_ACCESS_KEY_ID': 'fake',
}

# To access public bucket, need to remove the AWS credentials in 
# the environment variables or the following error will occur.
# PermissionError: The AWS Access Key Id you provided does not exist in our records.

for key in aws_default_config.keys():
    if key in os.environ:
        del os.environ[key]

In [3]:
dataset_ids_file = "data/dataset_ids.txt"
verbose = 1
plugin_name = "waterbodies_timeseries_v2"
# This is the directory containing the parquet files for water body polygons split by wofs_ls regions.
polygons_directory = "s3://deafrica-waterbodies-dev/0-2-0/test_directory/polygons_split_by_region/"
# Unique key id in polygons vector files.
use_id = "UID"
# Directory to write the timeseries parquet files.
output_directory = "s3://deafrica-waterbodies-dev/0-2-0/test_directory/pq_files"
# Rerun scenes that have already been processed.
overwrite = True
# Write to the Waterbodies database.
db = False
# Include polygons that only partially intersect the scene.
partial = True
# Include data from over the scene boundary.
overedge = True
# Not matter DataFrame is empty or not, always as it as Parquet file.
dump_empty_dataframe = True

In [4]:
# "Support" pathlib Paths.
dataset_ids_file = str(dataset_ids_file)
polygons_directory = str(polygons_directory)
output_directory = str(output_directory)

In [5]:
# Set up logger.
logging_setup(verbose)
_log = logging.getLogger(__name__)

In [6]:
# Read the plugin as a Python module.
module = import_module(f"deafrica_conflux.plugins.{plugin_name}")
plugin_file = module.__file__
plugin = run_plugin(plugin_file)
_log.info(f"Using plugin {plugin_file}")
validate_plugin(plugin)

[2023-10-26 18:13:36,754] {1894110324.py:5} INFO - Using plugin /home/jovyan/dev/deafrica-conflux/deafrica_conflux/plugins/waterbodies_timeseries_v2.py


In [7]:
%%time
# Check if the text file containing dataset ids file exists.
if not check_file_exists(dataset_ids_file):
    _log.error(f"Could not find text file {dataset_ids_file}!")
    raise FileNotFoundError(f"Could not find text file {dataset_ids_file}!")
    
# Read ID/s from the S3 URI or File URI.
if check_if_s3_uri(dataset_ids_file):
    fs = fsspec.filesystem("s3")
else:
    fs = fsspec.filesystem("file")
    
with fs.open(dataset_ids_file, "r") as file:
    dataset_ids = [line.strip() for line in file]
_log.info(f"Read {dataset_ids} from file.")

[2023-10-26 18:13:36,761] {<timed exec>:14} INFO - Read ['936f161e-e5f3-5627-b8c7-834bfb5e8bcc', '56633394-d128-5b82-b366-f2ac6c7d391e', 'aaa1f697-ce35-557b-b406-86c6ab3120b7', 'c367b59e-faaf-5408-9214-89d5b6384032', 'f0b94027-593f-5e5c-8629-1fc13a6bcfdb', '75f93f21-a0e5-5d13-8683-822ed4fe6cbc'] from file.
CPU times: user 1.87 ms, sys: 0 ns, total: 1.87 ms
Wall time: 1.62 ms


In [8]:
if db:
    engine = get_engine_waterbodies()

In [9]:
# Connect to the datacube.
dc = datacube.Datacube(app="deafrica-conflux-drill")

In [10]:
# Get the product name from the plugin.
product_name = plugin.product_name

In [11]:
# As part of the drill function
import collections
import datetime
import logging
import multiprocessing
import warnings
import functools
from types import ModuleType

import datacube
import geopandas as gpd
import numpy as np
import pandas as pd
import shapely.geometry
import tqdm
import xarray as xr
from datacube.model import Dataset
from datacube.utils.geometry import Geometry
from deafrica_tools.spatial import xr_rasterize

from skimage.measure import regionprops
from deafrica_conflux.drill import get_intersections

In [12]:
def get_polygons_within_ds_extent(polygons_gdf: gpd.GeoDataFrame, ds: Dataset) -> gpd.GeoDataFrame:
    """
    Filter a set of polygons to include only polygons within (contained in) the extent of a dataset.
    """
    # Get the extent of the dataset.
    ds_extent = ds.extent
    ds_extent_crs = ds_extent.crs
    ds_extent_geom = ds_extent.geom
    ds_extent_gdf = gpd.GeoDataFrame(geometry=[ds_extent_geom], crs=ds_extent_crs).to_crs(polygons_gdf.crs)
    
    # Get all polygons that are contained withn the extent of the dataset.
    polygon_ids_within_ds_extent = ds_extent_gdf.sjoin(polygons_gdf, how="inner", predicate="contains")["index_right"].to_list()
    polygons_within_ds_extent = polygons_gdf.loc[polygon_ids_within_ds_extent]
    
    return polygons_within_ds_extent

In [13]:
def get_polygons_intersecting_ds_extent(polygons_gdf: gpd.GeoDataFrame, ds: Dataset) -> gpd.GeoDataFrame:
    """
    Filter a set of polygons to only include polygons that intersect with
    the extent of a dataset.

    Parameters
    ---------
    polygons_gdf : gpd.GeoDataFrame
    ds : Dataset

    Returns
    -------
    gpd.GeoDataFrame
    """
    # Get the extent of the dataset.
    ds_extent = ds.extent
    ds_extent_crs = ds_extent.crs
    ds_extent_geom = ds_extent.geom
    ds_extent_gdf = gpd.GeoDataFrame(geometry=[ds_extent_geom], crs=ds_extent_crs).to_crs(polygons_gdf.crs)

    # Get all polygons that intersect with the extent of the dataset.
    polygon_ids_intersecting_ds_extent = ds_extent_gdf.sjoin(polygons_gdf, how="inner", predicate="intersects")["index_right"].to_list()
    polygons_intersecting_ds_extent = polygons_gdf.loc[polygon_ids_intersecting_ds_extent]

    return polygons_intersecting_ds_extent

In [14]:
def filter_large_polygons(
    polygons_gdf: gpd.GeoDataFrame, ds: Dataset
) -> gpd.GeoDataFrame:
    """
    Filter out large polygons from the set of polygons.
    Large polygons are defined as polygons which are large than 3 scenes
    in width and in height.
    
    Arguments
    ---------
    polygons_gdf : gpd.GeoDataFrame
    ds : datacube.model.Dataset

    Returns
    -------
    gpd.GeoDataFrame
    """
    # Get the extent of the dataset.
    ds_extent = ds.extent

    # Reproject the extent of the dataset to match the set of polygons.
    ds_extent = ds_extent.to_crs(polygons_gdf.crs)

    # Get the bounding box of the extent of the dataset.
    bbox = ds_extent.boundingbox
    left, bottom, right, top = bbox

    # Create a polygon 3 dataset extents in width and height.
    width = right - left
    height = top - bottom

    testbox = shapely.geometry.Polygon(
        [
            (left - width, bottom - height),
            (left - width, top + height),
            (right + width, top + height),
            (right + width, bottom - height),
        ]
    )

    filtered_polygons_gdf = polygons_gdf[~polygons_gdf.geometry.intersects(testbox.boundary)]

    return filtered_polygons_gdf

In [15]:
def drill(
    plugin: ModuleType,
    polygons_gdf: gpd.GeoDataFrame,
    reference_dataset: Dataset,
    partial=True,
    overedge=True,
    dc: datacube.Datacube | None = None,
    time_buffer=datetime.timedelta(hours=1),
) -> pd.DataFrame:
    """
    Perform a polygon drill.

    Arguments
    ---------
    plugin : module
        A validated plugin to drill with.

    polygons_gdf : GeoDataFrame
        A GeoDataFrame with the ID (column containing the polygons ids) as the index.

    reference_dataset : Dataset
        Refernce dataset to process.

    partial : bool
        Optional (defaults to True). Whether to include polygons that partially
        overlap with the scene. If partial is True, polygons that partially
        overlap with the scene are included. If partial is False, polygons that
        partially overlap with the scene are excluded from the drill, and going
        off the edge of the scene will exclude the entire polygon. Describes
        what happens to the polygon, not what happens to the data. Interacts
        with overedge, which describes what happens to the overedge data.

    overedge : bool
        Optional (defaults to False). Whether to include data from other scenes
        in partially overedge polygons. If overedge is False, data from other
        scenes is not included in results. If overedge is True, data from other
        scenes is included in results. Interacts with partial.

    dc : datacube.Datacube
        Optional existing Datacube.

    time_buffer : datetime.timedelta
        Optional (default 1 hour). Only consider datasets within
        this time range for overedge.

    Returns
    -------
    Drill table : pd.DataFrame
        Index = polygon ID
        Columns = output bands
    """
    
    # Validate partial and overedge parameters.
    if not partial:
        if overedge:
            _log.error("overedge=True expects partial=True")
            raise ValueError("overedge=True expects partial=True")

    # TODO: Generalize to work with multiple products and 
    # products with multiple measurements measurements.

    # Check the plugin does not have multiple products to load.
    # Using multiple products is not is not implemented.
    if len(plugin.input_products.items()) > 1:
        raise NotImplementedError("Expected one product in plugin")
    else:
        reference_product = reference_dataset.type.name
        assert reference_product == list(plugin.input_products.keys())[0]
        measurements = plugin.input_products[reference_product]
        if len(measurements) > 1:
            raise NotImplementedError("Expected 1 measurement in plugin")
        else:
            measurement = measurements[0]

    # Get a datacube if we don't have one already.
    if dc is None:
        dc = datacube.Datacube(app="deafrica-conflux-drill")

    # Get the output crs and resolution from the plugin.
    output_crs = plugin.output_crs
    resolution = plugin.resolution
    if hasattr(plugin, "resampling"):
        resampling = plugin.resampling
    else:
        resampling = "nearest"

    # Reproject the polygons to the required CRS.
    polygons_gdf = polygons_gdf.to_crs(output_crs)

    # Filter the polygons based on the partial and overedge parameter.
    if partial:
        # Include polygons that partially overlap with the scene.
        # i.e. only  include polygons that intersect with the dataset extent.
        filtered_polygons_gdf = get_polygons_intersecting_ds_extent(polygons_gdf, reference_dataset)
        _log.info(f"Filtered out {len(polygons_gdf) - len(filtered_polygons_gdf)} polygons out of {len(polygons_gdf)} polygons.")
        if overedge:
            # If overedge, remove anything which intersects with a 3-scene
            # width box.
            n = len(filtered_polygons_gdf)
            filtered_polygons_gdf = filter_large_polygons(filtered_polygons_gdf, reference_dataset)
            _log.info(f"Overedge filter removed {n - len(filtered_polygons_gdf)} polygons larger than 3 scenes in width and height out of {n} polygons.")
    else:
        # Do not include polygons that partially overlap with the scene.
        # i.e. only include polygons within the dataset extent.
        filtered_polygons_gdf = get_polygons_within_ds_extent(polygons_gdf, reference_dataset)
        _log.info(f"Filtered out {len(polygons_gdf)- len(filtered_polygons_gdf)} polygons out of {len(polygons_gdf)} polygons.")

    if len(filtered_polygons_gdf) == 0:
        scene_uuid = str(reference_dataset.id)
        _log.warning(f"No polygons found in scene {scene_uuid}")
        return pd.DataFrame({})

    # Load the reference dataset.
    if overedge:
        # Search for all the datasets neighbouring our reference dataset that we need to cover 
        # the area of the polygons.

        # Get the bounding box for the polygons.
        geopolygon = Geometry(geom=shapely.geometry.box(*filtered_polygons_gdf.total_bounds), crs=filtered_polygons_gdf.crs)

        # TODO: Test using a 1 day
        # Get the time range to use for searching for datasets neighbouring our reference dataset.
        time_span = (reference_dataset.center_time - time_buffer, reference_dataset.center_time + time_buffer)

        req_datasets = dc.find_datasets(product=reference_product, geopolygon=geopolygon, time=time_span, ensure_location=True)

        reference_scene = dc.load(datasets=req_datasets,
                                  measurements=measurements,
                                  geopolygon=geopolygon,
                                  time=time_span,
                                  output_crs=output_crs,
                                  resolution=resolution,
                                  group_by="solar_day",
                                  resampling=resampling
                                 )

        _log.info(f"Loaded the {len(req_datasets)} required datasets to cover all the polygons: {', '.join([str(dataset.id) for dataset in req_datasets])} .")

    else:    
        # Load the reference scene.
        reference_scene = dc.load(datasets=[reference_dataset], measurements=measurements, output_crs=output_crs, resolution=resolution, resampling=resampling)

        _log.info(f"Loaded the reference dataset {str(reference_dataset.id)} ." )


    _log.info(f"Reference scene is {reference_scene.sizes}")

    # Transform the loaded data.
    # Force warnings to raise exceptions.
    # This means users have to explicitly ignore warnings.
    ds = reference_scene.isel(time=0)
    with warnings.catch_warnings():
        warnings.filterwarnings("error")
        ds_transformed = plugin.transform(ds)[measurement]


    # Assign a one-indexed numeric column for the polygons.
    # This will allow us to build a polygon enumerated raster.
    attr_col = "_conflux_one_index"
    # This mutates the (in-memory) polygons_gdf, but that's OK.
    filtered_polygons_gdf[attr_col] = range(1, len(filtered_polygons_gdf.index) + 1)
    conflux_one_index_to_id = {v: k for k, v in filtered_polygons_gdf[attr_col].to_dict().items()}

    # Build the enumerated polygon raster.
    polygon_raster = xr_rasterize(filtered_polygons_gdf, reference_scene, attr_col)

    # Get the summaries.
    # Convert the xarrat.DataArrays to a numpy array for image processing.
    np_polygon_raster = polygon_raster.to_numpy().astype(int)

    np_ds_transformed = ds_transformed.to_numpy().astype(float)

    # For each polygon, perform the summary.
    props = regionprops(
            label_image=np_polygon_raster, intensity_image=np_ds_transformed, extra_properties=(plugin.summarise,)
        )
    summary_df_list = []
    for region_prop in props:
        polygon_summary_df = region_prop.summarise
        polygon_index = conflux_one_index_to_id[region_prop.label]
        polygon_summary_df.index = [polygon_index]
        summary_df_list.append(polygon_summary_df)

    summary_df = pd.concat(summary_df_list, ignore_index=False)

    # Detect intersections.
    # We only have to do this if partial and not overedge.
    # If not partial, then there can't be any intersections.
    # If overedge, then there are no partly observed polygons.
    if partial and not overedge:
        intersection_features = get_intersections(filtered_polygons_gdf, reference_scene.extent)
        intersection_features.rename(
            inplace=True,
            columns={
                "North": "conflux_n",
                "South": "conflux_s",
                "East": "conflux_e",
                "West": "conflux_w",
            },
        )
        # Merge in the edge information.
        summary_df = summary_df.join(
            # left join only includes objects with some
            # representation in the scene
            intersection_features,
            how="left",
        )

    return summary_df

In [16]:
%%time
failed_dataset_ids = []
for i, dataset_id in enumerate(dataset_ids):
    _log.info(f"Processing {dataset_id} ({i + 1}/{len(dataset_ids)})")
    
    # Load the dataset using the dataset id.
    reference_dataset = dc.index.datasets.get(dataset_id)
    
    # Get the region code for the dataset.
    region_code = reference_dataset.metadata.region_code
    
    # Get the center time for the dataset
    centre_time = reference_dataset.center_time
  
    # Use the center time to check if a parquet file for the dataset already
    # exists in the output directory.
    if not overwrite:
        _log.info(f"Checking existence of {dataset_id}")
        exists = table_exists(product_name, dataset_id, centre_time, output_directory)
    
    if overwrite or not exists:
        try:
            # Load the water body polygons for the region
            polygons_vector_file = os.path.join(polygons_directory, f"{region_code}.parquet")
            try:
                polygons_gdf = gpd.read_parquet(polygons_vector_file)
            except Exception as error:
                _log.exception(f"Could not read file {polygons_vector_file}")
                raise error
            else:
                # Guess the ID field.
                id_field = guess_id_field(polygons_gdf, use_id)
                _log.debug(f"Guessed ID field: {id_field}")

                # Set the ID field as the index.
                polygons_gdf.set_index(id_field, inplace=True)
                
            # Perform the polygon drill on the dataset
            table = drill(plugin,
                          polygons_gdf,
                          reference_dataset,
                          partial,
                          overedge, 
                          dc)
            # Write the table to a parquet file.
            if (dump_empty_dataframe) or (not table.empty):
                pq_filename = write_table_to_parquet(product_name, dataset_id, centre_time, table, output_directory)
                if db:
                    log.info(f"Writing {pq_filename} to database")
                    stack_waterbodies_parquet_to_db(parquet_file_paths=[pq_filename], verbose=verbose, engine=engine, drop=False,)
        except KeyError as keyerr:
            log.exception(f"Found {dataset_id} has KeyError: {str(keyerr)}")
            failed_dataset_ids.append(dataset_id)
        except TypeError as typeerr:
            _log.exception(f"Found {dataset_id} has TypeError: {str(typeerr)}")
            failed_dataset_ids.append(dataset_id)
        except RasterioIOError as ioerror:
            _log.exception(f"Found {dataset_id} has RasterioIOError: {str(ioerror)}")
            failed_dataset_ids.append(dataset_id)
        else:
            _log.info(f"{dataset_id} successful")
    else:
        _log.info(f"{dataset_id} already exists, skipping")

    if failed_dataset_ids:
        # Write the failed dataset ids to a text file.
        parent_folder, file_name = os.path.split(dataset_ids_file)
        file, file_extension = os.path.splitext(file_name)
        failed_datasets_text_file = os.path.join(
            parent_folder, file + "_failed_dataset_ids" + file_extension
        )

        with fs.open(failed_datasets_text_file, "a") as file:
            for dataset_id in failed_dataset_ids:
                file.write(f"{dataset_id}\n")

        _log.info(f"Failed dataset IDs {failed_dataset_ids} written to: {failed_datasets_text_file}.")

[2023-10-26 18:13:37,473] {<timed exec>:3} INFO - Processing 936f161e-e5f3-5627-b8c7-834bfb5e8bcc (1/6)
[2023-10-26 18:13:37,791] {id_field.py:64} INFO - Values in the column UID are unique.
[2023-10-26 18:13:38,002] {340435804.py:95} INFO - Filtered out 1463 polygons out of 1737 polygons.
[2023-10-26 18:13:41,302] {340435804.py:101} INFO - Overedge filter removed 1 polygons larger than 3 scenes in width and height out of 274 polygons.
[2023-10-26 18:13:44,020] {340435804.py:137} INFO - Loaded the 3 required datasets to cover all the polygons: 936f161e-e5f3-5627-b8c7-834bfb5e8bcc, 56633394-d128-5b82-b366-f2ac6c7d391e, 5f9dc6b0-bd63-5382-915f-6e1074bb6975 .
[2023-10-26 18:13:44,021] {340435804.py:146} INFO - Reference scene is Frozen({'time': 1, 'y': 6553, 'x': 5972})
[2023-10-26 18:13:45,277] {credentials.py:620} INFO - Found credentials in shared credentials file: ~/.aws/credentials
[2023-10-26 18:13:45,635] {io.py:258} INFO - Table written to s3://deafrica-waterbodies-dev/0-2-0/test_

In [23]:
from deafrica_conflux.stack import StackMode, stack_parquet

# Stack the waterbodies files
"""
Stack outputs of deafrica-conflux into other formats.
"""
# Path to the Parquet directory
parquet_path = "s3://deafrica-waterbodies-dev/0-2-0/test_directory/pq_files"
pattern = ".*"
# Output directory for waterbodies-style stack
stack_output_directory = 's3://deafrica-waterbodies-dev/0-2-0/test_directory'
mode = "waterbodies"
# Remove timeseries duplicated data if applicable. 
remove_duplicated_data = True
# Convert mode to StackMode
mode_map = {
    "waterbodies": StackMode.WATERBODIES,
    "waterbodies_db": StackMode.WATERBODIES_DB,
}


kwargs = {}
if mode == "waterbodies":
    kwargs["output_directory"] = stack_output_directory
    kwargs["remove_duplicated_data"] = remove_duplicated_data
elif mode == "waterbodies_db":
    kwargs["drop"] = drop

stack_parquet(
    path=parquet_path, pattern=pattern, mode=mode_map[mode], verbose=verbose, **kwargs
)


[2023-10-26 18:17:27,015] {stack.py:543} INFO - Begin to query s3://deafrica-waterbodies-dev/0-2-0/test_directory/pq_files with pattern .*
[2023-10-26 18:17:27,382] {stack.py:113} INFO - Found 6 parquet files.
[2023-10-26 18:17:27,383] {stack.py:252} INFO - Reading...


  0%|          | 0/6 [00:00<?, ?it/s]

[2023-10-26 18:17:28,042] {stack.py:265} INFO - Writing...
[2023-10-26 18:17:28,046] {stack.py:273} INFO - Writing s3://deafrica-waterbodies-dev/0-2-0/test_directory/kxte/kxte6fxgx.csv
[2023-10-26 18:17:28,075] {stack.py:273} INFO - Writing s3://deafrica-waterbodies-dev/0-2-0/test_directory/kxte/kxte3zzgp.csv
[2023-10-26 18:17:28,101] {stack.py:273} INFO - Writing s3://deafrica-waterbodies-dev/0-2-0/test_directory/kxte/kxte82w1t.csv
[2023-10-26 18:17:28,126] {stack.py:273} INFO - Writing s3://deafrica-waterbodies-dev/0-2-0/test_directory/kxte/kxte810ze.csv
[2023-10-26 18:17:28,155] {stack.py:273} INFO - Writing s3://deafrica-waterbodies-dev/0-2-0/test_directory/kxte/kxte83xg3.csv
[2023-10-26 18:17:28,180] {stack.py:273} INFO - Writing s3://deafrica-waterbodies-dev/0-2-0/test_directory/kxte/kxte8d42k.csv
[2023-10-26 18:17:28,204] {stack.py:273} INFO - Writing s3://deafrica-waterbodies-dev/0-2-0/test_directory/kxte/kxte8dhkr.csv
[2023-10-26 18:17:28,227] {stack.py:273} INFO - Writing s3:

In [24]:
# Read a single csv file
timeseries =  pd.read_csv("s3://deafrica-waterbodies-dev/0-2-0/test_directory/kxyy/kxyy2mycv.csv")

In [25]:
timeseries

Unnamed: 0,date,pc_wet,px_wet,area_wet_m2,pc_dry,px_dry,area_dry_m2,pc_invalid,px_invalid,area_invalid_m2
0,2003-01-17T07:49:46Z,,,,,,,100.0,30.0,27000.0
