In [None]:
# %% [1] Imports, env, backend
import os, io, json, time, platform, threading, traceback
from typing import List, Dict, Any, Optional

from dotenv import load_dotenv
from PIL import Image, ImageDraw, ImageOps
import ipywidgets as W
from IPython.display import display, HTML

# Load your Moondream Cloud key (api.env must contain MOONDREAM_API_KEY=...)
load_dotenv("api.env")

import moondream as md

class VisionBackend:
    def caption(self, image: Image.Image, length: str = "normal") -> Dict[str, Any]: ...
    def query(self, image: Image.Image, question: Optional[str], reasoning: int = 0) -> Dict[str, Any]: ...
    def detect(self, image: Image.Image, label: str) -> Dict[str, Any]: ...
    def point(self, image: Image.Image, label: str) -> Dict[str, Any]: ...

class MoondreamBackend(VisionBackend):
    def __init__(self):
        api_key = os.getenv("MOONDREAM_API_KEY", "").strip()
        endpoint = os.getenv("MOONDREAM_ENDPOINT", "http://localhost:2020/v1").strip()
        if api_key:
            self.model = md.vl(api_key=api_key); self.source = "moondream-cloud"
        else:
            self.model = md.vl(endpoint=endpoint); self.source = "moondream-local"

    def caption(self, image, length="normal"):
        t0 = time.time()
        base = self.model.caption(image)["caption"]
        text = base
        if length in ("normal","long"):
            target = "two to three" if length == "normal" else "four to six"
            try:
                text = self.model.query(image, f"Rewrite the caption into {target} sentences. Caption: {base}")["answer"]
            except Exception:
                text = base
        return {"mode":"caption","text":text,"meta":{"ms":int((time.time()-t0)*1000),"source":self.source,"length":length}}

    def query(self, image, question, reasoning=0):
        t0 = time.time()
        q = (question or "").strip()
        if not q:
            return {"mode":"query","text":"", "suggestions":[
                "What objects are most prominent?",
                "What is the likely setting or environment?",
                "Are any brand names or labels visible?",
            ], "meta":{"ms":0,"source":self.source}}
        if reasoning >= 1:
            q += " Think carefully and justify briefly."
        ans = self.model.query(image, q)["answer"]
        return {"mode":"query","text":ans,"meta":{"ms":int((time.time()-t0)*1000),"source":self.source,"reasoning":reasoning}}

    def detect(self, image, label):
        t0 = time.time()
        objs = self.model.detect(image, label)["objects"]
        dets = [{"label":label,"box":[o["x_min"],o["y_min"],o["x_max"],o["y_max"]]} for o in objs]
        return {"mode":"detect","detections":dets,"meta":{"ms":int((time.time()-t0)*1000),"source":self.source}}

    def point(self, image, label):
        t0 = time.time()
        pts = self.model.point(image, label)["points"]
        ptsn = [{"label":label,"xy":[p["x"],p["y"]]} for p in pts]
        return {"mode":"point","points":ptsn,"meta":{"ms":int((time.time()-t0)*1000),"source":self.source}}

BACKEND: VisionBackend = MoondreamBackend()


In [None]:
# %% [2] Styles + helpers
HTML("""
<style>
.card { background:#fff; border-radius:12px; box-shadow:0 10px 24px rgba(2,6,23,.06); padding:12px; }
.small { color:#6B7280; font-size:12px; }
img.widget-image { border:1px solid #e5e7eb; }
</style>
""")

def img_to_png_bytes(img: Image.Image) -> bytes:
    b = io.BytesIO(); img.save(b, format="PNG"); return b.getvalue()

def normbox_to_px(box, w, h):
    return int(box[0]*w), int(box[1]*h), int(box[2]*w), int(box[3]*h)

def normpt_to_px(pt, w, h):
    return int(pt[0]*w), int(pt[1]*h)

def draw_overlay(base: Image.Image, detections=None, points=None) -> Image.Image:
    detections = detections or []; points = points or []
    ov = base.copy()
    d = ImageDraw.Draw(ov); w, h = ov.size
    for det in detections:
        x1,y1,x2,y2 = normbox_to_px(det["box"], w, h)
        d.rectangle([x1,y1,x2,y2], outline="red", width=3)
    for pt in points:
        x,y = normpt_to_px(pt["xy"], w, h); r=5
        d.ellipse([x-r,y-r,x+r,y+r], fill="blue")
    return ov

