# Pre-processing script

In [1]:
import os
data_path = '../data/CBN-layers/'
os.chdir(data_path)

from cng.utils import ST_MakeValid, set_secrets, s3_client
s3 = s3_client()

import ibis
from ibis import _
con = ibis.duckdb.connect(extensions=["spatial"])

import geopandas as gpd
import duckdb

#### Helper functions

In [2]:
def get_url(folder, file, base_folder = 'CBN-data'):
    minio = 'https://minio.carlboettiger.info/'
    bucket = 'public-ca30x30'
    if base_folder is None:
        path = os.path.join(bucket,folder,file)
    else:
        path = os.path.join(bucket,base_folder,folder,file)
    url = minio+path
    return url

#### Variables

In [3]:
# CA Nature data 
ca_raw_parquet = "https://data.source.coop/cboettig/ca30x30/ca_areas.parquet"

# Boundary of CA, used to computed 'non-conserved' areas
ca_boundary_parquet = get_url('Preprocessing','ca_boundary.parquet',base_folder = None)

# newly protected areas 
newly_protected = get_url('Progress_data_new_protection/Newly_counted_lands','newly_counted_lands_2024.parquet')

# Ecoregions
ecoregions = get_url('Ecoregion','ACE_ecoregions.parquet')

# file to save non-conserved areas; costly operation so we save results 
ca_nonconserved_parquet_local = "ca_nonconserved_simplified500m.parquet" # local copy 
ca_nonconserved_parquet = get_url('Preprocessing',ca_nonconserved_parquet_local,base_folder = None)
ca_nonconserved_eco_parquet_local = "ca_nonconserved_simplified500m_ecoregions.parquet" # local copy 
ca_nonconserved_eco_parquet = get_url('Preprocessing',ca_nonconserved_eco_parquet_local ,base_folder = None)

# temp file only of CA Nature data + non-conserved areas  
ca_base_parquet = "ca-30x30-base.parquet"
ca_temp_parquet = "ca-30x30-temp.parquet"  

# temp file used to compute metrics w/ data layers 
ca_temp_stats_parquet = "ca-30x30-stats-temp.parquet"  

#vector data 
ACE_rarerank_statewide = get_url('ACE_biodiversity/ACE_rarerank_statewide','ACE_rarerank_statewide.parquet')
ACE_rarerank_ecoregion = get_url('ACE_biodiversity/ACE_rarerank_ecoregion','ACE_rarerank_ecoregion.parquet')
ACE_biorank_statewide = get_url('ACE_biodiversity/ACE_biorank_statewide','ACE_biorank_statewide.parquet')
ACE_biorank_ecoregion = get_url('ACE_biodiversity/ACE_biorank_ecoregion','ACE_biorank_ecoregion.parquet')

ACE_amph_richness = get_url('ACE_biodiversity/ACE_amphibian_richness','ACE_amphibian_richness.parquet')
ACE_reptile_richness = get_url('ACE_biodiversity/ACE_reptile_richness','ACE_reptile_richness.parquet')
ACE_bird_richness = get_url('ACE_biodiversity/ACE_bird_richness','ACE_bird_richness.parquet')
ACE_mammal_richness = get_url('ACE_biodiversity/ACE_mammal_richness','ACE_mammal_richness.parquet')
ACE_rare_amphibian_richness = get_url('ACE_biodiversity/ACE_rare_amphibian_richness','ACE_rare_amphibian_richness.parquet')
ACE_rare_reptile_richness = get_url('ACE_biodiversity/ACE_rare_reptile_richness','ACE_rare_reptile_richness.parquet')
ACE_rare_bird_richness = get_url('ACE_biodiversity/ACE_rare_bird_richness','ACE_rare_bird_richness.parquet')
ACE_rare_mammal_richness = get_url('ACE_biodiversity/ACE_rare_mammal_richness','ACE_rare_mammal_richness.parquet')
ACE_endemic_amphibian_richness = get_url('ACE_biodiversity/ACE_endemic_amphibian_richness','ACE_endemic_amphibian_richness.parquet')
ACE_endemic_reptile_richness = get_url('ACE_biodiversity/ACE_endemic_reptile_richness','ACE_endemic_reptile_richness.parquet')
ACE_endemic_bird_richness = get_url('ACE_biodiversity/ACE_endemic_bird_richness','ACE_endemic_bird_richness.parquet')
ACE_endemic_mammal_richness = get_url('ACE_biodiversity/ACE_endemic_mammal_richness','ACE_endemic_mammal_richness.parquet')

