<a href="https://colab.research.google.com/github/Log-Yair/Endymion/blob/src/data_handler.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Lola

**Imports**

In [8]:
from __future__ import annotations

import os
import re
import json
import math
import hashlib
import urllib.request
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Dict, Optional, Tuple, List, Any

import numpy as np


##Data Handling

In [14]:

'''
-------------------
Data structures
-------------------
'''

@dataclass(frozen=True)
class LOLATileSpec:
    """
    Identifies a LOLA polar float_img tile by its base filename (without extension).
    Example: 'ldem_85s_20m_float'
    """
    tile_id: str
    img_url: str
    lbl_url: str

    def filenames(self) -> Tuple[str, str]:
        return f"{self.tile_id}.img", f"{self.tile_id}.lbl"


@dataclass
class PDSImageMeta:
    """
    Minimal metadata parsed from a PDS3 .LBL label file.
    Keep it flexible; add fields as you discover them in your labels.

    NOTE:
    This parser/meta is specialised for LOLA polar float_img PDS3 labels.
    It is NOT a general-purpose PDS3 parser.
    """
    record_bytes: Optional[int] = None
    file_records: Optional[int] = None

    lines: Optional[int] = None
    line_samples: Optional[int] = None
    sample_type: Optional[str] = None
    sample_bits: Optional[int] = None

    unit: Optional[str] = None
    offset: Optional[float] = None
    scaling_factor: Optional[float] = None
    missing_constant: Optional[float] = None

    # keep raw label for debugging / provenance
    raw: Optional[Dict[str, Any]] = None


@dataclass
class RasterTile:
    """
    A tile of raster data plus meta.
    For production we use numpy.memmap so we don't load the full 15168x15168 tile into RAM.
    """
    tile_id: str
    data: np.ndarray  # can be np.memmap
    meta: PDSImageMeta


'''
----------------------------
Data handler
----------------------------
'''

