# Soils Revealed precalculations with `Zarrs`
## Purpose
In this notebooks we will performed zonal statistics for different vector and raster data.
## Setup
### Library import
**Setup software libraries** 

In [1]:
import os 
from typing import Dict

import s3fs
import numpy as np
import xarray as xr
import pandas as pd
import geopandas as gpd
import regionmask
from xhistogram.xarray import histogram

**Reads key-value pairs from a `.env` file**

In [2]:
from dotenv import load_dotenv
load_dotenv()

True

**Setup `Dask` client**

In [3]:
from dask.distributed import Client, LocalCluster
#cluster = LocalCluster(n_workers=1, threads_per_worker=36)
client = Client()  # start distributed scheduler locally.  Launch dashboard
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 15.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45933,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 15.00 GiB

0,1
Comm: tcp://127.0.0.1:34133,Total threads: 4
Dashboard: http://127.0.0.1:37321/status,Memory: 3.75 GiB
Nanny: tcp://127.0.0.1:45707,
Local directory: /tmp/dask-worker-space/worker-uisfxe5z,Local directory: /tmp/dask-worker-space/worker-uisfxe5z

0,1
Comm: tcp://127.0.0.1:43933,Total threads: 4
Dashboard: http://127.0.0.1:36929/status,Memory: 3.75 GiB
Nanny: tcp://127.0.0.1:42497,
Local directory: /tmp/dask-worker-space/worker-twgjhtjx,Local directory: /tmp/dask-worker-space/worker-twgjhtjx

0,1
Comm: tcp://127.0.0.1:44529,Total threads: 4
Dashboard: http://127.0.0.1:39905/status,Memory: 3.75 GiB
Nanny: tcp://127.0.0.1:40083,
Local directory: /tmp/dask-worker-space/worker-tgsmhkdl,Local directory: /tmp/dask-worker-space/worker-tgsmhkdl

0,1
Comm: tcp://127.0.0.1:45875,Total threads: 4
Dashboard: http://127.0.0.1:41325/status,Memory: 3.75 GiB
Nanny: tcp://127.0.0.1:38301,
Local directory: /tmp/dask-worker-space/worker-r9wgu7vt,Local directory: /tmp/dask-worker-space/worker-r9wgu7vt


### Utils

**read_dataset_from_zarr**

In [4]:
def read_dataset_from_zarr(s3_path: str, group: str) -> xr.Dataset():
    # Initilize the S3 file system
    s3 = s3fs.S3FileSystem(key=os.getenv("S3_ACCESS_KEY_ID"), secret=os.getenv("S3_SECRET_ACCESS_KEY"))
    store = s3fs.S3Map(root=s3_path, s3=s3, check=False)
    # Read Zarr file
    ds = xr.open_zarr(store=store, group=group, consolidated=True)
    
    # Change coordinates names
    ds = ds.rename({'x': 'lon', 'y': 'lat'})
    
    # Change dimension name
    if group == 'concentration':
        ds = ds.rename({'depht': 'depth'})
    
    return ds

**read_dataframes_from_geojson_files**

In [5]:
def read_dataframes_from_geojson_files(folder_path: str, suffix = ".geojson", prefix: str = "") -> Dict[str, gpd.GeoDataFrame]:
    dataframes: Dict[str, pd.DataFrame] = {}
    for file_name in os.listdir(folder_path):
        if file_name.endswith(suffix) and file_name.startswith(prefix):
            file_path = os.path.join(folder_path, file_name)
            print(file_path)
            gdf = gpd.read_file(file_path)
            key = file_name[:-8]  # Remove the ".csv" extension
            dataframes[key] = gdf
    return dataframes

**rasterize_vector_data**

In [12]:
def rasterize_vector_data(ds: xr.Dataset(), 
                        df: gpd.GeoDataFrame(),
                        mask_name: str, 
                        index_column_name: str = 'index', 
                        x_coor_name: str = 'lon',
                        y_coor_name: str = 'lat') -> xr.Dataset():
    """Rasterize a GeoDataFrame using xarray Dataset 
    as a reference and add it a s a new variable"""

    mask = regionmask.mask_geopandas(
        df,
        ds[x_coor_name],
        ds[y_coor_name],
        numbers=index_column_name
    )

    ds[mask_name] = mask
    
    return ds

## Zonal statistics
### Read raster data
**Read rasters as `xarray.Dataset` from `Zarr` in Amazon S3 bucket**

