# Extracting ESM data using regional model boundaries.
**Author:** Denisse Fierro Arcos  
**Last updated:** 2024-09-03  
  
This script needs to be run whenever there is an update of regional boundaries or ESM outputs. This script is best run in NCI's Gadi but file paths can be updated to work anywhere where GFDL outputs are stored.  

**Note:** Given that we are not able to share large files in this repository, we cannot share all files used in this notebook. However, we have included the FishMIP regional model shapefile with which you can create raster masks. You will need to run notebooks `03a_Regional_Models_2DMasks` and `03b_Regional_Models_3DMasks` to create the raster masks before you can continue.

## Loading libraries

In [7]:
import xarray as xr
import zarr
import os
from glob import glob
import pandas as pd
import numpy as np
from datetime import datetime
import geopandas as gpd
import rioxarray
from dask.distributed import Client

## Starting cluster

In [2]:
client = Client(threads_per_worker = 1)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 37871 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /proxy/37871/status,

0,1
Dashboard: /proxy/37871/status,Workers: 14
Total threads: 14,Total memory: 63.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43549,Workers: 14
Dashboard: /proxy/37871/status,Total threads: 14
Started: Just now,Total memory: 63.00 GiB

0,1
Comm: tcp://127.0.0.1:43449,Total threads: 1
Dashboard: /proxy/42535/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:42761,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-yk4q9e_x,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-yk4q9e_x

0,1
Comm: tcp://127.0.0.1:37621,Total threads: 1
Dashboard: /proxy/38915/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:45159,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-czg_llh9,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-czg_llh9

0,1
Comm: tcp://127.0.0.1:44811,Total threads: 1
Dashboard: /proxy/45395/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:46873,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-2qrvvgdb,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-2qrvvgdb

0,1
Comm: tcp://127.0.0.1:37589,Total threads: 1
Dashboard: /proxy/33409/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:40583,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-zm00lga0,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-zm00lga0

0,1
Comm: tcp://127.0.0.1:39131,Total threads: 1
Dashboard: /proxy/40291/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:45719,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-i46w8b0a,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-i46w8b0a

0,1
Comm: tcp://127.0.0.1:37633,Total threads: 1
Dashboard: /proxy/40227/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:36033,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-1yheo9j3,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-1yheo9j3

0,1
Comm: tcp://127.0.0.1:46155,Total threads: 1
Dashboard: /proxy/45811/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:44523,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-qidf_sf7,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-qidf_sf7

0,1
Comm: tcp://127.0.0.1:44239,Total threads: 1
Dashboard: /proxy/44857/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:36453,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-emnu9ez3,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-emnu9ez3

0,1
Comm: tcp://127.0.0.1:43729,Total threads: 1
Dashboard: /proxy/43977/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:35405,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-44r09jrg,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-44r09jrg

0,1
Comm: tcp://127.0.0.1:39499,Total threads: 1
Dashboard: /proxy/33909/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:41531,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-zlaqt2a_,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-zlaqt2a_

0,1
Comm: tcp://127.0.0.1:46493,Total threads: 1
Dashboard: /proxy/35433/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:46575,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-_avo7efx,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-_avo7efx

0,1
Comm: tcp://127.0.0.1:40563,Total threads: 1
Dashboard: /proxy/39371/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:36277,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-6uk0pspy,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-6uk0pspy

0,1
Comm: tcp://127.0.0.1:36697,Total threads: 1
Dashboard: /proxy/41673/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:40609,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-y67fpjyu,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-y67fpjyu

0,1
Comm: tcp://127.0.0.1:38265,Total threads: 1
Dashboard: /proxy/39071/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:33923,
Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-anmtci1z,Local directory: /jobfs/137502389.gadi-pbs/dask-scratch-space/worker-anmtci1z


## Loading gridded mask and regions shapefiles
We will use these files to extract data for each FishMIP regional model. Note that the gridded mask **MUST** exactly match the ESM where we will extract data.

In [23]:
#Loading FishMIP regional models shapefile
rmes = gpd.read_file(os.path.join('/g/data/vf71/shared_resources/FishMIP_regional_models',
                                  'FishMIP_regional_models.shp'))

#Loading FishMIP regional models gridded mask
ras_file = os.path.join('/g/data/vf71/shared_resources/FishMIPMasks',
                        'merged_regional_fishmip',
                        'gfdl-mom6-cobalt2_areacello_15arcmin_fishMIP_regional_merged.nc')
mask_ras = xr.open_dataset(ras_file).region
#Renaming coordinate dimensions
mask_ras = mask_ras.rename({'latitude': 'lat', 'longitude': 'lon'})
#Rechunking data to make it more manageable
mask_ras = mask_ras.chunk({'lat': 144, 'lon': 288})

