# Downloading and preprocessing CHIRPS

Download daily global CHIRPS, concatenate into a daily zarr and resample into a monthly zarr.

The monthly data is 0.05 degrees, the daily is 0.25 degrees. Possibly worth coarsening monthly data to 0.25 degrees as well.

In [1]:
import os
import re
import tarfile
import tempfile
import requests
import xarray as xr
import xagg as xa
from datetime import datetime
from tqdm import tqdm
from funcs_support import get_params, utility_save, get_filepaths
dir_list = get_params()

df = get_filepaths()

In [2]:
from distributed import Client
# Start dask client
client = Client()
display(client)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 36920 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:36920/status,

0,1
Dashboard: http://127.0.0.1:36920/status,Workers: 8
Total threads: 48,Total memory: 503.37 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:37979,Workers: 0
Dashboard: http://127.0.0.1:36920/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:36390,Total threads: 6
Dashboard: http://127.0.0.1:37344/status,Memory: 62.92 GiB
Nanny: tcp://127.0.0.1:43489,
Local directory: /tmp/dask-scratch-space/worker-521z7p16,Local directory: /tmp/dask-scratch-space/worker-521z7p16

0,1
Comm: tcp://127.0.0.1:41410,Total threads: 6
Dashboard: http://127.0.0.1:39936/status,Memory: 62.92 GiB
Nanny: tcp://127.0.0.1:44704,
Local directory: /tmp/dask-scratch-space/worker-64q661ax,Local directory: /tmp/dask-scratch-space/worker-64q661ax

0,1
Comm: tcp://127.0.0.1:46249,Total threads: 6
Dashboard: http://127.0.0.1:39414/status,Memory: 62.92 GiB
Nanny: tcp://127.0.0.1:41961,
Local directory: /tmp/dask-scratch-space/worker-mwppkqrt,Local directory: /tmp/dask-scratch-space/worker-mwppkqrt

0,1
Comm: tcp://127.0.0.1:43253,Total threads: 6
Dashboard: http://127.0.0.1:32941/status,Memory: 62.92 GiB
Nanny: tcp://127.0.0.1:34653,
Local directory: /tmp/dask-scratch-space/worker-hlpamoa4,Local directory: /tmp/dask-scratch-space/worker-hlpamoa4

0,1
Comm: tcp://127.0.0.1:35330,Total threads: 6
Dashboard: http://127.0.0.1:38036/status,Memory: 62.92 GiB
Nanny: tcp://127.0.0.1:37571,
Local directory: /tmp/dask-scratch-space/worker-_2aell4_,Local directory: /tmp/dask-scratch-space/worker-_2aell4_

0,1
Comm: tcp://127.0.0.1:37884,Total threads: 6
Dashboard: http://127.0.0.1:36234/status,Memory: 62.92 GiB
Nanny: tcp://127.0.0.1:34084,
Local directory: /tmp/dask-scratch-space/worker-4gcprc1w,Local directory: /tmp/dask-scratch-space/worker-4gcprc1w

0,1
Comm: tcp://127.0.0.1:42769,Total threads: 6
Dashboard: http://127.0.0.1:37507/status,Memory: 62.92 GiB
Nanny: tcp://127.0.0.1:39994,
Local directory: /tmp/dask-scratch-space/worker-613sj87o,Local directory: /tmp/dask-scratch-space/worker-613sj87o

0,1
Comm: tcp://127.0.0.1:36023,Total threads: 6
Dashboard: http://127.0.0.1:36568/status,Memory: 62.92 GiB
Nanny: tcp://127.0.0.1:46583,
Local directory: /tmp/dask-scratch-space/worker-lm7hzwhl,Local directory: /tmp/dask-scratch-space/worker-lm7hzwhl


In [3]:
# ------------------------------------------------------------------------------
# Configuration
# ------------------------------------------------------------------------------
source_url = 'https://data.chc.ucsb.edu/products/CHIRPS-2.0/global_daily/netcdf/p25/'
years = range(1981, 2026)
output_dir = dir_list['raw']+'CHIRPS/'

