# Create Zarr Stores with Different Chunk Shapes

In this notebook, we create Zarr stores for the CMIP6 TAS daily data available in NetCDF on S3. This method of creating Zarr stores uses [pangeo-forge](https://pangeo-forge.readthedocs.io/) and it's [recipes](https://pangeo-forge.readthedocs.io/en/latest/pangeo_forge_recipes/recipe_user_guide/index.html) pattern.

## 1.1 Install and import libraries

In [2]:
%%capture
!pip uninstall apache-beam -y
!pip install 'apache-beam[interactive, dataframe]==2.48.0' git+https://github.com/carbonplan/cmip6-downscaling.git git+https://github.com/pangeo-forge/pangeo-forge-recipes.git@beam-refactor

In [5]:
import apache_beam as beam
import boto3
from botocore.exceptions import ClientError
import fsspec
import os
from pangeo_forge_recipes.patterns import FilePattern, ConcatDim, MergeDim
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
from pangeo_forge_recipes.storage import FSSpecTarget
import re
import rioxarray
import s3fs
import xarray as xr
import eodc_hub_role

In [6]:
credentials = eodc_hub_role.fetch_and_set_credentials()
bucket = 'nasa-eodc-data-store'

Note: This is adapted from https://github.com/carbonplan/benchmark-maps/blob/datasets/stores/01b_cmip6_netcdf_to_zarr.ipynb.

## 1.2 Set parameters

In [7]:
#parameters
temporal_resolution = "daily"
model = "GISS-E2-1-G"
variable = "tas"
anon=True

In [8]:
# Initiate fsspec filesystems for reading and writing
s3_path = f"s3://nex-gddp-cmip6/NEX-GDDP-CMIP6/{model}/historical/r1i1p1*/{variable}/*"
fs_read = fsspec.filesystem("s3", anon=anon, skip_instance_cache=False)
fs_write = fsspec.filesystem("")

In [9]:
# Retrieve list of available months
files_paths = fs_read.glob(s3_path)
print(f"{len(files_paths)} discovered from {s3_path}")

65 discovered from s3://nex-gddp-cmip6/NEX-GDDP-CMIP6/GISS-E2-1-G/historical/r1i1p1*/tas/*


In [10]:
files_paths[0]

'nex-gddp-cmip6/NEX-GDDP-CMIP6/GISS-E2-1-G/historical/r1i1p1f2/tas/tas_day_GISS-E2-1-G_historical_r1i1p1f2_gn_1950.nc'

## 1.3 Test we can open the files

In [11]:
fs_s3 = s3fs.S3FileSystem(anon=True)
filepath = f's3://{files_paths[0]}'
f = fs_s3.open(filepath, mode='rb')
ds = xr.open_dataset(f)
ds

# 2: Setup the destination

In [12]:
def format_function(time):
    pattern = r"\b\d{4}\b"
    return re.sub(pattern, str(time), filepath)

years = list(range(1950, 1952))
time_dim = ConcatDim("time", keys=years, nitems_per_file=365)

pattern = FilePattern(format_function, time_dim, file_type="netcdf4")
pattern = FilePattern.prune(pattern, nkeep=2)

In [13]:
fs = s3fs.S3FileSystem(
    key=credentials['AccessKeyId'],
    secret=credentials['SecretAccessKey'],
    token=credentials['SessionToken'], 
    anon=False
)
target_root = FSSpecTarget(fs=fs, root_path=bucket)
print(f"Using {pattern.items()}")

Using <generator object FilePattern.items at 0x7f0954525150>


# 3: Set different target chunks

For different sets of chunks, generate a zarr store.

In [14]:
chunk_sets = []
# Optimized for analysis
temporal_target_chunks = { 'lat': ds.lat.shape[0], 'lon': ds.lon.shape[0], 'time': 29 }
chunk_sets.append(temporal_target_chunks)

In [15]:
# Optimized for visualization at a single time step
global_target_chunks = { 'lat': ds.lat.shape[0], 'lon': ds.lon.shape[0], 'time': 1 }
global_target_chunks
chunk_sets.append(global_target_chunks)

In [17]:
# Optimized for time series
#spatial_target_chunks = calc_auspicious_chunks_dict(ds[variable], chunk_dims=('lat','lon',))
spatial_target_chunks = {'time': 365, 'lat': 262, 'lon': 262}
chunk_sets.append(spatial_target_chunks)

In [18]:
chunk_sets

[{'lat': 600, 'lon': 1440, 'time': 29},
 {'lat': 600, 'lon': 1440, 'time': 1},
 {'time': 365, 'lat': 262, 'lon': 262}]

In [22]:
s3 = boto3.client(
    's3',
    aws_access_key_id=credentials['AccessKeyId'],
    aws_secret_access_key=credentials['SecretAccessKey'],
    aws_session_token=credentials['SessionToken']    
)

for chunk_set in chunk_sets:
    dir_path = str(("_").join(map(str, chunk_set.values())))
    store_name = f"{dir_path}/CMIP6_{temporal_resolution}_{model}_{variable}.zarr"
    try:
        key = f"{store_name}/.zmetadata"
        response = s3.head_object(Bucket=bucket, Key=key)
        print(f"File '{store_name}' exists in bucket '{bucket}'.")
        continue
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            print(f"File '{key}' does not exist in bucket '{bucket}'.")
        else:
            print(f"Error occurred: {e}")
            raise e
    print(f"Writing to {target_root}/{store_name}")
    transforms = (
        beam.Create(pattern.items())
        | OpenURLWithFSSpec(open_kwargs={'anon': True})
        | OpenWithXarray(file_type=pattern.file_type)
        | StoreToZarr(
            store_name=store_name,
            target_root=target_root,
            combine_dims=pattern.combine_dim_keys,
            target_chunks=chunk_set,
        )
    )
    # Commented out so we don't re-run if we don't intend to
    # with beam.Pipeline() as p:
    #     p | transforms

File '600_1440_29/CMIP6_daily_GISS-E2-1-G_tas.zarr' exists in bucket 'nasa-eodc-data-store'.
File '600_1440_1/CMIP6_daily_GISS-E2-1-G_tas.zarr' exists in bucket 'nasa-eodc-data-store'.
File '365_262_262/CMIP6_daily_GISS-E2-1-G_tas.zarr' exists in bucket 'nasa-eodc-data-store'.


In [37]:
import zarr
import s3fs
s3fs.core.setup_logging("DEBUG")

In [39]:
dir_path = str(("_").join(map(str, chunk_sets[1].values())))
store_name = f"{dir_path}/CMIP6_{temporal_resolution}_{model}_{variable}.zarr"

In [40]:
store = s3fs.S3Map(root=f"{bucket}/{store_name}", s3=fs, check=True)
#zarr.consolidate_metadata(store)

2023-07-27 23:06:16,753 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'nasa-eodc-data-store', 'Key': '600_1440_1/CMIP6_daily_GISS-E2-1-G_tas.zarr'}
2023-07-27 23:06:16,791 - s3fs - DEBUG - _error_wrapper -- Client error (maybe retryable): An error occurred (404) when calling the HeadObject operation: Not Found
2023-07-27 23:06:16,792 - s3fs - DEBUG - _call_s3 -- CALL: list_objects_v2 - ({},) - {'Bucket': 'nasa-eodc-data-store', 'Prefix': '600_1440_1/CMIP6_daily_GISS-E2-1-G_tas.zarr/', 'Delimiter': '/', 'MaxKeys': 1}
2023-07-27 23:06:16,825 - s3fs - DEBUG - _call_s3 -- CALL: put_object - () - {'Bucket': 'nasa-eodc-data-store', 'Key': '600_1440_1/CMIP6_daily_GISS-E2-1-G_tas.zarr/a'}
2023-07-27 23:06:16,856 - s3fs - DEBUG - _call_s3 -- CALL: delete_objects - ({},) - {'Bucket': 'nasa-eodc-data-store', 'Delete': {'Objects': [{'Key': '600_1440_1/CMIP6_daily_GISS-E2-1-G_tas.zarr/a'}], 'Quiet': True}}


