<a href="https://colab.research.google.com/github/LegendSeyi/AI-tutorial-practice/blob/main/Ecclesiastes_HA_middleware.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# middleware/app.py
import os
import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx

# Config from env
HA_URL = os.getenv("HA_URL", "http://homeassistant.local:8123")
HA_TOKEN = os.getenv("HA_TOKEN", "YOUR_LONG_LIVED_ACCESS_TOKEN")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
TTS_SERVICE_URL = os.getenv("TTS_SERVICE_URL", "http://tts:5002/synthesize")  # example

app = FastAPI(title="Ecclesiastes Middleware")

# Simple models
class TextQuery(BaseModel):
    device_id: str
    text: str

async def ha_get_state(entity_id: str):
    url = f"{HA_URL}/api/states/{entity_id}"
    headers = {"Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json"}
    async with httpx.AsyncClient() as client:
        r = await client.get(url, headers=headers, timeout=10.0)
        if r.status_code == 200:
            return r.json()
        else:
            raise HTTPException(status_code=500, detail=f"HA state error: {r.text}")

async def ha_call_service(domain: str, service: str, data: dict):
    url = f"{HA_URL}/api/services/{domain}/{service}"
    headers = {"Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json"}
    async with httpx.AsyncClient() as client:
        r = await client.post(url, json=data, headers=headers, timeout=10.0)
        return r

async def gemini_query(prompt: str) -> str:
    # Placeholder: implement actual Gemini API call / Google Cloud client
    # For now return an echo
    if not GEMINI_API_KEY:
        raise HTTPException(status_code=503, detail="Gemini API key not configured")
    # Implement real call here...
    return f"(Gemini simulated) {prompt}"

async def tts_generate_and_notify(device_id: str, text: str):
    # Call a local TTS service to produce an audio file, then publish or notify the mic node
    payload = {"device_id": device_id, "text": text}
    async with httpx.AsyncClient() as client:
        r = await client.post(TTS_SERVICE_URL, json=payload, timeout=30.0)
        if r.status_code != 200:
            raise HTTPException(status_code=500, detail="TTS failed")
        return r.json()

# Simple intent keywording
HOME_COMMAND_KEYWORDS = ["turn on", "turn off", "switch on", "switch off", "lock", "unlock"]
STATUS_KEYWORDS = ["is the", "are the", "how many", "what is", "what's the", "are there"]

def is_home_command(text: str) -> bool:
    txt = text.lower()
    return any(k in txt for k in HOME_COMMAND_KEYWORDS)

def is_status_query(text: str) -> bool:
    txt = text.lower()
    return any(k in txt for k in STATUS_KEYWORDS)

@app.post("/query")
async def handle_query(q: TextQuery):
    text = q.text.strip()
    device_id = q.device_id
    # Offline check for Gemini availability (simple)
    gemini_available = bool(GEMINI_API_KEY)

    # 1) If it's a home command -> translate to HA service call
    if is_home_command(text):
        # Very simple parse example
        # Example: "Turn on living room light"
        # naive extraction: find "turn on/off" and last word = entity name
        # In production: use robust NLU
        txt = text.lower()
        if "turn on" in txt or "switch on" in txt:
            service = "turn_on"
        elif "turn off" in txt or "switch off" in txt:
            service = "turn_off"
        else:
            service = "turn_on"

        # naive entity mapping (you should create mapping table)
        # Example mapping dict
        mapping = {
            "living room light": "light.living_room",
            "bedroom light": "light.bedroom"
        }
        # find entity by matching mapping keys
        target = None
        for k, v in mapping.items():
            if k in txt:
                target = v
                break
        if not target:
            # fallback: ask Gemini or reply
            reply_text = "Which device do you want to control?"
            await tts_generate_and_notify(device_id, reply_text)
            return {"status": "need_device", "text": reply_text}

        # call HA
        r = await ha_call_service("light", service, {"entity_id": target})
        if r.status_code in (200,201):
            reply_text = f"Done. {target} {service.replace('_',' ')}."
            await tts_generate_and_notify(device_id, reply_text)
            return {"status": "ok", "text": reply_text}
        else:
            raise HTTPException(status_code=500, detail=f"HA service failed: {r.text}")

    # 2) If it's a status/home-state query
    if is_status_query(text):
        # naive: check for 'how many lights are on' -> count lights
        if "how many lights" in text.lower() or "how many lights are on" in text.lower():
            # get all light entities - in prod get via HA config or discovery
            # Example: list statically
            entities = ["light.living_room", "light.bedroom", "light.kitchen"]
            on_count = 0
            for e in entities:
                s = await ha_get_state(e)
                if s.get("state") == "on":
                    on_count += 1
            reply = f"{on_count} lights are currently on."
            await tts_generate_and_notify(device_id, reply)
            return {"status": "ok", "text": reply}
        # other status patterns can be added similarly

    # 3) Else -> general query -> send to Gemini
    if gemini_available:
        gemini_answer = await gemini_query(text)
        await tts_generate_and_notify(device_id, gemini_answer)
        return {"status": "gemini", "text": gemini_answer}
    else:
        reply_text = "Sorry, I cannot access Gemini right now. I can only control local devices."
        await tts_generate_and_notify(device_id, reply_text)
        return {"status": "offline", "text": reply_text}

@app.get("/health")
def health():
    return {"status": "ok"}