# ------------------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------------------

def download_file(url, dest):
    """Download URL to dest path, if not yet exists"""
    if not os.path.exists(dest):
        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            with open(dest, "wb") as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
    return dest

if len(df.query('model == "CHIRPS" and freq == "day" and suffix != "tmp"')) == 0:
    os.makedirs(output_dir, exist_ok=True)
    
    # ------------------------------------------------------------------------------
    # Download netcdfs for all years
    # ------------------------------------------------------------------------------
    file_paths = []
    
    
    for year in tqdm(years):
        url = f"{source_url}chirps-v2.0.{year}.days_p25.nc"
        fname = 'pr_day_CHIRPS_historical_obs_'+str(year)+'0101-'+str(year)+'1231_tmp.nc'
        dest = os.path.join(output_dir, fname)
        file_paths.append(download_file(url, dest))
    
    # ------------------------------------------------------------------------------
    # Open, save as single file
    # ------------------------------------------------------------------------------
    dss = xr.open_mfdataset(file_paths,chunks='auto')
    dss = xa.fix_ds(dss)
    dss = dss.rename({'precip':'pr'})
    dss.attrs['DESCRIPTION'] = 'CHIRPS 2.0 (global) Daily Precipitation (p25)'
    dss.attrs['SOURCE'] = 'preprocess_CHIRPS.ipynb'

    # Chunk to a manageable size, with time a single chunk,
    # since expected analysis is temporal
    dss = dss.chunk({'lat':10,'lon':10,'time':-1})

    timestr = (re.sub(r'\-','',str(dss.time.min().values)[0:8])+'0101-'+
               re.sub(r'\-','',str(dss.time.max().values)[0:8])+
               str(dss.time.max().dt.daysinmonth.values))

    output_fn = output_dir+'pr_day_CHIRPS_historical_obs_'+timestr+'.zarr'
    
    utility_save(dss,output_fn)
    
    # ------------------------------------------------------------------------------
    # Remove individual files
    # ------------------------------------------------------------------------------
    for fn in file_paths:
        os.system('rm -rf '+fn)
else:
    print('CHIRPS daily data exists!')
    output_fn = df.query('model == "CHIRPS" and freq == "day" and suffix != suffix').iloc[0]['path']

CHIRPS daily data exists!


In [4]:
# ------------------------------------------------------------------------------
# Save monthly data
# ------------------------------------------------------------------------------

if len(df.query('model == "CHIRPS" and freq == "Amon" and suffix != "tmp"')) == 0:
    url = 'https://data.chc.ucsb.edu/products/CHIRPS-2.0/global_monthly/netcdf/chirps-v2.0.monthly.nc'

    fn_dl = dir_list['raw']+'CHIRPS/chirps_monthly_tmp.nc'
    
    # Download monthly
    if not os.path.exists(fn_dl):
        download_file(url, dir_list['raw']+'CHIRPS/chirps_monthly_tmp.nc')
    
    # Load monthly
    ds = xr.open_dataset(fn_dl,chunks='auto')
    
    ds = xa.fix_ds(ds)
    ds = ds.rename({'precip':'pr'})

    # Go from mm/month to mm/day
    ds['pr'] = ds['pr'] / ds.time.dt.daysinmonth
    ds['pr'].attrs['units'] = 'mm/day'
    
    ds.attrs['DESCRIPTION'] = 'CHIRPS 2.0 (global) Monthly Precipitation (p05)'
    ds.attrs['SOURCE'] = 'preprocess_CHIRPS.ipynb'

    timestr = (re.sub(r'\-','',str(ds.time.min().values)[0:8])+'0101-'+
               re.sub(r'\-','',str(ds.time.max().values)[0:8])+
               str(ds.time.max().dt.daysinmonth.values))

    output_fn_monthly = output_dir+'pr_Amon_CHIRPS_historical_obs_'+timestr+'.zarr'
    
    utility_save(ds,output_fn_monthly)

    os.system('rm -rf '+fn_dl)
else:
    print('CHIRPS monthly data exists!')


CHIRPS monthly data exists!