wetlands = get_url('Freshwater_resources/Wetlands','CA_wetlands.parquet')
fire = get_url('Climate_risks/Historical_fire_perimeters','calfire_2023.parquet')
farmland = get_url('NBS_agriculture/Lands_suitable_grazing','Farmland_2018.parquet')
grazing = get_url('NBS_agriculture/Lands_suitable_grazing','Grazing_land_2018.parquet')
DAC = get_url('Progress_data_new_protection/DAC','DAC_2022.parquet')
low_income = get_url('Progress_data_new_protection/Low_income_communities','low_income_CalEnviroScreen4.parquet')

# raster data
climate_zones = get_url('Climate_zones', 'climate_zones_10_processed.tif')
habitat = get_url('Habitat', 'CWHR13_2022_processed.tif')
plant_richness = get_url('Biodiversity_unique/Plant_richness', 'species_D_processed.tif')
endemic_plant_richness = get_url('Biodiversity_unique/Rarityweighted_endemic_plant_richness', 'endemicspecies_E_processed.tif')
resilient_conn_network = get_url('Connectivity_resilience/Resilient_connected_network_allcategories', 
                                 'rcn_wIntactBioCat_caOnly_2020-10-27_processed.tif')

# final files: conserved + non-conserved areas + data layers 
ca_parquet = "ca-30x30-cbn.parquet"
ca_pmtiles = "ca-30x30-cbn.pmtiles" #excludes non-conserved geometries


# Step 1: Computing all "non-conserved" areas

#### Computing difference: Non-conserved areas = CA Boundary - Conserved Areas

In [None]:
# This chunk will take ~2 hours to run 
conn = ibis.duckdb.connect("tmp", extensions=["spatial"]) #save to disk

# CA Boundary 
ca_all_tbl = (
    conn.read_parquet(ca_boundary_parquet)
    .rename(geom = "geometry")
    .cast({"geom": "geometry"})
)

# CA-Nature data / protected areas 
tbl = (
    conn.read_parquet(ca_raw_parquet)
    .cast({"SHAPE": "geometry"})
    .rename(geom = "SHAPE")
)

conn.create_table("t1", ca_all_tbl, overwrite = True)
conn.create_table("t2", tbl.filter(_.Release_Year == 2024), overwrite = True)

# simplified all geometries 500m so the kernel doesn't crash
# computing difference
conn.conn.execute('''
CREATE TABLE not_in_pad AS
WITH t2_simplified AS (
    SELECT ST_Simplify(geom, 500) AS geom
    FROM t2
),
t2_union AS (
    SELECT ST_Union_Agg(geom) AS geom
    FROM t2_simplified
)
SELECT 
    ST_Difference(t1.geom, t2_union.geom) AS geom
FROM 
    t1, t2_union;
''')


# save to parquet file so we don't have to run this again
nonconserved = conn.table("not_in_pad")
nonconserved.execute().to_parquet(ca_nonconserved_parquet_local)

# upload to minio
s3.fput_object("public-ca30x30", 'Preprocessing/'+ca_nonconserved_parquet_local, ca_nonconserved_parquet_local) 


#### Compute ecoregion for non-conserved areas

In [None]:
eco = con.read_parquet(ecoregions)
non = con.read_parquet(ca_nonconserved_parquet)

con.create_table("eco", eco.select("ECOREGION_","geom"), overwrite = True)

con.create_table("non", non, overwrite = True)
            
#split up the non-conserved areas by ecoregions
con.con.execute('''
CREATE TABLE non_conserved_eco AS
SELECT 
    non.*, 
    eco.ECOREGION_ AS ecoregion,
    ST_Intersection(non.geom, eco.geom) AS geom  -- Split non into ecoregions
FROM non
JOIN eco 
ON ST_Intersects(non.geom, eco.geom)
WHERE ST_GeometryType(ST_Intersection(non.geom, eco.geom)) IN ('POLYGON', 'MULTIPOLYGON');
''')

# save to parquet file so we don't have to run this again
non_eco = (con.table("non_conserved_eco")
           .drop('geom')
           .rename(geom = "geom_1")
           .mutate(geom = ST_MakeValid(_.geom))
           .mutate(id=ibis.row_number().over())
          )

