In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import logging
from typing import Tuple, List
import sys
from pathlib import Path
import pandas as pd
import geopandas as gpd
import numpy as np
import asyncio
from tqdm.asyncio import tqdm
from itertools import product
from shapely.geometry import box


scripts_dir = Path("..").joinpath("src")
if scripts_dir not in sys.path:
    sys.path.insert(0, scripts_dir.resolve().as_posix())

from pipelines.utils import background
from pipelines.processors import calculate_area, get_matches, repair_geometry, arrange_dimensions, clean_geometries, simplify_async

In [4]:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("fiona").setLevel(logging.WARNING)
logger = logging.getLogger("notebook")

In [5]:
def split_by_year(
    gdf: gpd.GeoDataFrame, year_col: str = "STATUS_YR", year_val: int = 2010
) -> List[gpd.GeoDataFrame]:
    """Split data by year. relevant for MPA data.(coverage indicator)"""
    prior_2010 = (
        gdf[gdf[year_col] <= year_val][["iso_3", "STATUS_YR", "geometry"]]
        .dissolve(
            by=["iso_3"],
        )
        .assign(year=2010)
        .reset_index()
    )

    after_2010 = (
        gdf[gdf["STATUS_YR"] > 2010][["iso_3", "STATUS_YR", "geometry"]]
        .rename(columns={"STATUS_YR": "year"})
    )
    return [prior_2010, after_2010]

In [6]:
def create_grid(bounds: Tuple[float, float, float, float], cell_size: int = 1) -> gpd.GeoDataFrame:
    """Create a grid of cells for a given GeoDataFrame"""
    minx, miny, maxx, maxy = bounds
    x = np.arange(minx, maxx, cell_size)
    y = np.arange(miny, maxy, cell_size)
    polygons = [
        {
            "geometry": box(i, j, i + cell_size, j + cell_size),
            "cell_id": f"{i}_{j}",
        }
        for i, j in product(x, y)
    ]
    return gpd.GeoDataFrame(polygons)


def subdivide_grid(
    grid_gdf: gpd.GeoDataFrame, gdf: gpd.GeoDataFrame, max_cellsize: float, max_complexity: int
) -> List:
    subdivided_elements = []
    for grid_element in grid_gdf.geometry:
        candidates = get_matches(grid_element, gdf)
        density = len(candidates)
        if density > max_complexity:
            
            subdivision_cellsize = max_cellsize / 2
            # Subdivide the grid element recursively
            subgrid = create_grid(grid_element.bounds, subdivision_cellsize)
            subdivided_elements.extend(
                subdivide_grid(subgrid, gdf, subdivision_cellsize, max_complexity)
            )
        elif density > 0:
            subdivided_elements.append(grid_element)

    return subdivided_elements


def create_density_based_grid(
    gdf: gpd.GeoDataFrame, max_cellsize: int = 10, max_complexity: int = 10000
) -> gpd.GeoDataFrame:
    # Get the bounds of the GeoDataFrame
    minx, miny, maxx, maxy = gdf.total_bounds

    # Create an initial grid
    grid_gdf = create_grid((minx, miny, maxx, maxy), max_cellsize)

    # Subdivide grid elements based on density and complexity
    subdivided_elements = subdivide_grid(grid_gdf, gdf, max_cellsize, max_complexity)

    return gpd.GeoDataFrame(geometry=subdivided_elements)

In [69]:
#  TODO: refactor this so old function mantains functionality for marine areas

def split_gdf_by_grid(gdf: gpd.GeoDataFrame, grid_gdf: gpd.GeoDataFrame):
    result = []
    gdf["already_processed"] = False
    for geometry in grid_gdf.geometry:
        candidates = get_matches(geometry, gdf)
        subset = gdf.loc[candidates.index][~gdf["already_processed"]]
        gdf.loc[subset.index, "already_processed"] = True
        if not subset.empty:
            result.append(subset.drop(columns=["already_processed"]).reset_index(drop=True).copy())
    return result


@background
def spatial_join_chunk(df_large_chunk, df_small, pbar):
    try:
        bbox = df_large_chunk.total_bounds

        candidates = get_matches(box(*bbox), df_small.geometry)
        if len(candidates) > 0:
            subset = df_small.loc[candidates.index].clip(box(*bbox))

            result = (
                df_large_chunk.sjoin(subset, how="inner")
                .clip(subset.geometry)
                .reset_index(drop=True)
            )
            result.geometry = result.geometry.apply(repair_geometry)
        else:
            result = gpd.GeoDataFrame(columns=df_large_chunk.columns)
        return result
    except Exception as e:
        logging.error(e)
        return gpd.GeoDataFrame()
    finally:
        pbar.update(1)


