In [None]:
import sys
sys.path.append('../src')
from utils import ExportResults, Visualization

exp_res = ExportResults()
viz = Visualization()

In [None]:
import ee
ee.Authenticate()
ee.Initialize(project='dse-staff')
print(ee.String('Hello from the Earth Engine servers!').getInfo())

In [None]:
import pandas as pd
df = pd.read_csv('../data/GLOBAL_sim_transects_landOnly.csv')
df['WDPA_PI'].nunique()

In [None]:
import geopandas as gpd
gdf = gpd.read_file('../data/global_wdpa_June2021/Global_wdpa_footprint_June2021.shp')
gdfwInfo = gpd.read_file('../data/global_wdpa_June2021/Global_wdpa_wInfo_June2021.shp')
transects = gpd.read_file('../data/gee_transects_ext/gee_transects_ext.shp')
print(gdf['FID'].nunique(), gdfwInfo['WDPA_PID'].nunique(), transects['WDPA_PI'].nunique())

In [None]:
import pywdpa
import geopandas as gpd
from shapely.ops import unary_union

def clean_wdpa(gdf, crs=None, snap_tol=1, geom_prec=1500, erase_overlaps=True):
    if crs:
        gdf = gdf.to_crs(crs)
    grid = gdf.geometry.unary_union
    gdf["geometry"] = gdf.geometry.map(lambda g: snap(g, grid, snap_tol))
    scale = geom_prec
    gdf["geometry"] = gdf.geometry.scale(scale, scale).buffer(0).scale(1/scale, 1/scale)
    if erase_overlaps:
        u = unary_union(gdf.geometry)
        gdf = gpd.GeoDataFrame(geometry=[u], crs=gdf.crs)
    return gdf

cleaned = clean_wdpa(gdfwInfo)
cleaned['WDPA_PID'].nunique()

In [None]:
# Combine results
df = exp_res.combine_gcs_csvs(bucket_name='dse-staff',folder_path='protected_areas/tables')

# Calculate edge and reformat columns
df['edge_index'] = df['boundary_x_mean'] / df['buffer_x_mean']
df['WDPA_PID'] = df['WDPA_PID'].astype(str)
df['year'] = df['year'].astype(str)

In [None]:
df

In [None]:
#exp_res.save_df_to_gcs(df, bucket_name='dse-staff', wdpaid='combined', year="2001-2025")

In [None]:
viz.plot_edge_index(df)

In [None]:
df.head()

Edge trend calculation

In [None]:
import pandas as pd 
import statsmodels.api as sm

#linear regression for each protected area and each band type

def calc_trend(group):
    result = {}
    for col in ["WDPA_PID", "band_name"]: 
        result[col] = group[col].iloc[0] if col in group else None
    if group["year"].nunique() > 1:
        X = sm.add_constant(group["year"].astype(float))
        y = group["edge_index"]
        model = sm.OLS(y, X).fit()
        result.update({
            "estimate": model.params.get("year", None),
            "std_err": model.bse.get("year", None),
            "t_value": model.tvalues.get("year", None),
            "p_value": model.pvalues.get("year", None)
        })
    else:
        result.update({
            "estimate": None,
            "std_err": None,
            "t_value": None,
            "p_value": None
        })
    return pd.Series(result)

# Group by WDPA_PID and band_name, apply function, reset index
trends = (
    df.groupby(["WDPA_PID", "band_name"])
      .apply(calc_trend)
      .reset_index(drop=True)
)

# trends now contains trend stats and your chosen columns per group


In [None]:
trends.head()

In [None]:
from scipy import stats

mean_estimate = trends["estimate"].mean(skipna=True)
print(f"Mean estimate: {mean_estimate:.7f}")

t_stat, p_val = stats.ttest_1samp(trends["estimate"].dropna(), 0)
print(f"T-test: t = {t_stat:.3f}, p = {p_val:.3e}")


Global analysis charts

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm
from statsmodels.formula.api import ols


In [None]:

# Load and process df_info
biome_map = {
    "Deserts & Xeric Shrublands": "Desert",
    "Tropical & Subtropical Coniferous Forests": "Tropical-Forests",
    "Tropical & Subtropical Moist Broadleaf Forests": "Tropical-Forests",
    "Tropical & Subtropical Dry Broadleaf Forests": "Tropical-Forests",
    "Mediterranean Forests, Woodlands & Scrub": "Temperate-Forests",
    "Temperate Conifer Forests": "Temperate-Forests",
    "Temperate Broadleaf & Mixed Forests": "Temperate-Forests",
    "Boreal Forests/Taiga": "Boreal-Forests",
    "Tropical & Subtropical Grasslands, Savannas & Shrublands": "Grassland & shrubland",
    "Temperate Grasslands, Savannas & Shrublands": "Grassland & shrubland",
    "Montane Grasslands & Shrublands": "Grassland & shrubland",
    "Flooded Grasslands & Savannas": "Grassland & shrubland",
    "N/A": "Rock & Ice"
}
df["BIOME"] = df["BIOME_NAME"].replace(biome_map).fillna("Rock & Ice")


