#Installations

In [None]:
# Import requirements
# Core scientific + visualization libraries
!pip install numpy matplotlib pillow

# Computer vision
!pip install opencv-python

# Roboflow client + inference SDK
!pip install roboflow inference

# Supervision library (for annotations, tracking, etc.)
!pip install supervision==0.19.0

# Optional: Jupyter/Colab utilities
!pip install ipython

!pip install gtts

!pip -q install ultralytics opencv-python timm gTTS groq langgraph pydantic==2.* torch torchvision

!pip install supervision


# Import needed modules
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import supervision as sv
import numpy as np
import shutil
import cv2

import os

from inference_sdk import InferenceHTTPClient
from google.colab.patches import cv2_imshow
from PIL import Image as im

!pip install groq

#Keys Setting

In [None]:
# Run this cell BEFORE the workflow that reads os.environ[...]

ROBOFLOW_API_KEY = ""      # e.g., rf_...
GROQ_API_KEY     = ""          # e.g., gsk_...
EMAIL_USER       = ""      # e.g., you@gmail.com
EMAIL_PASS       = ""
CAREGIVER_EMAIL  = ""

# Export to environment so your existing code (which uses os.environ) keeps working unchanged
import os
os.environ["ROBOFLOW_API_KEY"] = ROBOFLOW_API_KEY
os.environ["GROQ_API_KEY"]     = GROQ_API_KEY
os.environ["EMAIL_USER"]       = EMAIL_USER
os.environ["EMAIL_PASS"]       = EMAIL_PASS
os.environ["CAREGIVER_EMAIL"]  = CAREGIVER_EMAIL

# Quick asserts (optional)
assert os.environ["ROBOFLOW_API_KEY"]
assert os.environ["GROQ_API_KEY"]
assert os.environ["EMAIL_USER"] and os.environ["EMAIL_PASS"] and os.environ["CAREGIVER_EMAIL"]
print("✅ Hard-coded keys loaded into environment.")

from openai import OpenAI
import os


OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://api.aimlapi.com/v1")

OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "").strip()
assert OPENAI_API_KEY, "❌ OPENAI_API_KEY missing."




# 🧭 Agentic Workflow (Video → Roboflow → GPT-5 → Email)

🎥 **Video Stream** ➡️ 🧿 **Roboflow (CV Annotation)** ➡️ 🧠 **GPT-5 (Fall Detection & Reasoning)** ➡️ 📧 **Email Caregiver (Alert)**

#LangGraph + Roboflow + GPT-5 + Email Alerts

In [None]:
# =========================================================
# Agentic Fall Monitoring (LangGraph)
# - Node 1: Roboflow annotate + ByteTrack + timers  → vision.mp4
# - Node 2: Groq fall scan on vision.mp4            → voice + email + JSON
# =========================================================



# ---------- Imports ----------
import os, cv2, base64, mimetypes, json, time, shutil, ssl, smtplib
import numpy as np
from typing import Dict, Optional, Tuple, Any, List
from PIL import Image as PILImage
from IPython.display import display, Audio, Video
import supervision as sv
from inference_sdk import InferenceHTTPClient
from groq import Groq
from gtts import gTTS
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders

from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, END

# =========================================================
#                       CONFIG
# =========================================================
# Paths
INPUT_VIDEO         = "/content/sample_detection.mp4"        # original (downloaded if missing)
FRAMES_DIR          = "/content/frames"
ANNOTATED_DIR       = "/content/annotated"
ANNOTATED_VIDEO     = "/content/vision.mp4"                  # output of Node 1
GROQ_REPORT_PATH    = "/content/groq_fall_report.json"
TRIGGER_FRAME_PATH  = "/content/fall_detected.jpg"
TMP_JPEG_PATH       = "/content/_groq_frame.jpg"

# Roboflow & model
MODEL_ID = "fall-detection-real/1"   # classes: standing / falling / fallen

# Model choices
OPENAI_MODEL = "openai/gpt-5-chat-latest"
GROQ_MODEL   = "meta-llama/llama-4-scout-17b-16e-instruct"

# Prompts (unchanged)
SYSTEM_PROMPT = (
    "You analyze annotated video frames for fall detection. "
    "Use posture and floor contact primarily; labels may be small or missing. "
    "If any person appears fallen, reply exactly: 'A person has fallen'. "
    "Otherwise reply exactly: 'No fall detected'."
)
USER_PROMPT = "Strict fall check. Reply EXACTLY one of the two phrases."