## Setting up directories and files

In [4]:
#Base folder containing Earth System Model (ESM) data
base_dir = '/g/data/vf71/fishmip_inputs/ISIMIP3a/global_inputs/obsclim/025deg'
#Get a list of all files containing monthly ESM outputs (depth is excluded)
list_files = glob(os.path.join(base_dir, '*.nc'))

#Define (or create) folder for outputs
base_out = os.path.join(base_dir.replace('global', 'regional'), 'download_data')
os.makedirs(base_out, exist_ok = True)

## Defining useful functions for data extraction

In [12]:
def open_ard_data(file_path, boolean_mask_ras):
    '''
    Open netCDF files in analysis ready data (ARD) format. That is apply chunks
    that make data analysis easier.
    
    Inputs:
    file_path (character): Full file path where netCDF file is stored.
    boolean_mask_ras (boolean data array): Data array to be used as initial mask
    to decrease the size of the original dataset. This mask makes no distinction
    between regional models, it simply identifies grid cells within regional 
    model boundaries with the value of 1.
    
    Outputs:
    da (data array): ARD data array containing data only for grid cells within
    regional model boundaries.
    '''

    #Getting chunks from gridded mask to apply it to model data array
    [lat_chunk] = np.unique(boolean_mask_ras.chunksizes['lat'])
    [lon_chunk] = np.unique(boolean_mask_ras.chunksizes['lon'])
    
    #Open data array
    da = xr.open_dataarray(file_path)
    
    #Checking if there is a fourth dimension in dataset, it is depth, but it is not
    #always called the same
    if len(da.dims) > 3:
        [depth_name] = [d for d in da.dims if d not in ['time', 'lat', 'lon']]
        #Changing depth dimension name
        da = da.rename({depth_name: 'depth_bin_m'})
        da = da.chunk({'time': 600, 'depth_bin_m': 5, 'lat': lat_chunk, 'lon': lon_chunk})
    elif len(da.dims) == 3:
        da = da.chunk({'time': 600, 'lat': lat_chunk, 'lon': lon_chunk})
    else:
        da = da.chunk({'lat': lat_chunk, 'lon': lon_chunk})
    
    #Apply mask for all regions to decrease dataset size
    da = da.where(boolean_mask_ras == 1)
    
    #Add spatial information to dataset
    da.rio.set_spatial_dims(x_dim = 'lon', y_dim = 'lat', inplace = True)
    da.rio.write_crs('epsg:4326', inplace = True)
    
    #Change format of time dimension
    if 'time' in da.dims:
        new_time = da.indexes['time'].to_datetimeindex()
        #Changing time in data array
        da['time'] = new_time

    return da

In [13]:
def mask_ard_data(ard_da, shp_mask, file_out):
    '''
    Open netCDF files in analysis ready data (ARD) format. That is apply chunks
    that make data analysis easier.
    
    Inputs:
    ard_da (data array): Analysis ready data (ARD) data array produced by the 
    function "open_ard_data"
    shp_mask (shapefile): Shapefile containing the boundaries of regional models
    file_out (character): Full file path where masked data should be stored.
    
    Outputs:
    No data is returned, but masked file will be stored in specified file path.
    '''

    #Clip data using regional shapefile
    da_mask = ard_da.rio.clip(shp_mask.geometry, shp_mask.crs, drop = True, 
                              all_touched = True)
    #Remove spatial information
    da_mask = da_mask.drop_vars('spatial_ref')
    da_mask.encoding = {}

    #Check file extension included in path to save data
    if file_out.endswith('zarr'):
        for i, c in enumerate(da_mask.chunks):
            if len(c) > 1 and len(set(c)) > 1:
                print(f'Rechunking {file_out}.')
                print(f'Dimension "{da_mask.dims[i]}" has unequal chunks.')
                da_mask = da_mask.chunk({da_mask.dims[i]: '200MB'})
        da_mask.to_zarr(file_out, consolidated = True, mode = 'w')
    if file_out.endswith('parquet'):
        #Keep data array attributes to be recorded in final data frame
        da_attrs = ard_da.attrs
        da_attrs = pd.DataFrame([da_attrs])
        if 'time' in ard_da.coords:
            ind_wider = ['lat', 'lon', 'time', 'vals']
        else:
            ind_wider = ['lat', 'lon', 'vals']
        #Turn extracted data into data frame and remove rows with NA values
        df = da_mask.to_series().to_frame().reset_index().dropna()
        #Changing column name to standardise across variables
        df = df.rename(columns = {ard_da.name: 'vals'}).reset_index(drop = True)
        #Reorganise data
        df = df[ind_wider]
        #Include original dataset attributes
        df = pd.concat([df, da_attrs], axis = 1)
        #Saving data frame
        df.to_parquet(file_out)

