In [1]:
import os
import io
import uuid
import logging
from dotenv import load_dotenv
from datetime import datetime
import re

import gradio as gr
from PIL import Image

# --- external model lib ---
import google.generativeai as genai

# --- setup logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pawsence")

# --- load config ---
load_dotenv()
API_KEY = os.getenv("GOOGLE_API_KEY")
if not API_KEY:
    raise RuntimeError("Set GOOGLE_API_KEY in env or directly in notebook")

genai.configure(api_key=API_KEY)

MODEL_NAME = "gemini-1.5-flash"
model = genai.GenerativeModel(MODEL_NAME)

# --- in-memory session store (for demo only) ---
SESSIONS = {}

DISCLAIMER = (
    "‚ö† Disclaimer: I am an AI assistant for preliminary animal health guidance. "
    "This is NOT a substitute for a veterinarian. Always consult a vet for diagnosis or treatment."
)

# --- helpers ---
def safe_text_from_response(response):
    # defensive: different SDK shapes
    if response is None:
        return ""
    if hasattr(response, "text"):
        return response.text or ""
    if getattr(response, "candidates", None):
        try:
            return response.candidates[0].content
        except Exception:
            return str(response)
    return str(response)

def image_to_jpeg_bytes(pil_img: Image.Image, max_size=(1600,1600)):
    # validate & resize to reasonable size, return jpeg bytes
    if pil_img.mode not in ("RGB", "RGBA"):
        pil_img = pil_img.convert("RGB")
    pil_img.thumbnail(max_size, Image.LANCZOS)
    buf = io.BytesIO()
    pil_img.save(buf, format="JPEG", quality=85)
    buf.seek(0)
    return buf.read()

# improved urgent detection: whole-word patterns (case-insensitive)
URGENT_PATTERNS = [
    r"\bnot breathing\b",
    r"\bno breath\b",
    r"\bseizure\b",
    r"\bconvulsion\b",
    r"\bcollapse(ed|ing)?\b",
    r"\bbleeding (heavily|a lot|profusely)\b",
    r"\bcannot stand\b",
    r"\bwon't stand\b",
    r"\bsudden death\b",
]

def is_urgent(text: str) -> bool:
    text = (text or "").lower()
    return any(re.search(p, text) for p in URGENT_PATTERNS)

# API call with defensive handling
def call_gemini(conversation_messages, image_bytes=None):
    # Build inputs for Gemini: structured list, last user prompt is the focus
    inputs = []
    # convert structured history to single-string items (Gemini flexible)
    for m in conversation_messages:
        role = m.get("role", "user")
        content = m.get("content", "")
        inputs.append(f"[{role.upper()}] {content}")

    if image_bytes:
        inputs.append({"mime_type": "image/jpeg", "data": image_bytes})

    try:
        response = model.generate_content(
            inputs,
            generation_config={
                "temperature": 0.2,
                "top_p": 0.95,
                "max_output_tokens": 512,
            }
        )
        return safe_text_from_response(response)
    except Exception as e:
        logger.exception("Gemini call failed")
        return f"‚ö† Error: failed to call model: {e}"

