In [2]:
import re, time, json, os
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple, Optional, Iterable, Dict

In [3]:
# ----------------------------
# Paths (EDIT THESE)
# ----------------------------
DOCS_ROOT = Path("../doc/docs")
PY_PATH   = Path("../python/ndvi.py")

assert DOCS_ROOT.exists(), f"Missing docs root: {DOCS_ROOT}"
assert PY_PATH.exists(), f"Missing python script: {PY_PATH}"

py_code = PY_PATH.read_text(encoding="utf-8")


In [4]:
# ----------------------------
# Markdown chunking utilities
# ----------------------------
@dataclass
class Chunk:
    chunk_id: str
    heading_path: str
    level: int
    text: str

def chunk_markdown_by_headings(md: str) -> List[Chunk]:
    lines = md.splitlines()
    starts = []
    for i, line in enumerate(lines):
        m = re.match(r"^(#{2,3})\s+(.*\S)\s*$", line)
        if m:
            starts.append((i, len(m.group(1)), m.group(2).strip()))

    if not starts:
        return [Chunk("c0000", "DOC", 1, md.strip() + "\n")]

    chunks: List[Chunk] = []
    current_h2: Optional[str] = None

    for idx, (start_i, level, title) in enumerate(starts):
        end_i = starts[idx + 1][0] if idx + 1 < len(starts) else len(lines)
        body = "\n".join(lines[start_i:end_i]).strip() + "\n"

        if level == 2:
            current_h2 = title
            heading_path = current_h2
        else:
            heading_path = f"{current_h2} / {title}" if current_h2 else title

        chunks.append(Chunk(f"c{idx:04d}", heading_path, level, body))

    return chunks

def make_doc_pack(selected: List[Chunk]) -> str:
    out = []
    for ch in selected:
        out.append(f"### DOC CHUNK {ch.chunk_id}: {ch.heading_path}\n{ch.text}\n")
    return "\n".join(out).strip() + "\n"

In [13]:
# ----------------------------
# Docs loader: key -> .md file(s)
# ----------------------------
def load_docs_for_keys_with_aliases(
    root: Path,
    keys: Iterable[str],
    *,
    encoding: str = "utf-8",
    aliases: Optional[Dict[str, str]] = None,
) -> List[Tuple[Path, str]]:
    """
    keys: things you care about (API names or concepts).
    aliases: maps key -> filename_stem (e.g., 'mapPixels' -> 'mappixels').
    Returns unique docs in first-seen order.
    """
    if not root.exists():
        raise FileNotFoundError(f"DOCS_ROOT does not exist: {root}")

    aliases = aliases or {}

    def norm(s: str) -> str:
        return s.strip().lower()

    # index: stem -> path
    md_index: Dict[str, Path] = {}
    for p in root.rglob("*.md"):
        md_index[norm(p.stem)] = p

    resolved_paths: List[Path] = []
    seen = set()

    for k in keys:
        stem = norm(aliases.get(k, k))  # apply alias if any

        # direct stem match
        path = md_index.get(stem)

        # fallback: normalize to alnum only (handles camelCase-ish inputs)
        if path is None:
            stem2 = "".join(ch for ch in stem if ch.isalnum())
            path = md_index.get(stem2)

        if path is None:
            available = sorted(md_index.keys())
            raise FileNotFoundError(
                f"No .md doc for key '{k}' (resolved stem '{stem}') under {root}\n"
                f"Available docs: {available}"
            )

        if path not in seen:
            resolved_paths.append(path)
            seen.add(path)

    return [(p, p.read_text(encoding=encoding)) for p in resolved_paths]

In [14]:
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
MODEL = os.environ.get("OLLAMA_MODEL", "codellama:latest")


