In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
import os, json

BASE = "/content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/alert_system"
OUT = os.path.join(BASE, "outputs")
os.makedirs(OUT, exist_ok=True)

cfg = {
  "enabled_channels": {
    "console": True,
    "email": False,   # keep False until you add secure env vars
    "gui": False
  },
  "email": {
    "smtp_server": "smtp.gmail.com",
    "smtp_port": 587,
    "sender": os.getenv("EMAIL_USER"),
    "recipients": ["your_recipient@example.com"],
    "username": os.getenv("EMAIL_USER"),
    "password": os.getenv("EMAIL_PASS")
  },
  "suppression_window_seconds": 30,
  "outputs_dir": OUT,
  "log_csv": "alert_log.csv",
  "log_text": "alert_summary.txt",
  "alert_timeout_seconds": 2,
  "max_alerts_per_minute": 60
}

cfg_path = os.path.join(BASE, "alert_config.json")
with open(cfg_path, "w") as f:
    json.dump(cfg, f, indent=2)

print("alert_config.json written at:", cfg_path)
print("Edit config to enable email once credentials are available (use env vars EMAIL_USER/EMAIL_PASS).")


alert_config.json written at: /content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/alert_system/alert_config.json
Edit config to enable email once credentials are available (use env vars EMAIL_USER/EMAIL_PASS).


In [3]:
import textwrap, os

base_path = "/content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/alert_system"
os.makedirs(base_path, exist_ok=True)

console_code = textwrap.dedent("""
import logging
from datetime import datetime, timezone

logger = logging.getLogger("alert_system")
logger.setLevel(logging.INFO)
if not logger.handlers:
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s")
    ch.setFormatter(formatter)
    logger.addHandler(ch)

def send_console_alert(violation_event):
    ts = datetime.now(timezone.utc).isoformat()
    msg = f"ALERT - {violation_event['video']} Frame:{violation_event['frame']} Violations:{violation_event['violations']} Timestamp:{ts}"
    logger.warning(msg)
    return {"status": "sent", "channel": "console", "ts": ts}
""")

email_code = textwrap.dedent("""
import smtplib
from email.message import EmailMessage
from datetime import datetime, timezone

def send_email_alert(violation_event, email_cfg):
    # email_cfg must include smtp_server, smtp_port, sender, recipients(list), username, password
    msg = EmailMessage()
    subject = f"PPE Violation: {', '.join(violation_event['violations'])} - {violation_event['video']}"
    body = f\"\"\"PPE Violation detected.
Video: {violation_event['video']}
Frame: {violation_event['frame']}
Violations: {', '.join(violation_event['violations'])}
Timestamp (UTC): {violation_event['timestamp']}
-- Alert System\"\"\"
    msg["From"] = email_cfg.get("sender")
    msg["To"] = ", ".join(email_cfg.get("recipients", []))
    msg["Subject"] = subject
    msg.set_content(body)

    try:
        with smtplib.SMTP(email_cfg["smtp_server"], email_cfg["smtp_port"], timeout=10) as server:
            server.ehlo()
            server.starttls()
            server.login(email_cfg["username"], email_cfg["password"])
            server.send_message(msg)
        return {"status": "sent", "channel": "email", "ts": datetime.now(timezone.utc).isoformat()}
    except Exception as e:
        return {"status": "failed", "channel": "email", "error": str(e), "ts": datetime.now(timezone.utc).isoformat()}
""")

manager_code = textwrap.dedent("""
import os, json, csv, time
from datetime import datetime, timezone
from collections import defaultdict

from console_alert import send_console_alert
from email_alert import send_email_alert

CONFIG_PATH_DEFAULT = os.path.join(os.path.dirname(__file__), "alert_config.json")

class AlertManager:
    def __init__(self, config_path=None):
        self.config_path = config_path or CONFIG_PATH_DEFAULT
        self._load_config()
        os.makedirs(self.cfg['outputs_dir'], exist_ok=True)
        self.log_csv_path = os.path.join(self.cfg['outputs_dir'], self.cfg['log_csv'])
        self.log_text_path = os.path.join(self.cfg['outputs_dir'], self.cfg['log_text'])
        self._init_csv()
        self.last_alert_time = defaultdict(lambda: 0)

    def _load_config(self):
        with open(self.config_path, "r") as f:
            self.cfg = json.load(f)

    def _init_csv(self):
        if not os.path.exists(self.log_csv_path):
            with open(self.log_csv_path, "w", newline='') as f:
                csv.writer(f).writerow(["utc_timestamp","video","frame","violations","channels","status","detail"])

    def _should_alert(self, key):
        now = time.time()
        last = self.last_alert_time.get(key, 0)
        return (now - last) >= self.cfg.get("suppression_window_seconds", 30)

    def _record_alert(self, event, channels, status, detail=""):
        ts = datetime.now(timezone.utc).isoformat()
        with open(self.log_csv_path, "a", newline='') as f:
            csv.writer(f).writerow([ts, event.get("video"), event.get("frame"), ";".join(event.get("violations",[])), ",".join(channels), status, detail])
        with open(self.log_text_path, "a") as f:
            f.write(f\"[{ts}] {event.get('video')} Frame:{event.get('frame')} Violations:{','.join(event.get('violations',[]))} Channels:{','.join(channels)} Status:{status} Detail:{detail}\\n\")

    def handle_violation(self, event):
        # event must contain video, frame, violations(list), timestamp
        signature = "|".join(sorted(event.get("violations", [])))
        key = f\"{event.get('video')}::{signature}\"
        if not self._should_alert(key):
            return {"status":"suppressed"}

        channels = []
        details = []

        if self.cfg["enabled_channels"].get("console", False):
            r = send_console_alert(event)
            channels.append("console")
            details.append(r.get("status"))

        if self.cfg["enabled_channels"].get("email", False):
            r = send_email_alert(event, self.cfg["email"])
            channels.append("email")
            details.append(r.get("status"))

        self.last_alert_time[key] = time.time()
        detail = ";".join(details)
        self._record_alert(event, channels, " | ".join(details), detail)
        return {"status":"sent","channels":channels,"detail":detail}
""")