In [None]:
def classify_edge(row):
    if row["estimate"] > 0 and row["p_value"] < 0.05:
        return "Significantly Increase"
    elif row["estimate"] > 0:
        return "Not significantly Increase"
    elif row["estimate"] == 0:
        return "Stable"
    elif row["estimate"] < 0 and row["p_value"] < 0.05:
        return "Significantly Decrease"
    elif row["estimate"] < 0:
        return "Not significantly Decrease"
    return "unknown"

trends["edge_change"] = trends.apply(classify_edge, axis=1)


In [None]:
trends.head()

In [None]:
# Only keep one row per WDPA_PID for the static columns
static_cols = ["BIOME", "GIS_AREA", "GOV_TYPE", "IUCN_CAT", "ORIG_NAME", "OWN_TYPE", "STATUS_YR", "WDPA_PID"]
df_static = df[static_cols].drop_duplicates(subset=["WDPA_PID"])

# Merge with trends (on WDPA_PID only)
trends2 = trends.merge(df_static, on="WDPA_PID", how="left")
trends2.head()

In [None]:

# Stack bar setup
stack_count = (
    trends2
    .groupby(["BIOME", "edge_change"])
    .size()
    .unstack(fill_value=0)
    .reset_index()
)
stack_count["total"] = stack_count.sum(axis=1, numeric_only=True)
for col in ["Significantly Increase", "Not significantly Increase", "Significantly Decrease", "Not significantly Decrease"]:
    stack_count[col] = stack_count[col] / stack_count["total"]

stack_perc = stack_count.drop(columns="total").melt(id_vars="BIOME", var_name="variable", value_name="value")
stack_perc["BIOME"] = pd.Categorical(
    stack_perc["BIOME"],
    categories=["Mangroves", "Rock & Ice", "Tundra", "Desert", "Grassland & shrubland", 
                "Boreal-Forests", "Temperate-Forests", "Tropical-Forests"],
    ordered=True
)

stack_perc

In [None]:
# ...existing code...
from statsmodels.formula.api import ols

models = {}
results = []

for band in df["band_name"].unique():
    df_band = df[df["band_name"] == band]
    # Global model for this band
    model_global = ols("edge_index ~ year", data=df_band).fit()
    models[(band, "Global")] = model_global
    results.append({
        "band_name": band,
        "group": "Global",
        "slope": model_global.params.get("year", None),
        "p_value": model_global.pvalues.get("year", None),
        "r_squared": model_global.rsquared,
        "n": len(df_band)
    })
    # Per-biome models for this band
    for biome in df_band["BIOME"].unique():
        df_biome = df_band[df_band["BIOME"] == biome]
        if not df_biome.empty:
            model_biome = ols("edge_index ~ year", data=df_biome).fit()
            models[(band, biome)] = model_biome
            results.append({
                "band_name": band,
                "group": biome,
                "slope": model_biome.params.get("year", None),
                "p_value": model_biome.pvalues.get("year", None),
                "r_squared": model_biome.rsquared,
                "n": len(df_biome)
            })

# Convert results to DataFrame and save to CSV
results_df = pd.DataFrame(results)
results_df.to_csv("regression_results.csv", index=False)
results_df


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Filter out groups with NaN slopes if needed
plot_df = results_df.dropna(subset=["slope"])

# Set up FacetGrid by band_name
g = sns.FacetGrid(plot_df, col="band_name", col_wrap=3, sharex=False, sharey=True, height=4)

def facet_plot(data, color, **kwargs):
    # Plot a horizontal line for each group
    for i, row in data.iterrows():
        plt.axhline(y=row["slope"], color=color, lw=2, label=row["group"])
        # Annotate n above the line
        plt.text(
            x=0.5, y=row["slope"], s=f'n={int(row["n"])}',
            va='bottom', ha='center', color=color, fontsize=10, transform=plt.gca().get_yaxis_transform()
        )
    # Add legend only once
    handles, labels = plt.gca().get_legend_handles_labels()
    if handles:
        plt.legend(handles, labels, loc='best')

g.map_dataframe(facet_plot)

g.set_axis_labels("Group", "Slope (trend)")
g.set_titles(col_template="{col_name}")
plt.tight_layout()
plt.show()