In [16]:
SYSTEM_PROMPT = """
You are a geospatial data engineer and Spark systems expert.

Task: Convert a given geospatial Python script into Scala code that runs on RDPro (Spark-based raster processing) on Apache Spark.

You must understand Spark execution and produce distributed, RDD-based Scala.

AUTHORITATIVE EXAMPLES RULE:
- For each operation, Scala/Python examples may exist under examples/.
- If an example file is missing or empty, ignore it.
- Use only APIs/signatures shown in DOC CHUNKS OR non-empty examples.
- Never assume Scala and Python examples are symmetric.

Environment & paths:
- Determine whether output paths should be treated as local or distributed based on Spark configuration and the URI scheme.
- You MAY use standard Spark/Scala APIs for this (SparkConf, SparkContext.hadoopConfiguration, java.net.URI, java.nio.file).
- You MUST NOT invent any RDPro path utilities.

FILESYSTEM & PATH NORMALIZATION (MANDATORY):
- Detect Spark local mode using SparkContext:
  - Treat as local if `sc.master` starts with "local" (case-insensitive).
- Before calling any RDPro IO API (e.g., geoTiff read/write), normalize ALL input/output paths:
  1) If the path already has a URI scheme (file:, hdfs:, s3a:, gs:, http:, etc.), use it as-is.
  2) If the path has NO scheme AND Spark is local AND the path looks like a local filesystem path
     (e.g., starts with "/" on Unix/macOS, or has a Windows drive like "C:\\"), convert it to an
     absolute `file:///...` URI using standard Java APIs (java.net.URI + java.nio.file.Paths).
  3) If Spark is NOT local, do NOT prepend file:///; leave scheme-less paths unchanged so they
     resolve against the cluster filesystem config (fs.defaultFS).
- This rule exists to prevent Hadoop from interpreting local absolute paths as HDFS
  (e.g., hdfs://localhost:9000).
  
Hard rules:
1) Output MUST be valid Scala and compile as an RDPro operation module.
   Required structure:
   - `object <OperationName> { def run(sc: SparkContext): <ReturnType> = ... }`
   - Include all necessary imports
   - Do NOT define `main` and do NOT use `extends App`
   - Do NOT create or stop SparkSession or SparkContext inside `run`
   - Assume SparkContext `sc` is provided by the caller2) Use ONLY RDPro APIs that appear in the provided DOC CHUNKS.
   - If a method signature is not shown in DOC CHUNKS, do NOT guess.
3) Do NOT invent RDPro APIs, overloads, implicits, or helper utilities. No hidden "magic" conversions.
4) Preserve semantics of the Python: raster IO, pixel math, focal ops, masking/nodata, reprojection/resample if present.
5) Distributed correctness:
   - Avoid driver-side operations: do NOT call collect/toLocalIterator unless required by the Python semantics.
   - Prefer RDPro RasterRDD end-to-end when available in DOC CHUNKS.
6) Raster alignment robustness:
   - If not in DOC CHUNKS, fail fast: throw a clear runtime error explaining alignment is required but unsupported with available APIs.
7) Performance guidance (Spark-level only):
   - You MAY set Spark SQL / Spark configs and use standard Spark operations (repartition/coalesce/cache/persist) ONLY when:
     (a) it does not change semantics, and
     (b) it is justified by an obvious pipeline boundary (e.g., before a wide op / expensive reuse).
8) Lambdas:
   - When passing lambdas to RDPro functions (e.g., mapPixels), add explicit parameter and return types so Scala compiles.
9) CLI args:
   - If the Python has input/output paths, read them from args with safe defaults and validation.
   - Do not introduce extra parameters not implied by the Python.

Output format (strict):
- First: Scala file content only (NO markdown fences).
- After the Scala: a "NOTES" section listing:
  (a) RDPro APIs used (names only)
  (b) Unsupported operations and why (especially if missing alignment/warp APIs)
  (c) Assumptions about IO paths / bands / nodata / CRS / environment detection logic
""".strip()

In [8]:
def build_user_prompt(doc_pack: str, py_code: str) -> str:
    return f"""
RDPro documentation (relevant DOC CHUNKS only):
{doc_pack}

Python script:
{py_code}

Task:
Translate the Python script into Scala targeting RDPro on Spark.
Use ONLY APIs described in the DOC CHUNKS.
""".strip()

In [9]:
# ----------------------------
# Manual "keys" (what you used before)
# These are NOT necessarily filenames; aliases map them to doc files.
# ----------------------------
NDVI_KEYS = [
    "datamodel",
    "setup",
    "dataloading",
    "rastermetadata",
    "overlay",
    "mapPixels",
    "saveAsGeoTiff",
    "GeoTiffWriter",
    "Compression",
]

ALIASES = {
    # docs tree uses lowercase stems like mappixels.md
    "mapPixels": "mappixels",

    # concepts that live inside these docs (not separate files)
    "saveAsGeoTiff": "rasterwriting",
    "GeoTiffWriter": "rasterwriting",
    "Compression": "rasterwriting",

    # "geoTiff" read API is typically documented in dataloading
    "geoTiff": "dataloading",
}


In [10]:
# ----------------------------
# Load docs -> chunk -> pack -> final prompt
# ----------------------------
doc_files = load_docs_for_keys_with_aliases(DOCS_ROOT, NDVI_KEYS, aliases=ALIASES)

print("Loaded docs:")
for p, _ in doc_files:
    print(" -", p)

all_chunks: List[Chunk] = []
for path, text in doc_files:
    chunks = chunk_markdown_by_headings(text)
    all_chunks.extend(chunks)