non_conserved_eco = non_eco.execute()
non_conserved_eco.to_parquet(ca_nonconserved_eco_parquet_local)

# upload to minio
s3.fput_object("public-ca30x30", 'Preprocessing/' + ca_nonconserved_eco_parquet_local, 
               ca_nonconserved_eco_parquet_local)

#### Non-conserved areas need to match CA Nature schema when merging

In [None]:
# match CA Nature schema 
nonconserved_clean = (
    con.read_parquet(ca_nonconserved_eco_parquet)
    .cast({"geom": "geometry"})
    .mutate(established = ibis.null(), gap_code = 0, name = ibis.literal("Non-Conserved Areas"),
            access_type = ibis.null(), manager = ibis.null(), manager_type = ibis.null(),
            easement = ibis.null(), type = ibis.literal("Land"),
            status = ibis.literal("non-conserved"),
            acres = _.geom.area() / 4046.8564224 #convert sq meters to acres
           )
    .cast({"established": "string", "gap_code": "int16", "status": "string","name": "string",
          "access_type": "string", "manager": "string", "manager_type": "string",
          "ecoregion": "string", "easement": "string", "id": "int64", "type": "string",
          "acres":"float32"}) #match schema to CA Nature
)

# Step 2: Isolate the "newly protected" polygons

In [None]:
# # negative buffer to account for overlapping boundaries. 
# buffer = -30 #30m buffer 

# tbl = (
#     con.read_parquet(ca_raw_parquet)
#     .cast({"SHAPE": "geometry"})
#     .rename(geom = "SHAPE")
#     .filter(_.reGAP < 3) # only gap 1 and 2 count towards 30x30
# )

# # polygons with release_year 2024 are a superset of release_year 2023. 
# # use anti_join to isolate the objects that are in release_year 2024 but not release_year 2023 (aka newly established). 
# tbl_2023 = tbl.filter(_.Release_Year == 2023).mutate(geom=_.geom.buffer(buffer)) 
# tbl_2024 = tbl.filter(_.Release_Year == 2024)
# intersects = tbl_2024.anti_join(tbl_2023, _.geom.intersects(tbl_2023.geom))

In [None]:
buffer = 160 #0.1mile buffer 

tbl_2024 = (
    con.read_parquet(ca_raw_parquet)
    .cast({"SHAPE": "geometry"})
    .rename(geom = "SHAPE")
    .filter(_.Release_Year == 2024)
    .mutate(geom=_.geom.buffer(buffer)) 
)

tbl_new = (
    con.read_parquet(newly_protected)
)

intersects = tbl_new.anti_join(tbl_2024, _.geom.intersects(tbl_2024.geom))

# Step 3: Join all protected lands + non-conserved areas 

In [None]:
# %%time
new2024 = intersects.select("OBJECTID").mutate(established = ibis.literal("2024")) # saving IDs to join on

ca_merged = (con
      .read_parquet(ca_raw_parquet)
      .cast({"SHAPE": "geometry"})
      .mutate(area = _.SHAPE.area())
      .filter(_.Release_Year == 2024) # having both 2023 and 2024 is redudant since 2024 is the superset.
      .left_join(new2024, "OBJECTID") # newly established 2024 polygons 
      .mutate(established=_.established.fill_null("pre-2024")) 
      .rename(name = "cpad_PARK_NAME", access_type = "cpad_ACCESS_TYP", manager = "cpad_MNG_AGENCY",
              manager_type = "cpad_MNG_AG_LEV", id = "OBJECTID", type = "TYPE", 
              ecoregion = "CA_Ecoregion_Name", acres = "Acres", gap_code = "reGAP", geom = "SHAPE")
      .cast({"gap_code": "int16"})
      .cast({"id": "int64"})
      .mutate(manager = _.manager.substitute({"": "Unknown"})) 
      .mutate(manager_type = _.manager_type.substitute({"": "Unknown"}))
      .mutate(access_type = _.access_type.substitute({"": "Unknown Access"}))
      .mutate(name = _.name.substitute({"": "Unknown"}))
      .mutate(manager_type = _.manager_type.substitute({"Home Owners Association": "HOA"}))
      .mutate(easement=_.Easement.cast("string").substitute({"0": "False", "1": "True"}))
      .mutate(status=_.gap_code.cast("string")
              .substitute({"1": "30x30-conserved", "2": "30x30-conserved", "3": "other-conserved", 
                           "4": "unknown"}))
      .select(_.established, _.gap_code, _.status, _.name, _.access_type, _.manager, _.manager_type,
              _.ecoregion, _.easement, _.acres, _.id, _.type, _.geom)
      .union(nonconserved_clean)
      .mutate(acres = _.acres.round(4))
      .mutate(geom = ST_MakeValid(_.geom))
      .drop_null(['geom'],how = "any")
     )

