In [None]:
!nvidia-smi

## Step 01 # Install the Ultralytics Package

In [None]:
!pip install ultralytics

## Step 02 # Import All the Requried Libraries

In [None]:
import ultralytics
ultralytics.checks()

In [None]:
from ultralytics import YOLO
from IPython.display import Image

## Step # 03 Download Dataset from Roboflow

In [None]:
!pip install roboflow

from roboflow import Roboflow
rf = Roboflow(api_key="lDv9p5jJjAOyjFiEBTiR")
project = rf.workspace("aniruddha-paul").project("industrial-helmet")
version = project.version(1)
dataset = version.download("yolov11")

In [None]:
dataset.location

## Step # 04 Train YOLO11 Model on a Custom Dataset

In [None]:
!yolo task=detect mode=train data={dataset.location}/data.yaml model="yolo11n.pt" epochs=50 imgsz=640

## Step # 05 Examine Training Results

In [None]:
from IPython.display import Image
Image("/content/runs/detect/train/confusion_matrix.png", width=600)

In [None]:
Image("/content/runs/detect/train/labels.jpg", width=600)

In [None]:
Image("/content/runs/detect/train/results.png", width=600)

In [None]:
Image("/content/runs/detect/train/train_batch0.jpg", width=600)

In [None]:
Image("/content/runs/detect/train/val_batch0_pred.jpg", width=600)

In [None]:
Image("/content/runs/detect/train/val_batch1_pred.jpg", width=600)

## Step # 08 Inference with Custom Model on Images

In [None]:
!yolo task=detect mode=predict model="/content/runs/detect/train/weights/best.pt" conf=0.25 source={dataset.location}/test/images save=True

In [None]:
import glob
import os
from IPython.display import Image as IPyImage, display

latest_folder = max(glob.glob('/content/runs/detect/predict*/'), key=os.path.getmtime)
for img in glob.glob(f'{latest_folder}/*.jpg')[1:4]:
    display(IPyImage(filename=img, width=600))
    print("\n")

In [None]:
!yolo task=detect mode=predict model= "/content/runs/detect/train/weights/best.pt" conf=0.25 source=helmet.jpg save=True

In [None]:
Image("/content/runs/detect/predict5/helmet.jpg", width=600)

## Step # 08 Inference with Custom Model on Videos

In [None]:
!yolo task=detect mode=predict model= "/content/runs/detect/train/weights/best.pt" conf=0.25 source="PPE_Part1.mp4" save=True

In [None]:
from ultralytics import YOLO
import cv2
import os
import glob
from IPython.display import HTML, display
from base64 import b64encode
import subprocess

# Define custom colors for classes
# Based on the training output, class IDs are: 0: 'head', 1: 'helmet', 2: 'person'
custom_colors = {
    0: (0, 0, 255),  # Red for 'head' (people not wearing helmets) - BGR format
    1: (0, 255, 0),  # Green for 'helmet' (people wearing helmets) - BGR format
    2: (0, 0, 0)   # Black for 'person' (if detected, based on previous training output) - BGR format
}

# Load the trained YOLO model
model = YOLO("/content/runs/detect/train/weights/best.pt")

# Define input and output video paths
input_video_path = "PPE_Part1.mp4"
custom_results_dir = "/content/runs/detect/custom_color_predict"
output_video_path_raw = os.path.join(custom_results_dir, "PPE_Part1_custom_color.avi")
compressed_output_video_path = "/content/PPE_Part1_custom_color_compressed.mp4" # Using a distinct name for the compressed video

os.makedirs(custom_results_dir, exist_ok=True)

# Process the video frame by frame
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
    raise IOError(f"Cannot open video file {input_video_path}")

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Define the codec and create VideoWriter object
# Using 'MJPG' codec for .avi for broad compatibility before re-encoding
fourcc = cv2.VideoWriter_fourcc(*'MJPG')
out = cv2.VideoWriter(output_video_path_raw, fourcc, fps, (frame_width, frame_height))