async def spatial_join(
    geodataframe_a: gpd.GeoDataFrame, geodataframe_b: gpd.GeoDataFrame
) -> gpd.GeoDataFrame:
    """Create spatial join between two GeoDataFrames."""
    # we build the spatial index for the larger GeoDataFrame
    smaller_dim, larger_dim = arrange_dimensions(geodataframe_a, geodataframe_b)

    logger.info(f"Processing {len(larger_dim)} elements")

    grid = create_density_based_grid(larger_dim, max_cellsize=10, max_complexity=5000)

    logger.info(f"grid created with {len(grid)} cells")

    list_of_chunks = split_gdf_by_grid(larger_dim, grid)

    logger.info(f"grid split into {len(list_of_chunks)} chunks")

    with tqdm(total=len(list_of_chunks)) as pbar:  # we create a progress bar
        new_df = await asyncio.gather(
            *(spatial_join_chunk(chunk, smaller_dim, pbar) for chunk in list_of_chunks)
        )

    return gpd.GeoDataFrame(pd.concat(new_df, ignore_index=True), crs=smaller_dim.crs)


@background
def spatial_dissolve_chunk(geometry, gdf, pbar):

    try:
        candidates = get_matches(
            geometry,
            gdf.geometry,
        )
        subset = gdf.loc[candidates.index]

        result = pd.concat(
            subset.clip(geometry).pipe(split_by_year, year_col="STATUS_YR"), ignore_index=True
        ).copy()

        data_chunk = [
            (
                result[result["year"] <= 2010]
                .reset_index()
                .pipe(calculate_area, "area", None)
                .drop(columns=["geometry"])
            )
        ]
        for year in range(2011, 2025):
            data_chunk.append(
                result[result["year"] <= year]
                .dissolve(
                    by=["iso_3"],
                )
                .assign(year=year)
                .reset_index()
                .pipe(calculate_area, "area", None)
                .drop(columns=["geometry"])
            )

        return pd.concat(data_chunk, ignore_index=True)
    except Exception as e:
        logging.error(e)
        return gpd.GeoDataFrame()
    finally:
        pbar.update(1)

async def process_grid(gdf):
    grid_gdf = create_density_based_grid(gdf, max_cellsize=10, max_complexity=5000)
    with tqdm(total=grid_gdf.shape[0]) as pbar:
        pbar = tqdm(total=len(grid_gdf), desc="Processing grid elements")
        result = await asyncio.gather(*[spatial_dissolve_chunk(geometry, gdf, pbar) for geometry in grid_gdf.geometry.values])
    return result

In [None]:
gadm = gpd.read_file("../data/gadm/processed/preprocess/gadm_preprocess.shp").pipe(clean_geometries)
wdpa = gpd.read_file(
    "../data/mpa-terrestrial/processed/preprocess/mpa-terrestrial_preprocess.shp"
).pipe(clean_geometries)
gadm.sindex
wdpa.sindex

In [None]:
wdpa_subset = wdpa[
    ~(
        (wdpa.bounds.minx < -181)
        | (wdpa.bounds.miny < -91)
        | (wdpa.bounds.maxx > 181)
        | (wdpa.bounds.maxy > 91)
    )
].reset_index(drop=True)

gadm_sync = await simplify_async(gadm)
sjoin_gdf = await spatial_join(wdpa_subset, gadm_sync)

In [None]:
# test that we have not produce duplicates
sjoin_gdf.loc[sjoin_gdf.duplicated(subset=["WDPA_PID", "GID_0"], keep=False)].sort_values(
    "WDPA_PID"
)

In [None]:
data = await process_grid(sjoin_gdf)

In [107]:
result_oecms = (
    sjoin_gdf.groupby(["iso_3", "PA_DEF"])
    .agg({"PA_DEF": "count"})
    .rename(columns={"PA_DEF": "count"})
    .reset_index()
    .pivot(index="iso_3", columns="PA_DEF", values="count")
    .fillna(0)
    .reset_index()
    .rename(columns={"0": "oecm", "1": "pa"})
)
# ).reset_index().pivot(index="iso_3", columns="PA_DEF", values="count").reset_index(names=["PA_DEF"], level=0, drop=True)

In [108]:
result_oecms["oecm_perc"] = result_oecms["oecm"] / (result_oecms["oecm"] + result_oecms["pa"])

In [112]:
result_oecms.sort_values("pa", ascending=False).head(10)

PA_DEF,iso_3,oecm,pa,oecm_perc
180,USA,0.0,50674.0,0.0
161,SWE,0.0,30813.0,0.0
44,DEU,0.0,23703.0,0.0
55,EST,0.0,20579.0,0.0
57,FIN,0.0,18427.0,0.0
29,CAN,2.0,12566.0,0.000159
61,GBR,0.0,11712.0,0.0
9,AUS,0.0,11154.0,0.0
30,CHE,0.0,10632.0,0.0
130,NZL,0.0,10205.0,0.0


In [113]:
result_area = pd.concat(data)[['iso_3', 'year', 'area']].groupby(['iso_3', 'year']).sum().reset_index()

In [114]:
result = result_area.merge(result_oecms, on="iso_3")

In [21]:
# Todo: this needs to be merged with the marine data and validated with the pandera model
result

NameError: name 'result' is not defined