In [19]:
import os
import fsspec
import xarray as xr
from fsspec.implementations.sftp import SFTPFileSystem

Create SFTPFileSystem object to explore SFTP structure

In [33]:
sftp_host = os.environ["GLEAM_FTP"][7:-1] # host should not have sftp prefix
gleam_creds_sftp = dict(
    username = os.environ["GLEAM_USER"],
    password = os.environ["GLEAM_PASSWORD"],
    port = int(os.environ["GLEAM_PORT"])
    )


In [None]:
print(sftp_host)

In [34]:
# create link
fs_sftp = SFTPFileSystem(host=sftp_host, **gleam_creds_sftp)

Explore file structure

In [35]:
fs_sftp.ls("./")

['./data']

In [36]:
fs_sftp.ls("./data")

['./data/CHANGELOG',
 './data/README_GLEAM_v3.7.pdf',
 './data/v3.7b',
 './data/v3.8a']

In [None]:
fs_sftp.ls("./data/v3.7b")

In [None]:
fs_sftp.ls("./data/v3.7b/daily")

In [37]:
fs_sftp.ls("./data/v3.7b/daily/2003")

['./data/v3.7b/daily/2003/E_2003_GLEAM_v3.7b.nc',
 './data/v3.7b/daily/2003/Eb_2003_GLEAM_v3.7b.nc',
 './data/v3.7b/daily/2003/Ei_2003_GLEAM_v3.7b.nc',
 './data/v3.7b/daily/2003/Ep_2003_GLEAM_v3.7b.nc',
 './data/v3.7b/daily/2003/Es_2003_GLEAM_v3.7b.nc',
 './data/v3.7b/daily/2003/Et_2003_GLEAM_v3.7b.nc',
 './data/v3.7b/daily/2003/Ew_2003_GLEAM_v3.7b.nc',
 './data/v3.7b/daily/2003/SMroot_2003_GLEAM_v3.7b.nc',
 './data/v3.7b/daily/2003/SMsurf_2003_GLEAM_v3.7b.nc',
 './data/v3.7b/daily/2003/S_2003_GLEAM_v3.7b.nc']

In [None]:
# this will download the file
# fs_sftp.get('data/v3.7b/daily/2003/E_2003_GLEAM_v3.7b.nc', "./download")

Create dictionary of environmental variables for GLEAM

In [20]:
gleam_creds = dict(
    username = os.environ["GLEAM_USER"],
    password = os.environ["GLEAM_PASSWORD"],
    port = int(os.environ["GLEAM_PORT"])
    )

Check that open file objects will work

In [38]:
years = range(2003, 2023)

for year in years:
    # format file path on SFTP
    filepath = f"data/v3.7b/daily/{year}/Et_{year}_GLEAM_v3.7b.nc"
    # create full URL
    urlpath = os.environ["GLEAM_FTP"] + filepath

    # add to credentials to send to fsspec.open
    gleam_creds["urlpath"] = urlpath

    # create OpenFile object
    file = fsspec.open(**gleam_creds)
    print(file)

<OpenFile '/data/v3.7b/daily/2003/Et_2003_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2004/Et_2004_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2005/Et_2005_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2006/Et_2006_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2007/Et_2007_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2008/Et_2008_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2009/Et_2009_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2010/Et_2010_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2011/Et_2011_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2012/Et_2012_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2013/Et_2013_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2014/Et_2014_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2015/Et_2015_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2016/Et_2016_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2017/Et_2017_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2018/Et_2018_GLEAM_v3.7b.nc'>
<OpenFile '/data/v3.7b/daily/2019/Et_2019_GLEAM_v3.7b.nc

## Start creating pangeo-forge-recipe

In [21]:
import pandas as pd
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
import apache_beam as beam
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
from tempfile import TemporaryDirectory

Create time range

In [32]:
dates = pd.date_range("2003", "2005", freq="A")

In [23]:
time_concat_dim = ConcatDim("time", dates, nitems_per_file=1)
time_concat_dim

ConcatDim(name='time', nitems_per_file=1)

In [24]:
base_url = "sftp://hydras.ugent.be/data/v3.7b/daily/{time:%Y}/Et_{time:%Y}_GLEAM_v3.7b.nc"

In [25]:
def make_url(time):
    return base_url.format(time=time)

In [6]:
make_url(dates[-1])

'sftp://hydras.ugent.be/data/v3.7b/daily/2003/Et_2003_GLEAM_v3.7b.nc'

In [26]:
pattern = FilePattern(make_url, time_concat_dim, fsspec_open_kwargs=gleam_creds)
pattern

<FilePattern {'time': 2}>

In [27]:
for index, url in pattern.items():
    print(index)
    print(url)
    # Stop after the 3rd filepath (September 3rd, 1981)
    if '19810903' in url:
        break

{Dimension(name='time', operation=<CombineOp.CONCAT: 2>): Position(value=0, indexed=False)}
sftp://hydras.ugent.be/data/v3.7b/daily/2002/Et_2002_GLEAM_v3.7b.nc
{Dimension(name='time', operation=<CombineOp.CONCAT: 2>): Position(value=1, indexed=False)}
sftp://hydras.ugent.be/data/v3.7b/daily/2003/Et_2003_GLEAM_v3.7b.nc


In [29]:
td = TemporaryDirectory()
target_path = td.name
target_name = "output.zarr"
target_path

'/tmp/tmplepmcor8'

In [30]:
transforms = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec()
    | OpenWithXarray(file_type=pattern.file_type)
    | StoreToZarr(
        target_root=target_path,
        store_name=target_name,
        combine_dims=pattern.combine_dim_keys,
    )
)
transforms

<_ChainedPTransform(PTransform) label=[Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr] at 0x7f1bff49ae00>

Run!

In [31]:
with beam.Pipeline() as p:
    p | transforms

Custom TB Handler failed, unregistering


Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 838, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 984, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in <lambda>
  File "/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/pangeo_forge_recipes/transforms.py", line -1, in wrapper
  File "/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/pangeo_forge_recipes/openers.py", line 35, in open_url
    open_file = _get_opener(url, secrets, **kw)
  File "/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/pangeo_forge_recipes/storage.py", line 212, in _get_opener
    return fsspec.open(fname, mode="rb", **open_kwargs)
  File "