In [41]:
xr.open_zarr(store, consolidated=True)

2023-07-27 23:06:21,437 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'nasa-eodc-data-store', 'Key': '600_1440_1/CMIP6_daily_GISS-E2-1-G_tas.zarr'}
2023-07-27 23:06:21,469 - s3fs - DEBUG - _error_wrapper -- Client error (maybe retryable): An error occurred (404) when calling the HeadObject operation: Not Found
2023-07-27 23:06:21,469 - s3fs - DEBUG - _call_s3 -- CALL: list_objects_v2 - ({},) - {'Bucket': 'nasa-eodc-data-store', 'Prefix': '600_1440_1/CMIP6_daily_GISS-E2-1-G_tas.zarr/', 'Delimiter': '/', 'MaxKeys': 1}
2023-07-27 23:06:21,505 - s3fs - DEBUG - _call_s3 -- CALL: put_object - () - {'Bucket': 'nasa-eodc-data-store', 'Key': '600_1440_1/CMIP6_daily_GISS-E2-1-G_tas.zarr/a'}
2023-07-27 23:06:21,531 - s3fs - DEBUG - _call_s3 -- CALL: delete_objects - ({},) - {'Bucket': 'nasa-eodc-data-store', 'Delete': {'Objects': [{'Key': '600_1440_1/CMIP6_daily_GISS-E2-1-G_tas.zarr/a'}], 'Quiet': True}}
2023-07-27 23:06:21,558 - s3fs - DEBUG - _call_s3 -- CALL: get_object -