# Timers / thresholds
FALL_SECONDS_THRESHOLD = 30   # prolonged fallen time for model-based (Roboflow) check
TARGET_SAMPLE_SEC      = 0.25 # Groq sampling cadence (~0.25s)
HEARTBEAT_VOICE        = True
HEARTBEAT_EVERY_SEC    = 10
PLAY_AUDIO_ON_DETECT   = True

# Define IN_COLAB
try:
    from google.colab import userdata
    IN_COLAB = True
except ImportError:
    IN_COLAB = False


# =========================================================
#                 ENV / Keys (assertions)
# =========================================================
ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY","").strip()
GROQ_API_KEY     = os.environ.get("GROQ_API_KEY","").strip()
EMAIL_USER       = os.environ.get("EMAIL_USER","").strip()
EMAIL_PASS       = os.environ.get("EMAIL_PASS","").strip()
CAREGIVER_EMAIL  = os.environ.get("CAREGIVER_EMAIL","").strip()

assert ROBOFLOW_API_KEY, "❌ ROBOFLOW_API_KEY missing."
assert GROQ_API_KEY,     "❌ GROQ_API_KEY missing."
assert EMAIL_USER and EMAIL_PASS and CAREGIVER_EMAIL, "❌ EMAIL_USER/EMAIL_PASS/CAREGIVER_EMAIL missing."

##LangGraph State & Utility Functions

In [None]:
# =========================================================
#               STATE (Pydantic) for LangGraph
# =========================================================
class AssistState(BaseModel):
    input_video: str = Field(default=INPUT_VIDEO)
    annotated_video: str = Field(default=ANNOTATED_VIDEO)
    frames_dir: str = Field(default=FRAMES_DIR)
    annotated_dir: str = Field(default=ANNOTATED_DIR)
    # Stage 1 outputs
    num_frames: int = 0
    video_fps: float = 0.0
    prolonged_fall_ids: List[int] = Field(default_factory=list)
    # Stage 2 outputs
    groq_confirmed: bool = False
    groq_first_detection_frame: Optional[int] = None
    groq_first_detection_time_s: Optional[float] = None
    trigger_frame_path: Optional[str] = None
    groq_report_path: str = Field(default=GROQ_REPORT_PATH)
    # Internal logging
    logs: List[str] = Field(default_factory=list)

# =========================================================
#                     UTIL HELPERS
# =========================================================
def log(state: AssistState, msg: str) -> None:
    print(msg)
    state.logs.append(msg)

def speak_once(text="Fall detected.", path="/content/fall_alert.mp3"):
    if not PLAY_AUDIO_ON_DETECT:
        return
    try:
        gTTS(text=text, lang="en").save(path)
        display(Audio(path, autoplay=True))
    except Exception:
        pass

def send_email_alert(subject: str, body: str, to_email: str, attach_path: Optional[str] = None):
    msg = MIMEMultipart()
    msg["From"] = EMAIL_USER
    msg["To"] = to_email
    msg["Subject"] = subject
    msg.attach(MIMEText(body, "plain"))
    if attach_path and os.path.exists(attach_path):
        with open(attach_path, "rb") as f:
            part = MIMEBase("application", "octet-stream")
            part.set_payload(f.read())
        encoders.encode_base64(part)
        part.add_header("Content-Disposition", f'attachment; filename="{os.path.basename(attach_path)}"')
        msg.attach(part)

    context = ssl.create_default_context()
    with smtplib.SMTP("smtp.gmail.com", 587, timeout=20) as server:
        server.starttls(context=context)
        server.login(EMAIL_USER, EMAIL_PASS)
        server.send_message(msg)

