# Examples how to work with Global Coastal Transect System 

Run the first few cells to load required functions and jump to the section you're interested in afterwards. 

In [None]:
import sys

# sys.path.insert(0, "..\src")
sys.path.insert(0, "../src")

import dask
# NOTE: query planning is not implemented in dask_geopandas yet, so we have to set 
# it to False before we do any dask_geopandas import 
dask.config.set({"dataframe.query-planning": False})

from coastlines4shorelines.utils import transect_origins_to_coastline,retrieve_transects_by_roi

import logging
import os
import pathlib


from dask.dataframe.utils import make_meta

import dask_geopandas
import duckdb
import geopandas as gpd
import hvplot.pandas
import pandas as pd
import pystac
import shapely
from dotenv import load_dotenv
from ipyleaflet import Map, basemaps

from coastmonitor.geo.geometries import geo_bbox

load_dotenv(override=True)

sas_token = os.getenv("AZURE_STORAGE_SAS_TOKEN")
account_name = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
storage_options = {"account_name": account_name, "credential": sas_token}

logging.getLogger("azure").setLevel(logging.WARNING)

## Load from STAC catalog

Load the transects from our CoCliCo STAC catalog. 

In [None]:
coclico_catalog = pystac.Catalog.from_file(
    "https://coclico.blob.core.windows.net/stac/v1/catalog.json"
)

In [None]:
coclico_catalog

In [None]:
list(coclico_catalog.get_all_collections())

In [None]:
gcts = coclico_catalog.get_child("gcts")
gcts

### Use a dynamic map to extract data by region of interest

The IPyleaflet map below can be used to find the bbox coordinates of a certain region.
Zoom to the area where you want to extract data and run the next cell. Please keep in
mind to wait 1 second because the map has to be rendered before the coordinates can be
extracted. 

In [None]:
m = Map(basemap=basemaps.Esri.WorldImagery, scroll_wheel_zoom=True)
m.center = 41.735966575868716, -70.10032653808595
m.zoom = 9
m.layout.height = "800px"
m

## IMPORTANT NOTE: Wait for the map to render before you run the next cell

rendering the map takes a second, so you need to pause 1 second before running the next cell otherwise you cannot parse the north/west/east/south bounds

In [None]:
# this makes a GeoPandas dataframe from the DynamicMap that is rendered above
roi = geo_bbox(m.west, m.south, m.east, m.north)

In [None]:
# makes a list of all items (data partitions) in the GCTS STAC catalog
items = list(gcts.get_all_items())

## The dataset is partitioned into geospatial chunks

The dataset is divided into different chunks, that each span a different region of the world. In the next cell
we read the spatial extends of each chunk and compose that into a GeoDataFrame

In [None]:
bboxes = pd.concat([geo_bbox(*i.to_dict()["bbox"]) for i in items])
bboxes = bboxes.reset_index(drop=True)
bboxes.explore()

## Now we can find the bboxes that cover our region of interest

In [None]:
bboxes_roi = gpd.sjoin(bboxes, roi)[bboxes.columns]
items_roi = [items[i] for i in bboxes_roi.index]

In [None]:
items_roi

In [None]:
items_roi[0]

## The STAC items contain references to where the data is stored

In [None]:
hrefs = [i.assets["data"].href for i in items_roi]

## Cloud based data

The href that you see below is a url to a cloud bucket with the transects for the area of interest. The prefix "az://" is the protocol for Azure cloud storage.

In [None]:
hrefs

## Reading the transect partitions that span our region of interest 

We will read the data from cloud storage - but only the data that spans our region of interest (the DynamicMap above). 

## Dask dataframes are lazy

These dataframes are not in memory yet. We still have to trigger the compute (see cell below)

In [None]:
dask_geopandas.read_parquet(hrefs, storage_options=storage_options).sjoin

## Compute the transects that span our region of interest

The transects are not in memory yet. In the next cell we will trigger the retrieval from cloud storage to local client by doing a `ddf.compute()` call. 

In [None]:
transects = dask_geopandas.read_parquet(hrefs, storage_options=storage_options)
transects_roi = (
    transects.sjoin(roi.to_crs(transects.crs)).drop(columns=["index_right"]).compute()
)

In [None]:
%%time

transects = dask_geopandas.read_parquet(hrefs, storage_options=storage_options)
transects_roi = (
    transects.sjoin(roi.to_crs(transects.crs)).drop(columns=["index_right"]).compute()
)

unique_coastline_names = list(
    map(str, transects_roi.transect_id.str.extract(r"(cl\d+s\d+)")[0].unique())
)


def add_coastline_name(df):
    df["coastline_name"] = df.transect_id.str.extract(r"(cl\d+s\d+)")
    return df


meta = make_meta(transects)
new_col_meta = pd.DataFrame({"coastline_name": pd.Series([], dtype=str)})
meta = pd.concat([meta, new_col_meta])

transects = transects.map_partitions(add_coastline_name, meta=meta)
transects_roi = transects.loc[
    transects["coastline_name"].isin(unique_coastline_names)
].compute()

transects_roi = transects_roi.sort_values("transect_id")
transects_roi[["coastline_id", "segment_id", "transect_dist"]] = (
    transects_roi.transect_id.str.extract(r"cl(\d+)s(\d+)tr(\d+)")
)
transects_roi = transects_roi.astype(
    {"coastline_id": int, "segment_id": int, "transect_dist": int}
)
transects_roi.head()

