In [49]:
# @title 1. Install Dependencies
!pip install -q ultralytics langchain-openai reportlab opencv-python-headless
!pip install -q git+https://github.com/facebookresearch/segment-anything.git

import os
import cv2
import sys
import json
import smtplib
import torch
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage, Table, TableStyle
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from google.colab import userdata

# Create directories
os.makedirs('/content/violations', exist_ok=True)
os.makedirs('/content/reports', exist_ok=True)

print("‚úÖ Dependencies installed and directories created.")

  Preparing metadata (setup.py) ... [?25l[?25hdone
‚úÖ Dependencies installed and directories created.


In [None]:
# @title 2. Global Configuration
class Config:
    # --- SITE INFO ---
    SITE_NAME = "Construction Site A"
    SITE_LOCATION = "Zone 3, Building B"
    COMPANY_NAME = "SafeBuild Construction Inc."

    # --- PATHS ---
    IMAGE_PATH = '/content/Worker-working-without-safety-boots-hand-gloves-head-protection_Q320.jpg'
    YOLO_WEIGHTS = '/content/best.pt'
    SAM_WEIGHTS = '/content/sam3.pt'
    VIOLATIONS_DIR = '/content/violations'
    REPORTS_DIR = '/content/reports'


    # --- API KEYS (Try Colab Secrets first, else Manual) ---
    try:
        HF_TOKEN = userdata.get('HF_TOKEN')
        OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
        EMAIL_SENDER = userdata.get('EMAIL_SENDER')
        EMAIL_PASSWORD = userdata.get('EMAIL_PASSWORD') # Gmail App Password
        EMAIL_RECIPIENTS = userdata.get('EMAIL_RECIPIENTS', '').split(',')
    except:
        # FALLBACK: Enter keys here if not using Colab Secrets
        OPENAI_API_KEY = "sk-..."
        EMAIL_SENDER = "your_email@gmail.com"
        EMAIL_PASSWORD = "your_app_password"
        EMAIL_RECIPIENTS = ["manager@example.com"]

    # --- SETTINGS ---
    EMAIL_ENABLED = True # Set to True to actually send emails
    SAM_IMAGE_SIZE = 1024
    CONFIDENCE_THRESHOLD = 0.4

    # --- CLASS MAPPING ---
    TARGET_CLASSES = {
        'person': [6],
        'helmet': [1],
        'vest': [2],
        'no_helmet': [7]
    }

config = Config()
print(f"‚öôÔ∏è Configuration loaded for {config.SITE_NAME}")

‚öôÔ∏è Configuration loaded for Construction Site A


In [51]:
from huggingface_hub import hf_hub_download, login
from huggingface_hub import login
login(new_session=False)

In [None]:
#!wget --header="Authorization: Bearer YOUR_HF_TOKEN" "https://huggingface.co/facebook/sam3/resolve/main/sam3.pt"


In [53]:
# @title 3. Phase 1: Hierarchical Detection System
from ultralytics import YOLO
from ultralytics.models.sam import SAM3SemanticPredictor

class SafetyDetector:
    def __init__(self):
        print("üöÄ Initializing Hierarchical Detection System...")
        self.yolo_model = YOLO(config.YOLO_WEIGHTS)

        # Load SAM 3
        overrides = dict(model=config.SAM_WEIGHTS, task="segment", mode="predict", conf=0.15)
        self.sam_model = SAM3SemanticPredictor(overrides=overrides)

        # Update Class IDs dynamically if possible
        names = self.yolo_model.names
        name_to_id = {v: k for k, v in names.items()}
        for key in config.TARGET_CLASSES:
            if key in name_to_id: config.TARGET_CLASSES[key] = [name_to_id[key]] # Fallback to strict map if key name differs

    def box_iou(self, box1, box2):
        x1 = max(box1[0], box2[0])
        y1 = max(box1[1], box2[1])
        x2 = min(box1[2], box2[2])
        y2 = min(box1[3], box2[3])
        inter = max(0, x2 - x1) * max(0, y2 - y1)
        if inter == 0: return 0
        box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
        return inter / box2_area

    def run_sam_rescue(self, image_path, search_prompts, roi_box, h, w):
        """Runs SAM only on ROI"""
        try:
            res = self.sam_model(image_path, text=search_prompts, imgsz=config.SAM_IMAGE_SIZE, verbose=False)
            if not res[0].masks: return False
            masks = [m.cpu().numpy().astype(np.uint8) for m in res[0].masks.data]
            for m in masks:
                if m.shape[:2] != (h, w): m = cv2.resize(m, (w, h), interpolation=cv2.INTER_NEAREST)
                roi = m[roi_box[1]:roi_box[3], roi_box[0]:roi_box[2]]
                if np.sum(roi) > 0: return True
        except: pass
        return False

    def detect(self, image_path):
        img = cv2.imread(image_path)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        h, w = img_rgb.shape[:2]

        # --- YOLO SCAN ---
        results = self.yolo_model.predict(image_path, conf=config.CONFIDENCE_THRESHOLD, verbose=False)
        detections = {'person': [], 'helmet': [], 'vest': [], 'no_helmet': []}

        for box in results[0].boxes:
            cls = int(box.cls[0])
            coords = box.xyxy[0].cpu().numpy().astype(int)
            # Map YOLO classes to our buckets
            for key, ids in config.TARGET_CLASSES.items():
                if cls in ids: detections[key].append(coords)

        violations = []

        # --- HIERARCHICAL LOGIC ---
        for p_box in detections['person']:
            has_helmet, has_vest, unsafe_explicit = False, False, False

            # Check Overlaps
            for eq in detections['helmet']:
                if self.box_iou(p_box, eq) > 0.3: has_helmet = True
            for eq in detections['vest']:
                if self.box_iou(p_box, eq) > 0.3: has_vest = True
            for eq in detections['no_helmet']:
                if self.box_iou(p_box, eq) > 0.3: unsafe_explicit = True

            status = "SAFE"
            missing = []

            # 1. Fast Unsafe
            if unsafe_explicit:
                status = "UNSAFE"
                missing.append("Helmet")

            # 2. Fast Safe
            elif has_helmet and has_vest:
                status = "SAFE"

            # 3. Rescue Vest
            elif has_helmet and not has_vest:
                body_roi = [p_box[0], int(p_box[1] + (p_box[3]-p_box[1])*0.2), p_box[2], p_box[3]]
                if not self.run_sam_rescue(image_path, ["vest"], body_roi, h, w):
                    status = "UNSAFE"
                    missing.append("Vest")

            # 4. Rescue Helmet
            elif has_vest and not has_helmet:
                head_roi = [p_box[0], p_box[1], p_box[2], int(p_box[1] + (p_box[3]-p_box[1])*0.4)]
                if not self.run_sam_rescue(image_path, ["helmet"], head_roi, h, w):
                    status = "UNSAFE"
                    missing.append("Helmet")

            # 5. Full Rescue
            else:
                head_roi = [p_box[0], p_box[1], p_box[2], int(p_box[1] + (p_box[3]-p_box[1])*0.4)]
                body_roi = [p_box[0], int(p_box[1] + (p_box[3]-p_box[1])*0.2), p_box[2], p_box[3]]
                found_h = self.run_sam_rescue(image_path, ["helmet"], head_roi, h, w)
                found_v = self.run_sam_rescue(image_path, ["vest"], body_roi, h, w)

                if not found_h or not found_v:
                    status = "UNSAFE"
                    if not found_h: missing.append("Helmet")
                    if not found_v: missing.append("Vest")

            # --- LOG VIOLATION IF UNSAFE ---
            if status == "UNSAFE":
                timestamp = datetime.now()
                # Draw box on copy of image for report
                evidence_img = img_rgb.copy()
                cv2.rectangle(evidence_img, (p_box[0], p_box[1]), (p_box[2], p_box[3]), (255, 0, 0), 3)

                evidence_path = f"{config.VIOLATIONS_DIR}/violation_{timestamp.strftime('%H%M%S')}.jpg"
                cv2.imwrite(evidence_path, cv2.cvtColor(evidence_img, cv2.COLOR_RGB2BGR))

                violation_data = {
                    "timestamp": timestamp,
                    "location": config.SITE_LOCATION,
                    "description": f"Worker detected without {', '.join(missing)}",
                    "missing_items": missing,
                    "confidence": 0.85, # Aggregate confidence
                    "image_path": evidence_path,
                    "bbox": p_box
                }
                violations.append(violation_data)

        return violations

detector = SafetyDetector()
print("‚úÖ Detector Ready")

üöÄ Initializing Hierarchical Detection System...
‚úÖ Detector Ready


In [None]:
# @title 4. Phase 2: AI Compliance Agent
class ComplianceAgent:
    def __init__(self):
        self.has_key = config.OPENAI_API_KEY and config.OPENAI_API_KEY.startswith("sk-")
        if self.has_key:
            self.llm = ChatOpenAI(model="gpt-4", openai_api_key=config.OPENAI_API_KEY, temperature=0.3)
            self.prompt = PromptTemplate(
                input_variables=["date", "time", "location", "description", "missing"],
                template="""
                    You are an OSHA Safety Compliance Officer.
                    Details: {date} at {location}.
                    Violation: {description}. Missing: {missing}.

                    Generate a Citation Report Section:
                    1. **Regulation Cited:** (Map '{missing}' to specific OSHA 1926 codes).
                    2. **Observation:** (Professional description of the worker's non-compliance).
                    3. **Severity:** (High/Medium/Low based on the missing gear).
                    4. **Required Action:** (Immediate corrective steps).
                    """
            )
        else:
            print("‚ö†Ô∏è Agent initialized in MOCK mode (No API Key)")

    def generate_report_text(self, violation):
        if not self.has_key:
            return f"""
            **MOCK AI REPORT**
            **Incident:** {violation['description']} detected at {violation['location']}.
            **Regulation:** Likely violates OSHA 1926.100 (Head Protection) or 1926.102 (PPE).
            **Action:** Immediately halt work and provide required PPE to the worker.
            """

        try:
            chain = self.prompt | self.llm
            response = chain.invoke({
                "date": violation['timestamp'].strftime("%Y-%m-%d"),
                "time": violation['timestamp'].strftime("%H:%M:%S"),
                "location": violation['location'],
                "description": violation['description'],
                "missing": ", ".join(violation['missing_items'])
            })
            return response.content
        except Exception as e:
            return f"Error generating AI report: {e}"

agent = ComplianceAgent()
print("‚úÖ Agent Ready")

‚úÖ Agent Ready


In [55]:
# @title 5. Phase 3: PDF Report Generator
class PDFGenerator:
    def generate_pdf(self, violation, report_text):
        filename = f"{config.REPORTS_DIR}/Incident_{violation['timestamp'].strftime('%Y%m%d_%H%M%S')}.pdf"
        doc = SimpleDocTemplate(filename, pagesize=letter)
        styles = getSampleStyleSheet()
        story = []

        # Header
        story.append(Paragraph(f"SAFETY INCIDENT REPORT", styles['Title']))
        story.append(Spacer(1, 12))

        # Meta Data Table
        data = [
            ["Date:", violation['timestamp'].strftime("%Y-%m-%d %H:%M:%S")],
            ["Site:", config.SITE_NAME],
            ["Location:", config.SITE_LOCATION],
            ["Violation Type:", violation['description']]
        ]
        t = Table(data, colWidths=[100, 300])
        t.setStyle(TableStyle([('FONT', (0,0), (-1,-1), 'Helvetica-Bold'), ('GRID', (0,0), (-1,-1), 1, colors.black)]))
        story.append(t)
        story.append(Spacer(1, 20))

        # Evidence Image
        if os.path.exists(violation['image_path']):
            im = ReportLabImage(violation['image_path'], width=400, height=300)
            story.append(im)
            story.append(Paragraph("Fig 1. Automated Detection Evidence", styles['Italic']))
            story.append(Spacer(1, 20))

        # AI Analysis Section
        story.append(Paragraph("Officer Analysis (AI Generated):", styles['Heading2']))
        story.append(Paragraph(report_text.replace("\n", "<br/>"), styles['Normal']))

        # Footer
        story.append(Spacer(1, 30))
        story.append(Paragraph(f"Generated by {config.COMPANY_NAME} Safety System", styles['Italic']))

        doc.build(story)
        print(f"üìÑ PDF Generated: {filename}")
        return filename

pdf_gen = PDFGenerator()
print("‚úÖ PDF Generator Ready")

‚úÖ PDF Generator Ready


In [56]:
# @title 6. Phase 4: Email Notification System
class NotificationService:
    def send_email(self, violation, pdf_path):
        if not config.EMAIL_ENABLED:
            print(f"üìß EMAIL SKIPPED (Config Disabled). Report saved at {pdf_path}")
            return

        msg = MIMEMultipart()
        msg['From'] = config.EMAIL_SENDER
        msg['To'] = ", ".join(config.EMAIL_RECIPIENTS)
        msg['Subject'] = f"üö® Safety Violation: {violation['description']}"

        body = f"""
        URGENT: Safety Violation Detected

        Site: {config.SITE_NAME}
        Time: {violation['timestamp']}
        Violation: {violation['description']}

        Please see attached official report.
        """
        msg.attach(MIMEText(body, 'plain'))

        with open(pdf_path, "rb") as f:
            attach = MIMEApplication(f.read(), _subtype="pdf")
            attach.add_header('Content-Disposition', 'attachment', filename=os.path.basename(pdf_path))
            msg.attach(attach)

        try:
            server = smtplib.SMTP('smtp.gmail.com', 587)
            server.starttls()
            server.login(config.EMAIL_SENDER, config.EMAIL_PASSWORD)
            server.send_message(msg)
            server.quit()
            print(f"‚úÖ Email Sent to {config.EMAIL_RECIPIENTS}")
        except Exception as e:
            print(f"‚ùå Email Failed: {e}")

notifier = NotificationService()
print("‚úÖ Notification Service Ready")

‚úÖ Notification Service Ready


In [57]:
# @title 7. Run Full Ecosystem
def run_safety_ecosystem(image_path):
    print("="*60)
    print("üé¨ STARTING SAFETY AUDIT PIPELINE")
    print("="*60)

    # 1. DETECT
    print("\nüîç PHASE 1: Detection")
    violations = detector.detect(image_path)

    if not violations:
        print("‚úÖ No violations detected. Site is safe.")
        return

    print(f"‚ö†Ô∏è  Found {len(violations)} Violations. Processing...")

    for v in violations:
        print(f"   üëâ Processing: {v['description']}")

        # 2. AGENT
        print("   ü§ñ PHASE 2: AI Agent Analysis")
        report_text = agent.generate_report_text(v)

        # 3. REPORT
        print("   üìÑ PHASE 3: Generating Documents")
        pdf_path = pdf_gen.generate_pdf(v, report_text)

        # 4. NOTIFY
        print("   Aa PHASE 4: Notification")
        notifier.send_email(v, pdf_path)

    print("\n‚úÖ PIPELINE COMPLETE")
    print("="*60)

# Run it
if os.path.exists(config.IMAGE_PATH):
    run_safety_ecosystem(config.IMAGE_PATH)
else:
    print(f"‚ùå Error: Image not found at {config.IMAGE_PATH}")

üé¨ STARTING SAFETY AUDIT PIPELINE

üîç PHASE 1: Detection

Ultralytics 8.3.240 üöÄ Python-3.12.12 torch-2.9.0+cu126 CUDA:0 (Tesla T4, 15095MiB)
image 1/1 /content/Worker-working-without-safety-boots-hand-gloves-head-protection_Q320.jpg: 644x644 (no detections), 900.7ms
Speed: 2.2ms preprocess, 900.7ms inference, 0.6ms postprocess per image at shape (1, 3, 644, 644)
Results saved to [1m/content/runs/segment/predict4[0m

image 1/1 /content/Worker-working-without-safety-boots-hand-gloves-head-protection_Q320.jpg: 644x644 (no detections), 812.0ms
Speed: 2.2ms preprocess, 812.0ms inference, 0.9ms postprocess per image at shape (1, 3, 644, 644)
Results saved to [1m/content/runs/segment/predict4[0m
‚ö†Ô∏è  Found 1 Violations. Processing...
   üëâ Processing: Worker detected without Helmet, Vest
   ü§ñ PHASE 2: AI Agent Analysis
   üìÑ PHASE 3: Generating Documents
üìÑ PDF Generated: /content/reports/Incident_20251221_065202.pdf
   Aa PHASE 4: Notification
‚úÖ Email Sent to ['sheza