files = {
    "console_alert.py": console_code,
    "email_alert.py": email_code,
    "alert_manager.py": manager_code
}

for fname, code in files.items():
    with open(os.path.join(base_path, fname), "w") as f:
        f.write(code)

print("alert modules written to:", base_path)


alert modules written to: /content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/alert_system


In [4]:
import sys, os
sys.path.append("/content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/alert_system")

from alert_manager import AlertManager
from datetime import datetime, timezone

am = AlertManager()   # loads config from Drive
test_event = {
    "video": "/content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/testVideos/853867-hd_1920_1080_25fps.mp4",
    "frame": 10,
    "violations": ["NO-Hardhat", "NO-Safety Vest"],
    "timestamp": datetime.now(timezone.utc).isoformat()
}
res = am.handle_violation(test_event)
print("Alert result:", res)

print("--- files in outputs ---")
print(os.listdir(am.cfg['outputs_dir']))
print("Open CSV:", os.path.join(am.cfg['outputs_dir'], am.cfg['log_csv']))




Alert result: {'status': 'sent', 'channels': ['console'], 'detail': 'sent'}
--- files in outputs ---
['alert_summary.txt', 'alert_log.csv']
Open CSV: /content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/alert_system/outputs/alert_log.csv


In [6]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.225-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.225-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.225 ultralytics-thop-2.0.18


In [11]:
import os, glob, time
from ultralytics import YOLO
import cv2
from datetime import datetime, timezone
from alert_manager import AlertManager

# Paths
MODEL_PATH = "/content/drive/MyDrive/Project/Dataset/css-data/cross_validation/results_fold3/weights/best.pt"
VIDEO_DIR = "/content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/testVideos"
OUTPUT_VIDEO_DIR = "/content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/visual_demo/outputs/alerts"
os.makedirs(OUTPUT_VIDEO_DIR, exist_ok=True)

# init
model = YOLO(MODEL_PATH)
alert_mgr = AlertManager()
RED_CLASSES = ["NO-Hardhat", "NO-Mask", "NO-Safety Vest"]
GREEN_CLASSES = ["Hardhat", "Mask", "Safety Vest"]
YELLOW_CLASSES = ["Person", "Machinery", "Vehicle", "Safety Cone"]

video_files = sorted(glob.glob(os.path.join(VIDEO_DIR, "*.mp4")))
print("Videos:", len(video_files))

for video_path in video_files:
    base_name = os.path.basename(video_path)
    print("--- Processing", base_name)
    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS)) or 20
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out_path = os.path.join(OUTPUT_VIDEO_DIR, f"alert_{base_name}")
    out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w,h))

    frame_id = 0
    t0 = time.time()
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame_id += 1

        results = model.predict(frame, conf=0.5, iou=0.45, imgsz=640, device="cuda" if model.device.type!='cpu' else "cpu", verbose=False)
        # annotate
        ann = results[0].plot()   # returns annotated frame (numpy)

        # collect detections
        names = model.names
        detections = [names[int(cls)] for cls in results[0].boxes.cls] if len(results[0].boxes) else []
        violations = [v for v in detections if v in RED_CLASSES]

        # alert hook
        if violations:
            event = {
                "video": base_name,
                "frame": frame_id,
                "violations": violations,
                "timestamp": datetime.now(timezone.utc).isoformat()
            }
            alert_mgr.handle_violation(event)

        out.write(ann)

    cap.release(); out.release()
    elapsed = time.time()-t0
    print(f"Saved annotated video -> {out_path} (frames:{frame_id} time:{elapsed:.1f}s)")
print("All done.")


Videos: 5
--- Processing 1100493329-preview.mp4




Saved annotated video -> /content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/visual_demo/outputs/alerts/alert_1100493329-preview.mp4 (frames:966 time:414.0s)
--- Processing 2048246-hd_1920_1080_24fps.mp4




Saved annotated video -> /content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/visual_demo/outputs/alerts/alert_2048246-hd_1920_1080_24fps.mp4 (frames:447 time:218.8s)
--- Processing 2865276-uhd_3840_2160_30fps.mp4




Saved annotated video -> /content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/visual_demo/outputs/alerts/alert_2865276-uhd_3840_2160_30fps.mp4 (frames:210 time:125.7s)
--- Processing 4480570-hd_1920_1080_30fps.mp4




Saved annotated video -> /content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/visual_demo/outputs/alerts/alert_4480570-hd_1920_1080_30fps.mp4 (frames:386 time:183.6s)
--- Processing 853867-hd_1920_1080_25fps.mp4




Saved annotated video -> /content/drive/MyDrive/Project/Dataset/yolov8_ppe_detection/visual_demo/outputs/alerts/alert_853867-hd_1920_1080_25fps.mp4 (frames:388 time:187.0s)
All done.