print("Processing video with custom colors...")
frame_count = 0
while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Perform inference on the frame
    results = model.predict(frame, conf=0.25, verbose=False)

    # Draw bounding boxes with custom colors
    for r in results:
        boxes = r.boxes
        for box in boxes:
            # Get bounding box coordinates, confidence, and class ID
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            conf = round(float(box.conf[0]), 2)
            cls = int(box.cls[0])
            class_name = model.names[cls]

            # Get custom color for the class
            color = custom_colors.get(cls, (255, 255, 255)) # Default to white if class not in custom_colors

            # Draw rectangle
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) # 2 is thickness

            # Put label
            label = f"{class_name}: {conf}"
            text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)[0]
            text_x = x1
            text_y = y1 - 10 if y1 - 10 > text_size[1] else y1 + text_size[1]
            cv2.rectangle(frame, (text_x, text_y - text_size[1] - 5), (text_x + text_size[0] + 5, text_y + 5), color, -1)
            cv2.putText(frame, label, (text_x + 5, text_y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) # White text

    out.write(frame)
    frame_count += 1

cap.release()
out.release()
print(f"Finished processing {frame_count} frames. Raw video saved to {output_video_path_raw}")


# Compress the video
try:
    print(f"Attempting to compress video from {output_video_path_raw} to {compressed_output_video_path}")
    result = subprocess.run(
        ['ffmpeg', '-y', '-i', output_video_path_raw, '-vcodec', 'libx264', '-crf', '28', '-preset', 'fast', compressed_output_video_path],
        capture_output=True, text=True, check=True
    )
    print("ffmpeg stdout:", result.stdout)
    print("ffmpeg stderr:", result.stderr)

    # Show video if successful
    if os.path.exists(compressed_output_video_path):
        mp4 = open(compressed_output_video_path,'rb').read()
        data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
        display(HTML(f"""
        <video width=400 controls>
              <source src=\"{data_url}\" type=\"video/mp4\">
        </video>
        """))
    else:
        print(f"Error: Compressed video not found at {compressed_output_video_path}")

except subprocess.CalledProcessError as e:
    print(f"ffmpeg command failed with error code {e.returncode}")
    print("ffmpeg stdout:", e.stdout)
    print("ffmpeg stderr:", e.stderr)
    print(f"Error: Failed to create compressed video at {compressed_output_video_path}")
except FileNotFoundError as e:
    print(f"Error: {e}")
    print(f"Please ensure the video file exists at {output_video_path_raw}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")


In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# Calculate total counts from the DataFrame
total_helmets = df_frame_counts['Helmets Detected'].sum()
total_heads = df_frame_counts['Heads Detected'].sum()

# Data for the bar chart
labels = ['Helmets', 'Heads']
counts = [total_helmets, total_heads]
colors = ['green', 'red']

# Create the bar chart
plt.figure(figsize=(8, 6))
plt.bar(labels, counts, color=colors)
plt.xlabel('Object Type')
plt.ylabel('Total Number of Detections')
plt.title('Total Number of Detected Helmets and Heads in Video')
plt.grid(axis='y', linestyle='--', alpha=0.7)

# Add count labels on top of the bars
for i, count in enumerate(counts):
    plt.text(labels[i], count + 0.5, str(count), ha='center', va='bottom')

plt.show()


In [None]:
from ultralytics import YOLO
import cv2
import pandas as pd

# Load the trained YOLO model
model = YOLO("/content/runs/detect/train/weights/best.pt")

# Define input video path
input_video_path = "PPE_Part1.mp4"

# List to store frame-by-frame counts
frame_data = []

# Process the video frame by frame
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
    raise IOError(f"Cannot open video file {input_video_path}")

print("Collecting frame-by-frame counts...")
frame_number = 0
while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Perform inference on the frame
    results = model.predict(frame, conf=0.25, verbose=False)

    current_frame_helmets = 0
    current_frame_heads = 0

    for r in results:
        boxes = r.boxes
        for box in boxes:
            cls = int(box.cls[0])
            class_name = model.names[cls]

            if class_name == 'helmet':
                current_frame_helmets += 1
            elif class_name == 'head':
                current_frame_heads += 1

    frame_data.append({
        'Frame': frame_number,
        'Helmets Detected': current_frame_helmets,
        'Heads Detected': current_frame_heads
    })
    frame_number += 1

cap.release()
print("Finished collecting data.")

# Create a DataFrame from the collected data
df_frame_counts = pd.DataFrame(frame_data)

# Display the DataFrame
display(df_frame_counts)


In [None]:
csv = df_frame_counts.to_csv('frame_counts.csv', index=False)

In [None]:
# Calculate total counts
total_helmets = df_frame_counts['Helmets Detected'].sum()
total_heads = df_frame_counts['Heads Detected'].sum()

# Data for the donut chart
labels = ['Helmets', 'Heads']
sizes = [total_helmets, total_heads]
colors = ['#4CAF50', '#F44336'] # Green for helmets, Red for heads
explode = (0.05, 0.05)  # Explode a slice if desired

# Create a figure and a set of subplots
fig, ax = plt.subplots(figsize=(8, 8))

# Draw pie chart
wedges, texts, autotexts = ax.pie(sizes, colors=colors,
                                  autopct='%1.1f%%', startangle=90,
                                  pctdistance=0.85, explode=explode)

# Draw white circle in the middle to make it a donut chart
centre_circle = plt.Circle((0,0),0.70,fc='white')
fig.gca().add_artist(centre_circle)

# Equal aspect ratio ensures that pie is drawn as a circle.
ax.axis('equal')

# Add labels and title
ax.legend(wedges, labels, title="Detection Type", loc="center left", bbox_to_anchor=(1, 0, 0.5, 1))
ax.set_title('Ratio of Detected Helmets vs. Heads', fontsize=16)

plt.setp(autotexts, size=12, weight="bold", color="white")

plt.show()


In [None]:
unsafe_frames_df = df_frame_counts[df_frame_counts['Heads Detected'] > df_frame_counts['Helmets Detected']]
display(unsafe_frames_df)

In [None]:
sender_email = "kidwai4601299@cloud.neduet.edu.pk" # Replace with your sender email
sender_password = "pghu gbtm ezcd nfie" # Replace with your sender password
recipient_email = "ahmed4630136@cloud.neduet.edu.pk" # Replace with the recipient email

print("Email settings configured.")

In [None]:
import smtplib
from email.message import EmailMessage

# Configure SMTP server details
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 587 # For TLS

# --- IMPORTANT: REPLACE THESE WITH YOUR ACTUAL EMAIL AND PASSWORD ---
sender_email = "kidwai4601299@cloud.neduet.edu.pk" # Replace with your sender email
sender_password = "pghu gbtm ezcd nfie" # Replace with your sender password
recipient_email = "ahmed4630136@cloud.neduet.edu.pk" # Replace with the recipient email
# -------------------------------------------------------------------

print("Attempting to send email notifications for unsafe frames...")

if not unsafe_frames_df.empty:
    # Check if placeholders are still present
    if sender_email == "sender_email" or sender_password == "sender_password" or recipient_email == "recipient_email":
        print("Error: Please replace the placeholder email and password with your actual credentials before running.")
    else:
        try:
            # Establish a secure connection to the SMTP server
            with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
                server.starttls() # Upgrade connection to secure TLS
                server.login(sender_email, sender_password)

                for index, row in unsafe_frames_df.iterrows():
                    frame_number = int(row['Frame'])
                    heads_detected = int(row['Heads Detected'])
                    helmets_detected = int(row['Helmets Detected'])

                    # Create the email message
                    msg = EmailMessage()
                    msg['Subject'] = f'UNSAFE INSTANCE DETECTED - Frame {frame_number}'
                    msg['From'] = sender_email
                    msg['To'] = recipient_email
                    msg.set_content(f"""
                    Dear Recipient,

                    An unsafe instance has been detected in the video feed.

                    Details:
                    - Frame Number: {frame_number}
                    - Heads Detected (without helmet): {heads_detected}
                    - Helmets Detected: {helmets_detected}

                    Please review the footage at your earliest convenience.

                    Best regards,
                    Automated Safety System
                    """)

                    server.send_message(msg)
                    print(f"Email notification sent for Frame {frame_number}.")
            print("All email notifications sent successfully.")
        except smtplib.SMTPAuthenticationError:
            print("Error: Failed to authenticate with the SMTP server. Check your email and password, and ensure 'Less secure app access' is enabled or use an App Password.")
        except smtplib.SMTPConnectError:
            print("Error: Failed to connect to the SMTP server. Check server address and port.")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
else:
    print("No unsafe instances found. No emails to send.")


In [None]:
import pandas as pd

# Create a sample CSV file with employee names and email addresses
# For demonstration purposes, we'll create a dummy CSV file.
# In a real-world scenario, this file would already exist.

csv_content = """Name,Email
John Doe,john.doe@example.com
Jane Smith,jane.smith@example.com
Peter Jones,peter.jones@example.com
Alice Brown,alice.brown@example.com
"""

with open('employees.csv', 'w') as f:
    f.write(csv_content)

print("Sample 'employees.csv' created.")

# Read the 'employees.csv' file into a pandas DataFrame
employees_df = pd.read_csv('employees.csv')

# Display the DataFrame to verify the data has been loaded correctly
print("Employee DataFrame loaded successfully:")
display(employees_df)

In [None]:
employee_emails = employees_df['Email'].tolist()

print("Employee email list created:")
print(employee_emails)

In [None]:
import smtplib
from email.message import EmailMessage

# Configure SMTP server details
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 587 # For TLS

# --- IMPORTANT: REPLACE THESE WITH YOUR ACTUAL SENDER EMAIL AND PASSWORD ---
sender_email = "kidwai4601299@cloud.neduet.edu.pk" # Replace with your sender email
sender_password = "pghu gbtm ezcd nfie" # Replace with your sender password
# --------------------------------------------------------------------------

print("Attempting to send email notifications for unsafe frames...")

if not unsafe_frames_df.empty:
    # Check if sender credentials are still placeholders
    if sender_email == "sender_email" or sender_password == "sender_password":
        print("Error: Please replace the placeholder sender email and password with your actual credentials before running.")
    elif not employee_emails: # Check if employee_emails list is empty
        print("Error: No employee email addresses found in the list. Cannot send notifications.")
    else:
        try:
            # Establish a secure connection to the SMTP server
            with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
                server.starttls() # Upgrade connection to secure TLS
                server.login(sender_email, sender_password)

                for index, row in unsafe_frames_df.iterrows():
                    frame_number = int(row['Frame'])
                    heads_detected = int(row['Heads Detected'])
                    helmets_detected = int(row['Helmets Detected'])

                    for recipient in employee_emails:
                        # Create the email message
                        msg = EmailMessage()
                        msg['Subject'] = 'Safety Alert: Unsafe Instance Detected'
                        msg['From'] = sender_email
                        msg['To'] = recipient
                        msg.set_content(f"""
                        Dear Employee,

                        This is an automated safety alert.
                        An unsafe instance has been detected in the video feed where a potential helmet non-compliance was observed.

                        Details for the incident at Frame {frame_number}:
                        - Individuals detected without helmets: {heads_detected}
                        - Individuals detected with helmets: {helmets_detected}

                        Please ensure compliance with all safety regulations, including consistent helmet usage in designated areas. Your safety is our top priority.

                        Best regards,
                        Automated Safety System
                        """)

                        server.send_message(msg)
                        print(f"Email notification sent for Frame {frame_number} to {recipient}.")
            print("All email notifications sent successfully.")
        except smtplib.SMTPAuthenticationError:
            print("Error: Failed to authenticate with the SMTP server. Check your sender email and password, and ensure 'Less secure app access' is enabled or use an App Password if 2-Factor Authentication is enabled.")
        except smtplib.SMTPConnectError:
            print("Error: Failed to connect to the SMTP server. Check server address and port.")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
else:
    print("No unsafe instances found. No emails to send.")