# Tutorial notebook for working Planetary Computer

In [1]:
import sys

# add the src directory to the path so that we can import generic functions
sys.path.insert(0, "src")

import logging
import os
import pathlib

# Sometimes you need libraries which are not included in the planetary computer image. That's
# generally not a problem because you can install them with pip.
os.system("pip install python-dotenv -q")

import dask_geopandas
import geopandas as gpd
import hvplot.pandas
import pandas as pd
import pystac
import shapely
from dotenv import load_dotenv
from ipyleaflet import Map, basemaps

# load environment variables
load_dotenv(override=True)

# tokens to access data in private containers
sas_token = os.getenv("AZURE_STORAGE_SAS_TOKEN")
coclico_storage_options = {"account_name": "coclico", "credential": sas_token}

# disable logging messages from azure
logging.getLogger("azure").setLevel(logging.WARNING)

## Load from STAC catalog

Load the transects from our CoCliCo STAC catalog. 

In [2]:
coclico_catalog = pystac.Catalog.from_file(
    "https://coclico.blob.core.windows.net/stac/v1/catalog.json"
)

In [3]:
coclico_catalog

In [4]:
list(coclico_catalog.get_all_collections())

[<Collection id=ssl>,
 <Collection id=wef>,
 <Collection id=eesl>,
 <Collection id=floodmaps>,
 <Collection id=sc>,
 <Collection id=cbca>,
 <Collection id=cfr>,
 <Collection id=smd>,
 <Collection id=cisi>,
 <Collection id=slp5>,
 <Collection id=slp6>,
 <Collection id=slp6_pilot>,
 <Collection id=coastal-mask>,
 <Collection id=shorelinemonitor-shorelines>,
 <Collection id=gcts-2000m>]

In [5]:
gcts = coclico_catalog.get_child("gcts-2000m")
gcts

### Use a dynamic map to extract data by region of interest

The IPyleaflet map below can be used to find the bbox coordinates of a certain region.
Zoom to the area where you want to extract data and run the next cell. Please keep in
mind to wait 1 second because the map has to be rendered before the coordinates can be
extracted. 

In [6]:
m = Map(basemap=basemaps.Esri.WorldImagery, scroll_wheel_zoom=True)
m.center = 43.406241, -2.976665
m.zoom = 9
m.layout.height = "800px"
m

