
# SmartCommunity-AI — Kaggle Notebook (ADK + Gemini Integration)
**Multi-agent Intelligent Concierge System** — single-file Kaggle-ready notebook configured to use **`gemini-1.5-flash-latest`** and `GOOGLE_API_KEY` as the credential.

This notebook includes:
- Project pitch & architecture
- ADK/Gemini integration using `google.generativeai` (Gemini) for intent classification and agent reasoning
- Multi-agent orchestrator (Orchestrator, ErrandAgent, MaintenanceAgent, InfoSearchAgent)
- Tool integration examples (Vendor API mock, Google Search placeholder)
- Demo cells to run locally in Kaggle (requires setting `GOOGLE_API_KEY` in Kaggle Secrets)

---
**Important:** Do not commit API keys. Use Kaggle Secrets (Settings → Secrets) to add `GOOGLE_API_KEY`. Replace mock tools with real APIs where commented.


In [None]:

# Install dependencies (uncomment if needed on Kaggle)
# !pip install --quiet google-generativeai fastapi uvicorn python-dotenv requests

print('If you need to install packages, uncomment the pip line above.')



## Configuration & Authentication

Set the Kaggle secret `GOOGLE_API_KEY` (Kaggle → Settings → Secrets). This notebook reads it from `os.environ`.

The notebook uses `google.generativeai` (the Google Generative AI Python client) to call Gemini Flash:
- Model used: **`gemini-1.5-flash-latest`**

You will also see placeholders for Vision model usage (commented) if you later want `gemini-1.5-pro-vision-latest` for image understanding tasks.


In [None]:

import os, sys, asyncio, logging, uuid, json
from collections import defaultdict

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('SmartCommunity-ADK')

# Verify the API key is set
API_KEY = os.environ.get('GOOGLE_API_KEY')
if not API_KEY:
    logger.warning('GOOGLE_API_KEY not found in environment. Set it in Kaggle Secrets before running Gemini calls.')
else:
    logger.info('GOOGLE_API_KEY found (not printed for security).')

# Import the official Google Generative AI client (Gemini). If not installed, instruct user to install.
try:
    import google.generativeai as genai
    genai.configure(api_key=API_KEY)
    logger.info('google.generativeai imported and configured.')
except Exception as e:
    logger.warning('google.generativeai is not available in this environment. Gemini calls will fail if attempted. %s', e)

# NOTE: If Kaggle blocks direct outbound calls to Google endpoints you may need to run this locally or on Cloud Run.



## Notebook Overview (what each section does)

1. Utility classes (MemoryBank, SessionService) — small in-memory stores for demo.  
2. Tool wrappers — VendorAPI (mock), GoogleSearchTool (placeholder), Vision (optional).  
3. Agents — ErrandAgent (sequential), MaintenanceAgent (loop/pause), InfoSearchAgent (parallel).  
4. Orchestrator — routes user input using an intent classifier powered by Gemini Flash.  
5. Demo — run sample interactions.  
6. Guidance — how to replace mocks with real tools and deploy to Cloud Run / Vertex AI Agent Engine.


In [None]:

# -------------------- Utilities --------------------
class MemoryBank:
    def __init__(self):
        self._prefs = defaultdict(dict)
        self._interactions = defaultdict(list)

    def get_preference(self, resident_id: str, key: str):
        return self._prefs[resident_id].get(key)

    def set_preference(self, resident_id: str, key: str, value):
        self._prefs[resident_id][key] = value

    def save_interaction(self, resident_id: str, summary: str):
        self._interactions[resident_id].append(summary)
        self._interactions[resident_id] = self._interactions[resident_id][-50:]

    def get_recent(self, resident_id: str, n: int = 5):
        return self._interactions[resident_id][-n:]


class InMemorySessionService:
    def __init__(self):
        self._sessions = defaultdict(dict)

    def get_session(self, resident_id: str):
        return self._sessions[resident_id]

    def update_session(self, resident_id: str, **kwargs):
        self._sessions[resident_id].update(kwargs)


In [None]:

# -------------------- Tools --------------------
# VendorAPI: mock implementation. Replace with vendor OpenAPI client or real API calls.
import requests

class VendorAPI:
    def __init__(self, base_url=None, api_key=None):
        self.base = base_url or os.environ.get('VENDOR_API_URL', 'https://mock.vendor.api')
        self.api_key = api_key or os.environ.get('VENDOR_API_KEY')

    def list_vendors(self, category: str = 'grocery'):
        # Replace with real HTTP call to vendors registry
        return ['FreshMart', 'DailyNeeds', 'QuickShop']

    def create_order(self, vendor: str, resident_id: str, items: list):
        # Replace with real vendor order API call
        return {'order_id': str(uuid.uuid4()), 'vendor': vendor, 'items': items}

