This script was used to transform the CMS CO₂ flux dataset provided in Cloud Optimized GeoTIFF (COG) format for display in the Greenhouse Gas (GHG) Center.

In [1]:
import xarray
import re
import pandas as pd
import boto3
import glob
import s3fs
import tempfile
from datetime import datetime
import os
import hashlib
import json

In [2]:
config = {
    "data_acquisition_method": "s3",
    "raw_data_bucket" : "ghgc-data-store-develop",
    "raw_data_prefix": "delivery/cms-co2-flux-monthgrid-v1",
    "cog_data_bucket": "ghgc-data-store-develop",
    "cog_data_prefix": "transformed_cogs",
    "date_fmt" :"%Y%m",
    "transformation": {
        "reverse_lat": True,
        "assign_coords": True,
        "filter_variable": {
            "include": [],
            "exclude": ['lat','lon','latitude','longitude','area'],
            "add" : {
                "name" : "",
                "from_variable": "",
                "custom_logic": ""
            }
        }
    }
}

In [3]:
session = boto3.session.Session()
s3_client = session.client("s3")

raw_data_bucket = config["raw_data_bucket"]
raw_data_prefix= config["raw_data_prefix"]

cog_data_bucket = config['cog_data_bucket']
cog_data_prefix= config["cog_data_prefix"]

date_fmt=config['date_fmt']

fs = s3fs.S3FileSystem()

files_processed = pd.DataFrame(columns=["file_name", "COGs_created"])

In [4]:
def get_all_s3_keys(bucket, model_name, ext):
    """Get a list of all keys in an S3 bucket."""
    keys = []

    kwargs = {"Bucket": bucket, "Prefix": f"{model_name}/"}
    while True:
        resp = s3_client.list_objects_v2(**kwargs)
        for obj in resp["Contents"]:
            if obj["Key"].endswith(ext) and "historical" not in obj["Key"]:
                keys.append(obj["Key"])

        try:
            kwargs["ContinuationToken"] = resp["NextContinuationToken"]
        except KeyError:
            break

    return keys

keys = get_all_s3_keys(raw_data_bucket, raw_data_prefix, ".nc")

In [5]:
base_date = datetime.strptime(keys[0].split(".")[-2][-16:-10], "%Y%m")
def convert_months_to_date(months, base_date):
    return base_date + pd.DateOffset(months=months)

for key in keys:
    url=f"s3://{raw_data_bucket}/{key}"
    filename = key.split("/")[-1] # CMSFluxNBE201001_202212_v3.nc
    with fs.open(url) as file_obj:
        xds=xarray.open_dataset(file_obj,decode_times=False)
        xds = xds.assign_coords(longitude=(((xds.longitude + 180) % 360) - 180)).sortby("longitude")
        new_time_values = [convert_months_to_date(month, base_date) for month in xds['time'].values]
        xds['time'] = xarray.DataArray(new_time_values, dims='time')
        variables = [var for var in xds.data_vars if var not in config["transformation"]["filter_variable"]["exclude"]]
        for time_increment in range(0, len(xds.time)):
            for var in variables:
                filename_elements = filename.split("_") # CMSFluxNBE201001 202212 v3.nc
                data = getattr(xds.isel(time=time_increment), var)
                #data = data.fillna(-9999)
                data= data.where(data!=-999, -9999) #as per documentation. there are no nan or -999 values when checked
                
                data.rio.set_spatial_dims("longitude", "latitude", inplace=True)
                data.rio.write_crs("epsg:4326", inplace=True)
                data.rio.write_nodata(-9999, inplace=True)

                date = data.time.dt.strftime(date_fmt).item(0)
                filename_elements[-1] = var
                filename_elements.append(date)
                cog_filename = "_".join(filename_elements)
                cog_filename = f"{cog_filename}.tif"

                #data.rio.to_raster(f"cms-co2-flux-monthgrid-v1/{cog_filename}", driver="COG", compress="DEFLATE")

                with tempfile.NamedTemporaryFile() as temp_file:
                    data.rio.to_raster(temp_file.name, driver="COG", compress="DEFLATE")
                    s3_client.upload_file(
                        Filename=temp_file.name,
                        Bucket=cog_data_bucket,
                        Key=f"{cog_data_prefix}/cms-co2-flux-monthgrid-v1/{cog_filename}",
                    )
                del data
                
    print("COG creation done for", key)

delivery/cms-co2-flux-monthgrid-v1/CMSFluxFossilFuelPrior201001_202212_v3.nc ['fossil']
COG creation done for delivery/cms-co2-flux-monthgrid-v1/CMSFluxFossilFuelPrior201001_202212_v3.nc
delivery/cms-co2-flux-monthgrid-v1/CMSFluxLandPrior201001_202212_v3.nc ['NBE_prior']
COG creation done for delivery/cms-co2-flux-monthgrid-v1/CMSFluxLandPrior201001_202212_v3.nc
delivery/cms-co2-flux-monthgrid-v1/CMSFluxNBE201001_202212_v3.nc ['NBE_post']
COG creation done for delivery/cms-co2-flux-monthgrid-v1/CMSFluxNBE201001_202212_v3.nc
delivery/cms-co2-flux-monthgrid-v1/CMSFluxOcean201001_202212_v3.nc ['ocean_post']
COG creation done for delivery/cms-co2-flux-monthgrid-v1/CMSFluxOcean201001_202212_v3.nc
delivery/cms-co2-flux-monthgrid-v1/CMSFluxOceanFluxPrior201001_202212_v3.nc ['Ocean-Prior']
COG creation done for delivery/cms-co2-flux-monthgrid-v1/CMSFluxOceanFluxPrior201001_202212_v3.nc
delivery/cms-co2-flux-monthgrid-v1/CMSFluxTotal201001_202212_v3.nc ['total_post']
COG creation done for deliv

In [7]:


keys = get_all_s3_keys("ghgc-data-store-develop", "transformed_cogs/cms-co2-flux-monthgrid-v1", ".tif")
def compute_sha256(url):
    """Compute SHA-256 checksum for a given file."""
    sha256_hash = hashlib.sha256()
    with fs.open(url) as f:
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

sha_mapping = {}
for key in keys:
    if key.endswith(".tif"):
        sha_mapping[key.split("/")[-1]]=compute_sha256(f"s3://ghgc-data-store-develop/{key}")
        
with open('cms-co2-flux-monthgrid-v1.json', 'w') as json_file:
    json.dump(sha_mapping, json_file, indent=4)

In [8]:
#check for duplicates (collision)
data = [(key, value) for key, value in sha_mapping.items()]
df = pd.DataFrame(data, columns=['file', 'code'])
df.groupby("file").agg(num_codes=("code","nunique")). query("num_codes>1")

Unnamed: 0_level_0,num_codes
file,Unnamed: 1_level_1