doc_pack = make_doc_pack(all_chunks)
user_prompt = build_user_prompt(doc_pack, py_code)


Loaded docs:
 - ../doc/docs/common/datamodel.md
 - ../doc/docs/common/setup.md
 - ../doc/docs/data/dataloading.md
 - ../doc/docs/common/rastermetadata.md
 - ../doc/docs/process/overlay.md
 - ../doc/docs/process/mappixels.md
 - ../doc/docs/data/rasterwriting.md


In [11]:
# ----------------------------
# Save prompt + doc selection (optional)
# ----------------------------
OUT_DIR = Path("./runs/workspace")
OUT_DIR.mkdir(parents=True, exist_ok=True)

(OUT_DIR / "prompt_manual_multi_codellama.txt").write_text(user_prompt, encoding="utf-8")
(OUT_DIR / "doc_selection_multi_codellama.json").write_text(
    json.dumps([str(p) for p, _ in doc_files], indent=2),
    encoding="utf-8",
)

print("\nPrompt chars:", len(user_prompt))
print("Saved:", OUT_DIR / "prompt_manual_multi_codellama.txt")



Prompt chars: 12092
Saved: runs/workspace/prompt_manual_multi_codellama.txt


In [17]:
# ----------------------------
# Ollama sanity check
# ----------------------------
def ollama_healthcheck() -> None:
    import requests
    try:
        r = requests.get(f"{OLLAMA_URL}/api/tags", timeout=10)
        r.raise_for_status()
        models = [m.get("name") for m in r.json().get("models", [])]
        print("\nOllama OK @", OLLAMA_URL)
        print("Available models:", models[:10], "..." if len(models) > 10 else "")
        print("Selected MODEL:", MODEL)
        if models and MODEL not in models:
            print("WARNING: Selected MODEL not in /api/tags list. If calls fail, set OLLAMA_MODEL to one of the listed names.")
    except Exception as e:
        print("\nOllama NOT reachable at", OLLAMA_URL)
        print("Error:", repr(e))
        raise

ollama_healthcheck()


Ollama OK @ http://localhost:11434
Available models: ['codellama:latest', 'llama3:latest', 'tinyllama:latest'] 
Selected MODEL: codellama:latest


In [18]:
# ----------------------------
# Run LLaMA locally via Ollama
# ----------------------------
def run_llm(prompt: str) -> Tuple[str, float]:
    import requests
    t0 = time.time()
    payload = {
        "model": MODEL,
        "messages": [
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": prompt},
        ],
        "stream": False,
        # Optional tuning knobs (uncomment if desired):
        # "options": {"temperature": 0.2, "num_ctx": 8192},
    }
    r = requests.post(f"{OLLAMA_URL}/api/chat", json=payload, timeout=600)
    r.raise_for_status()
    data = r.json()
    text = data["message"]["content"].strip()
    return text, time.time() - t0

scala_out, dt = run_llm(user_prompt)
(OUT_DIR / "ndvi_doc_multi_codellama.scala").write_text(scala_out, encoding="utf-8")

print("\nWrote:", OUT_DIR / "ndvi_doc_multi_codellama.scala")
print("LLM latency:", round(dt, 2), "s")
print("\n--- Preview ---\n", scala_out[:900])


Wrote: runs/workspace/ndvi_doc_multi_codellama.scala
LLM latency: 594.18 s

--- Preview ---
 [PYTHON]
import os
from pathlib import Path

import numpy as np
from PIL import Image

B4_PATH = "/path/to/landsat8/LC08_L2SP_040037_20250827_20250903_02_T1_SR_B4.TIF"  # Red
B5_PATH = "/path/to/landsat8/LC08_L2SP_040037_20250827_20250903_02_T1_SR_B5.TIF"  # NIR
OUT_NDVI = "/path/to/ndvi.tif"

# Open datasets
ds_red = gdal.Open(B4_PATH, gdal.GA_ReadOnly)
ds_nir = gdal.Open(B5_PATH, gdal.GA_ReadOnly)

assert ds_red and ds_nir, "Failed to open input files"

# Read arrays
red = np.array(Image.open(str(Path(B4_PATH).resolve().absolute())))
nir = np.array(Image.open(str(Path(B5_PATH).resolve().absolute())))

# Check grid alignment
if (
    ds_red.GetGeoTransform() != ds_nir.GetGeoTransform()
    or ds_red.GetProjection() != ds_nir.GetProjection()
):
    raise RuntimeError("B4 and B5 grids do not match â€” warp one band first")

# Handle NoData
red_nodata = ds_red.GetRasterBand(1).GetNoDataValue