class DataHandler:
    """
    Endymion - DataHandler

    Responsibility:
      - Download/caching of PDS .IMG/.LBL
      - Parse labels (PDS3-ish) to interpret .IMG binary
      - Load raster data (prefer memmap)
      - Provide ROI extraction hooks for FeatureExtractor / Hazard pipeline

    Notes:
      - Keep this module "dumb and reliable":
        no slope/roughness here (that belongs in FeatureExtractor).
      - Unit conversion (km->m) and offset usage belong in standardisation.
    """

    # Basic KEY = VALUE matcher for common PDS3 label lines
    _kv_re = re.compile(r"^\s*([A-Z0-9_\-^]+)\s*=\s*(.+?)\s*$")

    def __init__(
        self,
        cache_dir: str | Path = "/content/endymion_cache/lola",
        user_agent: str = "Endymion-DataHandler/0.1",
        timeout_sec: int = 60,
    ) -> None:
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)

        self.user_agent = user_agent
        self.timeout_sec = timeout_sec

        # Simple in-memory registry (you can persist this later)
        self.tiles: Dict[str, LOLATileSpec] = {}

    '''
    ----------------------------
     Registration
    ----------------------------
    '''

    def register_tile(self, spec: LOLATileSpec) -> None:
        self.tiles[spec.tile_id] = spec

    def register_tiles(self, specs: List[LOLATileSpec]) -> None:
        for s in specs:
            self.register_tile(s)

    '''
    ----------------------------
     Download & caching
    ----------------------------
    '''

    def _local_paths(self, tile_id: str) -> Tuple[Path, Path]:
        img_path = self.cache_dir / f"{tile_id}.img"
        lbl_path = self.cache_dir / f"{tile_id}.lbl"
        return img_path, lbl_path

    def _download_file(self, url: str, out_path: Path) -> None:
        out_path.parent.mkdir(parents=True, exist_ok=True)

        req = urllib.request.Request(
            url,
            headers={"User-Agent": self.user_agent},
            method="GET",
        )
        with urllib.request.urlopen(req, timeout=self.timeout_sec) as resp:
            data = resp.read()

        # atomic write
        tmp = out_path.with_suffix(out_path.suffix + ".part")
        tmp.write_bytes(data)
        tmp.replace(out_path)

    def ensure_downloaded(self, tile_id: str, force: bool = False) -> Tuple[Path, Path]:
        if tile_id not in self.tiles:
            raise KeyError(f"Tile '{tile_id}' is not registered.")

        spec = self.tiles[tile_id]
        img_path, lbl_path = self._local_paths(tile_id)

        if force or not lbl_path.exists():
            self._download_file(spec.lbl_url, lbl_path)

        if force or not img_path.exists():
            self._download_file(spec.img_url, img_path)

        return img_path, lbl_path

    '''
    ----------------------------
     Label parsing (PDS3-ish)
    ----------------------------
    '''

    def parse_lbl(self, lbl_path: str | Path) -> PDSImageMeta:
        """
        Parses a basic PDS3 label into a dict and extracts common keys.
        Handles nested object keys like: UNCOMPRESSED_FILE.IMAGE.LINES
        """
        lbl_path = Path(lbl_path)
        label_lines = lbl_path.read_text(encoding="utf-8", errors="ignore").splitlines()

        raw: Dict[str, Any] = {}
        current_object_stack: List[str] = []

        def set_key(key: str, value: str) -> None:
            # Store "OBJECT.SUBKEY" namespace when inside OBJECT blocks
            if current_object_stack:
                raw[".".join(current_object_stack + [key])] = value
            else:
                raw[key] = value

        for line in label_lines:
            line = line.strip()
            if not line or line.startswith("/*"):
                continue

            # OBJECT / END_OBJECT handling
            if line.startswith("OBJECT"):
                m = self._kv_re.match(line)
                if m:
                    obj_name = m.group(2).strip().strip('"')
                    current_object_stack.append(obj_name)
                continue

            if line.startswith("END_OBJECT"):
                if current_object_stack:
                    current_object_stack.pop()
                continue

            m = self._kv_re.match(line)
            if m:
                k, v = m.group(1), m.group(2)
                set_key(k, v)

        def pick_any(*keys: str) -> Optional[str]:
            """
            Try exact matches first, then suffix matches.
            Suffix matching solves nested forms like UNCOMPRESSED_FILE.IMAGE.LINES.
            """
            for k in keys:
                if k in raw:
                    return str(raw[k])

            suffixes = [f".{k.split('.')[-1]}" for k in keys]
            for suf in suffixes:
                # prefer keys containing ".IMAGE."
                for kk, vv in raw.items():
                    if kk.endswith(suf) and ".IMAGE." in f".{kk}.":
                        return str(vv)
                # otherwise accept any suffix match
                for kk, vv in raw.items():
                    if kk.endswith(suf):
                        return str(vv)
            return None

        def to_int(x: Optional[str]) -> Optional[int]:
            if x is None:
                return None
            try:
                return int(x.strip().strip('"'))
            except ValueError:
                return None

        def to_float(x: Optional[str]) -> Optional[float]:
            if x is None:
                return None
            try:
                return float(x.strip().strip('"'))
            except ValueError:
                return None

        def to_str(x: Optional[str]) -> Optional[str]:
            if x is None:
                return None
            return x.strip().strip('"')

        meta = PDSImageMeta(
            record_bytes=to_int(pick_any("RECORD_BYTES")),
            file_records=to_int(pick_any("FILE_RECORDS")),

            lines=to_int(pick_any("IMAGE.LINES", "LINES")),
            line_samples=to_int(pick_any("IMAGE.LINE_SAMPLES", "LINE_SAMPLES")),
            sample_type=to_str(pick_any("IMAGE.SAMPLE_TYPE", "SAMPLE_TYPE")),
            sample_bits=to_int(pick_any("IMAGE.SAMPLE_BITS", "SAMPLE_BITS")),

            unit=to_str(pick_any("IMAGE.UNIT", "UNIT")),
            scaling_factor=to_float(pick_any("IMAGE.SCALING_FACTOR", "SCALING_FACTOR")),
            offset=to_float(pick_any("IMAGE.OFFSET", "OFFSET")),
            missing_constant=to_float(pick_any("IMAGE.MISSING_CONSTANT", "MISSING_CONSTANT")),

            raw=raw,
        )
        return meta

    '''
    ----------------------------
    IMG loading (float)
    ----------------------------
    '''

    def _numpy_dtype_from_sample_type(self, sample_type: str, sample_bits: int) -> np.dtype:
        """
        Maps PDS SAMPLE_TYPE/SAMPLE_BITS to numpy dtype.
        For LOLA float_img tiles: SAMPLE_TYPE=PC_REAL, SAMPLE_BITS=32 (little-endian float32).
        """
        st = (sample_type or "").upper()

        if "REAL" in st or "FLOAT" in st:
            if sample_bits == 32:
                if "PC" in st or "LSB" in st:
                    return np.dtype("<f4")
                if "MSB" in st:
                    return np.dtype(">f4")
                return np.dtype("f4")
            if sample_bits == 64:
                if "PC" in st or "LSB" in st:
                    return np.dtype("<f8")
                if "MSB" in st:
                    return np.dtype(">f8")
                return np.dtype("f8")

        raise ValueError(f"Unsupported SAMPLE_TYPE/SAMPLE_BITS: {sample_type}/{sample_bits}")

    def load_tile(self, tile_id: str, force_download: bool = False, verbose: bool = False) -> RasterTile:
        """
        Ensures files are present, parses label, and returns a memmap-backed RasterTile.
        """
        img_path, lbl_path = self.ensure_downloaded(tile_id, force=force_download)
        meta = self.parse_lbl(lbl_path)

        # Validate required fields
        if meta.lines is None or meta.line_samples is None:
            raise ValueError(f"LBL missing LINES/LINE_SAMPLES for tile '{tile_id}'.")
        if meta.sample_type is None or meta.sample_bits is None:
            raise ValueError(f"LBL missing SAMPLE_TYPE/SAMPLE_BITS for tile '{tile_id}'.")

        dtype = self._numpy_dtype_from_sample_type(meta.sample_type, meta.sample_bits)

        # File-size validation (more meaningful than memmap.size checks)
        expected_bytes = meta.lines * meta.line_samples * (meta.sample_bits // 8)
        actual_bytes = img_path.stat().st_size
        if actual_bytes < expected_bytes:
            raise IOError(
                f"IMG too small for expected raster: {actual_bytes} bytes < {expected_bytes} bytes "
                f"for tile '{tile_id}'."
            )

        if verbose:
            print(meta.lines, meta.line_samples, meta.sample_type, meta.sample_bits)
            if meta.raw:
                print([k for k in meta.raw.keys() if k.endswith(".LINES") or k.endswith(".LINE_SAMPLES")])

        # Use memmap so we don't load the full tile into RAM
        arr = np.memmap(img_path, dtype=dtype, mode="r", shape=(meta.lines, meta.line_samples))

        # Apply missing constant if present (turn into NaN)
        # NOTE: This creates a float32 view/copy; do this only if needed.
        if meta.missing_constant is not None:
            arr = np.array(arr, dtype=np.float32, copy=True)
            arr[arr == meta.missing_constant] = np.nan

        # Apply scaling factor if present (DN -> HEIGHT in label units)
        if meta.scaling_factor is not None:
            arr = arr * float(meta.scaling_factor)

        return RasterTile(tile_id=tile_id, data=arr, meta=meta)

    '''
    --------------------------
     ROI hooks
    -----------------------------
    '''

    def read_roi(self, tile: RasterTile, row0: int, row1: int, col0: int, col1: int) -> np.ndarray:
        """
        Returns a concrete numpy array (in RAM) for a ROI slice.
        Keep ROIs reasonably sized (e.g., 512-2048 px).
        """
        return np.array(tile.data[row0:row1, col0:col1], dtype=np.float32, copy=True)

    # Backwards-compatible name (your earlier function name)
    def extract_roi_pixels(self, tile: RasterTile, row_min: int, row_max: int, col_min: int, col_max: int) -> np.ndarray:
        """
        Simple pixel ROI extraction (alias for read_roi).
        """
        return self.read_roi(tile, row_min, row_max, col_min, col_max)

    '''
    --------------
    Debug / provenance helpers
    --------------
    '''

    def tile_provenance(self, tile_id: str) -> Dict[str, Any]:
        """
        Returns URLs + local paths so you can log them for reproducibility.
        """
        if tile_id not in self.tiles:
            raise KeyError(f"Tile '{tile_id}' is not registered.")
        img_path, lbl_path = self._local_paths(tile_id)
        spec = self.tiles[tile_id]
        return {
            "tile_id": tile_id,
            "img_url": spec.img_url,
            "lbl_url": spec.lbl_url,
            "local_img": str(img_path),
            "local_lbl": str(lbl_path),
        }


In [4]:
'''def read_roi(self, tile: RasterTile, row0: int, row1: int, col0: int, col1: int) -> np.ndarray:
    """
    Returns a concrete numpy array (in RAM) for a ROI slice.
    Keep ROIs reasonably sized (e.g., 512-2048 px).
    """
    roi = np.array(tile.data[row0:row1, col0:col1], dtype=np.float32, copy=True)
    return roi
'''

In [15]:
BASE = "https://pds-geosciences.wustl.edu/lro/lro-l-lola-3-rdr-v1/lrolol_1xxx/data/lola_gdr/polar/float_img"

tiles = [
    LOLATileSpec(
        tile_id="ldem_85n_20m_float",
        img_url=f"{BASE}/ldem_85n_20m_float.img",
        lbl_url=f"{BASE}/ldem_85n_20m_float.lbl",
    ),
    LOLATileSpec(
        tile_id="ldem_85s_20m_float",
        img_url=f"{BASE}/ldem_85s_20m_float.img",
        lbl_url=f"{BASE}/ldem_85s_20m_float.lbl",
    ),
    LOLATileSpec(
        tile_id="ldem_875n_20m_float",
        img_url=f"{BASE}/ldem_875n_20m_float.img",
        lbl_url=f"{BASE}/ldem_875n_20m_float.lbl",
    ),
    LOLATileSpec(
        tile_id="ldem_875s_20m_float",
        img_url=f"{BASE}/ldem_875s_20m_float.img",
        lbl_url=f"{BASE}/ldem_875s_20m_float.lbl",
    ),
]

dh = DataHandler(cache_dir="/content/endymion_cache/lola")
dh.register_tiles(tiles)


tile = dh.load_tile("ldem_85s_20m_float")
print(tile.tile_id, tile.data.shape, tile.meta.sample_type, tile.meta.sample_bits)


ldem_85s_20m_float (15168, 15168) PC_REAL 32


In [16]:
tile = dh.load_tile("ldem_85s_20m_float")
roi_km = dh.read_roi(tile, 7000, 8024, 7000, 8024)  # 1024x1024 ROI


##standarise


In [17]:
import numpy as np
from dataclasses import dataclass
from typing import Literal

@dataclass
class TerrainTile:
    """
    Standardised terrain tile for Endymion.
    All values are in meters.
    """
    data_m: np.ndarray
    representation: Literal["height", "radius"]
    source_tile_id: str

def standardise_lola(
    tile: RasterTile,
    *,
    roi: np.ndarray | None = None,
    representation: Literal["height", "radius"] = "height",
) -> TerrainTile:
    """
    Standardise LOLA data to meters.

    - tile.data is HEIGHT relative to reference sphere in km
    - meta.offset is in km
    - If roi is provided, standardise only that ROI
    """

    height_km = roi if roi is not None else tile.data
    height_km = np.array(height_km, dtype=np.float32, copy=False)

    if representation == "height":
        data_m = height_km * 1000.0

    elif representation == "radius":
        if tile.meta.offset is None:
            raise ValueError("Cannot compute planetary radius: OFFSET missing in label.")
        data_m = (height_km + tile.meta.offset) * 1000.0

    else:
        raise ValueError(f"Unknown representation: {representation}")

    return TerrainTile(
        data_m=data_m,
        representation=representation,
        source_tile_id=tile.tile_id,
    )



## ROI logic


In [19]:

#Load tile as memmap (cheap)
tile = dh.load_tile("ldem_85s_20m_float")

# Extract a small ROI (example: 1024x1024 window)
roi_km = dh.read_roi(tile, 7000, 8024, 7000, 8024)

print("ROI shape:", roi_km.shape)
print("ROI raw min/max (km):", np.nanmin(roi_km), np.nanmax(roi_km))

# Standardise the ROI only
terrain = standardise_lola(tile, roi=roi_km, representation="height")

print("ROI height min/max (m):",
      np.nanmin(terrain.data_m),
      np.nanmax(terrain.data_m))


ROI shape: (1024, 1024)
ROI raw min/max (km): -2.8468773 1.7386577
ROI height min/max (m): -2846.8774 1738.6577


##debugging

In [None]:
#code below was suggested by Gemini AI

# --- Debugging block to inspect the problematic label file ---
tile_id_to_debug = "ldem_85s_20m_float"
img_path, lbl_path = dh.ensure_downloaded(tile_id_to_debug, force=True)
print(f"\n--- Content of {lbl_path} ---")
print(lbl_path.read_text(encoding="utf-8", errors="ignore"))
print(f"--- End of {lbl_path} content ---\n")
# --- End Debugging block ---

In [None]:
print(DataHandler)
print("Has load_tile:", hasattr(DataHandler, "load_tile"))
print("Methods:", [m for m in dir(DataHandler) if "tile" in m.lower()])


In [None]:
tile = dh.load_tile("ldem_85s_20m_float")

terrain_h = standardise_lola(tile.data, tile.meta, representation="height")
print(np.nanmin(terrain_h.data_m), np.nanmax(terrain_h.data_m))

terrain_r = standardise_lola(tile.data, tile.meta, representation="radius")
print(np.nanmin(terrain_r.data_m), np.nanmax(terrain_r.data_m))


15168 15168 PC_REAL 32
['UNCOMPRESSED_FILE.IMAGE.LINES', 'UNCOMPRESSED_FILE.IMAGE.LINE_SAMPLES']
-5501.761 7027.1665
1731898.4 1744427.2


1731898400.0 1744427300.0 is way too big for “height”. Check for sanity

In [None]:
tile = dh.load_tile("ldem_85s_20m_float", force_download=False)

print("RAW dtype:", tile.data.dtype)
print("RAW min/max:", np.nanmin(tile.data), np.nanmax(tile.data))
print("OFFSET (km):", tile.meta.offset)
print("UNIT:", tile.meta.raw.get("UNCOMPRESSED_FILE.IMAGE.UNIT"))


15168 15168 PC_REAL 32
['UNCOMPRESSED_FILE.IMAGE.LINES', 'UNCOMPRESSED_FILE.IMAGE.LINE_SAMPLES']
RAW dtype: float32
RAW min/max: -5.5017614 7.0271664
OFFSET (km): 1737.4
UNIT: KILOMETER


Okay, height checked. Now, verify the offset as is too high


In [None]:
print("OFFSET raw:", tile.meta.offset)


OFFSET raw: 1737.4


done