ca = ca_merged.execute()
gdf.set_crs("epsg:3310").to_parquet(ca_temp_parquet) # saving to temp file to compute zonal stats 

#### Join with county data

In [None]:
counties = con.read_parquet('../CA_counties.parquet')
ca = con.read_parquet(ca_base_parquet)

con.create_table("counties", counties.select("COUNTY_NAM","geom"), overwrite = True)
con.create_table("ca", ca, overwrite = True)

con.con.execute('''
CREATE TABLE counties_data AS
SELECT 
    ca.*, 
    counties.COUNTY_NAM AS county,
    ST_Intersection(ca.geom, counties.geom) AS geom
FROM ca
JOIN counties 
  ON ST_Intersects(ca.geom, counties.geom)
WHERE NOT ST_IsEmpty(ST_Intersection(ca.geom, counties.geom))
  AND ST_GeometryType(ST_Intersection(ca.geom, counties.geom)) IN ('POLYGON', 'MULTIPOLYGON');
''')

county_data = (con.table("counties_data")
           .drop('geom')
           .rename(geom = "geom_1")
           .mutate(geom = ST_MakeValid(_.geom))
           .mutate(acres = _.geom.area() / 4046.8564224) #convert sq meters to acres
          )
gdf = county_data.execute()

import string
duplicated = gdf["id"].duplicated(keep=False)

# modify the ids for areas that span multiple counties 
gdf["suffix"] = ""
gdf.loc[duplicated, "suffix"] = (
    gdf[duplicated]
    .groupby("id")
    .cumcount()
    .map(lambda i: string.ascii_lowercase[i])
)

# if id = 11 has 2 rows, make each row 11a and 11b
# if id = 11 only has 1 row, keep it 11. 
gdf["id"] = gdf["id"].astype(str) + gdf["suffix"]
gdf = gdf.drop(columns="suffix")

gdf.set_crs("epsg:3310").to_parquet(ca_base_parquet)

# Step 4: Compute metrics w/ data layers

#### Raster data

In [None]:
%%time
import rasterio
from exactextract import exact_extract

rasters = [climate_zones, habitat, plant_richness, endemic_plant_richness, resilient_conn_network]
names = ['climate_zone','habitat_type','plant_richness','rarityweighted_endemic_plant_richness', 'resilient_connected_network']

if 'gdf_stats' not in locals(): 
    gdf_stats = gpd.read_parquet(ca_base_parquet) # read in data if it's not already created 
    
# need to make the following changes to our data for exact_extract() to work:
gdf_stats = gdf_stats.rename(columns ={'id':'ca_id'}) #rename 'id' because it conflicts with a raster field. 
gdf_stats.to_parquet(ca_temp_parquet) #saving updated parquet to file to use for exact_extract()

for file,name in zip(rasters,names):
    print(name)
    if name in ['climate_zone','habitat_type','resilient_connected_network']:
        metric = "majority"
    else: 
        metric = "count"
    raster_stats = exact_extract(file, ca_temp_parquet, [metric], include_cols=["ca_id"], output = 'pandas') # compute overlap 
    raster_stats = raster_stats[['ca_id',metric]]
    raster_stats = raster_stats.rename(columns ={metric:name})
    raster_stats[name] = raster_stats[name].round(3) #rounding stats 
     
    # joining with gpd.join(), need to set an index 
    gdf_stats = gdf_stats.set_index("ca_id").join(raster_stats.set_index("ca_id")) 

    # exact_extract() won't work with index, so now that it's joined, we reset the index. 
    gdf_stats = gdf_stats.reset_index() 
    # gdf_stats.to_parquet(name+'_overlap.parquet')

gdf_stats = gdf_stats.rename(columns ={'ca_id':'id'}) #reverting back to "id" col name, since we are finished with exact_extract() 
gdf_stats.to_file('raster_stats.geojson') # can't save to parquet for some reason 
# reproject to epsg:4326 since that's what pmtiles requires and we want to match that 