def frame_to_data_url(bgr, path=TMP_JPEG_PATH):
    cv2.imwrite(path, bgr, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
    mime = mimetypes.guess_type(path)[0] or "image/jpeg"
    with open(path, "rb") as f:
        b64 = base64.b64encode(f.read()).decode("utf-8")
    return f"data:{mime};base64,{b64}"

# =========================================================
#               NODE: Prepare/Download video
# =========================================================
def prepare_video_node(state: AssistState) -> dict:
    if IN_COLAB and not os.path.exists(state.input_video):
        # sample video from blog
        os.system("wget --no-check-certificate 'https://drive.google.com/uc?export=download&id=1nSFmlcLAiBiFfQkMEisdR5DMICcYSax0' -O /content/sample_detection.mp4")
        log(state, f"Downloaded sample video to {state.input_video}")
    else:
        log(state, f"Using existing video: {state.input_video}")
    return {}


def _build_clients():
    openai_client = OpenAI(base_url=OPENAI_BASE_URL, api_key=OPENAI_API_KEY) if OPENAI_API_KEY else None
    groq_client   = Groq(api_key=GROQ_API_KEY) if GROQ_API_KEY else None
    return openai_client, groq_client

def ask_on_image_with_fallback(data_url: str, system_prompt: str, user_prompt: str):
    """
    Try GPT-5 (OpenAI) first. On error/empty response, fall back to Groq.

    """
    openai_client, groq_client = _build_clients()

    # 1) OpenAI first
    if openai_client:
        try:
            r = openai_client.chat.completions.create(
                model=OPENAI_MODEL, temperature=0, max_tokens=8,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": [
                        {"type": "text", "text": user_prompt},
                        {"type": "image_url", "image_url": {"url": data_url}}
                    ]}
                ]
            )
            txt = (r.choices[0].message.content or "").strip()
            if txt:
                return txt, "OpenAI GPT-5"
        except Exception:
            pass

    # 2) Fallback to Groq
    if groq_client:
        try:
            r = groq_client.chat.completions.create(
                model=GROQ_MODEL, temperature=0, max_tokens=8,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": [
                        {"type": "text", "text": user_prompt},
                        {"type": "image_url", "image_url": {"url": data_url}}
                    ]}
                ]
            )
            txt = (r.choices[0].message.content or "").strip()
            if txt:
                return txt, "Groq"
        except Exception:
            pass

    return "", "none"

def chat_with_fallback(system_prompt: str, user_text: str, max_tokens: int = 64, temperature: float = 0.0):
    """
    Text-only chat helper with fallback. Returns (reply_text, provider_name).
    """
    openai_client, groq_client = _build_clients()

    if openai_client:
        try:
            r = openai_client.chat.completions.create(
                model=OPENAI_MODEL, temperature=temperature, max_tokens=max_tokens,
                messages=[{"role": "system", "content": system_prompt},
                          {"role": "user",   "content": user_text}]
            )
            txt = (r.choices[0].message.content or "").strip()
            if txt:
                return txt, "OpenAI GPT-5"
        except Exception:
            pass

    if groq_client:
        try:
            r = groq_client.chat.completions.create(
                model=GROQ_MODEL, temperature=temperature, max_tokens=max_tokens,
                messages=[{"role": "system", "content": system_prompt},
                          {"role": "user",   "content": user_text}]
            )
            txt = (r.choices[0].message.content or "").strip()
            if txt:
                return txt, "OpenAI GPT-5"
        except Exception:
            pass

    return "", "none"



##Two-Stage Fall Detection Pipeline
Node 1: Annotate/Track; Node 2: Groq Scan + Alerts