Map(center=[43.406241, -2.976665], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title',…

## IMPORTANT NOTE: Wait for the map to render before you run the next cell

rendering the map takes a second, so you need to pause 1 second before running the next cell otherwise you cannot parse the north/west/east/south bounds

## Import functions from project directory 

In [7]:
from bilbao.utils import geo_bbox

# this makes a GeoPandas dataframe from the DynamicMap that is rendered above
roi = geo_bbox(m.west, m.south, m.east, m.north)
roi.explore()

In [8]:
# makes a list of all items (data partitions) in the GCTS STAC catalog
items = list(gcts.get_all_items())

## The dataset is partitioned into geospatial chunks

The dataset is divided into different chunks, that each span a different region of the world. In the next cell
we read the spatial extends of each chunk and compose that into a GeoDataFrame

In [9]:
bboxes = pd.concat([geo_bbox(*i.to_dict()["bbox"]) for i in items])
bboxes = bboxes.reset_index(drop=True)
bboxes.explore()

## Now we can find the bboxes that cover our region of interest

In [10]:
bboxes_roi = gpd.sjoin(bboxes, roi)[bboxes.columns]
items_roi = [items[i] for i in bboxes_roi.index]

In [11]:
items_roi

[<Item id=minx_-90.02_miny_-0.01_part_0>]

In [12]:
items_roi[0]

## The STAC items contain references to where the data is stored

In [13]:
hrefs = [i.assets["data"].href for i in items_roi]

## Cloud based data

The href that you see below is a url to a cloud bucket with the transects for the area of interest. The prefix "az://" is the protocol for Azure cloud storage. So if you don't have a STAC catalog write out the href's yourself. 

In [14]:
hrefs

['az://transects/gcts-2000m.parquet/minx_-90.02_miny_-0.01_part_0.parquet']

## Reading the transect partitions that span our region of interest 

We will read the data from cloud storage - but only the data that spans our region of interest (the DynamicMap above). 

## Dask dataframes are lazy

These dataframes are not in memory yet. We still have to trigger the compute (see cell below)

In [15]:
dask_geopandas.read_parquet(hrefs, storage_options=coclico_storage_options)

Unnamed: 0_level_0,tr_name,lon,lat,bearing,utm_crs,coastline_name,geometry,bbox,quadkey,isoCountryCodeAlpha2,admin_level_1_name,isoSubCountryCode,admin_level_2_name,bounding_quadkey
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
,object,float32,float32,float32,int32,int32,geometry,object,object,object,object,object,object,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...


## Compute the transects that span our region of interest

The transects are not in memory yet. In the next cell we will trigger the retrieval from cloud storage to local client by doing a `ddf.compute()` call. Note that we can also mix in regular Pandas operations, like sorting. Currently the transects are sorted by QuadKey to optimize fast read access by filter pushdown. If we want them sorted along the coastline we can do that by sorting the tr_name. 

In [None]:
%%time
transects = dask_geopandas.read_parquet(hrefs, storage_options=coclico_storage_options)
transects = (
    transects.sjoin(
        dask_geopandas.from_geopandas(roi.to_crs(transects.crs), npartitions=1)
    )
    .drop(columns=["index_right"])
    .sort_values("tr_name")
    .compute()
)

## Show the first lines of the table

In [None]:
transects.head()

## Holoviews interactive visualization

In [None]:
import colorcet as cc
import hvplot.pandas

transects_plot = (
    transects[["geometry", "bearing"]]
    .sample(500)
    .hvplot(
        geo=True,
        tiles="ESRI",
        color="bearing",
        cmap=cc.CET_C10,
        width=700,
        height=500,
        clabel="North bearing [deg]",
        xlabel="Longitude [deg]",
        ylabel="Latitude [deg]",
        title="Cross-shore transects (100m alongshore), Euskadi.",
        colorbar=True,
        tools=["wheel_zoom"],
    )
)
transects_plot

## Dask Cluster

In [None]:
import dask_gateway

cluster = dask_gateway.GatewayCluster()
client = cluster.get_client()
cluster.adapt(minimum=2, maximum=50)
client

## Accessing ERA 5 for our region of interest

In [None]:
import pystac_client

catalog = pystac_client.Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1/"
)

time_range = "2020-01-01/2023-12-31"
bbox = [m.west, m.south, m.east, m.north]

search = catalog.search(
    collections=["era5-pds"],
    bbox=bbox,
    datetime=time_range,
    query={"era5:kind": {"eq": "an"}},
)
items = search.get_all_items()
len(items)

In [None]:
item = items[0]
item

In [None]:
import hvplot.xarray
import numpy as np
import planetary_computer as pc
import rioxarray
import xarray as xr

item = items[0]
signed_item = pc.sign(item)
datasets = [
    xr.open_dataset(asset.href, **asset.extra_fields["xarray:open_kwargs"])
    for asset in signed_item.assets.values()
]

ds = xr.combine_by_coords(datasets, join="exact")
ds

## Datacube

In [None]:
ds.air_pressure_at_mean_sea_level

In [None]:
da = ds.air_pressure_at_mean_sea_level.mean("time").compute()

In [None]:
da = da.rio.write_crs(4326).rio.write_nodata(np.nan)
da.hvplot(
    x="lon",
    y="lat",
    geo=True,
    tiles="EsriImagery",
    rasterize=True,
    width=700,
    height=500,
    title="Mean Air Pressure (2020 - 2023)",
    colorbar=True,
    clabel="Air Pressure [Pa]",
)

## Compute feature maps

In [None]:
import warnings

