In [1]:
import re, time, json
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple

In [2]:
DOC_PATH = Path("/Users/clockorangezoe/Documents/phd_projects/code/geoAI/RDProLLMagent/doc/RDPro.md")
PY_PATH  = Path("/Users/clockorangezoe/Documents/phd_projects/code/geoAI/RDProLLMagent/python/ndvi.py")
assert DOC_PATH.exists(), f"Missing: {DOC_PATH}"
assert PY_PATH.exists(), f"Missing: {PY_PATH}"

rdpro_text = DOC_PATH.read_text(encoding="utf-8")
py_code = PY_PATH.read_text(encoding="utf-8")

In [3]:
import re
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class Chunk:
    chunk_id: str
    heading_path: str
    level: int
    text: str

def chunk_markdown_by_headings(md: str) -> List[Chunk]:
    lines = md.splitlines()
    starts = []
    for i, line in enumerate(lines):
        m = re.match(r"^(#{2,3})\s+(.*\S)\s*$", line)
        if m:
            starts.append((i, len(m.group(1)), m.group(2).strip()))

    if not starts:
        return [Chunk("c0000", "DOC", 1, md.strip() + "\n")]

    chunks: List[Chunk] = []
    current_h2: Optional[str] = None

    for idx, (start_i, level, title) in enumerate(starts):
        end_i = starts[idx + 1][0] if idx + 1 < len(starts) else len(lines)
        body = "\n".join(lines[start_i:end_i]).strip() + "\n"

        if level == 2:
            current_h2 = title
            heading_path = current_h2
        else:
            heading_path = f"{current_h2} / {title}" if current_h2 else title

        chunks.append(Chunk(f"c{idx:04d}", heading_path, level, body))

    return chunks

def select_chunks_manual(chunks: List[Chunk], include_keywords: List[str]) -> List[Chunk]:
    inc = [k.lower() for k in include_keywords]
    selected: List[Chunk] = []
    for ch in chunks:
        hay = (ch.heading_path + "\n" + ch.text).lower()
        if any(k in hay for k in inc):
            selected.append(ch)

    # de-dupe (preserve order)
    seen = set()
    uniq: List[Chunk] = []
    for ch in selected:
        if ch.chunk_id not in seen:
            uniq.append(ch)
            seen.add(ch.chunk_id)
    return uniq

def select_chunks_manual_api_keys(chunks: List[Chunk], api_keys: List[str]) -> List[Chunk]:
    """
    Strict manual selection: only select chunks that contain at least one explicit API key.
    This prevents pulling unrelated chunks like Flatten/Explode/Reshape unless you include them.
    """
    keys = [k.lower() for k in api_keys]

    selected: List[Chunk] = []
    for ch in chunks:
        hay = (ch.heading_path + "\n" + ch.text).lower()
        if any(key in hay for key in keys):
            selected.append(ch)

    # de-dupe (preserve order)
    seen = set()
    uniq: List[Chunk] = []
    for ch in selected:
        if ch.chunk_id not in seen:
            uniq.append(ch)
            seen.add(ch.chunk_id)
    return uniq

def make_doc_pack(selected: List[Chunk]) -> str:
    out = []
    for ch in selected:
        out.append(f"### DOC CHUNK {ch.chunk_id}: {ch.heading_path}\n{ch.text}\n")
    return "\n".join(out).strip() + "\n"


In [4]:
MODEL = "gpt-5"

SYSTEM_PROMPT = """
You are a geospatial data engineer and Spark systems expert.

Convert a given geospatial Python script into Scala code that runs on RDPro (Spark-based raster processing).

Hard rules:
1) Output MUST be valid Scala.
2) Use ONLY APIs that appear in the provided DOC CHUNKS.
3) Do NOT invent APIs or overloads. If a method signature is not shown in DOC CHUNKS, do not guess.
4) Preserve semantics (raster IO, pixel math, focal ops, projection/rescale if present).
5) Assume large-scale distributed Spark execution.
6) For lambdas passed to raster functions (e.g., mapPixels), add explicit parameter types when needed to compile.

Output format:
- First: Scala file content only (no markdown fences).
- After the Scala: a short NOTES section listing:
  (a) RDPro APIs used (names only)
  (b) Unsupported operations and why
  (c) Any assumptions about IO paths / bands / nodata
""".strip()

In [5]:
def build_user_prompt(doc_pack: str, py_code: str) -> str:
    return f"""
RDPro documentation (relevant DOC CHUNKS only):
{doc_pack}

Python script:
{py_code}

Task:
Translate the Python script into Scala targeting RDPro on Spark.
Use ONLY APIs described in the DOC CHUNKS.
""".strip()