Unnamed: 0,Array,Chunk
Bytes,2.35 GiB,3.30 MiB
Shape,"(730, 600, 1440)","(1, 600, 1440)"
Dask graph,730 chunks in 2 graph layers,730 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 2.35 GiB 3.30 MiB Shape (730, 600, 1440) (1, 600, 1440) Dask graph 730 chunks in 2 graph layers Data type float32 numpy.ndarray",1440  600  730,

Unnamed: 0,Array,Chunk
Bytes,2.35 GiB,3.30 MiB
Shape,"(730, 600, 1440)","(1, 600, 1440)"
Dask graph,730 chunks in 2 graph layers,730 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [33]:
store_name

'600_1440_1/CMIP6_daily_GISS-E2-1-G_tas.zarr'

# 4: Check it worked

In [23]:
for chunk_set in chunk_sets:
    dir_path = str(("_").join(map(str, chunk_set.values())))
    store_name = f"{dir_path}/CMIP6_{temporal_resolution}_{model}_{variable}.zarr"
    key = f"{store_name}/.zmetadata"
    response = s3.head_object(Bucket=bucket, Key=key)
    object_size = response['ContentLength']
    object_size_MB = object_size / (1024)    
    print(f"Size of metadata {object_size_MB} KB")
    
    store = s3fs.S3Map(root=f"{bucket}/{store_name}", s3=fs, check=True)
    ds = xr.open_zarr(store, consolidated=True)
    print(ds)

Size of metadata 5.2890625 KB
<xarray.Dataset>
Dimensions:  (lat: 600, lon: 1440, time: 730)
Coordinates:
  * lat      (lat) float64 -59.88 -59.62 -59.38 -59.12 ... 89.38 89.62 89.88
  * lon      (lon) float64 0.125 0.375 0.625 0.875 ... 359.1 359.4 359.6 359.9
  * time     (time) object 1950-01-01 12:00:00 ... 1950-12-31 12:00:00
Data variables:
    tas      (time, lat, lon) float32 dask.array<chunksize=(29, 600, 1440), meta=np.ndarray>
Attributes: (12/23)
    Conventions:           CF-1.7
    activity:              NEX-GDDP-CMIP6
    cmip6_institution_id:  NASA-GISS
    cmip6_license:         CC-BY-SA 4.0
    cmip6_source_id:       GISS-E2-1-G
    contact:               Dr. Rama Nemani: rama.nemani@nasa.gov, Dr. Bridget...
    ...                    ...
    scenario:              historical
    source:                BCSD
    title:                 GISS-E2-1-G, r1i1p1f2, historical, global downscal...
    tracking_id:           25d6baa3-0404-4eba-a3f1-afddbf69d4cc
    variant_label: 