# GoogleSearchTool: placeholder that *could* use Programmable Search Engine (PSE) or SERP API.
class GoogleSearchTool:
    async def search(self, query: str) -> str:
        # For demo, call Gemini to summarize search intent (or return mock snippet)
        # In production, call PSE JSON API or a Search API, then summarize the results with Gemini.
        await asyncio.sleep(0.1)
        return f"(simulated search snippet) Top results for: {query}"

# Vision tool placeholder (if you plan to use gemini vision model later)
class VisionTool:
    async def analyze_image(self, image_url: str) -> str:
        # In production: send image to Vision model (gemini-vision) and return analysis
        await asyncio.sleep(0.1)
        return f"(simulated) analyzed image at {image_url}"


In [None]:

# -------------------- Agents (with Gemini integration) --------------------

# Helper: call Gemini for intent classification or reasoning
def call_gemini_chat(prompt: str, model: str = "gemini-1.5-flash-latest", temperature: float = 0.0, max_output_tokens: int = 512):
    """Call Gemini chat completion. Requires google.generativeai installed and GOOGLE_API_KEY set."""
    try:
        # Use the Chat API to get a structured response. We pass a system message and user prompt.
        response = genai.chat.create(
            model=model,
            messages=[
                {"role": "system", "content": "You are an assistant for routing and decision-making in a residential concierge system."},
                {"role": "user", "content": prompt}
            ],
            temperature=temperature,
            max_output_tokens=max_output_tokens
        )
        # For the google.generativeai client, response may have 'candidates' or 'last'. We'll try common properties.
        if hasattr(response, 'last'):
            return str(response.last)
        if isinstance(response, dict):
            # Best-effort extract
            if 'candidates' in response and len(response['candidates'])>0:
                return response['candidates'][0].get('content', '')
            if 'output' in response:
                return str(response['output'])
        return str(response)
    except Exception as e:
        logger.warning('Gemini call failed: %s', e)
        return f"(gemini_call_failed) {e}"


class ErrandAgent:
    def __init__(self, vendor_api: VendorAPI, search_tool: GoogleSearchTool):
        self.vendor_api = vendor_api
        self.search = search_tool

    async def run(self, resident_id: str, message: str, session: dict, memory: MemoryBank) -> str:
        logger.info('ErrandAgent received: %s', message)
        # 1) Use Gemini to extract structured order info (items, time, constraints)
        prompt = f"Extract structured order details from the user's request: '{message}'. Return JSON with keys: intent, items (list of {{item,qty}}), time, notes." 
        structured = call_gemini_chat(prompt)

        # 2) Find vendors
        vendors = self.vendor_api.list_vendors(category='grocery')

        # 3) Price lookup (parallel)
        async def price_lookup(vendor):
            return await self.search.search(f"{vendor} grocery prices near me")

        tasks = [price_lookup(v) for v in vendors[:3]]
        prices = await asyncio.gather(*tasks)

        # 4) Choose vendor using Gemini as a decision-maker (optional)
        decision_prompt = f"You are a decision agent. Given vendors: {vendors} and price snippets: {prices}, which vendor would you choose and why?" 
        vendor_choice = call_gemini_chat(decision_prompt)

        # Fallback: choose first vendor
        chosen = vendors[0]
        order = self.vendor_api.create_order(vendor=chosen, resident_id=resident_id, items=[{"item":"milk","qty":1}])

        reply = (
            f"Order placed with {chosen}. Order ref: {order['order_id']}\n"
            f"Vendor decision reasoning (Gemini): {vendor_choice}\n"
            f"Price snippets:\n{chr(10).join(prices)}"
        )
        return reply


class MaintenanceAgent:
    def __init__(self, vision_tool: VisionTool = None):
        self.vision = vision_tool

    async def run(self, resident_id: str, message: str, session: dict, memory: MemoryBank) -> str:
        logger.info('MaintenanceAgent start for %s', resident_id)
        if not session.get('awaiting_photo'):
            session['awaiting_photo'] = True
            return 'Please upload a photo of the damage so we can assess and assign a contractor.'

        photo = session.get('damage_photo_url')
        if not photo:
            # In production use webhooks; here we wait briefly
            for _ in range(6):
                await asyncio.sleep(1)
                photo = session.get('damage_photo_url')
                if photo:
                    break

        if not photo:
            return 'No photo received. We can escalate to a guard inspection if you prefer.'

        # Use vision model via Gemini if available (placeholder)
        analysis = None
        if self.vision:
            analysis = await self.vision.analyze_image(photo)
        else:
            # Or use Gemini text model to reason about the photo (pass image URL as text)
            analysis_prompt = f"A resident uploaded a photo at URL: {photo}. Provide a short assessment and categorize severity (low/medium/high)."
            analysis = call_gemini_chat(analysis_prompt)

        ticket_id = f"TKT-{resident_id[:4]}-{str(uuid.uuid4())[:6]}"
        session['last_ticket'] = ticket_id
        preferred = memory.get_preference(resident_id, 'contractor') or 'on-call contractor'
        return f"Ticket {ticket_id} created. Analysis:\n{analysis}\nAssigned to {preferred}."