In [6]:
# Cell 7 â€” LLM call stub (you plug in your model call here)
def call_llm(prompt: str) -> str:
    """
    Replace this with your real LLM call.
    It should return Scala code as a string.
    """
    raise NotImplementedError("Plug in your LLM API call here.")

# Example usage:
# scala_code_manual = call_llm(prompt_manual)
# scala_code_auto   = call_llm(prompt_auto)


In [21]:
# ---- MANUAL KEYWORDS (edit this list) ----
NDVI_KEYS = [
    "setup",
    "geoTiff",
    "rastermetadata",
    "overlay",          # stack red + nir
    "mapPixels",        # compute NDVI
    "saveAsGeoTiff",    # write output
    "GeoTiffWriter",    # compression + write options
    "Compression",      # (optional) forces pulling compression option lines
]


In [22]:
chunks = chunk_markdown_by_headings(rdpro_text)
manual_selected = select_chunks_manual_api_keys(chunks, NDVI_KEYS)
doc_pack = make_doc_pack(manual_selected)
user_prompt = build_user_prompt(doc_pack, py_code)

In [23]:
OUT_DIR = Path("./runs/manual_oracle")
OUT_DIR.mkdir(parents=True, exist_ok=True)

(OUT_DIR / "prompt_manual.txt").write_text(user_prompt, encoding="utf-8")
(OUT_DIR / "doc_selection.json").write_text(
    json.dumps([{"id": c.chunk_id, "heading": c.heading_path} for c in manual_selected], indent=2),
    encoding="utf-8"
)

print("Manual chunks:", len(manual_selected))
print("Prompt chars:", len(user_prompt))
print("Saved:", OUT_DIR / "prompt_manual.txt")


Manual chunks: 14
Prompt chars: 18941
Saved: runs/manual_oracle/prompt_manual.txt


In [13]:
OUT_DIR = Path("./runs/manual_oracle")
OUT_DIR.mkdir(parents=True, exist_ok=True)

(OUT_DIR / "prompt_manual.txt").write_text(user_prompt, encoding="utf-8")
(OUT_DIR / "doc_selection.json").write_text(
    json.dumps([{"id": c.chunk_id, "heading": c.heading_path} for c in manual_selected], indent=2),
    encoding="utf-8"
)

print("Manual chunks:", len(manual_selected))
print("Prompt chars:", len(user_prompt))
print("Saved:", OUT_DIR / "prompt_manual.txt")


Manual chunks: 11
Prompt chars: 15804
Saved: runs/manual_oracle/prompt_manual.txt


In [19]:
import os

if "OPENAI_API_KEY" in os.environ:
    print("OPENAI_API_KEY is set")
else:
    print("OPENAI_API_KEY is NOT set")

OPENAI_API_KEY is set


In [20]:
from openai import OpenAI
client = OpenAI()

def run_llm(prompt: str) -> Tuple[str, float]:
    t0 = time.time()
    resp = client.responses.create(
        model=MODEL,
        input=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": prompt},
        ],
    )
    return resp.output_text.strip(), time.time() - t0

scala_out, dt = run_llm(user_prompt)
(OUT_DIR / "Job.manual.scala").write_text(scala_out, encoding="utf-8")

print("Wrote:", OUT_DIR / "Job.manual.scala")
print("LLM latency:", round(dt, 2), "s")
print("\n--- Preview ---\n", scala_out[:900])

Wrote: runs/manual_oracle/Job.manual.scala
LLM latency: 145.12 s

--- Preview ---
 import org.apache.spark.{SparkConf, SparkContext}

object ComputeNDVI {
  def main(args: Array[String]): Unit = {
    val B4_PATH: String =
      if (args.length >= 1) args(0)
      else "/content/B4/LC09_L2SP_040036_20250803_20250804_02_T1_SR_B4.TIF"
    val B5_PATH: String =
      if (args.length >= 2) args(1)
      else "/content/B5/LC09_L2SP_040036_20250803_20250804_02_T1_SR_B5.TIF"
    val OUT_NDVI: String =
      if (args.length >= 3) args(2)
      else "/content/ndvi.tif"

    val conf = new SparkConf().setAppName("ComputeNDVI")
    val sc = new SparkContext(conf)

    // Load single-band rasters as Float
    val red: RasterRDD[Float] = sc.geoTiff[Float](B4_PATH)
    val nir: RasterRDD[Float] = sc.geoTiff[Float](B5_PATH)

    // Check grid alignment by comparing RasterMetadata
    val redMD: RasterMetadata = red.flatten.first._3
    val nirMD: RasterMetadata = nir.flatten.first._3