In [None]:
# =========================================================
#   NODE 1: Roboflow annotate + tracking + timers → MP4
# =========================================================
def rf_annotate_node(state: AssistState) -> dict:
    # Clean dirs
    for d in (state.frames_dir, state.annotated_dir):
        if os.path.exists(d): shutil.rmtree(d)
        os.makedirs(d, exist_ok=True)

    # Extract frames
    frames_gen = sv.get_video_frames_generator(state.input_video)
    for i, frame in enumerate(frames_gen):
        PILImage.fromarray(frame).save(f"{state.frames_dir}/{i}.jpg")
    num_frames = len(os.listdir(state.frames_dir))
    log(state, f"🖼️ Saved {num_frames} frames to {state.frames_dir}")

    # Setup inference
    client = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
    BBOX_ANN = sv.BoundingBoxAnnotator()
    LABEL_ANN = sv.LabelAnnotator()
    tracker = sv.ByteTrack()
    video_info = sv.VideoInfo.from_video_path(state.input_video)

    class FPS_Timer:
        def __init__(self, fps: int = 30) -> None:
            self.fps = fps
            self.frames_id = 0
            self.tracker_id2frame_id: Dict[int, int] = {}
        def reset_time(self, tracker_id: int) -> None:
            self.tracker_id2frame_id[tracker_id] = self.frames_id
        def reset_all(self):
            for k in self.tracker_id2frame_id: self.tracker_id2frame_id[k] = self.frames_id
        def tick(self, detections: sv.Detections) -> np.ndarray:
            self.frames_id += 1
            times = []
            for tid in detections.tracker_id:
                if tid not in self.tracker_id2frame_id:
                    self.tracker_id2frame_id[tid] = self.frames_id
                start = self.tracker_id2frame_id[tid]
                times.append((self.frames_id - start) / self.fps)
            return np.array(times)

    timers = FPS_Timer(fps=video_info.fps)
    last_status_by_id: Dict[int, str] = {}
    prolonged_ids: set = set()

    # Per-frame processing
    for i in range(num_frames):
        img_path = f"{state.frames_dir}/{i}.jpg"
        frame_bgr = cv2.imread(img_path)
        frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)

        result = client.infer(img_path, model_id=MODEL_ID)
        detections = sv.Detections.from_inference(result)
        detections = tracker.update_with_detections(detections)

        class_names = detections.data.get("class_name", [])
        times = timers.tick(detections)

        status_by_id, times_by_id = {}, {}
        for idx, tid in enumerate(detections.tracker_id):
            status = class_names[idx] if idx < len(class_names) else "unknown"
            status_by_id[int(tid)] = status
            times_by_id[int(tid)] = float(times[idx])
            # Reset timer on status change
            if last_status_by_id.get(int(tid)) != status:
                timers.reset_time(int(tid))
        last_status_by_id.update(status_by_id)

        # Detect prolonged falls (model-based)
        for tid, status in status_by_id.items():
            if status == "fallen" and times_by_id.get(tid, 0.0) > FALL_SECONDS_THRESHOLD:
                prolonged_ids.add(tid)

        labels = [f"{status_by_id[int(tid)]}: {times_by_id[int(tid)]:.1f}s" for tid in detections.tracker_id]
        annotated = BBOX_ANN.annotate(scene=frame_rgb.copy(), detections=detections)
        annotated = LABEL_ANN.annotate(scene=annotated, detections=detections, labels=labels)
        PILImage.fromarray(annotated).save(f"{state.annotated_dir}/{i}.jpg")

    # Stitch to MP4
    first = cv2.imread(os.path.join(state.annotated_dir, "0.jpg"))
    h, w = first.shape[:2]
    writer = cv2.VideoWriter(state.annotated_video, cv2.VideoWriter_fourcc(*"mp4v"), video_info.fps, (w, h))
    for j in range(num_frames):
        writer.write(cv2.imread(os.path.join(state.annotated_dir, f"{j}.jpg")))
    writer.release()

    state.num_frames = num_frames
    state.video_fps = float(video_info.fps)
    state.prolonged_fall_ids = sorted(list(prolonged_ids))
    log(state, f"✅ Annotated video written to: {state.annotated_video}")
    if state.prolonged_fall_ids:
        log(state, f"[ALERT] Prolonged fallen IDs: {state.prolonged_fall_ids}")
    try:
        display(Video(state.annotated_video, embed=True))
    except Exception:
        pass
    return {"num_frames": state.num_frames, "video_fps": state.video_fps, "prolonged_fall_ids": state.prolonged_fall_ids}