def maybe_downscale(img: Image.Image, long_edge: int = 2000) -> Image.Image:
    if max(img.size) <= long_edge:
        return img
    im = img.copy()
    im.thumbnail((long_edge, long_edge))
    return im

def pretty_json(obj): 
    return json.dumps(obj, indent=2, ensure_ascii=False)

def diagnostics(last_exc: Exception|None=None) -> str:
    try:
        md_ver = getattr(md, "__version__", "unknown")
    except Exception:
        md_ver = "unknown"
    lines = [
        f"Backend: {getattr(BACKEND,'source','unknown')}",
        f"Python: {platform.python_version()}",
        f"ipywidgets: {W.__version__}",
        f"Pillow: {Image.__version__ if hasattr(Image,'__version__') else 'unknown'}",
        f"Moondream pkg: {md_ver}",
        f"Env KEY set? {'yes' if os.getenv('MOONDREAM_API_KEY') else 'no'}",
    ]
    if last_exc:
        tb = ''.join(traceback.format_exception_only(type(last_exc), last_exc)).strip()
        lines.append(f"Last error: {tb}")
    return "<pre style='font-family:ui-monospace,monospace;font-size:12.5px'>" + "\n".join(lines) + "</pre>"


In [None]:
# %% [3] Widgets + layout
# Uploader (built-in widget)
upload = W.FileUpload(accept="image/*", multiple=False)

# Left card
img_preview = W.Image(format="png", layout=W.Layout(max_width="520px"))
left_card = W.VBox([
    W.HTML("<b>Upload image</b>"),
    upload,
    W.HTML("<div class='small'>Large images can be downsized on the server before inference.</div>"),
    W.HTML("<b>Preview</b>"),
    img_preview
], _dom_classes=["card"], layout=W.Layout(width="34%"))

# Right controls
mode = W.ToggleButtons(options=["Query","Caption","Point","Detect"], value="Query", description="Mode")
q_text = W.Textarea(placeholder="Ask about the image… (leave blank for suggestions)", rows=3, layout=W.Layout(width="100%"))
q_reason = W.Checkbox(value=False, description="Reasoning")
cap_len = W.RadioButtons(options=[("Short","short"),("Normal","normal"),("Long","long")], value="short")
pt_label = W.Text(value="grapes", description="Label")
det_labels = W.Text(value="grapes", description="Labels (comma-separated)")
downscale = W.Checkbox(value=True, description="Downscale before inference")
max_edge  = W.BoundedIntText(value=2000, min=512, max=10000, step=100, description="Max edge (px)")

run_btn = W.Button(description="Run", button_style="primary")
clear_btn = W.Button(description="Clear")
troubleshoot_btn = W.Button(description="Troubleshoot")
status_html = W.HTML()
troubleshoot_out = W.HTML()

controls_box = W.VBox([])
right_controls = W.VBox([
    mode,
    controls_box,
    W.HBox([run_btn, clear_btn, troubleshoot_btn]),
    status_html
], _dom_classes=["card"], layout=W.Layout(width="66%"))

# Result area
overlay_preview = W.Image(format="png", layout=W.Layout(max_width="100%"))
text_out = W.HTML("<div class='small'>Result will appear here.</div>")
json_out = W.HTML()
result_card = W.VBox([
    W.HTML("<b>Result</b>"),
    overlay_preview,
    text_out,
    json_out
], _dom_classes=["card"])

# Top-level UI
ui = W.VBox([
    W.HTML("<h2>ImageTagger (Moondream)</h2>"),
    W.HBox([left_card, right_controls]),
    result_card,
    troubleshoot_out
])

def show_controls_for_mode(*_):
    if mode.value == "Query":
        children = [W.HTML("<b>Question</b>"), q_text, q_reason]
    elif mode.value == "Caption":
        children = [W.HTML("<b>Caption length</b>"), cap_len]
    elif mode.value == "Point":
        children = [W.HTML("<b>Object label</b>"), pt_label]
    else:
        children = [W.HTML("<b>Labels</b>"), det_labels,
                    W.HTML("<span class='small'>Comma-separated; leave blank for a generic 'object'.</span>")]
    children += [W.HTML("<b>Preprocessing</b>"), W.HBox([downscale, max_edge])]
    controls_box.children = children

mode.observe(show_controls_for_mode, names="value")
show_controls_for_mode()

display(ui)


In [None]:
# %% [4] Handlers + wiring
def set_error(msg: str):
    status_html.value = f"<div style='background:#ffecec;color:#b00020;border-left:4px solid #b00020;padding:10px;border-radius:6px'>{msg}</div>"

