In [None]:
import xarray as xr
import fsspec
import rioxarray

In [None]:
with fsspec.open(
    's3://nex-gddp-cmip6-cog/monthly/CMIP6_ensemble_median/tasmax/tasmax_month_ensemble-median_historical_195001.tif'
) as f:
    ds = rioxarray.open_rasterio(f)

In [None]:
TARGET_MB = 100

from utils import calc_chunk_dict

target_chunks = calc_chunk_dict(ds, TARGET_MB)

In [None]:
import pandas as pd

In [None]:
input_url_pattern = "s3://nex-gddp-cmip6-cog/monthly/CMIP6_ensemble_median/tasmax/tasmax_month_ensemble-median_historical_{yyyymm}.tif"

In [None]:
dates = pd.date_range("1950-01-01", "2014-12-31", freq="M", inclusive="both")
dates
input_urls = [input_url_pattern.format(yyyymm=day.strftime("%Y%m")) for day in dates]

In [None]:
from pangeo_forge_recipes.patterns import pattern_from_file_sequence

pattern = pattern_from_file_sequence(input_urls, "time", nitems_per_file=1)
pattern

In [None]:
import apache_beam as beam
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr

In [None]:
import os

target_root = (
    "s3://carbonplan-scratch/maps-data/c0/nex-gddp-cmip6/monthly/CMIP6_ensemble_median/tasmax/"
)
store_name = "tasmax_month_ensemble-median_historical"
target_store = os.path.join(target_root, store_name)

In [None]:
pattern = pattern.prune(nkeep=15)
pattern

In [None]:
from pangeo_forge_recipes.transforms import Indexed, T


class SetTimeAsCoord(beam.PTransform):
    """A preprocessing function to assign time from value in filepattern"""

    @staticmethod
    def _set_time_as_coord(item: Indexed[T]) -> Indexed[T]:
        index, ds = item
        time = xda.encoding["source"].split("_")[-1].split(".")[0]
        ds = (
            ds.expand_dims(time=[np.datetime64(time[0:4] + '-' + time[4:6])])
            .squeeze(dim=["band"], drop=True)
            .drop("spatial_ref")
            .rename({"band_data": "tasmax", "x": "lon", "y": "lat"})
            .sortby("lat")
        )
        return index, ds

    def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
        return pcoll | beam.Map(self._set_time_as_coord)

In [None]:
transforms = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec(open_kwargs={'anon': True})
    | OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={'engine': 'rasterio'})
    | SetTimeAsCoord()
    | StoreToZarr(
        store_name=store_name,
        target_root=target_root,
        combine_dims=pattern.combine_dim_keys,
        target_chunks=target_chunks,
    )
)
transforms

In [None]:
with beam.Pipeline() as p:
    p | transforms