# =========================================================
#   NODE 2: Groq fall scan on annotated video → voice + email
# =========================================================
# =========================================================
#   NODE 2: GPT-5 fall scan on annotated video → voice + email
#            (minimal-diff: keeps state.groq_* fields)
# =========================================================
# ======== NODE 2: GPT-5 scan with Groq fallback → voice + email + JSON ========
# Minimal-diff: keeps state.groq_* fields and state.groq_report_path
def groq_scan_node(state: AssistState) -> dict:
    # Optional quick probe (won't throw if only Groq is present)
    try:
        if OPENAI_API_KEY:
            OpenAI(base_url=OPENAI_BASE_URL, api_key=OPENAI_API_KEY).chat.completions.create(
                model=OPENAI_MODEL, messages=[{"role": "user", "content": "ping"}], max_tokens=1
            )
    except Exception:
        pass

    cap = cv2.VideoCapture(state.annotated_video)
    assert cap.isOpened(), f"Cannot open video: {state.annotated_video}"
    fps   = cap.get(cv2.CAP_PROP_FPS) or (state.video_fps or 30.0)
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    stride = max(1, int(round(fps * TARGET_SAMPLE_SEC)))

    confirmed = False
    first_detection: Optional[Tuple[int, float]] = None
    frame_idx = 0
    sample_idx = 0
    start_time = time.time()

    while True:
        ok, frame_bgr = cap.read()
        if not ok:
            break

        if frame_idx % stride == 0:
            data_url = frame_to_data_url(frame_bgr, TMP_JPEG_PATH)
            ans, provider = ask_on_image_with_fallback(data_url, SYSTEM_PROMPT, USER_PROMPT)
            ts  = frame_idx / fps

            # Only act/log if a fall is detected
            if "a person has fallen" in ans.lower():
                confirmed = True
                first_detection = (frame_idx, ts)
                cv2.imwrite(TRIGGER_FRAME_PATH, frame_bgr)
                log(state, "\n🛑 FINAL VERDICT: FALL DETECTED")
                log(state, f"🖼️ Saved trigger frame: {TRIGGER_FRAME_PATH}")
                speak_once("Alert. A fall has been detected.")
                try:
                    subject = "⚠️ Fall Alert Detected"
                    body = (
                        "A fall has been detected by the monitoring system.\n\n"
                        "The trigger frame is attached."
                    )
                    send_email_alert(subject, body, CAREGIVER_EMAIL, attach_path=TRIGGER_FRAME_PATH)
                    log(state, f"📧 Email sent to {CAREGIVER_EMAIL}")
                except smtplib.SMTPAuthenticationError as e:
                    log(state, "❌ SMTP auth error. Use a Gmail App Password for EMAIL_PASS.")
                    log(state, f"Details: {e}")
                except Exception as e:
                    log(state, f"❌ Failed to send email: {e}")
                break

            sample_idx += 1
        frame_idx += 1

    cap.release()

    # Report (keep original field names for compatibility)
    state.groq_confirmed = confirmed
    if confirmed and first_detection:
        state.groq_first_detection_frame = int(first_detection[0])
        state.groq_first_detection_time_s = float(first_detection[1])
        state.trigger_frame_path = TRIGGER_FRAME_PATH
    # (No "NO FALL DETECTED" log to stay silent unless positive)

    report = {
        "input_video": state.input_video,
        "annotated_video": state.annotated_video,
        "model": (OPENAI_MODEL if confirmed else OPENAI_MODEL),  # recorded key; provider is in email/logs
        "fps": float(fps),
        "total_frames": int(total),
        "stride_frames": int(stride),
        "sample_interval_seconds": round(stride / fps, 3),
        "first_detection": (
            {"frame": state.groq_first_detection_frame, "time_s": state.groq_first_detection_time_s}
            if state.groq_first_detection_frame is not None else None
        ),
        "trigger_frame_path": state.trigger_frame_path if confirmed else None,
        "verdict": "fallen" if confirmed else "no_fall",
        "elapsed_seconds": round(time.time() - start_time, 2),
        "prolonged_fall_ids_model_stage": state.prolonged_fall_ids,
    }
    with open(state.groq_report_path, "w") as f:
        json.dump(report, f, indent=2)

    return {
        "groq_confirmed": state.groq_confirmed,
        "groq_first_detection_frame": state.groq_first_detection_frame,
        "groq_first_detection_time_s": state.groq_first_detection_time_s,
        "trigger_frame_path": state.trigger_frame_path,
    }





# =========================================================
#           NODE: Terminal (no-op, for clarity)
# =========================================================
def finalize_node(state: AssistState) -> dict:
    log(state, "✅ Workflow complete.")
    return {}



##Building and Invoking Graph

In [None]:
# =========================================================
#                 BUILD THE AGENTIC GRAPH
# =========================================================
graph = StateGraph(AssistState)

graph.add_node("prepare_video", prepare_video_node)
graph.add_node("rf_annotate",   rf_annotate_node)
graph.add_node("groq_scan",     groq_scan_node)
graph.add_node("finalize",      finalize_node)

# Linear flow: prepare → annotate → groq → finalize
graph.set_entry_point("prepare_video")
graph.add_edge("prepare_video", "rf_annotate")
graph.add_edge("rf_annotate",   "groq_scan")
graph.add_edge("groq_scan",     "finalize")

workflow = graph.compile()

# =========================================================
#                     RUN THE WORKFLOW
# =========================================================
state = AssistState()
final_state = workflow.invoke(state)