def set_ok(msg: str):
    status_html.value = f"<div style='background:#eefbf1;border-left:4px solid #16a34a;padding:10px;border-radius:6px'>{msg}</div>"

def render_text(html_text: str):
    text_out.value = f"<div style='padding:10px;border-radius:10px;background:#F3F4F6'>{html_text}</div>"

def render_json(obj: Any):
    json_out.value = f"<pre style='font-family:ui-monospace,monospace;font-size:12.5px'>{pretty_json(obj)}</pre>"

def render_overlay(img: Image.Image, dets, pts):
    overlay_preview.value = img_to_png_bytes(draw_overlay(img, dets, pts))

def _extract_upload_item():
    """Handle ipywidgets 8 (dict) and older (tuple/list) formats."""
    if not upload.value:
        return None
    if isinstance(upload.value, dict):
        return next(iter(upload.value.values()))
    # older builds
    return upload.value[0]

def load_current_image() -> Image.Image:
    item = _extract_upload_item()
    if not item:
        raise ValueError("Please upload an image first.")
    raw = item["content"]
    img = Image.open(io.BytesIO(raw)); img.load()
    img = ImageOps.exif_transpose(img.convert("RGB"))
    return img

def _on_upload_change(_):
    try:
        item = _extract_upload_item()
        if not item:
            img_preview.value = b""
            return
        raw = item["content"]
        img = Image.open(io.BytesIO(raw)); img.load()
        img = ImageOps.exif_transpose(img.convert("RGB"))
        img_preview.value = img_to_png_bytes(img)
        set_ok("Upload received.")
    except Exception as e:
        set_error(f"Preview error: {e}")

upload.observe(_on_upload_change, names="value")

_run_lock = threading.Lock()

def on_run(_):
    if not _run_lock.acquire(blocking=False):
        set_error("Already running…")
        return
    try:
        overlay_preview.value = b""
        text_out.value = ""
        json_out.value = ""

        try:
            base_img = load_current_image()
        except Exception as e:
            set_error(f"Input issue: {e}")
            troubleshoot_out.value = diagnostics(e)
            return

        if downscale.value:
            base_img = maybe_downscale(base_img, int(max_edge.value))

        if mode.value == "Query":
            res = BACKEND.query(base_img, q_text.value.strip() or None, reasoning=int(q_reason.value))
            if not res.get("text"):
                render_text("<i>Suggestions</i>: " + " • ".join(res.get("suggestions", [])))
                render_json(res); set_ok("Suggestions ready."); return
            render_text(res["text"]); render_json(res); set_ok("Done.")

        elif mode.value == "Caption":
            res = BACKEND.caption(base_img, length=cap_len.value)
            render_text(res["text"]); render_json(res); set_ok("Done.")

        elif mode.value == "Point":
            res = BACKEND.point(base_img, (pt_label.value or "object").strip())
            render_text(f"Points for <b>{pt_label.value or 'object'}</b>")
            render_overlay(base_img, dets=[], pts=res.get("points", []))
            render_json(res); set_ok("Done.")

        else:  # Detect
            labels = [t.strip() for t in det_labels.value.split(",") if t.strip()] or ["object"]
            all_dets = []
            for lb in labels:
                all_dets.extend(BACKEND.detect(base_img, lb).get("detections", []))
            res = {"mode":"detect","detections":all_dets,"meta":{"source":getattr(BACKEND,'source','unknown')}}
            render_text(f"Detections for: <b>{', '.join(labels)}</b>")
            render_overlay(base_img, dets=all_dets, pts=[])
            render_json(res); set_ok("Done.")
    except Exception as e:
        set_error(f"Runtime error: {e}")
        troubleshoot_out.value = diagnostics(e)
    finally:
        _run_lock.release()

def on_clear(_):
    try:
        # reset uploader for same-filename re-uploads
        if isinstance(upload.value, dict):
            upload.value.clear()
        else:
            upload.value = ()
        if hasattr(upload, "_counter"):
            upload._counter = 0
    except Exception:
        pass
    img_preview.value = b""
    overlay_preview.value = b""
    text_out.value = ""
    json_out.value = ""
    status_html.value = ""

def on_troubleshoot(_):
    troubleshoot_out.value = diagnostics()

run_btn.on_click(on_run)
clear_btn.on_click(on_clear)
troubleshoot_btn.on_click(on_troubleshoot)
