In [1]:
import json
import numpy as np
import pandas as pd
import dask.dataframe as dd
import geopandas as gpd
from itertools import chain
root_dir = "/pfs/work7/workspace/scratch/tu_zxobe27-master_thesis/"

In [2]:
# import data
rivers = gpd.read_feather(f"{root_dir}data/river_network/shapefile.feather")
drainage_polygons = gpd.read_feather(f"{root_dir}data/drainage/extracted_drainage_polygons.feather")
land_cover = dd.read_parquet(f"{root_dir}data/land_cover/temp_extracted_land_cover/", columns=["year", "deforestation", "deforestation_p", "deforestation_a", "deforestation_u", "deforestation_m", "forest", "pasture", "agriculture", "urban", "mining", "total"]).astype(np.uint32)
cloud_cover = dd.read_parquet(f"{root_dir}data/cloud_cover/cloud_cover.parquet").astype({"grid_id": "uint32", "index": "uint32", "year": "uint32", "cloud_cover": "float32"})

In [4]:
# prepare deforestation data in dask dataframe
t_deforestation = land_cover.groupby(["grid_id", "index", "year"]).sum().reset_index().astype(np.uint32).persist()
del land_cover

In [97]:
## aggregation within adm2 regions

# prepare a table from adm2 to grid_id
adm2_table = pd.merge(
    rivers[["adm2", "estuary", "river", "segment", "subsegment"]], 
    drainage_polygons[["estuary", "river", "segment", "subsegment"]].reset_index(names = ["grid_id", "index"]), 
    on=["estuary", "river", "segment", "subsegment"], how="right",
    ).dropna()[["grid_id", "index", "adm2"]].astype(np.uint32)

In [103]:
# merge deforestation data with adm2
t_deforestation_adm = dd.merge(t_deforestation, dd.from_pandas(adm2_table, npartitions=1), on=["grid_id", "index"], how="left")
# calculate deforestation data for each adm2
c_final_df_deforestation = t_deforestation_adm.drop(columns=["grid_id", "index"]).groupby(["adm2", "year"]).sum().compute()

In [105]:
# merge cloud cover data with adm2
t_cloud_cover_adm = dd.merge(cloud_cover, dd.from_pandas(adm2_table, npartitions=1), on=["grid_id", "index"], how="left")
# calculate cloud cover data for each adm2
c_final_df_cloud_cover = t_cloud_cover_adm.drop(columns=["grid_id", "index"]).groupby(["adm2", "year"]).mean().compute()

In [111]:
# merge deforestation and cloud cover data
out_df = pd.merge(c_final_df_deforestation, c_final_df_cloud_cover, on=["adm2", "year"], how="left")

In [113]:
# write GID
out_df = out_df.reset_index().astype({"year": np.int16})
out_df["municipality"] = out_df.adm2.map(id_gid_lookup)
out_df.drop(columns=["adm2"], inplace=True)

# save to parquet
out_df.to_parquet(f"{root_dir}data/deforestation/deforestation.parquet", index=False)

In [115]:
## aggregation in upstream nodes
# compute lookup from node to list of polygons
drainage_polygons_tmp = drainage_polygons[((~ drainage_polygons.is_empty) & (~ drainage_polygons.geometry.isna()))].drop(columns="geometry").reset_index(names = ["grid", "index"])
drainage_polygons_tmp = pd.merge(drainage_polygons_tmp, rivers.drop(columns = ["NORIOCOMP", "CORIO", "geometry"]), on = ["estuary", "river", "segment", "subsegment"])
drainage_polygons_tmp["identifier"] = drainage_polygons_tmp.apply(lambda x: [x["grid"], x["index"]], axis = 1)
node_polygon_lookup = drainage_polygons_tmp.set_index("upstream_node_id").groupby(level=0).apply(lambda x: x["identifier"].tolist()).to_dict()

: 

In [6]:
# import reachability data
reachability_municipalities = json.load(open("/pfs/work7/workspace/scratch/tu_zxobe27-master_thesis/data/river_network/reachability_municipalities.json", "r"))

## convert to integer ID
# load municipalities
municipalities = gpd.read_file("/pfs/work7/workspace/scratch/tu_zxobe27-master_thesis/data/misc/raw/gadm/gadm41_BRA_2.json", engine="pyogrio")
# create mapping from GID_2 to integer ID
gid_id_lookup = municipalities["GID_2"].reset_index().set_index("GID_2")["index"].to_dict()
id_gid_lookup = municipalities["GID_2"].to_dict()
# convert reachability_municipalities keys to integer ID
reachability_municipalities = {gid_id_lookup.get(k, k): v for k, v in reachability_municipalities.items()}

In [7]:
## chunk all municipalities into chunks if 1M nodes
# assign chunks of 1M nodes
t_chunks = np.cumsum([len(x) if x is not None else 0 for i, x in reachability_municipalities.items()]) // 5e5
# get indices of chunks
t_chunks = [(int(np.argmax(t_chunks == i)), int(len(t_chunks) - np.argmax(t_chunks[::-1] == i) - 1)) for i in np.unique(t_chunks)]
# get nodes split into chunks
c_node_ids = [{y: reachability_municipalities[y] for y in list(reachability_municipalities.keys())[x[0]:x[1]]} for x in t_chunks]

In [74]:
# iterate over chunks
out_df = [None] * len(c_node_ids)
for i in [0]:#range(len(c_node_ids)):
    ## prepare data frame for final estimation
    # get polygon ids
    t_index_prep = {(key, int(value)): node_polygon_lookup.get(value, [None, None]) for key, values in c_node_ids[i].items() if values is not None for value in values}
    # combine in tuple for index
    t_index_prep = [(key[0], key[1], int(value[0]), int(value[1])) for key, values in t_index_prep.items() for value in values if value is not None]         
    # create dataframe with indices
    c_final_df = dd.from_pandas(pd.DataFrame().from_records(t_index_prep, columns = ["municipality", "node", "grid_id", "index"])).astype(np.uint32)
    # merge with deforestation data and summarize
    c_final_df_deforestation = dd.merge(c_final_df, t_deforestation, on = ["grid_id", "index"], how = "left")
    c_final_df_deforestation = c_final_df_deforestation.drop(columns=["grid_id", "index", "node"]).groupby(["municipality", "year"]).sum().compute().astype(np.float32)
    # merge with cloud cover data and summarize
    c_final_df_cloud_cover = dd.merge(c_final_df, cloud_cover, on = ["grid_id", "index"], how = "left")
    c_final_df_cloud_cover = c_final_df_cloud_cover.groupby(["municipality", "year"]).agg({"cloud_cover": "mean"}).compute().astype(np.float32)
    # Group by the bins and sum the value column
    #agg_dict = {"deforestation": "sum", "deforestation_p": "sum", "deforestation_a": "sum", "deforestation_u": "sum", "deforestation_m": "sum", "forest": "sum", "pasture": "sum", "agriculture": "sum", "urban": "sum", "mining": "sum", "total": "sum"}
    out_df[i] = pd.merge(c_final_df_deforestation, c_final_df_cloud_cover, on = ["municipality", "year"], how = "outer")

In [75]:
# combine all chunks
out_df = pd.concat(out_df).reset_index().astype({"year": np.int16})
# get GID_2
out_df["municipality"] = out_df.municipality.map(id_gid_lookup)


In [None]:
# save to parquet
out_df.to_parquet(f"{root_dir}data/land_cover/land_cover_municipalities_agg.parquet")