In [1]:
import os

import xarray as xr
import pandas as pd

from utils import DATA_ROOT
from datetime import datetime
from dask.distributed import Client, LocalCluster

# Choose your extent
# ⚠️ Change your output file as well ⚠️
#
from utils.download import west_us_extent as my_extent
# from utils.download import ga_extent as my_extent

In [2]:
def my_preprocess(ds):
    filepath = ds.encoding['source']

    # Add date coordinate
    time = filepath.split('/')[-1].split('_')[-2]
    date = datetime.strptime(time, '%Y%m%d')

    variable = filepath.split('/')[-1].split('_')[1]
    ds = ds.rename(
        {'band_data': variable, 'band': 'date'}
    ).assign_coords(date=[date])
    
    return ds

In [3]:
cluster = LocalCluster(
    n_workers=120,
    threads_per_worker=1,
    memory_limit='2G',
    dashboard_address=':41228',
)

client = Client(cluster)

In [4]:
# dt_range = pd.date_range('1981-01-01', '2024-09-30', freq='d')
# dt_range = pd.date_range('2019-10-01', '2024-09-30', freq='d')
dt_range = pd.date_range('2019-01-01', '2024-12-31', freq='d')

root_folder = (
    '/expanse/lustre/scratch/weiming/temp_project/data/'
    'input/PRISM/daily_stable'
)

bil_files = [
    os.path.join(
        root_folder,
        f"PRISM_ppt_stable_4kmD2_{i.strftime(format='%Y%m%d')}_bil",
        f"PRISM_ppt_stable_4kmD2_{i.strftime(format='%Y%m%d')}_bil.bil")
    for i in dt_range
] + [
    os.path.join(
        root_folder,
        f"PRISM_tmean_stable_4kmD2_{i.strftime(format='%Y%m%d')}_bil",
        f"PRISM_tmean_stable_4kmD2_{i.strftime(format='%Y%m%d')}_bil.bil")
    for i in dt_range
]

bil_files = [i for i in bil_files if os.path.exists(i)]

In [5]:
%%time

# Bil -> Zarr

out_file_zarr = ''.join([
    DATA_ROOT,
    'derived/PRISM/PRISM_daily_stable_westus_',
    dt_range[0].strftime(format='%Y%m%d'),
    '_',
    dt_range[-1].strftime(format='%Y%m%d'),
    '.zarr',
])

if not os.path.exists(out_file_zarr):
    ds = xr.open_mfdataset(
        bil_files, engine="rasterio", preprocess=my_preprocess,
        parallel=True, chunks={'x': -1, 'y': -1}
    )
    
    ds = ds.drop_vars(['spatial_ref'])

    if 'my_extent' in globals():
        ds = ds.sel(
            x=(my_extent['xmin'] <= ds.x) & (ds.x <= my_extent['xmax']),
            y=(my_extent['ymin'] <= ds.y) & (ds.y <= my_extent['ymax']),
        )
    else:
        print("No spatial subset because my_extent is not defined.")
    
    ds.attrs['creator'] = 'Weiming Hu, huwx@jmu.edu'
    ds.attrs['date_of_creation'] = datetime.today().strftime(format='%Y/%m/%d %H:%M:%S')
    ds.attrs['doc'] = 'https://prism.oregonstate.edu/documents/PRISM_datasets.pdf'

    ds.to_zarr(out_file_zarr, zarr_format=2)

CPU times: user 1min 27s, sys: 42 s, total: 2min 8s
Wall time: 2min 45s


In [17]:
client.close()
cluster.close()

After trying this conversion, I found out that:

1. NetCDF is about 40 MB smaller than Zarr (198 MB vs 231 MB)
2. It took almost 8 hours to complete on a single thread.

So I guess for now, I'm happy with Zarr.