from astropy.convolution import convolve


def standard_deviation(
    image: np.ndarray, radius: int, nodata: float | int = np.nan
) -> np.ndarray:
    """
    Calculates the standard deviation of an image using a moving window of
    specified radius with astropy's convolution library.

    Args:
        image (np.ndarray): 2D array containing the pixel intensities of a single-band image.
        radius (int): Radius defining the moving window used to calculate the standard deviation.
                    For example, radius = 1 will produce a 3x3 moving window.
        nodata (float, optional): Value to replace NaN results with. Defaults to np.nan.

    Returns:
        np.ndarray: 2D array containing the standard deviation of the image.

    Example:
        >>> img = np.random.random((100, 100))
        >>> std_img = standard_deviation(img, radius=1)
    """

    # Create kernel once
    win_rows, win_cols = radius * 2 + 1, radius * 2 + 1
    kernel = np.ones((win_rows, win_cols))

    # Pre-calculate square of image
    image_sq = image**2

    # First pad the image and its square
    image_padded = np.pad(image, radius, "reflect")
    image_sq_padded = np.pad(image_sq, radius, "reflect")

    # Calculate std with uniform filters
    win_mean = convolve(
        image_padded,
        kernel,
        boundary="extend",
        normalize_kernel=True,
        nan_treatment="interpolate",
        preserve_nan=True,
    )
    win_sqr_mean = convolve(
        image_sq_padded,
        kernel,
        boundary="extend",
        normalize_kernel=True,
        nan_treatment="interpolate",
        preserve_nan=True,
    )
    win_var = win_sqr_mean - win_mean**2

    # Ignore RuntimeWarnings in the sqrt calculation
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", message="invalid value encountered in sqrt")
        win_std = np.sqrt(win_var)

    # Remove padding
    win_std = win_std[radius:-radius, radius:-radius]

    # After computing standard deviation, replace NaN values with nodata
    win_std[np.isnan(win_std)] = nodata

    return win_std


da_stdev = xr.apply_ufunc(
    standard_deviation,
    da,
    input_core_dims=[["lon", "lat"]],
    output_core_dims=[["lon", "lat"]],
    vectorize=True,
    dask="parallelized",
    kwargs={"radius": 2, "nodata": np.nan},
    output_dtypes=["f4"],
)

In [None]:
da_stdev.rio.write_nodata(np.nan).rio.write_crs(4326).hvplot(
    x="lon",
    y="lat",
    geo=True,
    rasterize=True,
    title="St. Dev in Air Pressure",
    clabel="St. dev in Air pressure (Pa)",
    tiles="ESRI",
)

## Mapping raster data onto vectors

In [None]:
lons, lats = transects.lon.values, transects.lat.values
lons = lons + 180

In [None]:
transects_st_dev_air_pressure = [
    da_stdev.sel(lon=lon, lat=lat, method="nearest").item()
    for lon, lat in zip(lons, lats)
]
transects["st_dev_air_pressure"] = transects_st_dev_air_pressure

In [None]:
import colorcet as cc
import hvplot.pandas

transects_plot = (
    transects[["geometry", "st_dev_air_pressure"]]
    .sample(500)
    .hvplot(
        geo=True,
        tiles="ESRI",
        color="st_dev_air_pressure",
        cmap=cc.CET_D8[::-1],
        width=700,
        height=500,
        clabel="St. Dev. Air Pressure (Pa)",
        xlabel="Longitude [deg]",
        ylabel="Latitude [deg]",
        title="St. Dev. Air. Pressure per transect, Euskadi.",
        colorbar=True,
        tools=["wheel_zoom"],
    )
)
transects_plot

## Clipping raster to region of interest

In [None]:
bbox_translate = [bbox[0] + 180, bbox[1], bbox[2] + 180, bbox[3]]
bbox_as_geojson = shapely.geometry.mapping(shapely.box(*bbox_translate))
da.rio.clip([bbox_as_geojson])