# MODIS Monthly Snow Preprocessing Pipeline & ARD Analytics Using Dask

The following documentation leverages GEOAnalytics Canada's JupyterHub, Container Registry, Pipeline, and Dask Cluster systems to collect, preprocess, and then analyse data.

Gathering and preprocessing data can be easily automated using GEOAnalytics' Pipeline system. 
This particular Workflow is tailored toward [MODIS Snow Cover Daily (modis-10A1-061) product, a Level-3 product produced by NASA](https://nsidc.org/sites/default/files/mod10a1-v061-userguide_1.pdf) data. 
Each Task in the Pipeline system uses a custom Docker Image from [GEOAnalytics Container Registry](https://docs.geoanalytics.ca/1_getting_started/08-container-registry.html) (CR). 
> **Note**
> GEOAnalytics CR portal for members of Earth Obervation for Public Health is found at: https://registry.eo4ph.geoanalytics.ca

Quoting the User Guide:
>Snow-covered land typically has very high reflectance in visible bands and very low reflectance in
shortwave infrared bands. The Normalized Difference Snow Index (NDSI) reveals the magnitude of
this difference. The snow cover algorithm calculates NDSI for all land and inland water pixels in
daylight using Terra MODIS band 4 (visible green) and band 6 (shortwave near-infrared).

We will use AOI's defined by [Canada's Open Data Provincial Administrative Boudnaries](https://open.canada.ca/data/en/dataset/306e5004-534b-4110-9feb-58e3a5c3fd97/resource/15ea678b-8667-43ce-b80c-6545e039ee00) to drill down on a Province's snow cover. 
Using this AOI to determine data intersections, we preprocess accordingly to a minimal ARD output, preserving all original data values, saving out to Cloud Storage to be used in subsequent analytics. 

GEOAnalytics Canada Pipelines leverages Argo Workflows as the underlying workflow manager and we recommend using Python Hera framework to declare the components, as shown below.

The Pipeline portion of this code relies on the following core package versions:
```bash
hera==5.5.1
xarray==2023.7.0
rioxarray==0.15.0
pystac_client==0.7.2
planetary-computer==1.0.0
```

In [1]:
# Global Configuration
# --------------------
registry_url = 'registry.eo4ph.geoanalytics.ca'
repository = 'tutorial'
image_version_tag = '0.1.1'
docker_image_tag = f'{registry_url}/{repository}/snow-cover:{image_version_tag}'

## Create Custom Container Image From Dockerfile

To be able to execute the methods we will be implementing below, we will create a simple and custom minimal environment necessary for our functions to run in.
In this documentation, we use Python's Docker framework to build our Image, log in to [GEOAnalytics Container Registry](https://docs.geoanalytics.ca/1_getting_started/08-container-registry.html) (CR), and finally push our Image to be used in our Pipeline Workflow. 
The Image is built using the Docker in Docker (DinD) sidecar, accessible over localhost and preconfigured for the JupyterHub session to use. 

The Pipeline system is capable of pulling Images from GEOAnalytics CR.

The same effect can be accomplished via an external Dockerfile and using the Docker CLI commands.

In [None]:
! pip install --user docker==6.1.3

In [None]:
# Section Imports
# ---------------
import os
import io
import getpass
import docker

In [None]:
# DinD (Docker in Docker) sidecar is accessible over localhost
docker_client = docker.from_env()

The following code shows a String representation of a Dockerfile which can be turned into a Pythonic file-like object, which the Python Docker framework `build()` method accepts. 
Alternatively, if keeping such things in memory is not necessary, a file path on the filesystem can be used to point at a Dockerfile. 

In [None]:
dockerfile = """
FROM python:3.11-slim
RUN apt update --no-install-recommends \
    >>/dev/null
    
RUN pip install \
        pandas==2.0.3 \
        geopandas==0.13.2 \
        numpy==1.24.4 \
        pystac==1.8.3 \
        pystac_client==0.7.2 \
        planetary-computer==1.0.0 \
        odc-stac==0.3.6 \
        azure-storage-blob==12.17.0 \
        xarray==2023.7.0 \
        rioxarray==0.15.0 \
        pyproj==3.6.0 \
        shapely==2.0.1 \
        Pillow==10.0.0
"""
dockerfile_file = io.BytesIO(dockerfile.encode())

In [None]:
dockerfile_file.name = 'dockerfile'

In [None]:
docker_client.images.build(fileobj=dockerfile_file, tag=docker_image_tag)

Log in to GEOAnalytics CR using your username (inferred from environment variable) and password. 
You will need to log in in order to 1. verify that you are a authenticated User and 2. be able to push to your public and private repositories. 

In [None]:
docker_client.login(
    username=os.getenv('JUPYTERHUB_USER'),
    password=getpass.getpass(),
    registry=registry_url
)

Finally, push your new custom Image to the CR. 

In [None]:
docker_client.images.push(docker_image_tag)

## Pipeline Workflow

This section decalares the components used in the preprocessing workflow

In [2]:
# Import the required libraries
import os
import yaml

from typing import List, Dict, Tuple, Any

from hera.shared import global_config
from hera.workflows import (
                        Artifact, 
                        Workflow, 
                        script, 
                        DAG, 
                        Resources, 
                        Parameter, 
                        Env,
                        RetryStrategy,
                        models as m,
)

Set the Global configuration for Argo Workflows - the underlying system for GEOAnalytics Canada Pipelines. 

In [3]:
global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv("WORKFLOW_HOST")

GEOAnalytics Canada offers a variety of Node types to provision workloads on to. To determine what is available, contact the Admin group. 

In [4]:
user_config = None
with open(f'/home/jovyan/geoanalytics_{os.getenv("JUPYTERHUB_USER")}/userConfig.yaml', 'r') as f:
    user_config = yaml.safe_load(f)

In [5]:
pipeline_node_config = user_config['pipeline']['node-config']

The first tasks we need to implement are those to gather the prerequisite AOI artifact and to prepare the provided temporal range that will be parallelised in the next steps. 

The AOI file is hosted as a zipped up Shapefile. Since Canada's Data portal is Open, we can use Python's `requests` library to `GET` the data. 
However, Shapefiles are archaic and clumsy when not on a local filesystem - so we will convert this into GeoJSON file that is more Cloud-friendly. 
The GeoJSON file will be saved as an Artifact, 

In [14]:
@script(
    image=docker_image_tag,
    resources=Resources(cpu_request='100m', memory_request='512Mi'),
    node_selector=pipeline_node_config['small']['nodeSelector'],
    tolerations=pipeline_node_config['small']['tolerations'],
    outputs=[Artifact(name='aoi', path='/tmp/aoi.geojson')]
)
def generate_polygon(
        url: str,
        target_province: str
    ):
    import os
    import io
    import zipfile
    import requests

    os.environ['USE_PYGEOS'] = '0'
    import geopandas as gpd
    
    from glob import glob
    from shapely.geometry import Polygon, MultiPolygon
    
    try:
        os.makedirs('/tmp/canvec')
    except FileExistsError:
        pass
    
    res = requests.get(url)
    with zipfile.ZipFile(io.BytesIO(res.content), 'r') as zf:
        zf.extractall(path='/tmp/canvec')

    shp_file = glob('/tmp/canvec/**/*geo_pol*_2.shp')[0]
    gdf = gpd.read_file(shp_file)

    # Pull out Adminitrative Boundary to usable Shape
    tgt_col = 'juri_en'
    bc_all = gdf[gdf[tgt_col] == target_province]
    mp = MultiPolygon([poly for poly in bc_all.geometry.values.tolist() if isinstance(poly, Polygon)])
    bc = gpd.GeoDataFrame(data={'geom': [mp]})
    bc = bc.set_geometry('geom')

    # Populate an Artifact, defined in the wrapper, with the GeoJSON file
    bc.to_file('/tmp/aoi.geojson', driver='GeoJSON')
    
    
    

Using the configuration set by the User at the main workflow's entrypoint, we generate a "payload" for subsequent Tasks which will allow the desired set of data to be gathered and processed. 

As a bonus - this Task can also be configured in a [CRON Workflow](https://argoproj.github.io/argo-workflows/cron-workflows/) (a Workflow that executes on a specific schedule) ([Hera implementation](https://hera-workflows.readthedocs.io/en/latest/examples/workflows/upstream/cron_workflow/?h=cron)) where the `years` and `months` arguments may be `None` and therefore default to the current year and month, as defined by the functions code.
Then a CRON schedule for the beginning of the month (given a few days for L3 data to be processed and uploaded) can be set to ingest data of interest automatically. 

In [7]:
@script(
    image=docker_image_tag,
    resources=Resources(cpu_request='100m', memory_request='128Mi'),
    node_selector=pipeline_node_config['small']['nodeSelector'],
    tolerations=pipeline_node_config['small']['tolerations']
)
def fan_out_temporal_range(
        catalog_url: str,
        collection: str,
        assets: List[str],
        years: List[int] | None, # Will default to current year
        months: List[int] | None, # Will default to current month 
        epsg: int,
        resolution: int,
        target_province: str,
        output_prefix: str = '',
    ):
    import sys
    import json
    from datetime import datetime
    
    if years is not None:
        for year in years:
            if year < 2000:
                raise ValueError(f'MODIS collection starts at 02/24/2000: Error {start_year}')
            elif year > int(datetime.now().strftime('%Y')):
                raise ValueError(f'Cannot query future: Error {end_year}')
            else:
                pass
    else:
        years = [int(datetime.now().strftime('%Y'))]

    if months is not None:
        for month in months:
            if month < 1 or month > 12:
                raise ValueError(f'Month range is from 1 to 12, inclusive: Error: {start_month}')
    else:
        months = [int(datetime.now().strftime('%m'))]
        
    temporal_range = []
    for year in years:
        for month in months:
            if month < 10:
                month = f'0{month}'

            payload = {
                'catalog_url': catalog_url,
                'collection': collection,
                'assets': assets,
                'year': year,
                'month': month,
                'epsg': epsg,
                'resolution': resolution,
                'prov': target_province,
                'output_prefix': output_prefix,
            }
            temporal_range.append(payload)
    json.dump(temporal_range, sys.stdout)
    

Using the temporal range list and the AOI Artifact, we can parallelise querying for available data from [Planetary Computer's STAC server](https://planetarycomputer.microsoft.com/dataset/modis-10A1-061#overview).
MODIS Snow Cover Daily, as the name suggests, is a daily product. 
However, as can be seen in the following Task, the dates gathered by the initial query are converted into a new "payload" for the next Task. 
This implements some resiliency to missing dates or when targetting a non-daily product in order to successfully gather data.

In [8]:
@script(
    image=docker_image_tag,
    resources=Resources(cpu_request='500m', memory_request="3Gi"),
    node_selector=pipeline_node_config['small']['nodeSelector'],
    tolerations=pipeline_node_config['small']['tolerations'],
    inputs=[Artifact(name='aoi', path='/tmp/aoi.geojson')]
)
def query_for_data(month_payload: Dict[str, Any]):
    import os
    import sys
    import json
    import pystac_client
    import planetary_computer

    os.environ['USE_PYGEOS'] = '0'
    import geopandas as gpd
    
    from pathlib import Path
    from datetime import datetime

    try:
        os.makedirs('/tmp/payloads')
    except:
        pass

    payload = month_payload
    
    geom = gpd.read_file('/tmp/aoi.geojson', driver='GeoJSON').iloc[0].geometry

    catalog = pystac_client.Client.open(
        payload['catalog_url']
    )

    query_time_range = f"{payload['year']}-{payload['month']}"
    query = catalog.search(
        collections=payload['collection'],
        intersects=geom.envelope,
        datetime=query_time_range
    )

    date_track_list = []
    item_payloads = []
    for item in query.items_as_dicts():
        target_date = datetime.strftime(
            datetime.strptime(
                item['properties']['start_datetime'], 
                '%Y-%m-%dT%H:%M:%SZ'
            ), '%Y-%m-%d')
        if target_date not in date_track_list:
            date_track_list.append(target_date)
            item_payload = {}
            item_payload['query-date'] = target_date
            item_payload.update(payload)
            item_payloads.append(item_payload)
    
    json.dump(item_payloads, sys.stdout)



The following Task is the engine of the preprocessing - the data is pulled and operated on. 
As networks can be susceptible to down-time, a `RetryStrategy` is implemented in order to attempt again in case of a failed communication. 
If the request fails up to the limit, then this and subsequent Tasks will also fail. 
The AOI Artifact retrieved at the beginning of the Workflow is now used not just to query but also to clip the retrieved data. 
This minimises unwanted data being stored and further processed - avoiding wasting compute on what would be discarded otherwise. 
Each Task will receive a list of payloads targetting the temporal window defined in the previous step (a month in this specific case). 
Once preprocessed, the data is saved to Cloud storage and the path to that data is delivered to the next Task. 

In [9]:
@script(
    image=docker_image_tag,
    resources=Resources(cpu_request='2000m', memory_request="16Gi"),
    node_selector=pipeline_node_config['medium']['nodeSelector'],
    tolerations=pipeline_node_config['medium']['tolerations'],
    retry_strategy=RetryStrategy(
            limit=10,
            backoff=m.Backoff(
                duration="1",
                factor="2",
                max_duration="1m",
            )
    ),
    inputs=[
        Artifact(name='aoi', path='/tmp/aoi.geojson')
    ],
)
def process_daily_granules(payloads: List[Any]):
    import io
    import os
    import sys
    import json
    import xarray
    import pyproj
    import odc.stac
    import rioxarray
    import pystac_client
    import planetary_computer
    
    import numpy as np
    os.environ['USE_PYGEOS'] = '0'
    import geopandas as gpd
    
    from pathlib import Path
    from shapely.ops import transform
    from azure.storage.blob import ContainerClient

    container_client = ContainerClient(
            os.getenv('AZURE_STORAGE_CONTAINER_URL'),
            container_name=os.getenv('AZURE_STORAGE_CONTAINER_NAME'),
            credential=os.getenv('AZURE_STORAGE_CONTAINER_TOK'),
        )
    
    output_paths = []    
    for payload in payloads:

        geom = gpd.read_file('/tmp/aoi.geojson', driver='GeoJSON').iloc[0].geometry
        
        query_time_range = payload['query-date']
    
        catalog = pystac_client.Client.open(
            payload['catalog_url'],
            modifier=planetary_computer.sign_inplace
        )
    
        query = catalog.search(
            collections=payload['collection'],
            intersects=geom.envelope,
            datetime=query_time_range
        )
        
        data = odc.stac.load(
            query.items(),
            bands=payload['assets'],
        )
        _data = data['NDSI_Snow_Cover']
        _data.rio.set_nodata = 255
        uint8_data = _data.astype(np.uint8)
        modis_crs = pyproj.CRS.from_proj4('+proj=sinu +lon_0=0 +x_0=0 +y_0=0 +ellps=WGS84 +datum=WGS84 +units=m +no_defs')
        uint8_data = uint8_data.rio.write_crs(modis_crs)
        reprojected_data = uint8_data.rio.reproject(
            dst_crs=f"EPSG:{payload['epsg']}",
            resolution=payload['resolution'],
            nodata=255
        )
            
        if isinstance(reprojected_data, xarray.Dataset):
            reprojected_data = reprojected_data.to_array(dim='band')
        reprojected_data.name = query_time_range
        reprojected_data.attrs['long_name'] = query_time_range

        wgs84_poly = geom
        wgs84 = pyproj.CRS('EPSG:4326')
        utm = reprojected_data.rio.crs
        projection_op = pyproj.Transformer.from_crs(wgs84, utm, always_xy=True).transform
        utm_poly = transform(projection_op, wgs84_poly)
        
        # Clip data to reduce size and isolate to AOI
        clipped_data = reprojected_data.rio.clip(
                                        [utm_poly],
                                        all_touched=True,
                                        drop=True
                                    )
        
        # Write Raster
        filename = Path(query_time_range).stem
        if not isinstance(filename, str):
            filename = filename.as_posix()
        filename += '.tif'
        
        output_path = (Path(payload['output_prefix']) / 
                       Path(''.join(payload['prov'].split(' '))) / 
                       Path(filename)
                       ).as_posix()
        
        
        blob_client = container_client.get_blob_client(output_path)
    
        with io.BytesIO() as buf:
            clipped_data.rio.to_raster(buf, driver='GTiff')
            buf.seek(0)
            blob_client.upload_blob(buf, overwrite=True)
        output_paths.append(output_path)
    json.dump(output_paths, sys.stdout)
    

As a convenience, subsequent products can be derived from the preprocessed data. The following Task implements the creation of a preview PNG that can be used in reporting, documentation, or for STAC servers, to name only a few. 

In [10]:
@script(
    image=docker_image_tag,
    resources=Resources(cpu_request='500m', memory_request='4Gi'),
    node_selector=pipeline_node_config['small']['nodeSelector'],
    tolerations=pipeline_node_config['small']['tolerations'],
)
def create_preview_png(raster_paths: List[Any]):
    import os
    import io
    import sys
    import json
    import xarray
    import rioxarray

    import numpy as np
    
    from PIL import Image
    from pathlib import Path
    from azure.storage.blob import ContainerClient

    container_client = ContainerClient(
            os.getenv('AZURE_STORAGE_CONTAINER_URL'),
            container_name=os.getenv('AZURE_STORAGE_CONTAINER_NAME'),
            credential=os.getenv('AZURE_STORAGE_CONTAINER_TOK'),
        )
    
    for raster_path in raster_paths:
        output_path = raster_path.replace('.tif', '.png')
        raster_blob_client = container_client.get_blob_client(raster_path)
        buf = io.BytesIO()
        raster_blob_client.download_blob().readinto(buf)
        buf.seek(0)
        src = rioxarray.open_rasterio(buf)
        data = src.data
        if not isinstance(data, np.ndarray):
            data = data.compute()
        if data.shape[0] == 1:
            data = data[0]
        else:
            data = data.transpose(1,2,0)
        png_buf = io.BytesIO()
        img = Image.fromarray(data)
        blob_client = container_client.get_blob_client(output_path)
        img.save(png_buf, format='PNG')
        png_buf.seek(0)
        blob_client.upload_blob(png_buf, overwrite=True)
        buf.close()
        src.close()
        png_buf.close()
    
    json.dump(raster_paths, sys.stdout)
            
            

## Main Workflow Entrypoint



In [15]:
# Workflow Configuration Section
# -------------------------------

# URL containing the Canadian Provincial Administrative boundary shapes.
canada_boundaries_endpoint = 'https://ftp.maps.canada.ca/pub/nrcan_rncan/vector/canvec/shp/Admin/canvec_15M_CA_Admin_shp.zip'

# List of Canadian Provinces in a left-to-right then up and back order
target_provinces = [
    'British Columbia',               # 0
    'Alberta',                        # 1
    'Saskatchewan',                   # 2
    'Manitoba',                       # 3
    'Ontario',                        # 4
    'Quebec',                         # 5
    'New Brunswick',                  # 6
    'Nova Scotia',                    # 7
    'Prince Edward Island',           # 8
    'Newfoundland and Labrador',      # 9
    'Nunavut',                        # 10
    'Northwest Territories',          # 11
    'Yukon'                           # 12
]
target_province = target_provinces[4]

# Planetary Computer STAC Configuration
catalog_url = 'https://planetarycomputer.microsoft.com/api/stac/v1'
collection = 'modis-10A1-061'
assets = ['NDSI_Snow_Cover']
years = [2018,2019,2020,2021,2022]
months = [10, 11, 12, 1, 2, 3, 4]

# Data Configuration
epsg = 3005
resolution = 500

In [16]:
with Workflow(
    name=f"ingest-pc-stac-{collection.lower()}-{target_province.replace(' ', '').strip().lower()}",
    namespace=os.getenv("WORKFLOW_NS"),
    entrypoint="data-pipe",
    parallelism=20
) as wf1:
    
    with DAG(
        name='data-pipe', 
        parallelism=min(10, len(years)*len(months)),
    ) as _d:
        a0 = generate_polygon(
                arguments={
                    'url': canada_boundaries_endpoint,
                    'target_province': target_province
                },
        )
        b0 = fan_out_temporal_range(
                arguments={
                    'catalog_url': catalog_url,
                    'collection': collection,
                    'assets': assets,                               
                    'years': years,
                    'months': months,
                    'epsg': epsg,
                    'resolution': resolution,
                    'target_province': target_province,
                    'output_prefix': 'modis_daily_snow'
                }
            )

        a1 = query_for_data(
            arguments=[a0.get_artifact('aoi').as_name('aoi')],
            with_param=b0.result,
        )

        a2 = process_daily_granules(
            arguments=[
                a0.get_artifact('aoi').as_name('aoi'),
            ],
            with_param=a1.result
        )
        b2 = create_preview_png(
            with_param=a2.result
        )

        """
        Finally, we declare our DAG.
        We can run the fisrt two preparation Tasks in parallel and
        then subsequent Tasks operate sequentially.
        """
        [a0, b0] >> a1 >> a2 >> b2
        

In [None]:
wf1.create()

## ARD Workflow


### NDSI_Snow_Cover Value Map

- 0–100: NDSI snow cover
- 200: missing data
- 201: no decision
- 211: night
- 237: inland water
- 239: ocean
- 250: cloud
- 254: detector saturated
- 255: fill

In [None]:
import os
import io
import rioxarray

import numpy as np

from pathlib import Path
from pystac_client import Client
from dask_gateway import Gateway
from collections import defaultdict
from azure.storage.blob import ContainerClient

In [None]:
def add_or_update_stac_item(raster_path: str):
    import io
    import os
    import sys
    import json
    import pystac
    import requests
    
    headers={'cookie': auth_token}
    data = json.dumps(collection.to_dict())
    r = requests.post('', data=data, headers=headers)

In [None]:
def create_or_update_stac_collection():
    import json
    import pystac
    import requests

    collection = pystac.Collection(
                                    id='',
                                    href='https://stac.geoanalytics.ca/collections',
                                    title='',
                                    description='',
                                    extent='',
                                    keywords=[''],
                                    license='',
                                  )
    collection.validate()

In [None]:
container_client = ContainerClient(
    os.getenv('AZURE_STORAGE_CONTAINER_URL'),
    container_name=os.getenv('AZURE_STORAGE_CONTAINER_NAME'),
    credential=os.getenv('AZURE_STORAGE_CONTAINER_TOK'),
)

Isolate in own cell so it can be run only once.  
The gateway object can be used to connect to and create other Dask Clusters.  
See [our documentation](https://docs.geoanalytics.ca/1_getting_started/10-dask.html) for more information about Dask in GEOAnalytics Canada.

In [None]:
gateway = Gateway()

In [None]:
# Configuration
cluster_name = 'daskhub.f6c26b31cafd48edb54593017ced7703'

geoanalytics_stac_url = 'https://stac.eo4ph.geoanalytics.ca'
stac_url = 'https://stac.geoanalytics.ca/collections/'

In [None]:

cluster = gateway.connect(cluster_name)
dask_client = cluster.get_client()

In [None]:
cluster

In [None]:
blobs = container_client.list_blobs(name_starts_with='modis_daily_snow/BritishColumbia/')
raster_map = defaultdict(list)
for blob in blobs:
    if '.ipynb_checkpoints' in blob.name:
        container_client.delete_blob(blob)
        continue
    if blob.name.endswith('.png'):
        continue
    pth = Path(blob.name)
    map_key = '-'.join(pth.stem.split('-')[:2])
    raster_map[map_key].append(blob.name)
raster_map

In [None]:
raster_month_map = {}
attr_copy = None
for k in raster_map.keys():
    _monthly_data = raster_map[k]
    # Open rasters into the dask cluster
    open_rasters = [
        rioxarray.open_rasterio(
            container_client.get_blob_client(_blob).url, chunks='auto')
        for _blob in _monthly_data]
    if attr_copy is None:
        attr_copy = open_rasters[0].attrs
        attr_copy['long_name'] = 'MODIS Snow Cover'
    month_data = xarray.concat(open_rasters, dim='time')
    month_data = month_data.chunk('auto') # rechunk at dimension changes
    filtered_month_data = xarray.where((month_data >100), np.nan, month_data, keep_attrs=True)
    median_month_data = filtered_month_data.median(dim='time', skipna=True, keep_attrs=True)
    median_month_data = median_month_data.chunk('auto')
    final_data = xarray.where(np.isnan(median_month_data), 255, median_month_data).astype(np.uint8)
    raster_month_map[k] = final_data
ds = xarray.Dataset(data_vars=raster_month_map, attrs=attr_copy)
ds

In [None]:
final_data[0].plot.imshow(figsize=(20,10))

In [None]:
dask_client.restart()