In [1]:
import os
import s3fs
import xarray as xr
import apache_beam as beam
from pangeo_forge_recipes.storage import FSSpecTarget
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import OpenWithXarray, StoreToZarr

Example notebook for converting and rechunking gridded netcdf data ready for object storage, using pangeo-forge-recipes. Please note that his notebook is intended to serve as an example only, and be adapted for your own datasets.

The files are organised as one file per RCM/ensemble member (12 in total).

In [2]:
!ls ../data/G2G/preproc

G2G_DailyRiverFlow_NATURAL_RCM01_19801201_20801130.nc
G2G_DailyRiverFlow_NATURAL_RCM04_19801201_20801130.nc
G2G_DailyRiverFlow_NATURAL_RCM05_19801201_20801130.nc
G2G_DailyRiverFlow_NATURAL_RCM06_19801201_20801130.nc
G2G_DailyRiverFlow_NATURAL_RCM07_19801201_20801130.nc
G2G_DailyRiverFlow_NATURAL_RCM08_19801201_20801130.nc
G2G_DailyRiverFlow_NATURAL_RCM09_19801201_20801130.nc
G2G_DailyRiverFlow_NATURAL_RCM10_19801201_20801130.nc
G2G_DailyRiverFlow_NATURAL_RCM11_19801201_20801130.nc
G2G_DailyRiverFlow_NATURAL_RCM12_19801201_20801130.nc
G2G_DailyRiverFlow_NATURAL_RCM13_19801201_20801130.nc
G2G_DailyRiverFlow_NATURAL_RCM15_19801201_20801130.nc


One of the files looks like:

In [3]:
!ncdump -h ../data/G2G/preproc/G2G_DailyRiverFlow_NATURAL_RCM01_19801201_20801130.nc

netcdf G2G_DailyRiverFlow_NATURAL_RCM01_19801201_20801130 {
dimensions:
	Time = UNLIMITED ; // (36000 currently)
	RCM = 1 ;
	Northing = 1000 ;
	Easting = 700 ;
variables:
	string RCM(RCM) ;
	float Northing(Northing) ;
		Northing:_FillValue = NaNf ;
		Northing:standard_name = "Northing" ;
		Northing:axis = "Y" ;
		Northing:units = "GB National Grid" ;
	float Easting(Easting) ;
		Easting:_FillValue = NaNf ;
		Easting:standard_name = "Easting" ;
		Easting:axis = "X" ;
		Easting:units = "GB National Grid" ;
	float Time(Time) ;
		Time:_FillValue = NaNf ;
		Time:standard_name = "Time" ;
		Time:axis = "T" ;
		Time:units = "days since 1961-01-01" ;
		Time:calendar = "360_day" ;
	float dmflow(RCM, Time, Northing, Easting) ;
		dmflow:_FillValue = -999.f ;
		dmflow:units = "m3 s-1" ;
		dmflow:standard_name = "dmflow" ;
		dmflow:long_name = "Daily mean river flow" ;
		dmflow:missing_value = -999.f ;
}


--- 

First step is to define a 'ConcatDim' object/variable which contains the name of the dimension along which we want to concatenate the files, the values of the dimensions in the files (in the order that we'd like them contacatenate?) and the number of dimension elements within each file, if it is constant (e.g. for monthly files on a 360 calendar this would be 30). 

In [2]:
RCMs = ["01", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "15"]
RCM_concat_dim = ConcatDim("RCM", RCMs, nitems_per_file=1)

Next, we define the function that translates a given RCM into a file path. The function must have the same number of arguments as the number of Combine Dimensions and the name of the argument must match the name of the the Combine Dimension.

In [3]:
indir = "/home/users/mattjbr/object_storage/data/G2G/preproc"
pre = "G2G_DailyRiverFlow_NATURAL_RCM"
suf = "_19801201_20801130.nc"


def make_path(RCM):
    return os.path.join(indir, pre + RCM + suf)

Then these are put into a FilePattern object

In [4]:
pattern = FilePattern(make_path, RCM_concat_dim)

Next, we prune the FilePattern for testing

In [5]:
pattern_pruned = pattern.prune()

And create the test recipe, outputting to local disk, and specifying the chunks to rechunk the dataset to

In [None]:
target_root = (
    "/users/sgsys/matbro/object_storage/object_storage/data/output"  ## output folder
)
tn = "test.zarr"  ## output filename

target_chunks = {
    "RCM": 1,
    "Time": 360,
    "Northing": 100,
    "Easting": 100,
}  ## length of each dimension of the desired chunks

transforms = (
    beam.Create(pattern_pruned.items())
    | OpenWithXarray(file_type=pattern_pruned.file_type)
    | StoreToZarr(
        target_root=target_root,
        store_name=tn,
        combine_dims=pattern.combine_dim_keys,
        target_chunks=target_chunks,
    )
)

Run the recipe in parallel

In [None]:
from apache_beam.options.pipeline_options import PipelineOptions

beam_options = PipelineOptions(
    direct_num_workers=8, direct_running_mode="multi_processing"
)
with beam.Pipeline(options=beam_options) as p:
    p | transforms

Alternatively, the converted dataset can be output direct to object storage, however this is quite buggy with the beam's 'Direct' runner, and tends to stall out. Future work will look at using beam's other runners.

In [None]:
fs = s3fs.S3FileSystem(
    anon=False,
    key="xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    secret="yyyyyyyyyyyyyyyyyyyyyyyyyyy",
    client_kwargs={"endpoint_url": "https://chess-scape-o.s3-ext.jc.rl.ac.uk"},
)

target_root = FSSpecTarget(fs=fs, root_path="s3://g2g-test")
tn = "test.zarr"

target_chunks = {
    "RCM": 1,
    "Time": 360,
    "Northing": 100,
    "Easting": 100,
}  ## length of each dimension of the desired chunks

transforms = (
    beam.Create(pattern_pruned.items())
    | OpenWithXarray(file_type=pattern_pruned.file_type)
    | StoreToZarr(
        target_root=target_root,
        store_name=tn,
        combine_dims=pattern.combine_dim_keys,
        target_chunks=target_chunks,
    )
)

-------------

Check the output dataset:

In [11]:
xr.open_dataset("/work/scratch-pw2/mattjbr/testoutput.zarr")

In [15]:
import zarr

tzar = zarr.open("/work/scratch-pw2/mattjbr/testoutput.zarr/dmflow")

In [16]:
tzar.info

0,1
Type,zarr.core.Array
Data type,float32
Shape,"(2, 36000, 1000, 700)"
Chunk shape,"(1, 360, 100, 100)"
Order,C
Read-only,False
Compressor,"Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)"
Store type,zarr.storage.DirectoryStore
No. bytes,201600000000 (187.8G)
No. bytes stored,31345654783 (29.2G)
