# Generate CMIP STAC Items and Load them into a pgSTAC database

This notebook walks through generating STAC items from [NEX GDDP CMIP6 COGs on AWS](https://aws.amazon.com/marketplace/pp/prodview-k6adk576fiwmm#resources).

As-is it uses daily data from the `GISS-E2-1-G` model, the `tas` variable and loads data from 1950 and 1951. The bucket has other data available. It includes monthly aggregates, other models, other variables and more years. The scripts below can easily be modified to STAC-ify other data in the nex-gddp-cmip6-cog bucket.


In [1]:
!pip install boto3 fsspec pystac rio-stac s3fs
import fsspec
import json
from pystac import Collection, Asset
from datetime import datetime
import rio_stac
from pprint import pprint
import concurrent.futures
import threading


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
# Specify the CMIP model and variable to use
model = "GISS-E2-1-G"
variable = "tas"

## Discover the COG files on S3 using fsspec and `.glob`


In [3]:
anon = True
s3_path = f"s3://nex-gddp-cmip6-cog/daily/{model}/historical/r1i1p1f2/{variable}/"

In [4]:
fs_read = fsspec.filesystem("s3", anon=anon)

severe performance issues, see also https://github.com/dask/dask/issues/10276

To fix, you should specify a lower version bound on s3fs, or
update the current installation.



In [5]:
file_paths = fs_read.glob(f"{s3_path}*")
print(f"{len(file_paths)} discovered from {s3_path}")

23725 discovered from s3://nex-gddp-cmip6-cog/daily/GISS-E2-1-G/historical/r1i1p1f2/tas/


## Subset the data so we don't process all historical data

But you can if you want!


In [6]:
# Here we prepend the prefix 's3://', which points to AWS.
subset_files = sorted(
    ["s3://" + f for f in file_paths if "_1950_" in f or "_1951_" in f]
)

## Double check we discovered some files


In [7]:
if len(subset_files) == 0:
    raise Exception(f"No files to process. Do COGs for the {model} model exist?")
else:
    print(
        f"Subseted data to files for 1950 and 1951. {len(subset_files)} files to process."
    )

Subseted data to files for 1950 and 1951. 730 files to process.


## Setup the collection and items

The collection is statically defined in a json file, but can be modified as desired. Then, iterate throug all the files in S3 and create STAC Item JSON using `rio_stac`. Write all the JSON to an `ndjson` file for inserting.


In [8]:
file_prefix = f"CMIP6_daily_{model}_{variable}"
stac_items_file = f"{file_prefix}_stac_items.ndjson"
collection_json = json.loads(open(f"{file_prefix}_collection.json").read())
collection = Collection.from_dict(collection_json)

In [9]:
# clear the ndjson items file
with open(stac_items_file, "w") as file:
    pass

In [10]:
def process_item(s3_file, file, lock):
    print(f"Processing {s3_file}")
    filename = s3_file.split("/")[-1]
    year, month, day = filename.split("_")[-3:]
    day = day.replace(".tif", "")
    datetime_ = datetime.strptime(f"{year}{month}{day}", "%Y%m%d")
    # Create a new Item
    item = rio_stac.create_stac_item(
        id=filename,
        source=s3_file,
        collection=collection.id,
        input_datetime=datetime_,
        with_proj=True,
        with_raster=True,
        asset_name="data",
        asset_roles=["data"],
        asset_media_type="image/tiff; application=geotiff; profile=cloud-optimized",
        assets={
            "tiling": Asset(
                href=s3_file,
                roles=["virtual", "tiling"],
                title="tiling",
                description="Virtual asset for tiling",
                extra_fields={
                    "compose:rescale": [210, 330],
                    "compose:colormap_name": "hot",
                },
            )
        },
    )
    with lock:
        file.write(json.dumps(item.to_dict()) + "\n")

NOTE: This can take awhile if processing all 730 file which is why it is subset to only 2 files below, for demonstration purposes.


In [11]:
lock = threading.Lock()
file = open(stac_items_file, "a")
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [
        executor.submit(process_item, obj, file, lock) for obj in subset_files[0:2]
    ]
    [future.result() for future in concurrent.futures.as_completed(futures)]
file.close()

Processing s3://nex-gddp-cmip6-cog/daily/GISS-E2-1-G/historical/r1i1p1f2/tas/tas_day_GISS-E2-1-G_historical_r1i1p1f2_gn_1950_01_01.tifProcessing s3://nex-gddp-cmip6-cog/daily/GISS-E2-1-G/historical/r1i1p1f2/tas/tas_day_GISS-E2-1-G_historical_r1i1p1f2_gn_1950_01_02.tif



# Final step - seed the database


In [16]:
!./seed-db.sh {model} {variable}

postgresql://postgres:password@localhost:5432/postgres
Inserting collection from CMIP6_daily_GISS-E2-1-G_tas_collection.json
error connecting in 'pool-1': connection failed: FATAL:  role "postgres" does not exist
error connecting in 'pool-1': connection failed: FATAL:  role "postgres" does not exist
error connecting in 'pool-1': connection failed: FATAL:  role "postgres" does not exist
error connecting in 'pool-1': connection failed: FATAL:  role "postgres" does not exist
^C
Traceback (most recent call last):
  File "/Users/alukach/Projects/devseed/eoapi/jupyterhub-auth/.venv/bin/pypgstac", line 8, in <module>
    sys.exit(cli())
             ^^^^^
  File "/Users/alukach/Projects/devseed/eoapi/jupyterhub-auth/.venv/lib/python3.11/site-packages/pypgstac/pypgstac.py", line 125, in cli
    fire.Fire(PgstacCLI)
  File "/Users/alukach/Projects/devseed/eoapi/jupyterhub-auth/.venv/lib/python3.11/site-packages/fire/core.py", line 141, in Fire
    component_trace = _Fire(component, args, parsed