In [6]:
s3_path = 's3://soils-revealed/global-dataset.zarr' #'s3://soils-revealed/experimental-dataset.zarr' 
group = 'historic'#'concentration' #stocks
ds = read_dataset_from_zarr(s3_path, group)
ds

Unnamed: 0,Array,Chunk
Bytes,427.15 MiB,1.11 MiB
Shape,"(3, 2, 2160, 4320)","(1, 1, 270, 540)"
Dask graph,384 chunks in 2 graph layers,384 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 427.15 MiB 1.11 MiB Shape (3, 2, 2160, 4320) (1, 1, 270, 540) Dask graph 384 chunks in 2 graph layers Data type float64 numpy.ndarray",3  1  4320  2160  2,

Unnamed: 0,Array,Chunk
Bytes,427.15 MiB,1.11 MiB
Shape,"(3, 2, 2160, 4320)","(1, 1, 270, 540)"
Dask graph,384 chunks in 2 graph layers,384 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


### Read vector data

In [8]:
vector_data = read_dataframes_from_geojson_files(folder_path='../../data/processed/vector_data', prefix="political")

../../data/processed/vector_data/political_boundaries_1.geojson
../../data/processed/vector_data/political_boundaries_0.geojson


### Zonal statistics
**Create the data mask by rasterizing the vector data**

In [11]:
mask_name = 'political_boundaries_0'
df = vector_data['political_boundaries_0']

ds_new = rasterize_vector_data(ds, df, mask_name)

ds_new


Unnamed: 0,Array,Chunk
Bytes,427.15 MiB,1.11 MiB
Shape,"(3, 2, 2160, 4320)","(1, 1, 270, 540)"
Dask graph,384 chunks in 2 graph layers,384 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 427.15 MiB 1.11 MiB Shape (3, 2, 2160, 4320) (1, 1, 270, 540) Dask graph 384 chunks in 2 graph layers Data type float64 numpy.ndarray",3  1  4320  2160  2,

Unnamed: 0,Array,Chunk
Bytes,427.15 MiB,1.11 MiB
Shape,"(3, 2, 2160, 4320)","(1, 1, 270, 540)"
Dask graph,384 chunks in 2 graph layers,384 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


***
## All the computational process at once
### Experimental, Historic, and Recent datasets
**Input variables**

In [42]:
TOLERANCE = 0.075
LEVEL = 1

datasets = ['global', 'experimental']
groups = {'global': ['historic', 'recent'], 'experimental': ['stocks', 'concentration']}
variables = {'historic': 'stocks', 'recent': 'stocks', 'stocks': 'stocks', 'concentration': 'concentration'}
isos = {'global': None, 'experimental': 'ARG'}
n_binds = {'global': {'historic': [40, 40, 60], 'recent': [10]}, 'experimental': {'stocks': [80], 'concentration': [20]}}
bind_ranges = {'global': {'historic': [[-20,20], [-40,40], [-60,60]], 'recent': [[-50,50]]}, 'experimental': {'stocks': [[-50, 50]], 'concentration': [[-10, 10]]}}

In [46]:
for dataset in datasets:
    iso = isos[dataset]
    print('Dataset: ', dataset)
    print('iso: ', iso)
    for group in groups[dataset]:
        variable = variables[group]
        n_bind = n_binds[dataset][group]
        bind_range = bind_ranges[dataset][group]
        print('Group: ', group)
        print('Variable: ', variable) 
        print('n_bind: ', n_bind)
        print('bind_range: ', bind_range) 
        

Dataset:  global
iso:  None
Group:  historic
Variable:  stocks
n_bind:  [40, 40, 60]
bind_range:  [[-20, 20], [-40, 40], [-60, 60]]
Group:  recent
Variable:  stocks
n_bind:  [10]
bind_range:  [[-50, 50]]
Dataset:  experimental
iso:  ARG
Group:  stocks
Variable:  stocks
n_bind:  [80]
bind_range:  [[-50, 50]]
Group:  concentration
Variable:  concentration
n_bind:  [20]
bind_range:  [[-10, 10]]


In [None]:
path = '../data/global-dataset.zarr' #'../data/experimental-dataset.zarr'#
group = 'recent'  #'stocks' #'concentration' #'historic'
variable = 'stocks' #'concentration'  #'stocks'  
iso = None #'ARG'
tolerance = 0.075
group_type = 'recent' #'experimental_dataset'  # 'historic' 
nBinds = [10] #[20] #[80] #[40, 40, 60]   
bindsRange = [[-50,50]]  #[[-10, 10]] #[[-50, 50]] # [[-20,20], [-40,40], [-60,60]] 
level=1