# Wall time: 8min 43s
# gdf_stats = gpd.read_file('raster_stats.geojson')
# gdf_stats.to_parquet('raster_stats.parquet')

#### Function to compute overlap for vector data 

In [None]:
def vector_vector_stats(base, data_layer):
    t1 = con.read_parquet(base).select(_.id, _.geom)
    t2 = con.read_parquet(data_layer).select(_.geom)

    expr = (t1
     .left_join(t2, t1.geom.intersects(t2.geom))
     .group_by(t1.id, t1.geom)
     .agg(overlap_fraction = (t1.geom.intersection(t2.geom).area() / t1.geom.area()) 

          .sum().coalesce(0).round(3) ) # overlap 
    )
    ibis.to_sql(expr)
    stats = expr.execute()
    return stats[['id','overlap_fraction']]

#### Vector data

In [None]:
%%time

## this takes ~26 hours
names = ['ACE_rarerank_statewide', 'ACE_rarerank_ecoregion',
         'ACE_biorank_statewide', 'ACE_biorank_ecoregion',
         'ACE_amphibian_richness','ACE_reptile_richness',
         'ACE_bird_richness','ACE_mammal_richness',
         'ACE_rare_amphibian_richness','ACE_rare_reptile_richness',
         'ACE_rare_bird_richness','ACE_rare_mammal_richness',
         'ACE_endemic_amphibian_richness','ACE_endemic_reptile_richness',
         'ACE_endemic_bird_richness','ACE_endemic_mammal_richness',
         'wetlands','fire','farmland','grazing','DAC','low_income']

vectors = [ACE_rarerank_statewide, ACE_rarerank_ecoregion,
           ACE_biorank_statewide, ACE_biorank_ecoregion,
           ACE_amph_richness, ACE_reptile_richness,
           ACE_bird_richness, ACE_mammal_richness,
           ACE_rare_amphibian_richness, ACE_rare_reptile_richness,
           ACE_rare_bird_richness, ACE_rare_mammal_richness,
           ACE_endemic_amphibian_richness,
           ACE_endemic_reptile_richness,
           ACE_endemic_bird_richness,
           ACE_endemic_mammal_richness,
           wetlands, fire,
           farmland, grazing,
           DAC, low_income]


gdf_stats = gpd.read_parquet('raster_stats.parquet') 

 # set the index to the col we are joining on for gpd.join()
gdf_stats = gdf_stats.set_index('id')

for file,name in zip(vectors,names):
    print(name)
    vector_stats = vector_vector_stats(ca_base_parquet, file) 
    vector_stats = vector_stats.rename(columns ={'overlap_fraction':name}) 

    # joining new zonal stats column with CA Nature data. 
    gdf_stats = gdf_stats.join(vector_stats.set_index('id'))
    gdf_stats.to_parquet(name+'.parquet') #save CA Nature + zonal stats 

gdf_stats = gdf_stats.reset_index()
gdf_stats.to_parquet('vector_raster_stats.parquet') #save CA Nature + zonal stats 
gdf_stats

# gdf_stats = gdf_stats.to_crs("epsg:4326")
# gdf_stats.to_parquet(ca_temp_parquet) # save results 
# gdf_stats.to_parquet(ca_parquet) # save results 


# Step 5: Upload file + Generate PMTiles

In [None]:
from cng.utils import hf_upload, s3_cp,set_secrets, to_pmtiles

# upload parquet to minio and HF
hf_upload('ca-30x30-cbn.parquet', ca_parquet)
s3_cp(ca_parquet, "s3://public-ca30x30/ca-30x30-cbn.parquet", "minio")

#to use PMTiles, need to convert to geojson
ca_geojson = (con
            .read_parquet(ca_parquet)
            # .filter(_.status != 'non-conserved') #omitting the non-conserved to only for pmtiles  
            )

#can't go directly from parquet -> pmtiles, need to go parquet -> geojson -> pmtiles 
ca_geojson.execute().to_file('ca-30x30-cbn.geojson') 
pmtiles = to_pmtiles(path+ 'ca-30x30-cbn.geojson', ca_pmtiles, options = ['--extend-zooms-if-still-dropping'])

# upload pmtiles  to minio and HF
hf_upload('ca-30x30-cbn.pmtiles', ca_pmtiles)
s3_cp(ca_pmtiles, "s3://public-ca30x30/ca-30x30-cbn.pmtiles", "minio")