## Put everything together in one function that retrieves the transects per area of interest

This function below contains everything we have discussed so far. It loads transects that having matching coastlines from a STAC catalog into Python memory for a given area of interest. 

In [None]:
%%time

transects_roi = retrieve_transects_by_roi(roi, storage_options=storage_options)

## Sorting the transects

Currently the transects are stored by QuadKey to optimize fast read access by filter pushdown. If we want them sorted by the coastline we can do that as follows. 

## Compose the transect origins into coastlines

In [None]:
coastline = (
    transects_roi.groupby("coastline_id")
    .apply(transect_origins_to_coastline)
    .dropna()
    .reset_index()
    .rename(columns={0: "geometry"})
)
coastline = gpd.GeoDataFrame(coastline, crs=4326)

In [None]:
tratransects_roi.groupby("coastline_id").get_group(13508)

In [None]:
coastline.explore(column="coastline_id")

## Plus we can handle region of interests that do not span all coastlines

In [None]:
import fiona

fiona.drvsupport.supported_drivers["KML"] = "rw"
kml_fp = pathlib.Path(r"d:\FHICS\ShorelineS\ROIs\North_Carolina_Virginia.kml")
roi = gpd.read_file(kml_fp, driver="KML")

In [None]:
transects_roi = retrieve_transects_by_roi(roi, storage_options=storage_options)

In [None]:
# Apply function and explode to get one LineString per row
coastline = (
    transects_roi.groupby("coastline_id")
    .apply(transect_origins_to_coastline)
    .explode()
    .reset_index(name="geometry")
    .drop(columns=["level_1"])
)
coastline = gpd.GeoDataFrame(coastline, crs=4326)
coastline = gpd.overlay(coastline, roi[["geometry"]]).explode(index_parts=False)

In [None]:
m = roi.explore()
gpd.GeoDataFrame(coastline, crs=4326).explore(color="red", m=m)

## Load ShorelineMonitor SDS series 

In [None]:
sdss = dask_geopandas.read_parquet(
    "az://shorelinemonitor-raw-series/release/2024-04-15/sp_NC.parquet",
    storage_options=storage_options,
).compute()
sdss = sdss.assign(time=pd.to_datetime(sdss.time).dt.strftime("%Y-%m-%d"))

In [None]:
sdss.head()

In [None]:
coords = gpd.GeoSeries.from_xy(
      sdss["lon"], sdss["lat"]
            ).to_list()
cl = gpd.GeoDataFrame(coords)
cl.explore()

In [None]:
## Sample to explore data

In [None]:
import numpy as np

transect_sample = np.random.choice(sdss.transect_id.unique())

In [None]:
sdss.loc[sdss["transect_id"] == transect_sample][["geometry", "time"]].explore(
    column="time"
)

In [None]:
def filter_sp(sp_raw):
    """
    Function to filter shorelines with specified filtering indicator.
    """

    # set up the indicators
    # `sp_clean` will include only shoreline positions that satisfy the indicators
    sp_clean = sp_raw[
        (sp_raw.sh_sinuosity < 10)
        & (~sp_raw.obs_on_shoal)
        & (sp_raw.obs_is_primary)
        & (sp_raw.tr_is_qa)
        & (sp_raw.mdn_offset < 3 * sp_raw.tr_stdev)
        & (sp_raw.obs_count >= 5)
        & (~sp_raw.obs_is_outlier)
    ].copy()

    # set up the `sp_clean` table
    sp_clean = (
        sp_clean[
            ["time", "transect_id", "shoreline_position_trans", "geometry"]
        ]  # columns to be included in the clean tables
        .rename(columns=({"shoreline_position_trans": "shoreline_position"}))
        .reset_index(drop=True)
    )

    return sp_clean


# Implement the filtering function to raw time series (`sp`)
sdss_clean = filter_sp(sdss)

## Function to construct shoreline from SDS series

In [None]:
def shoreline_intersections_to_coastline(df):
    # Ensure df is sorted if not already
    df = df.sort_values(by=["transect_id", "segment_id", "transect_dist"])

    # Identify partitions by checking where the difference in transect_dist is not 100
    # diff() is NaN for the first row, so we use fillna() to set it to a value that does not equal 100 (e.g., 0)
    df["partition"] = (df["transect_dist"].diff().fillna(0) != 100).cumsum()

    lines = []
    for _, partition_df in df.groupby("partition"):
        if len(partition_df) > 1:
            coords = gpd.GeoSeries.from_xy(
                partition_df["lon"], partition_df["lat"]
            ).to_list()

            # Check if the coastline is closed and this is the only partition
            if (
                partition_df.osm_coastline_is_closed.iloc[0]
                and len(df["partition"].unique()) == 1
            ):
                coords.append(
                    coords[0]
                )  # Add the first point at the end to close the loop

            lines.append(LineString(coords))
        # Else case can be added if needed to handle single-point partitions

    return pd.Series(lines)


In [None]:
sdss.head()

In [None]:
import numpy as np

transect_sample = np.random.choice(sdss.transect_id.unique())

In [None]:
sdss.loc[sdss["transect_id"] == transect_sample][["geometry", "time"]].explore(
    column="time"
)