## Extracting data from all model outputs for all FishMIP regional models

In [41]:
#Keep area of grid cell as gridded data
[area] = [f for f in list_files if 'areacello' in f]
area_da = open_ard_data(area, mask_ras)
#Extract data for each region included in the regional mask
for i in rmes.region:
    mask = rmes[rmes.region == i]
    #Get name of region and clean it for use in output file
    reg_name = i.lower().replace(" ", "-").replace("'", "")
    #File name out - Replacing "global" for region name
    file_out = os.path.join(base_out, 
                            os.path.basename(area).replace('.nc', '.zarr').replace('global', reg_name))
    #Extract data and save masked data - but only if file does not already exist
    if os.path.isdir(file_out) | os.path.isfile(file_out):
        continue
    mask_ard_data(area_da, mask, file_out)

Process all variables.

In [None]:
for f in list_files:
    #Open data array as ARD
    da = open_ard_data(f, mask_ras)

    #Create file name based on presence of depth dimension
    if 'depth_bin_m' in da.dims:
        base_file_out = os.path.basename(f).replace('.nc', '.zarr')
    else:
        base_file_out = os.path.basename(f).replace('.nc', '.parquet')
    #Adding output folder to create full file path
    base_file_out = os.path.join(base_out, base_file_out)

    #Extract data for each region included in the regional mask
    for i in rmes.region:
        #Get polygon for each region
        mask = rmes[rmes.region == i]
        #Get name of region and clean it for use in output file
        reg_name = i.lower().replace(" ", "-").replace("'", "")
        #File name out - Replacing "global" for region name
        file_out = base_file_out.replace('global', reg_name)
        #Extract data and save masked data - but only if file does not already exist
        if os.path.isdir(file_out) | os.path.isfile(file_out):
            continue
        mask_ard_data(da, mask, file_out)

  new_time = da.indexes['time'].to_datetimeindex()
  new_time = da.indexes['time'].to_datetimeindex()
  new_time = da.indexes['time'].to_datetimeindex()
  new_time = da.indexes['time'].to_datetimeindex()
  new_time = da.indexes['time'].to_datetimeindex()
  new_time = da.indexes['time'].to_datetimeindex()
  new_time = da.indexes['time'].to_datetimeindex()


Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_phypico_15arcmin_aust-east-tuna-billfish_monthly_1961_2010.zarr.
Dimension "lat" has unequal chunks.
Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_phypico_15arcmin_christmas-island_monthly_1961_2010.zarr.
Dimension "lon" has unequal chunks.
Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_phypico_15arcmin_coral-sea-nemo_monthly_1961_2010.zarr.
Dimension "lat" has unequal chunks.
Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_phypico_15arcmin_gbr_monthly_1961_2010.zarr.
Dimension "lat" has unequal chunks.
Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_phypico_15arcmin_kimberley_monthly_19

  new_time = da.indexes['time'].to_datetimeindex()
  new_time = da.indexes['time'].to_datetimeindex()


Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_phydiaz_15arcmin_aust-east-tuna-billfish_monthly_1961_2010.zarr.
Dimension "lat" has unequal chunks.
Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_phydiaz_15arcmin_christmas-island_monthly_1961_2010.zarr.
Dimension "lon" has unequal chunks.
Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_phydiaz_15arcmin_coral-sea-nemo_monthly_1961_2010.zarr.
Dimension "lat" has unequal chunks.
Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_phydiaz_15arcmin_gbr_monthly_1961_2010.zarr.
Dimension "lat" has unequal chunks.
Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_phydiaz_15arcmin_kimberley_monthly_19

  new_time = da.indexes['time'].to_datetimeindex()


Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_vo_15arcmin_aust-east-tuna-billfish_monthly_1961_2010.zarr.
Dimension "lat" has unequal chunks.
Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_vo_15arcmin_christmas-island_monthly_1961_2010.zarr.
Dimension "lon" has unequal chunks.
Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_vo_15arcmin_coral-sea-nemo_monthly_1961_2010.zarr.
Dimension "lat" has unequal chunks.
Rechunking /g/data/vf71/fishmip_inputs/ISIMIP3a/regional_inputs/obsclim/025deg/download_data/gfdl-mom6-cobalt2_obsclim_vo_15arcmin_gbr_monthly_1961_2010.zarr.
Dimension "lat" has unequal chunks.
