In [None]:
import fsspec
import re
import pandas as pd
from osgeo import gdal, osr

gdal.UseExceptions()

## Get the data from NSIDC

In [None]:
SNODAS_DIR = "/nvm9/data/snodas/data"

In [None]:
SNODAS_HTTPS = "https://noaadata.apps.nsidc.org/NOAA/G02158/masked"
SNODAS_FILE_PREFIX = "us_ssmv11034tS__T0001TTNATS"
SNODAS_FILE_SUFFIX = "05HP001"
DATE_PATTERN = r"SNODAS_(\d{8})\.tar"

In [None]:
ENVI_HEADER = """ENVI
samples = 6935
lines = 3351
bands = 1
header offset = 0
file type = ENVI Standard
data type = 2
interleave = bsq
byte order = 1
"""
HEADER_BYTES = ENVI_HEADER.encode("utf-8")

In [None]:
TRANSLATE_OPTIONS = [
    "-of", "NetCDF",
    "-a_srs", "+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs",
    "-a_nodata", "-9999",
    "-a_ullr", "-124.73333333333333", "52.8750", "-66.94166666666667", "24.950"
]

In [None]:
fs = fsspec.filesystem("https")

In [None]:
for year in [2021, 2022, 2023, 2024, 2025]:
# for year in [2021]:
    for year_folder in fs.glob(f"{SNODAS_HTTPS}/{year}/*"):
        # if not any(month in year_folder for month in ["Oct", "Nov", "Dec"]):
        #     continue

        url = f"{year_folder}*.tar"
        print(year_folder)
        for file_url in fs.glob(url):
            date_match = re.search(DATE_PATTERN, file_url)
            date = date_match.group(1)
            if date != "20220207":
                continue
            
            base_name = f"{SNODAS_FILE_PREFIX}{date}{SNODAS_FILE_SUFFIX}"

            # File header
            header_name = f"/vsimem/{base_name}.hdr"
            header_file = gdal.VSIFOpenL(header_name, "wb")
            try:
                gdal.VSIFWriteL(
                    HEADER_BYTES,
                    1, len(HEADER_BYTES),
                    header_file
                )
            finally:
                gdal.VSIFCloseL(header_file)

            # File data
            file_name = f"/vsimem/{base_name}.dat"
            dat_file = f"tar://{base_name}.dat.gz::{file_url}"

            with fsspec.open(dat_file, "rb", compression="gzip") as remote_file:
                file_data = remote_file.read()

                # Create the file in memory
                data_file = gdal.VSIFOpenL(file_name, "wb")
                try:
                    gdal.VSIFWriteL(
                        file_data,
                        1, len(file_data),
                        data_file
                    )
                finally:
                    gdal.VSIFCloseL(data_file)

                # Translate to a NetCDF
                src_ds = gdal.Open(file_name, gdal.GA_ReadOnly)

                # Add time dimension
                # src_ds.SetMetadataItem("NETCDF_DIM_EXTRA", "{time}")
                # src_ds.SetMetadataItem("NETCDF_DIM_time_DEF", f"{{{1},10}}")
                # src_ds.SetMetadataItem("NETCDF_DIM_time_VALUES", f"{0}")
                # src_ds.SetMetadataItem("time#axis", "T")
                # src_ds.SetMetadataItem("time#calendar", "proleptic_gregorian")
                # src_ds.SetMetadataItem(
                #     "time#units", f"days since {pd.to_datetime(date)}"
                # )

                gdal.Translate(
                    destName=f"{SNODAS_DIR}/SWE_{date}.nc",
                    srcDS=src_ds,
                    options=TRANSLATE_OPTIONS
                )

                # Close file and clear the memory files
                src_ds = None
                gdal.Unlink(file_name)
                gdal.Unlink(header_name)

                print("*", end="")
        print("")

## Zarr archive

In [None]:
import xarray as xr
import dask
import pandas as pd
from pyproj import CRS

import zarr
from zarr.codecs import BloscCodec

from itertools import chain
from pathlib import Path

from swed_17.nb_helpers import start_cluster

xr.set_options(use_new_combine_kwarg_defaults=True);

In [None]:
ZARR_ARCHIVE = Path("/nvm9/data/snodas/zarr_archive/")
SNODAS_INPUT = Path("/nvm9/data/snodas/data")

In [None]:
CHUNKS = {"time": 12, "lat": 335, "lon": 693}

In [None]:
cluster = start_cluster(n_workers=12, memory_limit="12GB", local=False)

### Preprocess the files
When created, there was no time dimension set and the CRS WKT had no
complete defition of WGS 84 although it is the set one.

In [None]:
@dask.delayed
def prepare_file(file):
    date = pd.to_datetime(
        Path(file).stem.split("_")[1]
    )
    with xr.open_dataset(file) as orig:
        ds = orig.load()

    if "SWE" in ds:
        return

    if "time" not in ds.dims:
        ds = ds.expand_dims({"time": 1})
        ds = ds.assign_coords({"time": ("time", [date])})
    
    ds["crs"].attrs["spatial_ref"] = CRS.from_epsg(4326).to_wkt()
    ds = ds.rename_vars({
        "Band1": "SWE",
    })
    ds["SWE"].attrs["long_name"] = "Snow Water Equivalent"
    ds.to_netcdf(file)


In [None]:
files = chain(
    SNODAS_INPUT.glob(f"SWE_*.nc"),
)

dask.compute(
    [prepare_file(file) for file in files]
)

### Create the Zarr archive

In [None]:
water_year = 2025

In [None]:
files = chain(
    SNODAS_INPUT.glob(f"SWE_{water_year - 1}1*.nc"),
    SNODAS_INPUT.glob(f"SWE_{water_year}0*.nc"),
)

ds = xr.open_mfdataset(
    files,
    chunks=None,
    parallel=True,
    engine="h5netcdf",
)
ds = ds.chunk(CHUNKS)

In [None]:
compressor = BloscCodec(
    cname="zlib", clevel=4, shuffle=zarr.codecs.BloscShuffle.bitshuffle
)
encoding = {
    "SWE": {
        "chunks": tuple(CHUNKS.values()),
        "compressors": compressor,
    },
}
for coord_name in ds.coords:
    encoding[coord_name] = {'compressors': compressor}

In [None]:
ds.to_zarr(
    store=ZARR_ARCHIVE / f"wy{water_year}_snodas_swe.zarr",
    mode="w",
    zarr_format=3,
    encoding=encoding,
)

In [None]:
cluster.shutdown()

### Add a CBRFC Zone mask

In [None]:
ua_mask = xr.open_dataset("/nvm9/data/snodas/cbrfc_zone_raster_snodas_swe.nc").Band1

In [None]:
ua_mask.name = 'cbrfc_zone_gid'
ua_mask.attrs["long_name"] = "CBRFC zone database gid"

In [None]:
ua_mask = ua_mask.to_dataset()
ua_mask

In [None]:
for archive in ZARR_ARCHIVE.glob("*.zarr"):
    ua_mask.to_zarr(archive, mode="a")
    zarr.consolidate_metadata(archive)