# Converting affine transformed stored in GeoTransform to CF subsampled coordinates

In [1]:
import numpy as np
import xarray as xr

## Read data

In [53]:
source = "https://noaadata.apps.nsidc.org/NOAA/G02135/south/daily/geotiff/2024/01_Jan/S_20240101_concentration_v4.0.tif"

da = xr.open_dataarray(
    source, engine="rasterio", backend_kwargs={"parse_coordinates": True}
)


geotransform = da.spatial_ref.attrs.pop("GeoTransform")
geotransform

'-3950000.0 25000.0 0.0 4350000.0 0.0 -25000.0'

## Encode to CF-style subsampled coordinates

In [61]:
x_origin, pixel_width, x_rotation, y_origin, y_rotation, pixel_height = (
    float(x) for x in geotransform.split()
)

bands, ysize, xsize = da.shape

x_coords = x_origin + (np.arange(xsize) + 0.5) * pixel_width
y_coords = y_origin + (np.arange(ysize) + 0.5) * pixel_height

xstart_center = x_origin + pixel_width / 2
xstop_center = xstart_center + (pixel_width * (xsize - 1))
ystart_center = y_origin + pixel_height / 2
ystop_center = ystart_center + (pixel_height * (ysize - 1))

In [64]:
varname = "band_data"
newds = da.to_dataset().copy(deep=True)
newds[varname].attrs["coordinate_interpolation"] = "x: y: interp"
newds.coords["interp"] = (
    (),
    0,
    {
        "interpolation_name": "linear",
        "tie_point_mapping": "x: x_indices xtp y: y_indices ytp",
        "computational_precision": 64,
    },
)
newds.coords["x_indices"] = ("tp", [0, xsize])
newds.coords["y_indices"] = ("tp", [0, ysize])

newds.coords["xtp"] = (
    "tp",
    [xstart_center, xstop_center],
    {"standard_name": "projection_x_coordinate", "units": "m"},
)
newds.coords["ytp"] = (
    "tp",
    [ystart_center, ystop_center],
    {"standard_name": "projection_y_coordinate", "units": "m"},
)

newds

### Decode from CF to Affine

In [65]:
def parse_mapping_string(input_str):
    """
    Converts mapping strings to dictionaries. Supports mixed formats:
    1. "x: y: linear" -> shared value for multiple keys
    2. "z: bilinear" -> individual key-value pair
    3. "x: y: linear z: bilinear" -> combination of both
    4. "x: x_indices xtp y: y_indices ytp" -> multi-word values as tuples

    Args:
        input_str (str): String with space-separated mapping expressions

    Returns:
        dict: Dictionary mapping keys to values (strings or tuples)

    Examples:
        >>> parse_mapping_string("x: y: linear z: bilinear")
        {'x': 'linear', 'y': 'linear', 'z': 'bilinear'}

        >>> parse_mapping_string("x: y: z: shared w: individual")
        {'x': 'shared', 'y': 'shared', 'z': 'shared', 'w': 'individual'}

        >>> parse_mapping_string("a: b: first c: d: e: second f: third")
        {'a': 'first', 'b': 'first', 'c': 'second', 'd': 'second', 'e': 'second', 'f': 'third'}

        >>> parse_mapping_string("mode: fast quality: high")
        {'mode': 'fast', 'quality': 'high'}

        >>> parse_mapping_string("x: x_indices xtp y: y_indices ytp")
        {'x': ('x_indices', 'xtp'), 'y': ('y_indices', 'ytp')}

        >>> parse_mapping_string("a: single b: multi word value")
        {'a': 'single', 'b': ('multi', 'word', 'value')}
    """
    expressions = input_str.split()
    result = {}
    i = 0

    while i < len(expressions):
        # Find the start of a key (word ending with ':')
        if not expressions[i].endswith(":"):
            i += 1
            continue

        # Collect all consecutive keys (words ending with ':')
        keys = []
        while i < len(expressions) and expressions[i].endswith(":"):
            keys.append(expressions[i].rstrip(":"))
            i += 1

        # Collect value words until we hit the next key or end
        value_parts = []
        while i < len(expressions) and not expressions[i].endswith(":"):
            value_parts.append(expressions[i])
            i += 1

        # Create the mapping
        if keys and value_parts:
            # Convert to tuple if multiple words, otherwise keep as string
            if len(value_parts) == 1:
                value = value_parts[0]
            else:
                value = tuple(value_parts)

            for key in keys:
                result[key] = value

    return result


def compute_explicit_coords(ds: xr.Dataset, variable: str):
    coords = {}
    for coord, methodvarname in parse_mapping_string(
        ds[variable].attrs["coordinate_interpolation"]
    ).items():
        methodvar = ds[methodvarname]
        method = methodvar.attrs["interpolation_name"]
        indices, tiepoints = parse_mapping_string(methodvar.attrs["tie_point_mapping"])[
            coord
        ]
        precision = methodvar.attrs["computational_precision"]

        if method == "linear":
            coords[coord] = np.interp(
                np.arange(ds.sizes[coord]), ds[indices].data, ds[tiepoints].data
            )
        else:
            raise NotImplementedError
    return coords

In [83]:
from dataclasses import dataclass
from typing import Hashable

from affine import Affine


@dataclass
class AffineAxis:
    offset: float
    shear: float
    spacing: float
    size: int
    dimname: Hashable


def compute_affine(ds: xr.Dataset, variable: str) -> tuple[Affine, int, int]:
    # TODO: get rid of varname
    coords = {}
    current_dim = None
    axes = {"X": None, "Y": None}

    for coord, methodvarname in parse_mapping_string(
        ds[variable].attrs["coordinate_interpolation"]
    ).items():
        methodvar = ds[methodvarname]
        method = methodvar.attrs["interpolation_name"]
        indices, tiepoints = parse_mapping_string(methodvar.attrs["tie_point_mapping"])[
            coord
        ]
        precision = methodvar.attrs["computational_precision"]

        # FIXME: support latitude, longitude also for geographic CRS
        if (stdname := ds[tiepoints].attrs.get("standard_name")) in [
            "projection_x_coordinate",
            "projection_y_coordinate",
        ]:
            if stdname == "projection_x_coordinate":
                currentdim = "X"
            else:
                currentdim = "Y"
            (dim,) = ds[tiepoints].dims
        else:
            raise ValueError(
                "Expected a standard_name attribute with either projection_x_coordinate or projection_y_coordinate"
            )

        not_possible = method != "linear" or (ds[tiepoints].data.size != 2)
        if not_possible:
            raise ValueError("Cannot be represented by an affine")

        if currentdim is None:
            raise ValueError("could not detect whether axis is 'X' or 'Y'")

        tp = ds[tiepoints].data
        axes[currentdim] = AffineAxis(
            offset=tp[0].item(),
            shear=0.0,
            spacing=((tp[1] - tp[0]) / (ds.sizes[coord] - 1)).item(),
            size=ds.sizes[dim],
            dimname=dim,
        )

        current_dim = None

    X, Y = axes["X"], axes["Y"]
    return (
        Affine(
            X.spacing,
            X.shear,
            X.offset - X.spacing / 2,
            Y.shear,
            Y.spacing,
            Y.offset - Y.spacing / 2,
        ),
        X.size,
        Y.size,
    )

In [84]:
aff, xsize, ysize = compute_affine(newds, "band_data")
aff

Affine(25000.0, 0.0, -3950000.0,
       0.0, -25000.0, 4350000.0)

In [85]:
Affine.from_gdal(*map(float, geotransform.split(" ")))

Affine(25000.0, 0.0, -3950000.0,
       0.0, -25000.0, 4350000.0)