class InfoSearchAgent:
    def __init__(self, search_tool: GoogleSearchTool):
        self.search = search_tool

    async def run(self, resident_id: str, message: str, session: dict) -> str:
        # Use search tool then optionally summarize with Gemini
        results = await self.search.search(message)
        # Summarize with Gemini
        summary_prompt = f"Summarize these search results concisely for a resident: {results}"
        summary = call_gemini_chat(summary_prompt)
        return summary


In [None]:

# -------------------- Orchestrator --------------------
class Orchestrator:
    def __init__(self):
        self.vendor_api = VendorAPI()
        self.search_tool = GoogleSearchTool()
        self.vision_tool = VisionTool()
        self.errand_agent = ErrandAgent(self.vendor_api, self.search_tool)
        self.maintenance_agent = MaintenanceAgent(self.vision_tool)
        self.info_search_agent = InfoSearchAgent(self.search_tool)
        self.sessions = InMemorySessionService()
        self.memory = MemoryBank()

    def classify_intent(self, message: str) -> str:
        """Use Gemini to classify intent into ERRAND, MAINTENANCE, INFO."""
        prompt = f"Classify the following user message into one of: ERRAND, MAINTENANCE, INFO. Respond with a single token exactly. Message: '''{message}'''"
        cls = call_gemini_chat(prompt, max_output_tokens=20, temperature=0.0)
        cls_token = cls.strip().split()[0] if cls else 'INFO'
        return cls_token.upper()

    async def handle_message(self, resident_id: str, message: str) -> str:
        session = self.sessions.get_session(resident_id)
        intent = self.classify_intent(message)
        logger.info('Intent classified: %s', intent)

        if intent == 'ERRAND':
            result = await self.errand_agent.run(resident_id, message, session=session, memory=self.memory)
        elif intent == 'MAINTENANCE':
            result = await self.maintenance_agent.run(resident_id, message, session=session, memory=self.memory)
        else:
            info_task = asyncio.create_task(self.info_search_agent.run(resident_id, message, session=session))
            quick_reply = "I can help with errands, maintenance, and info. Which would you like?"
            info = await info_task
            result = f"{quick_reply}\nAdditional info:\n{info}"

        # compact context + store
        summary = f"Q:{message[:200]} | A:{str(result)[:300]}"
        self.memory.save_interaction(resident_id, summary)
        self.sessions.update_session(resident_id, last_message=message)
        return result


In [None]:

# -------------------- Demo --------------------
import asyncio

async def demo_flow():
    orch = Orchestrator()
    print('--- Errand example ---')
    r1 = await orch.handle_message('res_001', 'Please buy 2 liters of milk and a loaf of bread tomorrow morning')
    print(r1)

    print('\n--- Maintenance example (initial request) ---')
    r2 = await orch.handle_message('res_001', 'There is a water leak in my bathroom, please check')
    print(r2)

    print('\nSimulating photo upload to session...')
    orch.sessions.update_session('res_001', damage_photo_url='https://example.com/damage.jpg')
    r3 = await orch.handle_message('res_001', 'I have uploaded the photo')
    print(r3)

    print('\n--- Info example ---')
    r4 = await orch.handle_message('res_001', 'What time is the pharmacy open nearby?')
    print(r4)

# Run demo (only if this notebook has access to Gemini; otherwise the calls will fallback to warnings)
try:
    asyncio.get_event_loop().run_until_complete(demo_flow())
except Exception as e:
    print('Demo error:', e)



---
## Next steps & deployment tips

- Add the Kaggle secret `GOOGLE_API_KEY`. Run the demo section to see real Gemini-driven behavior.
- Replace `VendorAPI` with your vendor OpenAPI client and secure API keys as Kaggle secrets.
- For image analysis, integrate Gemini Vision (`gemini-1.5-pro-vision-latest`) and update `VisionTool.analyze_image` to call the Vision endpoint with image bytes/URL.
- To deploy, containerize the notebook code (or modularized repo) and deploy to Cloud Run. Use Service Account key at deployment time or work with Workload Identity.

---
