In [1]:
import concurrent.futures
import json
import xarray as xr

import helpers
from create_stac_item import create_stac_item

In [13]:
bucket = 'nasa-eodc-scratch'
timechunks = '06'
directory = f'NLDAS/netcdf/.timechunk{timechunks}'
num_workers = 16

In [None]:
credentials = helpers.get_credentials()
s3fsfs = helpers.create_s3filesystem(credentials)
s3fsfs

In [5]:
files = s3fsfs.glob(f's3://{bucket}/{directory}/*.nc')

In [6]:
len(files)

31

In [7]:
%%time
stac_items = []

def open_and_generate_stac(file: str):
    ds = xr.open_dataset(s3fsfs.open(f's3://{file}'), chunks={})
    return create_stac_item(ds, f's3://{file}')

with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
    stac_items = list(executor.map(open_and_generate_stac, files))

CPU times: user 1min 58s, sys: 56.9 s, total: 2min 55s
Wall time: 19min 40s


In [8]:
len(stac_items)

31

In [9]:
stac_items_as_dicts = []
for stac_item in stac_items:
    item_dict = stac_item.to_dict()
    item_dict['properties'] = helpers.convert_numpy(item_dict['properties'])
    stac_items_as_dicts.append(item_dict)

In [10]:
stac_collection = {
    "type": "FeatureCollection",
    "features": stac_items_as_dicts
}
#stac_collection

In [14]:
intake_filename = f'intake_{timechunks}.json'
with open(intake_filename, 'w') as f:
    f.write(json.dumps(stac_collection, indent=2))

In [15]:
s3fsfs.put(intake_filename, f's3://{bucket}/{directory}/{intake_filename}')

[None]