In [None]:
def compute_level_0_change(geo_name, group_type, variable, vector_data_0):
    df = pd.read_csv(f'../data/precalculations/{geo_name}_1_{group_type}_{variable}_change.csv')
    df.drop(columns='Unnamed: 0', inplace=True)
    df = df[df['depth'] != 'stocks']
    df = df[df['id'].notna()]
    df = df.astype({'id': int, 'id_0': int, 'sum_diff':'float64', 'count_diff':'float64', 'mean_diff':'float64'})
    
    for n, depth in enumerate(df['depth'].unique()):
        df_tmp = df[df['depth'] == depth].copy()
        
        if not df_tmp.empty:
            df_tmp['counts'] = df_tmp['counts'].apply(lambda x: np.array(json.loads(x)))
            df_counts = df_tmp[['id_0', 'counts']].groupby('id_0').sum().reset_index()
            df_counts['bins'] = df_tmp['bins'].iloc[0]
            
            df_diff = df_tmp[['id_0', 'sum_diff', 'count_diff']].groupby('id_0').sum().reset_index()
            df_diff['mean_diff'] = df_diff['sum_diff']/df_diff['count_diff']
            
            df_change_depth = pd.merge(df_counts, df_diff, on='id_0', how='left')
            
            df_change_depth['depth'] = depth
            for column in ['years', 'variable', 'group_type']:
                df_change_depth[column] = df_tmp[column].iloc[0]
                
            if n == 0:
                df_change = df_change_depth
            else: 
                df_change = pd.concat([df_change, df_change_depth])
     
    df_change = pd.merge(vector_data_0[f'{geo_name}_0'].drop(columns='geometry').astype({'id_0': int}), df_change.astype({'id_0': int}), on='id_0', how='left')
    
    return pd.concat([df_change.sort_values('id'), df.sort_values('id')]).drop(columns='index').reset_index().drop(columns='index').reset_index()

In [None]:
def load(x):
    try:
        return np.array(json.loads(x))
    except:
        a = np.empty((5))
        a[:] = np.nan
        return a
    
def compute_level_0_time_series(geo_name, group_type, variable, vector_data_0):
    df = pd.read_csv(f'../data/precalculations/{geo_name}_1_{group_type}_{variable}_time_series.csv')
    df.drop(columns='Unnamed: 0', inplace=True)
    df = df[df['depth'] != 'stocks']
    df = df[df['id'].notna()]
    df = df.astype({'id': int, 'id_0': int})
    
    for n, depth in enumerate(df['depth'].unique()):
        df_tmp = df[df['depth'] == depth].copy()
        
        if not df_tmp.empty:
            df_tmp['sum_values'] = df_tmp['sum_values'].apply(lambda x: load(x))
            df_tmp['count_values'] = df_tmp['count_values'].apply(lambda x: load(x))
            
            df_time_series_depth = df_tmp[['id_0', 'sum_values', 'count_values']].groupby('id_0').sum().reset_index()
            df_time_series_depth['mean_values'] = df_time_series_depth['sum_values']/df_time_series_depth['count_values']
            
            df_time_series_depth['depth'] = depth
            for column in ['years', 'variable', 'group_type']:
                df_time_series_depth[column] = df_tmp[column].iloc[0]
                
            if n == 0:
                df_time_series = df_time_series_depth
            else: 
                df_time_series = pd.concat([df_time_series, df_time_series_depth])
                
    df_time_series = pd.merge(vector_data_0[f'{geo_name}_0'].drop(columns='geometry').astype({'id_0': int}), df_time_series.astype({'id_0': int}).astype({'id_0': int}), on='id_0', how='left')
    
    return pd.concat([df_time_series.sort_values('id'), df.sort_values('id')]).drop(columns='index').reset_index().drop(columns='index').reset_index()

In [None]:
def save_precalculations_level1(vector_data, variable = 'stocks', group_type='experimental_dataset', output_type = 'change', root_path = '../data/precalculations/'):
    names = ['political_boundaries_1', 'biomes_1', 'hydrological_basins_1', 'landforms_1']

    for name in names:
        df = vector_data[name].copy()
        df.drop(columns='index', inplace=True)  
        df['variable'] = variable
        df['group_type'] = group_type
        
        df.to_csv(root_path+name+'_'+group_type+'_'+variable+'_'+output_type+'.csv')