# Creating an Encryption ETL - CPC

# Introduction

Welcome to the [dClimate](https://www.dclimate.net/) Encryption with [py-hamt](https://github.com/dClimate/py-hamt) - CPC Jupyter notebook. This notebook describes how to create an [ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load) to ingest GIS data into IPFS from a remote source and encrypt it. If you have not read the [Getting Started](./Getting%20Started.ipynb) notebook we highly suggest you first go through that notebook in order to understand everything below. 

In [1]:
!uv pip install xarray zarr multiformats git+https://github.com/dClimate/py-hamt.git --force-reinstall

[2mUsing Python 3.12.7 environment at: /opt/venv[0m
[2mResolved [1m37 packages[0m [2min 616ms[0m[0m                                            [0m
[2K[2mPrepared [1m37 packages[0m [2min 143ms[0m[0m                                            
[2mUninstalled [1m37 packages[0m [2min 298ms[0m[0m
         If the cache and target directories are on different filesystems, hardlinking may not be supported.
[2K[2mInstalled [1m37 packages[0m [2min 350ms[0m[0m                              [0m
 [33m~[39m [1mbases[0m[2m==0.3.0[0m
 [33m~[39m [1mblake3[0m[2m==1.0.4[0m
 [33m~[39m [1mcertifi[0m[2m==2025.1.31[0m
 [33m~[39m [1mcharset-normalizer[0m[2m==3.4.1[0m
 [33m~[39m [1mcrc32c[0m[2m==2.7.1[0m
 [33m~[39m [1mdag-cbor[0m[2m==0.3.3[0m
 [33m~[39m [1mdeprecated[0m[2m==1.2.18[0m
 [33m~[39m [1mdonfig[0m[2m==0.8.1.post1[0m
 [33m~[39m [1midna[0m[2m==3.10[0m
 [33m~[39m [1mmarkdown-it-py[0m[2m==3.0.0[0m
 [33m~[39m [1mmd

In [2]:
import os
import pathlib
import xarray as xr
import requests
import subprocess
import numpy as np
import zarr.codecs
from datetime import datetime
from py_hamt import HAMT, IPFSStore, create_zarr_encryption_transformers, IPFSZarr3
from multiformats import CID
import numcodecs  # For compression if needed


# Note: The encryption Key should be random, the below encryption key is just an exmaple bytes object of length 32
encryption_key = os.urandom(32)  # Generates a secure 32-byte encryption key
print("Encryption Key used:", encryption_key)

def prefetch_conus_data(
    base_url: str = "https://psl.noaa.gov/thredds/fileServer/Datasets/cpc_us_precip/RT/precip.V1.0.",
    start_year: int = 2007,
    end_year: int = None,
) -> list[str]:
    """
    Generate a list of netCDF download URLs for CPC CONUS daily precipitation from start_year to end_year.

    :param base_url: Prefix of the data location.
    :param start_year: Earliest year of data to fetch.
    :param end_year: Latest year of data to fetch; defaults to current year if None.
    :return: A list of full download URLs, one per year.
    """
    if end_year is None:
        end_year = datetime.utcnow().year

    download_links = []
    for yr in range(start_year, end_year + 1):
        download_url = f"{base_url}{yr}.nc"
        download_links.append(download_url)

    return download_links


def fetch_data(
    download_links: list[str],
    local_dir: str = "cpc_conus_netcdf",
    overwrite: bool = False,
):
    """
    Download .nc files for each link in download_links.  
    Uses `requests` (though `wget` via `subprocess` is another approach).

    :param download_links: List of full .nc file URLs.
    :param local_dir: Local directory in which to save .nc files.
    :param overwrite: If False, skip download if file exists.
    """
    os.makedirs(local_dir, exist_ok=True)

    for url in download_links:
        filename = url.split("/")[-1]  # e.g. "2007.nc"
        local_path = os.path.join(local_dir, filename)

        if not overwrite and os.path.exists(local_path):
            print(f"File {local_path} exists; skipping download.")
            continue

        print(f"Downloading {url}")
        resp = requests.get(url, stream=True)
        resp.raise_for_status()

        with open(local_path, "wb") as f:
            for chunk in resp.iter_content(chunk_size=8192):
                f.write(chunk)

        print(f"Saved to {local_path}")


def transform_to_zarr(
    local_dir: str,
    output_zarr: str,
    varname: str = None,
    start_date: str = None,
    end_date: str = None,
):
    """
    Combine netCDF files from local_dir into a single xarray Dataset, optionally subsetting by date,
    fix missing/fill values, and save to Zarr.

    :param local_dir: Directory containing .nc files, each named like YEAR.nc
    :param output_zarr: Path to the final Zarr store (e.g. "cpc_conus.zarr").
    :param varname: Optional name of the variable to keep. If None, keep all.
    :param start_date: Optional start date (YYYY-MM-DD). If provided, we do xarray’s .sel(time=slice(...)).
    :param end_date: Optional end date (YYYY-MM-DD).
    """
    nc_files = list(pathlib.Path(local_dir).glob("*.nc"))
    if not nc_files:
        raise RuntimeError(f"No netCDF files found in {local_dir}")

    print(f"Reading {len(nc_files)} .nc files from {local_dir}")

    # Within a Jupyter Notebook Context setting parallel=False is to prevent read errors. Parallel reads via Dask can lead to file caching hiccups.
    ds = xr.open_mfdataset([str(f) for f in nc_files], combine="by_coords", parallel=False)

    # If a variable name is provided, keep only that variable.
    if varname is not None and varname in ds.data_vars:
        ds = ds[[varname]]  # subset to that single variable

    # Subset by time if start_date/end_date are provided
    if start_date is not None or end_date is not None:
        time_slice = slice(start_date, end_date)
        ds = ds.sel(time=time_slice)

    # Fix fill values or missing values if they exist
    # ds = _fix_fill_values(ds)
    ds = standardize_and_fix_fill_values(ds)

    # Optionally, you can specify chunk sizes or compression
    # below is a trivial example of chunking by time. Adjust as needed:
    ds = ds.chunk({"time": 30})

    # For demonstration, apply Blosc compression
    for var in ds.data_vars:
        ds[var].encoding["compressors"] = zarr.codecs.BloscCodec()

    print(f"Writing dataset to Zarr => {output_zarr}")
    ds.to_zarr(output_zarr, mode="w", consolidated=True)
    print("Zarr creation complete.")


def load_zarr_to_ipfs(zarr_path: str, cid_out_path: str = None) -> str:
    """
    Put the Zarr store onto IPFS using py-hamt. Returns the root CID as a string.
    If cid_out_path is provided, also write the CID to a file.

    :param zarr_path: Path to the local Zarr directory.
    :param cid_out_path: Optional path to write the CID (e.g. "cpc_conus.cid").
    :return: The root CID string
    """
    print(f"Loading {zarr_path} onto IPFS via py-hamt...")
    
    # Encrypt only precip
    # You can exclude data from being encrypted with exclude_vars=["precip"] for example
    encrypt, decrypt = create_zarr_encryption_transformers(
        encryption_key, header="sample-header".encode() 
    )
    
    hamt_store = HAMT(store=IPFSStore(), transformer_encode=encrypt, transformer_decode=decrypt)  # By default uses local IPFS daemon at 127.0.0.1:5001
    ipfszarr3_store = IPFSZarr3(hamt_store)
    ds = xr.open_zarr(zarr_path)
    ds.to_zarr(store=ipfszarr3_store, mode="w")

    root_cid_str = str(hamt_store.root_node_id)
    print(f"Successfully wrote encrypted data to IPFS. Root CID = {root_cid_str}")

    if cid_out_path:
        with open(cid_out_path, "w") as f:
            f.write(root_cid_str + "\n")

    return root_cid_str

def standardize_and_fix_fill_values(ds: xr.Dataset) -> xr.Dataset:
    """
    Renames lat/lon, sorts coordinates, chunks dataset, fixes fill values, and applies compression.
    """
    # 1. Rename lat/lon to standard naming
    ds = ds.rename({"lat": "latitude", "lon": "longitude"})
    
    # 2. Sort coordinates to ascending
    ds = ds.sortby("latitude", ascending=True)
    ds = ds.sortby("longitude", ascending=True)
    
    # 3. Chunk the dataset (choose chunk sizes appropriate to your use case)
    ds = ds.chunk({"time": 1769, "latitude": 24, "longitude": 24})
    
    for var in ds.data_vars:
            da = ds[var]

            # Apply compression
            # clevel=9 means highest compression level (0-9 scale), we are optimizing for read speed
            da.encoding["compressors"] = zarr.codecs.BloscCodec(clevel=9)

            # Prefer Fill Value over missing_value
            da.encoding["_FillValue"] = np.nan
            if "missing_value" in da.attrs:
                del da.attrs["missing_value"]
            if "missing_value" in da.encoding:
                del da.encoding["missing_value"]

    return ds

Encryption Key used: b"\x89\x037d\xf6\xbf2'\xaf\x92\x1b&\t\x02\x9d\xc3\x06\x95\xfbn\xd3\x07\xc8\xa1\xe3JK'\x8a\xa2Wv"


After defining the various ETL steps above, it is simply a matter of calling each step in a sequential fashion. In the logs you should see the files being fetched for the dates provided. 

As you can see, 2007 is given as a start year and 2008 as an end year which is inclusive, and therefore will download the NetCDF files for both years(2007 and 2008). The data is then fetched, however, if the data already already exists locally it will not be downloaded again if the `overwrite` flag is set to false. The next step is to transform the netcdf files to zarr where the `end_date` only includes the first day of 2008 for example sake. Additionally if a `varname` is provided only that variable will be kept on the zarr. 

Lastly `load_zarr_to_ipfs` will return a CID (read the [Getting Started](./Getting%20Started.ipynb) to learn more) and write it out once the pipeline has completed.

In [3]:
# Example usage of the modular ETL pipeline

# 1. Generate the list of CPC CONUS precipitation .nc URLs (only for years 2007 and 2008, end year is inclusive)
download_links = prefetch_conus_data(
    start_year=2007,
    end_year=2008,  # If you want multiple years, pass a larger end_year
)

# 2. Download the actual .nc files
fetch_data(
    download_links=download_links,
    local_dir="downloaded_data/gis/cpc_conus_netcdf_demo",
    overwrite=False,  # set True if you want to re-download
)

# 3. Transform the .nc files to Zarr, but only keep the first year 
transform_to_zarr(
    local_dir="downloaded_data/gis/cpc_conus_netcdf_demo",
    output_zarr="output_data/gis/cpc_conus_demo.zarr",
    varname="precip",         # If you don't know the exact var name, you can omit varname
    start_date="2007-01-01",  # Only keep data from Jan 1
    end_date="2008-01-01",    # through Jan 1 of 2008 (only includes day one from the second file)
)

# 4. Put the new "cpc_conus_demo.zarr" on IPFS
root_cid = load_zarr_to_ipfs(
    zarr_path="output_data/gis/cpc_conus_demo.zarr",
    cid_out_path="output_data/gis/cpc_conus_demo.cid"
)

print(f"Pipeline complete! The root CID is {root_cid}")

File downloaded_data/gis/cpc_conus_netcdf_demo/precip.V1.0.2007.nc exists; skipping download.
File downloaded_data/gis/cpc_conus_netcdf_demo/precip.V1.0.2008.nc exists; skipping download.
Reading 2 .nc files from downloaded_data/gis/cpc_conus_netcdf_demo
Writing dataset to Zarr => output_data/gis/cpc_conus_demo.zarr




Zarr creation complete.
Loading output_data/gis/cpc_conus_demo.zarr onto IPFS via py-hamt...




Successfully wrote encrypted data to IPFS. Root CID = bafyr4ichvciaq5utzhpjtsfzcct3cktp3ceh7uindwcuylixnxrorcwlfa
Pipeline complete! The root CID is bafyr4ichvciaq5utzhpjtsfzcct3cktp3ceh7uindwcuylixnxrorcwlfa


# Opening and Reading the Dataset

As you can see below, in a similar fashion to the Getting Started Notebook, we go ahead and provide the root hash(CID) of the newly ingested dataset into the HAMT store which is then loaded by xarray. We will show that attempting to read the data without the decrypt or proper decryption key will not work.

In [4]:
import xarray as xr
from py_hamt import HAMT, IPFSZarr3, IPFSStore
from multiformats import CID

# Generate a new encryption key which does not match the originally generated one
wrong_encryption_key = os.urandom(32)
print("New encryption key generated, which does not match the previous one:", wrong_encryption_key)

decoded_root_cid = CID.decode(root_cid)
print("Decoded CID:\n", decoded_root_cid, "\n")


# Create HAMT instance using the IPFSStore connecting to your locally running IPFS Gateway from your local running IPFS Node
hamt = HAMT(store=IPFSStore(gateway_uri_stem="http://0.0.0.0:8080"), root_node_id=decoded_root_cid)
ipfszarr3_store = IPFSZarr3(hamt)
ds = xr.open_zarr(store=ipfszarr3_store)

print("Dataset(note the metadata is still accessible and unencrypted):\n", ds, "\n")
print("Try printing out values without trying to decrypt at all:")
try:

    print("Dataset values:", ds.precip.values)
except:
    print("Error: Could not read encrypted values as expected\n")

# Use bad encryption key
encrypt, decrypt = create_zarr_encryption_transformers(
    wrong_encryption_key, header="sample-header".encode() 
)

hamt = HAMT(store=IPFSStore(gateway_uri_stem="http://0.0.0.0:8080"), root_node_id=decoded_root_cid, transformer_encode=encrypt, transformer_decode=decrypt)
ipfszarr3_store = IPFSZarr3(hamt)
ds = xr.open_zarr(store=ipfszarr3_store)

print("Try printing out values using the wrong key:")
try:

    print(ds.precip.values)
except:
    print("Error: Could not read encrypted values as expected due to wrong key\n")


# Generate the transformers again with the same encryption_key used prior
encrypt, decrypt = create_zarr_encryption_transformers(
    encryption_key, header="sample-header".encode() 
)

hamt = HAMT(store=IPFSStore(gateway_uri_stem="http://0.0.0.0:8080"), root_node_id=decoded_root_cid, transformer_encode=encrypt, transformer_decode=decrypt)
ipfszarr3_store = IPFSZarr3(hamt)
ds = xr.open_zarr(store=ipfszarr3_store)

print("Successfully retrieved and decrypted the encrypted dataset using the original key\n")
try:
    # Let's get data for New York
    lon_range = (360 - 74.25, 360 - 73.7)  # Converted to 285.75 - 286.3
    lat_range = (40.5, 45.0)  # Latitude remains the same
    
    ny_precip = ds.sel(latitude=slice(*lat_range), longitude=slice(*lon_range))
    print(ny_precip.precip.values)
except:
    print("Could not read encrypted values as expected due to wrong key\n")

New encryption key generated, which does not match the previous one: b'\x9b\xf2\x83w\xbb\t\xa0\x12\x0c\xb0\xf3&f\t^\xcf\x07\x97b\xa0H\xebz\xfc\xd5dsj"k\xacB'
Decoded CID:
 bafyr4ichvciaq5utzhpjtsfzcct3cktp3ceh7uindwcuylixnxrorcwlfa 



CBORDecodingError: Failed to decode variable 'time': String bytes are not valid utf-8 bytes.
At byte #0: 66a6d8b650d2cf
            ^^ string of length 6
At byte #1:   a6
              ^^ invalid start byte

# Plotting the Data

To illustrate the data we go ahead and plot some of the precipitation information for New York using encrypted data :)

In [None]:
import matplotlib.pyplot as plt

lon_range = (360 - 74.25, 360 - 73.7)  # Converted to 285.75 - 286.3
lat_range = (40.5, 45.0)  # Latitude remains the same

ny_precip = ds.sel(latitude=slice(*lat_range), longitude=slice(*lon_range))
print(ny_precip)

if ny_precip.precip.count() > 0:
    print("Precipitation data exists for this range.")
else:
    print("No precipitation data for the specified range.")

# Compute the average precipitation over the region
ny_precip_mean = ny_precip.precip.mean(dim=["latitude", "longitude"])

# Plot the data
plt.figure(figsize=(10, 6))
ny_precip_mean.plot(marker='o', color='blue')
plt.title("Precipitation in New York (Daily Mean)")
plt.xlabel("Date")
plt.ylabel("Precipitation (mm)")
plt.grid(True)
plt.show()