In [None]:
# Notebook cell: installs (restart kernel if required)
!pip install ultralytics opencv-python-headless==4.7.0.72 matplotlib numpy pillow
# MCP libraries (optional for MCP wrapper)
!pip install fastmcp mcp  # installs simple MCP helpers; if one fails, use the official MCP python-sdk from modelcontextprotocol.io


In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
from PIL import Image
from io import BytesIO
import base64
import smtplib
from email.message import EmailMessage
import logging
import time

logging.basicConfig(level=logging.INFO)

# ---------------- CONFIG ----------------
VIDEO_SOURCE = 0  # webcam

YOLO_MODEL = "yolov8n.pt"  # YOLOv8 small, fast

# Email config
SMTP_HOST = "smtp.gmail.com"
SMTP_PORT = 465
SMTP_USER = "your.email@gmail.com"          # your email
SMTP_PASSWORD = "your_app_password"         # Gmail App Password
ALERT_TO = "recipient@domain.com"

# Alert threshold and cooldown
ALERT_THRESHOLD = 2
ALERT_COOLDOWN_SEC = 60
last_alert_time = 0



In [None]:
def pil_from_bgr(frame_bgr):
    return Image.fromarray(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))

def image_to_base64_pil(pil_img, fmt="JPEG"):
    buf = BytesIO()
    pil_img.save(buf, format=fmt)
    return base64.b64encode(buf.getvalue()).decode("ascii")

def base64_to_pil(b64str):
    return Image.open(BytesIO(base64.b64decode(b64str)))

def send_alert_email(subject, body, image_pil=None):
    msg = EmailMessage()
    msg['Subject'] = subject
    msg['From'] = SMTP_USER
    msg['To'] = ALERT_TO
    msg.set_content(body)
    
    if image_pil is not None:
        buf = BytesIO()
        image_pil.save(buf, format="JPEG")
        msg.add_attachment(buf.getvalue(), maintype="image", subtype="jpeg", filename="snapshot.jpg")
    
    with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT) as smtp:
        smtp.login(SMTP_USER, SMTP_PASSWORD)
        smtp.send_message(msg)
        logging.info("Alert email sent.")


In [None]:
model = YOLO(YOLO_MODEL)

def detect_people_and_annotate(frame_bgr, conf_thresh=0.25):
    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    results = model(frame_rgb, imgsz=640, conf=conf_thresh)[0]
    
    count = 0
    annotated = frame_bgr.copy()
    if hasattr(results, "boxes") and len(results.boxes) > 0:
        for box, cls in zip(results.boxes.xyxy, results.boxes.cls):
            cls_id = int(cls.cpu().numpy()) if hasattr(cls, "cpu") else int(cls)
            if cls_id == 0:  # person
                count += 1
                x1, y1, x2, y2 = map(int, box.cpu().numpy()) if hasattr(box, "cpu") else map(int, box)
                cv2.rectangle(annotated, (x1, y1), (x2, y2), (0,255,0), 2)
                cv2.putText(annotated, f"person", (x1, y1-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)
    
    cv2.putText(annotated, f"People: {count}", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0,0,255), 2)
    return count, annotated



In [None]:
try:
    from fastmcp import FastMCP
except Exception:
    FastMCP = None

def count_from_b64image(image_b64):
    pil_img = base64_to_pil(image_b64)
    frame_bgr = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
    count, annotated = detect_people_and_annotate(frame_bgr)
    annotated_pil = pil_from_bgr(annotated)
    snapshot_b64 = image_to_base64_pil(annotated_pil)
    
    global last_alert_time
    now = time.time()
    if count > ALERT_THRESHOLD and (now - last_alert_time) > ALERT_COOLDOWN_SEC:
        last_alert_time = now
        send_alert_email(f"[ALERT] Room occupancy: {count} people",
                         f"Detected {count} people in the room at {time.asctime()}",
                         image_pil=annotated_pil)
    
    return {"count": count, "snapshot_b64": snapshot_b64}

# Register MCP tool
if FastMCP is not None:
    mcp = FastMCP("room-people-counter")
    
    @mcp.tool()
    def get_people_count(image_base64: str = None, use_camera: bool = False):
        if use_camera:
            cap = cv2.VideoCapture(VIDEO_SOURCE)
            ret, frame = cap.read()
            cap.release()
            if not ret:
                raise ValueError("Could not capture frame from camera")
            pil_img = pil_from_bgr(frame)
            image_base64 = image_to_base64_pil(pil_img)
        
        if image_base64 is None:
            raise ValueError("Either image_base64 or use_camera=True required")
        
        return count_from_b64image(image_base64)
    
    print("MCP server ready. Run serve_stdio() or serve_http() to start.")
else:
    print("FastMCP not installed â€” MCP tool not registered.")



In [None]:
if FastMCP is not None:
    # Option 1: Local LLM testing
    # mcp.serve_stdio()

    # Option 2: HTTP access (recommended)
    mcp.serve_http(host="0.0.0.0", port=8080)


In [None]:
import requests

url = "http://127.0.0.1:8080/get_people_count"
payload = {"use_camera": True}

resp = requests.post(url, json=payload)
data = resp.json()
print(data)