# triage + memory
def triage_and_generate(session_id, user_text, pil_img, species):
    session = SESSIONS.setdefault(session_id, {"history": []})
    timestamp = datetime.utcnow().isoformat()
    user_entry = {"role": "user", "content": f"Species: {species}. {user_text}", "time": timestamp}
    session["history"].append(user_entry)

    urgent_flag = is_urgent(user_text)

    system_prompt = (
        "You are an assistant that provides safe, evidence-based veterinary triage. "
        "Keep replies concise. If key facts are missing (species, age, vaccination, trauma), ask one follow-up question. "
        "If the condition appears medically urgent, respond with 'ESCALATE' and recommend immediate veterinary care."
    )

    # Build conversation: system prompt + disclaimer + recent history (limit to last N entries)
    recent = session["history"][-12:]  # keep last 12 turns to limit size
    conv = [{"role": "system", "content": system_prompt},
            {"role": "system", "content": DISCLAIMER}]
    conv.extend(recent)

    img_bytes = None
    if pil_img:
        try:
            img_bytes = image_to_jpeg_bytes(pil_img)
            conv.append({"role": "user", "content": "Image attached: please describe visible signs and urgency."})
        except Exception as e:
            logger.exception("Image processing failed")
            session["history"].append({"role": "assistant", "content": f"‚ö† Error processing image: {e}"})
            return format_chat(session["history"]), ""

    reply_text = call_gemini(conv, image_bytes=img_bytes)
    if urgent_flag:
        reply_text = "‚ö† ESCALATE: This appears urgent. " + reply_text

    session["history"].append({"role": "assistant", "content": reply_text, "time": datetime.utcnow().isoformat()})
    return format_chat(session["history"]), reply_text

def format_chat(history):
    # Convert stored history into list of (user, assistant) pairs for gr.Chatbot
    # We'll iterate and collect messages in order; whenever 'user' add a tuple; when 'assistant' follow.
    chat_display = []
    for entry in history:
        role = entry.get("role", "user")
        content = entry.get("content", "")
        if role == "user":
            chat_display.append(("You", content))
        elif role == "assistant":
            chat_display.append(("PawSence", content))
        else:
            # system or others ‚Äî show as assistant note
            chat_display.append(("System", content))
    return chat_display

# --- Gradio UI ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
    with gr.Row():
        gr.Markdown(
            """
            <div style="text-align: center;">
                <h1>üêæ PawSence</h1>
                <h3 style="color: #555;">Your AI Vet Companion ‚Äî Caring for Pets, Powered by AI</h3>
            </div>
            """
        )

    with gr.Row():
        with gr.Column(scale=2):
            chat = gr.Chatbot(label="üí¨ Conversation", height=420)
            with gr.Row():
                txt = gr.Textbox(placeholder="‚úç Describe symptoms or ask a question...", lines=2)
                send = gr.Button("üöÄ Send", variant="primary")
        with gr.Column(scale=1):
            gr.Markdown("### ‚öô Case Details")
            species = gr.Dropdown(choices=["Dog","Cat","Horse","Cow","Goat","Sheep","Pig","Other"],
                                  label="Species", value="Dog")
            # return PIL Image to process safely
            img = gr.Image(type="pil", label="üì∑ Upload Image (optional)", height=200)
            # session id: default new uuid per user; for demo we expose a button to new session
            session_id_state = gr.State(str(uuid.uuid4()))
            clear_btn = gr.Button("üÜï Start New Session", variant="secondary")

    # events
    def user_submit(user_text, pil_img, sess_state, species_val):
        # create new session id if state is empty
        if not sess_state:
            sess_state = str(uuid.uuid4())

        messages, raw_reply = triage_and_generate(sess_state, user_text, pil_img, species_val)
        # Clear input textbox and image after send (return values for outputs)
        return messages, "", None, sess_state

    def reset_session():
        new_id = str(uuid.uuid4())
        SESSIONS[new_id] = {"history": []}
        return [], "", None, new_id

    send.click(user_submit, inputs=[txt, img, session_id_state, species],
               outputs=[chat, txt, img, session_id_state])
    clear_btn.click(reset_session, outputs=[chat, txt, img, session_id_state])

# For notebooks or simple local run:
if __name__ == "__main__":
    demo.launch(share=False, inline=True)

  chat = gr.Chatbot(label="üí¨ Conversation", height=420)
INFO:httpx:HTTP Request: GET http://127.0.0.1:7860/gradio_api/startup-events "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: HEAD http://127.0.0.1:7860/ "HTTP/1.1 200 OK"


* Running on local URL:  http://127.0.0.1:7860
* To create a public link, set `share=True` in `launch()`.
