In [5]:
%%writefile Accident.py

import streamlit as st
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import tempfile
import os
import time
from contextlib import contextmanager

# Set page config
st.set_page_config(
    page_title="Accident Detection System",
    page_icon="🚗",
    layout="wide"
)

# Constants
MAX_VIDEO_SIZE_MB = 100  # 100MB limit
SUPPORTED_IMAGE_TYPES = ["jpg", "jpeg", "png"]
SUPPORTED_VIDEO_TYPES = ["mp4", "avi", "mov"]

# Load model with caching
@st.cache_resource
def load_model():
    try:
        model = YOLO("D:/All Documents/Projects/Real-Time Accident Detection and Alert System/Models/best.pt")  # or 'model/best.onnx'
        return model
    except Exception as e:
        st.error(f"🚨 Model loading failed: {str(e)}")
        st.stop()

model = load_model()

# Context manager for temp files
@contextmanager
def temp_video_file(uploaded_file):
    temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
    try:
        temp.write(uploaded_file.read())
        temp.close()
        yield temp.name
    finally:
        try:
            os.unlink(temp.name)
        except:
            pass

# Sidebar
st.sidebar.title("Settings")
confidence_threshold = st.sidebar.slider(
    "Confidence Threshold", 
    min_value=0.1, 
    max_value=0.9, 
    value=0.3, 
    step=0.05,
    help="Adjust the sensitivity of accident detection"
)

# Main app
st.title("🚗 Real-Time Accident Detection System")
st.markdown("""
    Upload images or videos to detect potential accidents using AI.
    The system will highlight detected accidents with bounding boxes.
""")

# Option selection
option = st.radio(
    "Select input type:",
    ("Image", "Video"),
    horizontal=True,
    index=0
)

def process_image(uploaded_file):
    """Process and display image with accident detection"""
    try:
        image = Image.open(uploaded_file)
        col1, col2 = st.columns(2)
        
        with col1:
            st.image(image, caption="Original Image", use_column_width=True)
        
        with st.spinner("🔍 Analyzing image..."):
            start_time = time.time()
            results = model.predict(image, conf=confidence_threshold)
            processing_time = time.time() - start_time
            
            with col2:
                if len(results[0].boxes) > 0:
                    st.warning(f"🚨 ACCIDENT DETECTED (Confidence: {max(results[0].boxes.conf).item():.2f})")
                    result_img = Image.fromarray(results[0].plot()[:, :, ::-1])  # BGR to RGB
                else:
                    st.success("✅ NO ACCIDENT DETECTED")
                    result_img = Image.fromarray(results[0].orig_img[:, :, ::-1])
                
                st.image(result_img, caption="Processed Result", use_column_width=True)
                st.caption(f"Processing time: {processing_time:.2f} seconds")
    
    except Exception as e:
        st.error(f"Error processing image: {str(e)}")

def process_video(uploaded_file):
    """Process and display video with accident detection"""
    try:
        # Check file size
        if uploaded_file.size > MAX_VIDEO_SIZE_MB * 1024 * 1024:
            st.error(f"File too large. Maximum size is {MAX_VIDEO_SIZE_MB}MB")
            return
        
        st.video(uploaded_file)
        
        if st.button("Process Video", type="primary"):
            with temp_video_file(uploaded_file) as input_path:
                # Setup video capture
                cap = cv2.VideoCapture(input_path)
                if not cap.isOpened():
                    st.error("Error opening video file")
                    return
                
                # Get video properties
                frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                fps = int(cap.get(cv2.CAP_PROP_FPS))
                total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                
                # Create output temp file
                output_temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
                output_temp.close()
                
                # Video writer
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out = cv2.VideoWriter(output_temp.name, fourcc, fps, (frame_width, frame_height))
                
                # UI elements
                progress_bar = st.progress(0)
                status_text = st.empty()
                result_placeholder = st.empty()
                time_placeholder = st.empty()
                
                frame_count = 0
                accident_detected = False
                start_time = time.time()
                
                try:
                    while cap.isOpened():
                        ret, frame = cap.read()
                        if not ret:
                            break
                            
                        # Process frame
                        results = model.predict(frame, conf=confidence_threshold)
                        annotated_frame = results[0].plot()
                        
                        # Check for accidents
                        if len(results[0].boxes) > 0:
                            accident_detected = True
                        
                        # Write frame
                        out.write(annotated_frame)
                        
                        # Update progress
                        frame_count += 1
                        progress = frame_count / total_frames
                        progress_bar.progress(progress)
                        status_text.text(f"📊 Processing: {frame_count}/{total_frames} frames")
                        time_placeholder.text(f"⏱️ Elapsed time: {time.time() - start_time:.1f} seconds")
                    
                    # Release resources
                    cap.release()
                    out.release()
                    
                    # Show results
                    if accident_detected:
                        result_placeholder.warning("🚨 ACCIDENT(S) DETECTED IN VIDEO")
                    else:
                        result_placeholder.success("✅ NO ACCIDENTS DETECTED IN VIDEO")
                    
                    # Display processed video
                    st.video(output_temp.name)
                    
                    # Download button
                    with open(output_temp.name, "rb") as f:
                        st.download_button(
                            label="Download Processed Video",
                            data=f,
                            file_name="processed_video.mp4",
                            mime="video/mp4"
                        )
                
                finally:
                    # Cleanup
                    if 'cap' in locals() and cap.isOpened():
                        cap.release()
                    if 'out' in locals():
                        out.release()
                    try:
                        os.unlink(output_temp.name)
                    except:
                        pass
    
    except Exception as e:
        st.error(f"Error processing video: {str(e)}")

# File upload based on selected option
if option == "Image":
    uploaded_file = st.file_uploader(
        "Upload an image",
        type=SUPPORTED_IMAGE_TYPES,
        accept_multiple_files=False,
        help="Supported formats: JPG, JPEG, PNG"
    )
    if uploaded_file is not None:
        process_image(uploaded_file)
else:
    uploaded_file = st.file_uploader(
        "Upload a video",
        type=SUPPORTED_VIDEO_TYPES,
        accept_multiple_files=False,
        help="Supported formats: MP4, AVI, MOV (Max 100MB)"
    )
    if uploaded_file is not None:
        process_video(uploaded_file)

Overwriting Accident.py


In [9]:
%%writefile Accident.py

import streamlit as st
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import tempfile
import os
import time
from contextlib import contextmanager

# Set page config
st.set_page_config(
    page_title="Accident Detection System",
    page_icon="🚗",
    layout="wide"
)

# Constants
MAX_VIDEO_SIZE_MB = 100  # 100MB limit
SUPPORTED_IMAGE_TYPES = ["jpg", "jpeg", "png"]
SUPPORTED_VIDEO_TYPES = ["mp4", "avi", "mov"]
FRAME_SKIP = 2  # Process every 2nd frame for faster results

# Load model with caching
@st.cache_resource
def load_model():
    try:
        model = YOLO("D:/All Documents/Projects/Real-Time Accident Detection and Alert System/Models/best.pt")  # or 'model/best.onnx'
        return model
    except Exception as e:
        st.error(f"🚨 Model loading failed: {str(e)}")
        st.stop()

model = load_model()

# Context manager for temp files
@contextmanager
def temp_video_file(uploaded_file):
    temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
    try:
        temp.write(uploaded_file.read())
        temp.close()
        yield temp.name
    finally:
        try:
            os.unlink(temp.name)
        except:
            pass

# Sidebar with performance options
st.sidebar.title("Performance Settings")
confidence_threshold = st.sidebar.slider(
    "Confidence Threshold", 
    min_value=0.1, 
    max_value=0.9, 
    value=0.3, 
    step=0.05
)

frame_skip = st.sidebar.selectbox(
    "Frame Processing Rate",
    options=[1, 2, 3, 4],
    index=1,
    help="Higher values process fewer frames (faster but less accurate)"
)

# Main app
st.title("🚗 Real-Time Accident Detection System")
st.markdown("""
    <style>
    .fast-processing {
        color: #4CAF50;
        font-weight: bold;
    }
    </style>
    <p class="fast-processing">⚡ Optimized for faster processing</p>
""", unsafe_allow_html=True)

# Option selection
option = st.radio(
    "Select input type:",
    ("Image", "Video"),
    horizontal=True
)

def process_image(uploaded_file):
    """Optimized image processing"""
    try:
        image = Image.open(uploaded_file)
        
        with st.spinner("🔍 Analyzing image (optimized)..."):
            start_time = time.perf_counter()
            
            # Use half precision for faster inference if available
            results = model.predict(
                image, 
                conf=confidence_threshold,
                half=True  # Enable FP16 inference if supported
            )
            
            processing_time = time.perf_counter() - start_time
            
            col1, col2 = st.columns(2)
            with col1:
                st.image(image, caption="Original Image", use_column_width=True)
            
            with col2:
                if len(results[0].boxes) > 0:
                    st.warning(f"🚨 ACCIDENT DETECTED (Confidence: {max(results[0].boxes.conf).item():.2f})")
                    result_img = Image.fromarray(results[0].plot()[:, :, ::-1])
                else:
                    st.success("✅ NO ACCIDENT DETECTED")
                    result_img = Image.fromarray(results[0].orig_img[:, :, ::-1])
                
                st.image(result_img, caption="Processed Result", use_column_width=True)
                st.caption(f"⚡ Processing time: {processing_time:.3f} seconds")
    
    except Exception as e:
        st.error(f"Error processing image: {str(e)}")

def process_video(uploaded_file):
    """Optimized video processing with frame skipping"""
    try:
        if uploaded_file.size > MAX_VIDEO_SIZE_MB * 1024 * 1024:
            st.error(f"File too large. Maximum size is {MAX_VIDEO_SIZE_MB}MB")
            return
        
        st.video(uploaded_file)
        
        if st.button("Process Video (Optimized)", type="primary"):
            with temp_video_file(uploaded_file) as input_path:
                cap = cv2.VideoCapture(input_path)
                if not cap.isOpened():
                    st.error("Error opening video file")
                    return
                
                # Get video properties
                frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                fps = int(cap.get(cv2.CAP_PROP_FPS))
                total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                
                # Adjust FPS based on frame skip
                output_fps = fps / frame_skip
                
                # Create output temp file
                output_temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
                output_temp.close()
                
                # Video writer with adjusted FPS
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out = cv2.VideoWriter(output_temp.name, fourcc, output_fps, (frame_width, frame_height))
                
                # UI elements
                progress_bar = st.progress(0)
                status_text = st.empty()
                result_placeholder = st.empty()
                time_placeholder = st.empty()
                
                frame_count = 0
                processed_count = 0
                accident_detected = False
                start_time = time.perf_counter()
                
                try:
                    while cap.isOpened():
                        ret, frame = cap.read()
                        if not ret:
                            break
                        
                        frame_count += 1
                        
                        # Skip frames according to frame_skip setting
                        if frame_count % frame_skip != 0:
                            continue
                            
                        # Process frame with half precision
                        results = model.predict(
                            frame, 
                            conf=confidence_threshold,
                            half=True
                        )
                        
                        # Check for accidents
                        if len(results[0].boxes) > 0:
                            accident_detected = True
                        
                        # Write frame
                        out.write(results[0].plot())
                        processed_count += 1
                        
                        # Update progress less frequently for better performance
                        if processed_count % 5 == 0 or frame_count == total_frames:
                            progress = frame_count / total_frames
                            progress_bar.progress(progress)
                            status_text.text(f"📊 Processed {processed_count} frames (skipping {frame_skip-1} frames each)")
                            time_placeholder.text(f"⏱️ Elapsed: {time.perf_counter() - start_time:.1f}s | Est. remaining: {(total_frames-frame_count)/(fps/frame_skip):.1f}s")
                    
                    # Release resources
                    cap.release()
                    out.release()
                    
                    # Show results
                    if accident_detected:
                        result_placeholder.warning(f"🚨 ACCIDENT(S) DETECTED IN {processed_count} PROCESSED FRAMES")
                    else:
                        result_placeholder.success(f"✅ NO ACCIDENTS DETECTED IN {processed_count} PROCESSED FRAMES")
                    
                    # Display optimized video
                    st.video(output_temp.name)
                    
                    # Performance summary
                    total_time = time.perf_counter() - start_time
                    st.info(f"""
                        **Performance Summary:**
                        - Total frames: {total_frames}
                        - Processed frames: {processed_count}
                        - Frame skip rate: {frame_skip}x
                        - Processing speed: {processed_count/total_time:.1f} FPS
                        - Total processing time: {total_time:.1f} seconds
                    """)
                    
                    # Download button
                    with open(output_temp.name, "rb") as f:
                        st.download_button(
                            label="Download Processed Video",
                            data=f,
                            file_name="processed_video.mp4",
                            mime="video/mp4"
                        )
                
                finally:
                    # Cleanup
                    if 'cap' in locals() and cap.isOpened():
                        cap.release()
                    if 'out' in locals():
                        out.release()
                    try:
                        os.unlink(output_temp.name)
                    except:
                        pass
    
    except Exception as e:
        st.error(f"Error processing video: {str(e)}")

# File upload based on selected option
if option == "Image":
    uploaded_file = st.file_uploader(
        "Upload an image",
        type=SUPPORTED_IMAGE_TYPES,
        help="Supported formats: JPG, JPEG, PNG"
    )
    if uploaded_file is not None:
        process_image(uploaded_file)
else:
    uploaded_file = st.file_uploader(
        "Upload a video",
        type=SUPPORTED_VIDEO_TYPES,
        help=f"Supported formats: MP4, AVI, MOV (Max {MAX_VIDEO_SIZE_MB}MB)"
    )
    if uploaded_file is not None:
        process_video(uploaded_file)

# Footer
st.markdown("---")
st.caption("""
    ⚡ Optimized Accident Detection System | Powered by YOLOv8
    - Adjust frame skip rate in sidebar for faster processing
    - Higher values process fewer frames (faster but less accurate)
""")

Overwriting Accident.py


# Working Great

In [11]:
%%writefile Accident.py

import streamlit as st
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import tempfile
import os
import time
from contextlib import contextmanager

# Set page config
st.set_page_config(
    page_title="Accident Detection System",
    page_icon="🚗",
    layout="wide"
)

# Constants
MAX_VIDEO_SIZE_MB = 100
SUPPORTED_IMAGE_TYPES = ["jpg", "jpeg", "png"]
SUPPORTED_VIDEO_TYPES = ["mp4", "avi", "mov"]
FRAME_SKIP = 2

# Load model with caching
@st.cache_resource
def load_model():
    try:
        model = YOLO("D:/All Documents/Projects/Real-Time Accident Detection and Alert System/Models/best.pt")  # or 'model/best.onnx'
        return model
    except Exception as e:
        st.error(f"🚨 Model loading failed: {str(e)}")
        st.stop()

model = load_model()

# Context manager for temp files
@contextmanager
def temp_video_file(uploaded_file):
    temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
    try:
        temp.write(uploaded_file.read())
        temp.close()
        yield temp.name
    finally:
        try:
            os.unlink(temp.name)
        except:
            pass

# Sidebar
st.sidebar.title("Settings")
confidence_threshold = st.sidebar.slider(
    "Confidence Threshold", 
    min_value=0.1, 
    max_value=0.9, 
    value=0.3, 
    step=0.05
)

frame_skip = st.sidebar.selectbox(
    "Frame Processing Rate",
    options=[1, 2, 3, 4],
    index=1,
    help="Higher values process fewer frames (faster but less accurate)"
)

# Main app
st.title("🚗 Enhanced Accident Detection System")
st.markdown("""
    <style>
    .object-info {
        padding: 10px;
        margin: 5px 0;
        border-radius: 5px;
        background-color: #f0f2f6;
    }
    </style>
""", unsafe_allow_html=True)

# Option selection
option = st.radio(
    "Select input type:",
    ("Image", "Video"),
    horizontal=True
)

def get_object_details(boxes):
    """Extract detailed information about detected objects"""
    details = []
    for box in boxes:
        obj = {
            "type": model.names[int(box.cls)],
            "confidence": float(box.conf),
            "coordinates": {
                "x1": float(box.xyxy[0][0]),
                "y1": float(box.xyxy[0][1]),
                "x2": float(box.xyxy[0][2]),
                "y2": float(box.xyxy[0][3])
            }
        }
        details.append(obj)
    return details

def process_image(uploaded_file):
    """Process image with detailed object information"""
    try:
        image = Image.open(uploaded_file)
        
        with st.spinner("🔍 Analyzing image..."):
            results = model.predict(image, conf=confidence_threshold)
            
            col1, col2 = st.columns(2)
            with col1:
                st.image(image, caption="Original Image", use_column_width=True)
            
            with col2:
                if len(results[0].boxes) > 0:
                    st.warning("🚨 ACCIDENT DETECTED")
                    result_img = Image.fromarray(results[0].plot()[:, :, ::-1])
                    st.image(result_img, caption="Processed Result", use_column_width=True)
                    
                    # Display object details
                    st.subheader("Detected Objects:")
                    objects = get_object_details(results[0].boxes)
                    for obj in objects:
                        st.markdown(f"""
                            <div class="object-info">
                                <b>Type:</b> {obj['type']}<br>
                                <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                <b>Coordinates:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                            </div>
                        """, unsafe_allow_html=True)
                else:
                    st.success("✅ NO ACCIDENT DETECTED")
                    st.image(image, caption="No detections", use_column_width=True)
    
    except Exception as e:
        st.error(f"Error processing image: {str(e)}")

def process_video(uploaded_file):
    """Process video with frame-by-frame object information"""
    try:
        if uploaded_file.size > MAX_VIDEO_SIZE_MB * 1024 * 1024:
            st.error(f"File too large. Maximum size is {MAX_VIDEO_SIZE_MB}MB")
            return
        
        st.video(uploaded_file)
        
        if st.button("Process Video with Details", type="primary"):
            with temp_video_file(uploaded_file) as input_path:
                cap = cv2.VideoCapture(input_path)
                if not cap.isOpened():
                    st.error("Error opening video file")
                    return
                
                # Get video properties
                frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                fps = int(cap.get(cv2.CAP_PROP_FPS))
                total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                
                # Create output temp file
                output_temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
                output_temp.close()
                
                # Video writer
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out = cv2.VideoWriter(output_temp.name, fourcc, fps, (frame_width, frame_height))
                
                # UI elements
                progress_bar = st.progress(0)
                status_text = st.empty()
                result_placeholder = st.empty()
                details_placeholder = st.empty()
                
                frame_count = 0
                processed_count = 0
                accident_frames = []
                
                try:
                    while cap.isOpened():
                        ret, frame = cap.read()
                        if not ret:
                            break
                        
                        frame_count += 1
                        
                        # Skip frames according to frame_skip setting
                        if frame_count % frame_skip != 0:
                            continue
                            
                        # Process frame
                        results = model.predict(frame, conf=confidence_threshold)
                        annotated_frame = results[0].plot()
                        out.write(annotated_frame)
                        processed_count += 1
                        
                        # Check for accidents
                        if len(results[0].boxes) > 0:
                            # Get object details for this frame
                            objects = get_object_details(results[0].boxes)
                            accident_frames.append({
                                "frame_number": frame_count,
                                "time_seconds": frame_count/fps,
                                "objects": objects
                            })
                        
                        # Update progress
                        progress = frame_count / total_frames
                        progress_bar.progress(progress)
                        status_text.text(f"📊 Processing frame {frame_count}/{total_frames}")
                    
                    # Release resources
                    cap.release()
                    out.release()
                    
                    # Show results
                    if accident_frames:
                        result_placeholder.warning(f"🚨 ACCIDENTS DETECTED IN {len(accident_frames)} FRAMES")
                        
                        # Display detailed accident information
                        with st.expander("📝 Detailed Accident Report", expanded=True):
                            st.write(f"Total frames with accidents: {len(accident_frames)}")
                            
                            for accident in accident_frames:
                                st.markdown(f"""
                                    ### Frame {accident['frame_number']} (Time: {accident['time_seconds']:.1f}s)
                                """)
                                
                                for obj in accident['objects']:
                                    st.markdown(f"""
                                        <div class="object-info">
                                            <b>Object Type:</b> {obj['type']}<br>
                                            <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                            <b>Bounding Box:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                                        </div>
                                    """, unsafe_allow_html=True)
                                st.markdown("---")
                    else:
                        result_placeholder.success("✅ NO ACCIDENTS DETECTED IN VIDEO")
                    
                    # Display processed video
                    st.video(output_temp.name)
                    
                    # Download button
                    with open(output_temp.name, "rb") as f:
                        st.download_button(
                            label="Download Processed Video",
                            data=f,
                            file_name="processed_video.mp4",
                            mime="video/mp4"
                        )
                
                finally:
                    # Cleanup
                    if 'cap' in locals() and cap.isOpened():
                        cap.release()
                    if 'out' in locals():
                        out.release()
                    try:
                        os.unlink(output_temp.name)
                    except:
                        pass
    
    except Exception as e:
        st.error(f"Error processing video: {str(e)}")

# File upload based on selected option
if option == "Image":
    uploaded_file = st.file_uploader(
        "Upload an image",
        type=SUPPORTED_IMAGE_TYPES,
        help="Supported formats: JPG, JPEG, PNG"
    )
    if uploaded_file is not None:
        process_image(uploaded_file)
else:
    uploaded_file = st.file_uploader(
        "Upload a video",
        type=SUPPORTED_VIDEO_TYPES,
        help=f"Supported formats: MP4, AVI, MOV (Max {MAX_VIDEO_SIZE_MB}MB)"
    )
    if uploaded_file is not None:
        process_video(uploaded_file)

# Footer
st.markdown("---")
st.caption("""
    Enhanced Accident Detection System | Powered by YOLOv8
    - Shows detailed object information for each detection
    - Includes frame numbers and timestamps for video analysis
""")

Overwriting Accident.py


In [21]:
%%writefile Accident.py

import streamlit as st
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import tempfile
import os
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from datetime import datetime
from contextlib import contextmanager

# Set page config
st.set_page_config(
    page_title="Accident Detection Alert System",
    page_icon="🚨",
    layout="wide"
)

# Constants
MAX_VIDEO_SIZE_MB = 100
SUPPORTED_IMAGE_TYPES = ["jpg", "jpeg", "png"]
SUPPORTED_VIDEO_TYPES = ["mp4", "avi", "mov"]
FRAME_SKIP = 2

# Email Configuration (HARDCODED - FOR DEMONSTRATION ONLY)
EMAIL_SENDER = "rnithin691@gmail.com"
EMAIL_PASSWORD = "apvnnfurzihocefq"  # App-specific password
EMAIL_RECEIVER = "nithinreddycheerapureddy@gmail.com"

# Load model with caching
@st.cache_resource
def load_model():
    try:
        model = YOLO("D:/All Documents/Projects/Real-Time Accident Detection and Alert System/Models/best.pt")
        return model
    except Exception as e:
        st.error(f"🚨 Model loading failed: {str(e)}")
        st.stop()

model = load_model()

# Context manager for temp files
@contextmanager
def temp_video_file(uploaded_file):
    temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
    try:
        temp.write(uploaded_file.read())
        temp.close()
        yield temp.name
    finally:
        try:
            os.unlink(temp.name)
        except:
            pass

def send_email_alert(subject, message, attachment_path=None):
    """Send email notification with attachment"""
    try:
        msg = MIMEMultipart()
        msg['From'] = EMAIL_SENDER
        msg['To'] = EMAIL_RECEIVER
        msg['Subject'] = subject
        
        msg.attach(MIMEText(message, 'plain'))
        
        if attachment_path and os.path.exists(attachment_path):
            with open(attachment_path, 'rb') as f:
                img = MIMEImage(f.read())
                img.add_header('Content-Disposition', 'attachment', 
                             filename=os.path.basename(attachment_path))
                msg.attach(img)
        
        with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
            server.login(EMAIL_SENDER, EMAIL_PASSWORD)
            server.send_message(msg)
        
        st.success("📧 Email alert sent successfully")
        return True
    except Exception as e:
        st.error(f"Failed to send email: {str(e)}")
        return False

# Sidebar
st.sidebar.title("Settings")
confidence_threshold = st.sidebar.slider(
    "Confidence Threshold", 
    min_value=0.1, 
    max_value=0.9, 
    value=0.3, 
    step=0.05
)

frame_skip = st.sidebar.selectbox(
    "Frame Processing Rate",
    options=[1, 2, 3, 4],
    index=1,
    help="Higher values process fewer frames (faster but less accurate)"
)

enable_email = st.sidebar.checkbox("Enable Email Alerts", True)

# Main app
st.title("🚨 Accident Detection & Alert System")
st.markdown("""
    <style>
    .object-info {
        padding: 10px;
        margin: 5px 0;
        border-radius: 5px;
        background-color: #f0f2f6;
    }
    </style>
""", unsafe_allow_html=True)

def get_object_details(boxes):
    """Extract detailed information about detected objects"""
    details = []
    for box in boxes:
        obj = {
            "type": model.names[int(box.cls)],
            "confidence": float(box.conf),
            "coordinates": {
                "x1": float(box.xyxy[0][0]),
                "y1": float(box.xyxy[0][1]),
                "x2": float(box.xyxy[0][2]),
                "y2": float(box.xyxy[0][3])
            }
        }
        details.append(obj)
    return details

def process_image(uploaded_file):
    """Process image with detailed object information"""
    try:
        image = Image.open(uploaded_file)
        
        with st.spinner("🔍 Analyzing image..."):
            results = model.predict(image, conf=confidence_threshold)
            
            col1, col2 = st.columns(2)
            with col1:
                st.image(image, caption="Original Image", use_column_width=True)
            
            with col2:
                if len(results[0].boxes) > 0:
                    st.warning("🚨 ACCIDENT DETECTED")
                    result_img = Image.fromarray(results[0].plot()[:, :, ::-1])
                    st.image(result_img, caption="Processed Result", use_column_width=True)
                    
                    # Display object details
                    st.subheader("Detected Objects:")
                    objects = get_object_details(results[0].boxes)
                    for obj in objects:
                        st.markdown(f"""
                            <div class="object-info">
                                <b>Type:</b> {obj['type']}<br>
                                <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                <b>Bounding Box:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                            </div>
                        """, unsafe_allow_html=True)
                    
                    # Send email alert if enabled
                    if enable_email:
                        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        alert_msg = f"Accident detected in image at {timestamp}"
                        
                        # Save detection image
                        result_path = "accident_detected.jpg"
                        result_img.save(result_path)
                        
                        send_email_alert(
                            "🚨 Accident Detected", 
                            alert_msg,
                            result_path
                        )
                        
                        # Cleanup
                        os.unlink(result_path)
                else:
                    st.success("✅ NO ACCIDENT DETECTED")
                    st.image(image, caption="No detections", use_container_width=True)
    
    except Exception as e:
        st.error(f"Error processing image: {str(e)}")

def process_video(uploaded_file):
    """Process video with frame-by-frame object information"""
    try:
        if uploaded_file.size > MAX_VIDEO_SIZE_MB * 1024 * 1024:
            st.error(f"File too large. Maximum size is {MAX_VIDEO_SIZE_MB}MB")
            return
        
        st.video(uploaded_file)
        
        if st.button("Process Video with Details", type="primary"):
            with temp_video_file(uploaded_file) as input_path:
                cap = cv2.VideoCapture(input_path)
                if not cap.isOpened():
                    st.error("Error opening video file")
                    return
                
                # Get video properties
                frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                fps = int(cap.get(cv2.CAP_PROP_FPS))
                total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                
                # Create output temp file
                output_temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
                output_temp.close()
                
                # Video writer
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out = cv2.VideoWriter(output_temp.name, fourcc, fps, (frame_width, frame_height))
                
                # UI elements
                progress_bar = st.progress(0)
                status_text = st.empty()
                result_placeholder = st.empty()
                details_placeholder = st.empty()
                
                frame_count = 0
                processed_count = 0
                accident_frames = []
                
                try:
                    while cap.isOpened():
                        ret, frame = cap.read()
                        if not ret:
                            break
                        
                        frame_count += 1
                        
                        # Skip frames according to frame_skip setting
                        if frame_count % frame_skip != 0:
                            continue
                            
                        # Process frame
                        results = model.predict(frame, conf=confidence_threshold)
                        annotated_frame = results[0].plot()
                        out.write(annotated_frame)
                        processed_count += 1
                        
                        # Check for accidents
                        if len(results[0].boxes) > 0:
                            # Get object details for this frame
                            objects = get_object_details(results[0].boxes)
                            accident_frames.append({
                                "frame_number": frame_count,
                                "time_seconds": frame_count/fps,
                                "objects": objects
                            })
                        
                        # Update progress
                        progress = frame_count / total_frames
                        progress_bar.progress(progress)
                        status_text.text(f"📊 Processing frame {frame_count}/{total_frames}")
                    
                    # Release resources
                    cap.release()
                    out.release()
                    
                    # Show results
                    if accident_frames:
                        result_placeholder.warning(f"🚨 ACCIDENTS DETECTED IN {len(accident_frames)} FRAMES")
                        
                        # Display detailed accident information
                        with st.expander("📝 Detailed Accident Report", expanded=True):
                            st.write(f"Total frames with accidents: {len(accident_frames)}")
                            
                            for accident in accident_frames:
                                st.markdown(f"""
                                    ### Frame {accident['frame_number']} (Time: {accident['time_seconds']:.1f}s)
                                """)
                                
                                for obj in accident['objects']:
                                    st.markdown(f"""
                                        <div class="object-info">
                                            <b>Object Type:</b> {obj['type']}<br>
                                            <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                            <b>Bounding Box:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                                        </div>
                                    """, unsafe_allow_html=True)
                                st.markdown("---")
                        
                        # Send email alert if enabled
                        if enable_email:
                            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                            # Fixed f-string syntax here
                            time_list = [f"{a['time_seconds']:.1f}s" for a in accident_frames]
                            alert_msg = (
                                f"Accidents detected in video at {timestamp}\n"
                                f"Occurred at: {', '.join(time_list)}"
                            )
                            
                            send_email_alert(
                                "🚨 Multiple Accidents Detected",
                                alert_msg,
                                output_temp.name if len(accident_frames) <= 5 else None
                            )
                    else:
                        result_placeholder.success("✅ NO ACCIDENTS DETECTED IN VIDEO")
                    
                    # Display processed video
                    st.video(output_temp.name)
                    
                    # Download button
                    with open(output_temp.name, "rb") as f:
                        st.download_button(
                            label="Download Processed Video",
                            data=f,
                            file_name="processed_video.mp4",
                            mime="video/mp4"
                        )
                
                finally:
                    # Cleanup
                    if 'cap' in locals() and cap.isOpened():
                        cap.release()
                    if 'out' in locals():
                        out.release()
                    try:
                        os.unlink(output_temp.name)
                    except:
                        pass
    
    except Exception as e:
        st.error(f"Error processing video: {str(e)}")

# Option selection
option = st.radio(
    "Select input type:",
    ("Image", "Video"),
    horizontal=True
)

# File upload based on selected option
if option == "Image":
    uploaded_file = st.file_uploader(
        "Upload an image",
        type=SUPPORTED_IMAGE_TYPES,
        help="Supported formats: JPG, JPEG, PNG"
    )
    if uploaded_file is not None:
        process_image(uploaded_file)
else:
    uploaded_file = st.file_uploader(
        "Upload a video",
        type=SUPPORTED_VIDEO_TYPES,
        help=f"Supported formats: MP4, AVI, MOV (Max {MAX_VIDEO_SIZE_MB}MB)"
    )
    if uploaded_file is not None:
        process_video(uploaded_file)

# Security warning
st.warning("""
**Important Security Notice:**
- This implementation contains hardcoded email credentials
- Never use this in production or share the code publicly
- For production use, use environment variables or secrets management
""")

# Footer
st.markdown("---")
st.caption("""
    🚨 Accident Detection & Alert System | Powered by YOLOv8
    - Detects accidents in images and videos
    - Sends email alerts with detection details
""")

Overwriting Accident.py


In [22]:
# Video Sending

In [8]:
%%writefile Accident.py

import streamlit as st
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import tempfile
import os
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.base import MIMEBase
from email import encoders
from datetime import datetime
from contextlib import contextmanager

# Set page config
st.set_page_config(
    page_title="Accident Detection Alert System",
    page_icon="🚨",
    layout="wide"
)

# Constants
MAX_VIDEO_SIZE_MB = 100
SUPPORTED_IMAGE_TYPES = ["jpg", "jpeg", "png"]
SUPPORTED_VIDEO_TYPES = ["mp4", "avi", "mov"]
FRAME_SKIP = 2

# Email Configuration (HARDCODED - FOR DEMONSTRATION ONLY)
EMAIL_SENDER = "rnithin691@gmail.com"
EMAIL_PASSWORD = "apvnnfurzihocefq"  # App-specific password
EMAIL_RECEIVER = "rishiande9999@gmail.com"

# Load model with caching
@st.cache_resource
def load_model():
    try:
        model = YOLO("D:/All Documents/Projects/Real-Time Accident Detection and Alert System/Models/best.pt")
        return model
    except Exception as e:
        st.error(f"🚨 Model loading failed: {str(e)}")
        st.stop()

model = load_model()

# Context manager for temp files
@contextmanager
def temp_video_file(uploaded_file):
    temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
    try:
        temp.write(uploaded_file.read())
        temp.close()
        yield temp.name
    finally:
        try:
            os.unlink(temp.name)
        except:
            pass

def send_email_alert(subject, message, attachment_path=None, video_path=None):
    """Send email notification with attachments"""
    try:
        msg = MIMEMultipart()
        msg['From'] = EMAIL_SENDER
        msg['To'] = EMAIL_RECEIVER
        msg['Subject'] = subject
        
        msg.attach(MIMEText(message, 'plain'))
        
        # Attach image if provided
        if attachment_path and os.path.exists(attachment_path):
            with open(attachment_path, 'rb') as f:
                img = MIMEImage(f.read())
                img.add_header('Content-Disposition', 'attachment', 
                             filename=os.path.basename(attachment_path))
                msg.attach(img)
        
        # Attach video if provided
        if video_path and os.path.exists(video_path):
            part = MIMEBase('application', 'octet-stream')
            with open(video_path, 'rb') as f:
                part.set_payload(f.read())
            encoders.encode_base64(part)
            part.add_header('Content-Disposition',
                           'attachment',
                           filename=os.path.basename(video_path))
            msg.attach(part)
        
        with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
            server.login(EMAIL_SENDER, EMAIL_PASSWORD)
            server.send_message(msg)
        
        st.success("📧 Email alert with attachments sent successfully")
        return True
    except Exception as e:
        st.error(f"Failed to send email: {str(e)}")
        return False

# Sidebar
st.sidebar.title("Settings")
confidence_threshold = st.sidebar.slider(
    "Confidence Threshold", 
    min_value=0.1, 
    max_value=0.9, 
    value=0.3, 
    step=0.05
)

frame_skip = st.sidebar.selectbox(
    "Frame Processing Rate",
    options=[1, 2, 3, 4],
    index=1,
    help="Higher values process fewer frames (faster but less accurate)"
)

enable_email = st.sidebar.checkbox("Enable Email Alerts", True)

# Main app
st.title("🚨 Accident Detection & Alert System")
st.markdown("""
    <style>
    .object-info {
        padding: 10px;
        margin: 5px 0;
        border-radius: 5px;
        background-color: #f0f2f6;
    }
    </style>
""", unsafe_allow_html=True)

def get_object_details(boxes):
    """Extract detailed information about detected objects"""
    details = []
    for box in boxes:
        obj = {
            "type": model.names[int(box.cls)],
            "confidence": float(box.conf),
            "coordinates": {
                "x1": float(box.xyxy[0][0]),
                "y1": float(box.xyxy[0][1]),
                "x2": float(box.xyxy[0][2]),
                "y2": float(box.xyxy[0][3])
            }
        }
        details.append(obj)
    return details

def process_image(uploaded_file):
    """Process image with detailed object information"""
    try:
        image = Image.open(uploaded_file)
        
        with st.spinner("🔍 Analyzing image..."):
            results = model.predict(image, conf=confidence_threshold)
            
            col1, col2 = st.columns(2)
            with col1:
                st.image(image, caption="Original Image", use_container_width=True)
            
            with col2:
                if len(results[0].boxes) > 0:
                    st.warning("🚨 ACCIDENT DETECTED")
                    result_img = Image.fromarray(results[0].plot()[:, :, ::-1])
                    st.image(result_img, caption="Processed Result", use_container_width=True)
                    
                    # Display object details
                    st.subheader("Detected Objects:")
                    objects = get_object_details(results[0].boxes)
                    for obj in objects:
                        st.markdown(f"""
                            <div class="object-info">
                                <b>Type:</b> {obj['type']}<br>
                                <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                <b>Bounding Box:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                            </div>
                        """, unsafe_allow_html=True)
                    
                    # Send email alert if enabled
                    if enable_email:
                        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        alert_msg = f"Accident detected in image at {timestamp}"
                        
                        # Save detection image
                        result_path = "accident_detected.jpg"
                        result_img.save(result_path)
                        
                        send_email_alert(
                            "🚨 Accident Detected", 
                            alert_msg,
                            result_path
                        )
                        
                        # Cleanup
                        os.unlink(result_path)
                else:
                    st.success("✅ NO ACCIDENT DETECTED")
                    st.image(image, caption="No detections", use_container_width=True)
    
    except Exception as e:
        st.error(f"Error processing image: {str(e)}")

def process_video(uploaded_file):
    """Process video with frame-by-frame object information"""
    try:
        if uploaded_file.size > MAX_VIDEO_SIZE_MB * 1024 * 1024:
            st.error(f"File too large. Maximum size is {MAX_VIDEO_SIZE_MB}MB")
            return
        
        st.video(uploaded_file)
        
        if st.button("Process Video with Details", type="primary"):
            with temp_video_file(uploaded_file) as input_path:
                cap = cv2.VideoCapture(input_path)
                if not cap.isOpened():
                    st.error("Error opening video file")
                    return
                
                # Get video properties
                frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                fps = int(cap.get(cv2.CAP_PROP_FPS))
                total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                
                # Create output temp file
                output_path = "processed_video.mp4"
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
                
                # UI elements
                progress_bar = st.progress(0)
                status_text = st.empty()
                result_placeholder = st.empty()
                details_placeholder = st.empty()
                
                frame_count = 0
                processed_count = 0
                accident_frames = []
                
                try:
                    while cap.isOpened():
                        ret, frame = cap.read()
                        if not ret:
                            break
                        
                        frame_count += 1
                        
                        # Skip frames according to frame_skip setting
                        if frame_count % frame_skip != 0:
                            continue
                            
                        # Process frame
                        results = model.predict(frame, conf=confidence_threshold)
                        annotated_frame = results[0].plot()
                        out.write(annotated_frame)
                        processed_count += 1
                        
                        # Check for accidents
                        if len(results[0].boxes) > 0:
                            # Get object details for this frame
                            objects = get_object_details(results[0].boxes)
                            accident_frames.append({
                                "frame_number": frame_count,
                                "time_seconds": frame_count/fps,
                                "objects": objects
                            })
                        
                        # Update progress
                        progress = frame_count / total_frames
                        progress_bar.progress(progress)
                        status_text.text(f"📊 Processing frame {frame_count}/{total_frames}")
                    
                    # Release resources
                    cap.release()
                    out.release()
                    
                    # Show results
                    if accident_frames:
                        result_placeholder.warning(f"🚨 ACCIDENTS DETECTED IN {len(accident_frames)} FRAMES")
                        
                        # Display detailed accident information
                        with st.expander("📝 Detailed Accident Report", expanded=True):
                            st.write(f"Total frames with accidents: {len(accident_frames)}")
                            
                            for accident in accident_frames:
                                st.markdown(f"""
                                    ### Frame {accident['frame_number']} (Time: {accident['time_seconds']:.1f}s)
                                """)
                                
                                for obj in accident['objects']:
                                    st.markdown(f"""
                                        <div class="object-info">
                                            <b>Object Type:</b> {obj['type']}<br>
                                            <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                            <b>Bounding Box:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                                        </div>
                                    """, unsafe_allow_html=True)
                                st.markdown("---")
                        
                        # Send email alert if enabled
                        if enable_email:
                            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                            time_list = [f"{a['time_seconds']:.1f}s" for a in accident_frames]
                            alert_msg = (
                                f"Accidents detected in video at {timestamp}\n"
                                f"Occurred at: {', '.join(time_list)}"
                            )
                            
                            send_email_alert(
                                "🚨 Multiple Accidents Detected",
                                alert_msg,
                                video_path=output_path  # Send the processed video
                            )
                    else:
                        result_placeholder.success("✅ NO ACCIDENTS DETECTED IN VIDEO")
                    
                    # Display processed video
                    st.video(output_path)
                    
                    # Download button
                    with open(output_path, "rb") as f:
                        st.download_button(
                            label="Download Processed Video",
                            data=f,
                            file_name="processed_video.mp4",
                            mime="video/mp4"
                        )
                
                finally:
                    # Cleanup
                    if 'cap' in locals() and cap.isOpened():
                        cap.release()
                    if 'out' in locals():
                        out.release()
                    try:
                        os.unlink(output_path)
                    except:
                        pass
    
    except Exception as e:
        st.error(f"Error processing video: {str(e)}")

# Option selection
option = st.radio(
    "Select input type:",
    ("Image", "Video"),
    horizontal=True
)

# File upload based on selected option
if option == "Image":
    uploaded_file = st.file_uploader(
        "Upload an image",
        type=SUPPORTED_IMAGE_TYPES,
        help="Supported formats: JPG, JPEG, PNG"
    )
    if uploaded_file is not None:
        process_image(uploaded_file)
else:
    uploaded_file = st.file_uploader(
        "Upload a video",
        type=SUPPORTED_VIDEO_TYPES,
        help=f"Supported formats: MP4, AVI, MOV (Max {MAX_VIDEO_SIZE_MB}MB)"
    )
    if uploaded_file is not None:
        process_video(uploaded_file)

# Security warning
st.warning("""
**Important Security Notice:**
- This implementation contains hardcoded email credentials
- Never use this in production or share the code publicly
- For production use, use environment variables or secrets management
""")


Overwriting Accident.py


# Trails on Call and Loccation

In [10]:
%%writefile Accident.py

import streamlit as st
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import tempfile
import os
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.base import MIMEBase
from email import encoders
from datetime import datetime
from contextlib import contextmanager
import geocoder
from twilio.rest import Client

# Set page config
st.set_page_config(
    page_title="Accident Detection Alert System",
    page_icon="🚨",
    layout="wide"
)

# Constants
MAX_VIDEO_SIZE_MB = 100
SUPPORTED_IMAGE_TYPES = ["jpg", "jpeg", "png"]
SUPPORTED_VIDEO_TYPES = ["mp4", "avi", "mov"]
FRAME_SKIP = 2

# Alert Configuration (HARDCODED - FOR DEMONSTRATION ONLY)
EMAIL_SENDER = "rnithin691@gmail.com"
EMAIL_PASSWORD = "apvnnfurzihocefq"  # App-specific password
EMAIL_RECEIVER = "rishiande9999@gmail.com"

# Twilio Configuration (HARDCODED - FOR DEMONSTRATION ONLY)
TWILIO_ACCOUNT_SID = "AC4bf21d204946a249f04a25e781499540"
TWILIO_AUTH_TOKEN = "b08509d4fcec16a713d387813251cb77"
TWILIO_PHONE_NUMBER = "+17042868037"      # Your Twilio phone number
RECIPIENT_PHONE_NUMBER = "+91 63028 10409"   # Where to send alerts

# Load model with caching
@st.cache_resource
def load_model():
    try:
        model = YOLO("D:/All Documents/Projects/Real-Time Accident Detection and Alert System/Models/best.pt")
        return model
    except Exception as e:
        st.error(f"🚨 Model loading failed: {str(e)}")
        st.stop()

model = load_model()

# Context manager for temp files
@contextmanager
def temp_video_file(uploaded_file):
    temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
    try:
        temp.write(uploaded_file.read())
        temp.close()
        yield temp.name
    finally:
        try:
            os.unlink(temp.name)
        except:
            pass

def get_current_location():
    """Get current GPS coordinates"""
    try:
        g = geocoder.ip('me')
        if g.ok:
            return {
                'latitude': g.latlng[0],
                'longitude': g.latlng[1],
                'address': g.address
            }
        return None
    except Exception as e:
        st.warning(f"Could not get location: {str(e)}")
        return None

def send_sms_alert(message):
    """Send SMS notification via Twilio"""
    try:
        client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
        
        message = client.messages.create(
            body=message,
            from_=TWILIO_PHONE_NUMBER,
            to=RECIPIENT_PHONE_NUMBER
        )
        
        st.success(f"📱 SMS alert sent! SID: {message.sid}")
        return True
    except Exception as e:
        st.error(f"Failed to send SMS: {str(e)}")
        return False

def send_email_alert(subject, message, attachment_path=None, video_path=None):
    """Send email notification with attachments"""
    try:
        msg = MIMEMultipart()
        msg['From'] = EMAIL_SENDER
        msg['To'] = EMAIL_RECEIVER
        msg['Subject'] = subject
        
        msg.attach(MIMEText(message, 'plain'))
        
        # Attach image if provided
        if attachment_path and os.path.exists(attachment_path):
            with open(attachment_path, 'rb') as f:
                img = MIMEImage(f.read())
                img.add_header('Content-Disposition', 'attachment', 
                             filename=os.path.basename(attachment_path))
                msg.attach(img)
        
        # Attach video if provided
        if video_path and os.path.exists(video_path):
            part = MIMEBase('application', 'octet-stream')
            with open(video_path, 'rb') as f:
                part.set_payload(f.read())
            encoders.encode_base64(part)
            part.add_header('Content-Disposition',
                           'attachment',
                           filename=os.path.basename(video_path))
            msg.attach(part)
        
        with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
            server.login(EMAIL_SENDER, EMAIL_PASSWORD)
            server.send_message(msg)
        
        st.success("📧 Email alert with attachments sent successfully")
        return True
    except Exception as e:
        st.error(f"Failed to send email: {str(e)}")
        return False

def send_all_alerts(subject, message, location_info=None, attachment_path=None, video_path=None):
    """Unified alert function for all notification types"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    # Add location to message
    full_message = f"{message}\n\nTimestamp: {timestamp}"
    if location_info:
        if isinstance(location_info, dict):  # GPS coordinates
            full_message += (f"\nLocation: {location_info.get('address', 'Unknown')}"
                           f"\nCoordinates: {location_info.get('latitude')}, {location_info.get('longitude')}"
                           f"\nGoogle Maps: https://maps.google.com/?q={location_info.get('latitude')},{location_info.get('longitude')}")
        else:  # Manual location text
            full_message += f"\nLocation: {location_info}"
    
    # Send email
    if enable_email:
        send_email_alert(subject, full_message, attachment_path, video_path)
    
    # Send SMS
    if enable_sms:
        sms_success = send_sms_alert(f"ACCIDENT ALERT: {subject}\n{full_message}")
        
        # If SMS fails, show warning
        if not sms_success:
            st.warning("SMS notification failed (check Twilio configuration)")

# Sidebar
st.sidebar.title("Settings")
confidence_threshold = st.sidebar.slider(
    "Confidence Threshold", 
    min_value=0.1, 
    max_value=0.9, 
    value=0.3, 
    step=0.05
)

frame_skip = st.sidebar.selectbox(
    "Frame Processing Rate",
    options=[1, 2, 3, 4],
    index=1,
    help="Higher values process fewer frames (faster but less accurate)"
)

# Alert Settings
st.sidebar.subheader("Alert Settings")
enable_email = st.sidebar.checkbox("Enable Email Alerts", True)
enable_sms = st.sidebar.checkbox("Enable SMS Alerts", True)

# Location Settings
st.sidebar.subheader("Location Settings")
location_method = st.sidebar.radio(
    "Location Method",
    ("Automatic (GPS)", "Manual"),
    help="Automatic requires device location permissions"
)
manual_location = ""
if location_method == "Manual":
    manual_location = st.sidebar.text_input(
        "Location Description", 
        "Intersection of Main St & 5th Ave"
    )

# Main app
st.title("🚨 Accident Detection & Alert System")
st.markdown("""
    <style>
    .object-info {
        padding: 10px;
        margin: 5px 0;
        border-radius: 5px;
        background-color: #f0f2f6;
    }
    </style>
""", unsafe_allow_html=True)

def get_object_details(boxes):
    """Extract detailed information about detected objects"""
    details = []
    for box in boxes:
        obj = {
            "type": model.names[int(box.cls)],
            "confidence": float(box.conf),
            "coordinates": {
                "x1": float(box.xyxy[0][0]),
                "y1": float(box.xyxy[0][1]),
                "x2": float(box.xyxy[0][2]),
                "y2": float(box.xyxy[0][3])
            }
        }
        details.append(obj)
    return details

def process_image(uploaded_file):
    """Process image with detailed object information"""
    try:
        image = Image.open(uploaded_file)
        
        with st.spinner("🔍 Analyzing image..."):
            results = model.predict(image, conf=confidence_threshold)
            
            col1, col2 = st.columns(2)
            with col1:
                st.image(image, caption="Original Image", use_container_width=True)
            
            with col2:
                if len(results[0].boxes) > 0:
                    st.warning("🚨 ACCIDENT DETECTED")
                    result_img = Image.fromarray(results[0].plot()[:, :, ::-1])
                    st.image(result_img, caption="Processed Result", use_container_width=True)
                    
                    # Display object details
                    st.subheader("Detected Objects:")
                    objects = get_object_details(results[0].boxes)
                    for obj in objects:
                        st.markdown(f"""
                            <div class="object-info">
                                <b>Type:</b> {obj['type']}<br>
                                <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                <b>Bounding Box:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                            </div>
                        """, unsafe_allow_html=True)
                    
                    # Get location
                    location = None
                    if location_method == "Automatic (GPS)":
                        location = get_current_location()
                    else:
                        location = manual_location
                    
                    # Send alerts if enabled
                    if enable_email or enable_sms:
                        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        alert_msg = f"Accident detected in image at {timestamp}"
                        
                        # Save detection image
                        result_path = "accident_detected.jpg"
                        result_img.save(result_path)
                        
                        send_all_alerts(
                            "🚨 Accident Detected",
                            alert_msg,
                            location_info=location,
                            attachment_path=result_path
                        )
                        
                        # Cleanup
                        os.unlink(result_path)
                else:
                    st.success("✅ NO ACCIDENT DETECTED")
                    st.image(image, caption="No detections", use_container_width=True)
    
    except Exception as e:
        st.error(f"Error processing image: {str(e)}")

def process_video(uploaded_file):
    """Process video with frame-by-frame object information"""
    try:
        if uploaded_file.size > MAX_VIDEO_SIZE_MB * 1024 * 1024:
            st.error(f"File too large. Maximum size is {MAX_VIDEO_SIZE_MB}MB")
            return
        
        st.video(uploaded_file)
        
        if st.button("Process Video with Details", type="primary"):
            with temp_video_file(uploaded_file) as input_path:
                cap = cv2.VideoCapture(input_path)
                if not cap.isOpened():
                    st.error("Error opening video file")
                    return
                
                # Get video properties
                frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                fps = int(cap.get(cv2.CAP_PROP_FPS))
                total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                
                # Create output temp file
                output_path = "processed_video.mp4"
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
                
                # UI elements
                progress_bar = st.progress(0)
                status_text = st.empty()
                result_placeholder = st.empty()
                details_placeholder = st.empty()
                
                frame_count = 0
                processed_count = 0
                accident_frames = []
                
                try:
                    while cap.isOpened():
                        ret, frame = cap.read()
                        if not ret:
                            break
                        
                        frame_count += 1
                        
                        # Skip frames according to frame_skip setting
                        if frame_count % frame_skip != 0:
                            continue
                            
                        # Process frame
                        results = model.predict(frame, conf=confidence_threshold)
                        annotated_frame = results[0].plot()
                        out.write(annotated_frame)
                        processed_count += 1
                        
                        # Check for accidents
                        if len(results[0].boxes) > 0:
                            # Get object details for this frame
                            objects = get_object_details(results[0].boxes)
                            accident_frames.append({
                                "frame_number": frame_count,
                                "time_seconds": frame_count/fps,
                                "objects": objects
                            })
                        
                        # Update progress
                        progress = frame_count / total_frames
                        progress_bar.progress(progress)
                        status_text.text(f"📊 Processing frame {frame_count}/{total_frames}")
                    
                    # Release resources
                    cap.release()
                    out.release()
                    
                    # Show results
                    if accident_frames:
                        result_placeholder.warning(f"🚨 ACCIDENTS DETECTED IN {len(accident_frames)} FRAMES")
                        
                        # Display detailed accident information
                        with st.expander("📝 Detailed Accident Report", expanded=True):
                            st.write(f"Total frames with accidents: {len(accident_frames)}")
                            
                            for accident in accident_frames:
                                st.markdown(f"""
                                    ### Frame {accident['frame_number']} (Time: {accident['time_seconds']:.1f}s)
                                """)
                                
                                for obj in accident['objects']:
                                    st.markdown(f"""
                                        <div class="object-info">
                                            <b>Object Type:</b> {obj['type']}<br>
                                            <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                            <b>Bounding Box:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                                        </div>
                                    """, unsafe_allow_html=True)
                                st.markdown("---")
                        
                        # Get location
                        location = None
                        if location_method == "Automatic (GPS)":
                            location = get_current_location()
                        else:
                            location = manual_location
                        
                        # Send alerts if enabled
                        if enable_email or enable_sms:
                            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                            time_list = [f"{a['time_seconds']:.1f}s" for a in accident_frames]
                            alert_msg = (
                                f"Accidents detected in video at {timestamp}\n"
                                f"Occurred at: {', '.join(time_list)}"
                            )
                            
                            send_all_alerts(
                                "🚨 Multiple Accidents Detected",
                                alert_msg,
                                location_info=location,
                                video_path=output_path
                            )
                    else:
                        result_placeholder.success("✅ NO ACCIDENTS DETECTED IN VIDEO")
                    
                    # Display processed video
                    st.video(output_path)
                    
                    # Download button
                    with open(output_path, "rb") as f:
                        st.download_button(
                            label="Download Processed Video",
                            data=f,
                            file_name="processed_video.mp4",
                            mime="video/mp4"
                        )
                
                finally:
                    # Cleanup
                    if 'cap' in locals() and cap.isOpened():
                        cap.release()
                    if 'out' in locals():
                        out.release()
                    try:
                        os.unlink(output_path)
                    except:
                        pass
    
    except Exception as e:
        st.error(f"Error processing video: {str(e)}")

# Option selection
option = st.radio(
    "Select input type:",
    ("Image", "Video"),
    horizontal=True
)

# File upload based on selected option
if option == "Image":
    uploaded_file = st.file_uploader(
        "Upload an image",
        type=SUPPORTED_IMAGE_TYPES,
        help="Supported formats: JPG, JPEG, PNG"
    )
    if uploaded_file is not None:
        process_image(uploaded_file)
else:
    uploaded_file = st.file_uploader(
        "Upload a video",
        type=SUPPORTED_VIDEO_TYPES,
        help=f"Supported formats: MP4, AVI, MOV (Max {MAX_VIDEO_SIZE_MB}MB)"
    )
    if uploaded_file is not None:
        process_video(uploaded_file)

# Security warning
st.warning("""
**Important Security Notice:**
- This implementation contains hardcoded credentials
- Never use this in production or share the code publicly
- For production use:
  - Use environment variables
  - Implement proper secrets management
  - Set up proper authentication
""")

Overwriting Accident.py


# Using AWS

In [None]:
%%writefile Accident.py

import streamlit as st
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import tempfile
import os
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.base import MIMEBase
from email import encoders
from datetime import datetime
from contextlib import contextmanager
import geocoder
import boto3
import json

# Set page config
st.set_page_config(
    page_title="Accident Detection Alert System",
    page_icon="🚨",
    layout="wide"
)

# Constants
MAX_VIDEO_SIZE_MB = 100
SUPPORTED_IMAGE_TYPES = ["jpg", "jpeg", "png"]
SUPPORTED_VIDEO_TYPES = ["mp4", "avi", "mov"]
FRAME_SKIP = 2
EMERGENCY_NUMBERS = {
    "police": "+91100",  # India Police
    "ambulance": "+91108",  # India Ambulance
}

# AWS Configuration (Use environment variables in production)
AWS_CONFIG = {
    "access_key": "YOUR_AWS_ACCESS_KEY",
    "secret_key": "YOUR_AWS_SECRET_KEY",
    "region": "ap-south-1",
    "sns_sender_id": "ACCIDENT",
    "connect_instance_id": "YOUR_CONNECT_INSTANCE_ID",
    "connect_flow_id": "YOUR_CONTACT_FLOW_ID",
    "connect_source_phone": "+1234567890"  # Your Amazon Connect number
}

# Email Configuration
EMAIL_CONFIG = {
    "sender": "your_email@gmail.com",
    "password": "your_app_password",
    "receiver": "recipient_email@gmail.com"
}

# Initialize AWS clients
try:
    sns_client = boto3.client(
        'sns',
        aws_access_key_id=AWS_CONFIG["access_key"],
        aws_secret_access_key=AWS_CONFIG["secret_key"],
        region_name=AWS_CONFIG["region"]
    )
    
    connect_client = boto3.client(
        'connect',
        aws_access_key_id=AWS_CONFIG["access_key"],
        aws_secret_access_key=AWS_CONFIG["secret_key"],
        region_name=AWS_CONFIG["region"]
    )
except Exception as e:
    st.error(f"AWS initialization failed: {str(e)}")
    st.stop()

# Load model with caching
@st.cache_resource
def load_model():
    try:
        model = YOLO("path/to/your/model.pt")
        return model
    except Exception as e:
        st.error(f"🚨 Model loading failed: {str(e)}")
        st.stop()

model = load_model()

# Context manager for temp files
@contextmanager
def temp_video_file(uploaded_file):
    temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
    try:
        temp.write(uploaded_file.read())
        temp.close()
        yield temp.name
    finally:
        try:
            os.unlink(temp.name)
        except:
            pass

def get_current_location():
    """Get current GPS coordinates"""
    try:
        g = geocoder.ip('me')
        if g.ok:
            return {
                'latitude': g.latlng[0],
                'longitude': g.latlng[1],
                'address': g.address
            }
        return None
    except Exception as e:
        st.warning(f"Could not get location: {str(e)}")
        return None

def send_sms_aws(phone_number, message):
    """Send SMS using AWS SNS"""
    try:
        response = sns_client.publish(
            PhoneNumber=phone_number,
            Message=message,
            MessageAttributes={
                'AWS.SNS.SMS.SenderID': {
                    'DataType': 'String',
                    'StringValue': AWS_CONFIG["sns_sender_id"]
                }
            }
        )
        st.success(f"📱 SMS sent! Message ID: {response['MessageId']}")
        return True
    except Exception as e:
        st.error(f"Failed to send SMS: {str(e)}")
        return False

def make_emergency_call_aws(phone_number, message):
    """Make voice call using Amazon Connect"""
    try:
        response = connect_client.start_outbound_voice_contact(
            DestinationPhoneNumber=phone_number,
            ContactFlowId=AWS_CONFIG["connect_flow_id"],
            InstanceId=AWS_CONFIG["connect_instance_id"],
            SourcePhoneNumber=AWS_CONFIG["connect_source_phone"],
            Attributes={
                'message': message
            }
        )
        st.success(f"📞 Call initiated! Contact ID: {response['ContactId']}")
        return True
    except Exception as e:
        st.error(f"Failed to make call: {str(e)}")
        return False

def send_email_alert(subject, message, attachment_path=None, video_path=None):
    """Send email notification with attachments"""
    try:
        msg = MIMEMultipart()
        msg['From'] = EMAIL_CONFIG["sender"]
        msg['To'] = EMAIL_CONFIG["receiver"]
        msg['Subject'] = subject
        
        msg.attach(MIMEText(message, 'plain'))
        
        if attachment_path and os.path.exists(attachment_path):
            with open(attachment_path, 'rb') as f:
                img = MIMEImage(f.read())
                img.add_header('Content-Disposition', 'attachment', 
                             filename=os.path.basename(attachment_path))
                msg.attach(img)
        
        if video_path and os.path.exists(video_path):
            part = MIMEBase('application', 'octet-stream')
            with open(video_path, 'rb') as f:
                part.set_payload(f.read())
            encoders.encode_base64(part)
            part.add_header('Content-Disposition',
                           'attachment',
                           filename=os.path.basename(video_path))
            msg.attach(part)
        
        with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
            server.login(EMAIL_CONFIG["sender"], EMAIL_CONFIG["password"])
            server.send_message(msg)
        
        st.success("📧 Email alert sent successfully")
        return True
    except Exception as e:
        st.error(f"Failed to send email: {str(e)}")
        return False

def send_all_alerts(subject, message, location_info=None, attachment_path=None, video_path=None):
    """Unified alert function"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    full_message = f"{message}\n\nTimestamp: {timestamp}"
    if location_info:
        if isinstance(location_info, dict):
            full_message += (f"\nLocation: {location_info.get('address', 'Unknown')}"
                           f"\nCoordinates: {location_info.get('latitude')}, {location_info.get('longitude')}"
                           f"\nGoogle Maps: https://maps.google.com/?q={location_info.get('latitude')},{location_info.get('longitude')}")
        else:
            full_message += f"\nLocation: {location_info}"
    
    if enable_email:
        send_email_alert(subject, full_message, attachment_path, video_path)
    
    if enable_sms:
        send_sms_aws(RECIPIENT_PHONE_NUMBER, f"ACCIDENT ALERT: {subject}\n{full_message}")
    
    if enable_emergency_calls and location_info:
        # Call police
        call_message = (f"Emergency alert: Accident detected at {location_info.get('address', 'unknown location')}. "
                       f"Coordinates: {location_info.get('latitude')}, {location_info.get('longitude')}")
        make_emergency_call_aws(EMERGENCY_NUMBERS["police"], call_message)
        
        # Call ambulance
        make_emergency_call_aws(EMERGENCY_NUMBERS["ambulance"], call_message)

# Sidebar Configuration
st.sidebar.title("Settings")

# Model Settings
confidence_threshold = st.sidebar.slider(
    "Confidence Threshold", 0.1, 0.9, 0.3, 0.05
)

frame_skip = st.sidebar.selectbox(
    "Frame Processing Rate",
    options=[1, 2, 3, 4],
    index=1,
    help="Higher values process fewer frames"
)

# Alert Settings
st.sidebar.subheader("Alert Settings")
enable_email = st.sidebar.checkbox("Enable Email Alerts", True)
enable_sms = st.sidebar.checkbox("Enable SMS Alerts", True)
enable_emergency_calls = st.sidebar.checkbox(
    "Enable Emergency Calls", 
    False,
    help="Will automatically call emergency services"
)

# Location Settings
st.sidebar.subheader("Location Settings")
location_method = st.sidebar.radio(
    "Location Method",
    ("Automatic (GPS)", "Manual"),
    help="Automatic requires location permissions"
)

manual_location = ""
if location_method == "Manual":
    manual_location = st.sidebar.text_input(
        "Location Description", 
        "Intersection of Main St & 5th Ave"
    )

# Phone Number Configuration
RECIPIENT_PHONE_NUMBER = st.sidebar.text_input(
    "Recipient Phone Number",
    "+911234567890",
    help="Include country code"
)

# Main Application UI
st.title("🚨 Accident Detection & Alert System")
st.markdown("""
    <style>
    .object-info {
        padding: 10px;
        margin: 5px 0;
        border-radius: 5px;
        background-color: #f0f2f6;
    }
    </style>
""", unsafe_allow_html=True)

def get_object_details(boxes):
    """Extract detailed information about detected objects"""
    details = []
    for box in boxes:
        obj = {
            "type": model.names[int(box.cls)],
            "confidence": float(box.conf),
            "coordinates": {
                "x1": float(box.xyxy[0][0]),
                "y1": float(box.xyxy[0][1]),
                "x2": float(box.xyxy[0][2]),
                "y2": float(box.xyxy[0][3])
            }
        }
        details.append(obj)
    return details

def process_image(uploaded_file):
    """Process image with detailed object information"""
    try:
        image = Image.open(uploaded_file)
        
        with st.spinner("🔍 Analyzing image..."):
            results = model.predict(image, conf=confidence_threshold)
            
            col1, col2 = st.columns(2)
            with col1:
                st.image(image, caption="Original Image", use_container_width=True)
            
            with col2:
                if len(results[0].boxes) > 0:
                    st.warning("🚨 ACCIDENT DETECTED")
                    result_img = Image.fromarray(results[0].plot()[:, :, ::-1])
                    st.image(result_img, caption="Processed Result", use_container_width=True)
                    
                    st.subheader("Detected Objects:")
                    objects = get_object_details(results[0].boxes)
                    for obj in objects:
                        st.markdown(f"""
                            <div class="object-info">
                                <b>Type:</b> {obj['type']}<br>
                                <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                <b>Bounding Box:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                            </div>
                        """, unsafe_allow_html=True)
                    
                    location = get_current_location() if location_method == "Automatic (GPS)" else manual_location
                    
                    if enable_email or enable_sms or enable_emergency_calls:
                        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        alert_msg = f"Accident detected in image at {timestamp}"
                        
                        result_path = "accident_detected.jpg"
                        result_img.save(result_path)
                        
                        send_all_alerts(
                            "🚨 Accident Detected",
                            alert_msg,
                            location_info=location,
                            attachment_path=result_path
                        )
                        
                        os.unlink(result_path)
                else:
                    st.success("✅ NO ACCIDENT DETECTED")
                    st.image(image, caption="No detections", use_container_width=True)
    
    except Exception as e:
        st.error(f"Error processing image: {str(e)}")

def process_video(uploaded_file):
    """Process video with frame-by-frame object information"""
    try:
        if uploaded_file.size > MAX_VIDEO_SIZE_MB * 1024 * 1024:
            st.error(f"File too large. Maximum size is {MAX_VIDEO_SIZE_MB}MB")
            return
        
        st.video(uploaded_file)
        
        if st.button("Process Video with Details", type="primary"):
            with temp_video_file(uploaded_file) as input_path:
                cap = cv2.VideoCapture(input_path)
                if not cap.isOpened():
                    st.error("Error opening video file")
                    return
                
                frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                fps = int(cap.get(cv2.CAP_PROP_FPS))
                total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                
                output_path = "processed_video.mp4"
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
                
                progress_bar = st.progress(0)
                status_text = st.empty()
                result_placeholder = st.empty()
                details_placeholder = st.empty()
                
                frame_count = 0
                processed_count = 0
                accident_frames = []
                
                try:
                    while cap.isOpened():
                        ret, frame = cap.read()
                        if not ret:
                            break
                        
                        frame_count += 1
                        
                        if frame_count % frame_skip != 0:
                            continue
                            
                        results = model.predict(frame, conf=confidence_threshold)
                        annotated_frame = results[0].plot()
                        out.write(annotated_frame)
                        processed_count += 1
                        
                        if len(results[0].boxes) > 0:
                            objects = get_object_details(results[0].boxes)
                            accident_frames.append({
                                "frame_number": frame_count,
                                "time_seconds": frame_count/fps,
                                "objects": objects
                            })
                        
                        progress = frame_count / total_frames
                        progress_bar.progress(progress)
                        status_text.text(f"📊 Processing frame {frame_count}/{total_frames}")
                    
                    cap.release()
                    out.release()
                    
                    if accident_frames:
                        result_placeholder.warning(f"🚨 ACCIDENTS DETECTED IN {len(accident_frames)} FRAMES")
                        
                        with st.expander("📝 Detailed Accident Report", expanded=True):
                            st.write(f"Total frames with accidents: {len(accident_frames)}")
                            
                            for accident in accident_frames:
                                st.markdown(f"""
                                    ### Frame {accident['frame_number']} (Time: {accident['time_seconds']:.1f}s)
                                """)
                                
                                for obj in accident['objects']:
                                    st.markdown(f"""
                                        <div class="object-info">
                                            <b>Object Type:</b> {obj['type']}<br>
                                            <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                            <b>Bounding Box:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                                        </div>
                                    """, unsafe_allow_html=True)
                                st.markdown("---")
                        
                        location = get_current_location() if location_method == "Automatic (GPS)" else manual_location
                        
                        if enable_email or enable_sms or enable_emergency_calls:
                            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                            time_list = [f"{a['time_seconds']:.1f}s" for a in accident_frames]
                            alert_msg = (
                                f"Accidents detected in video at {timestamp}\n"
                                f"Occurred at: {', '.join(time_list)}"
                            )
                            
                            send_all_alerts(
                                "🚨 Multiple Accidents Detected",
                                alert_msg,
                                location_info=location,
                                video_path=output_path
                            )
                    else:
                        result_placeholder.success("✅ NO ACCIDENTS DETECTED IN VIDEO")
                    
                    st.video(output_path)
                    
                    with open(output_path, "rb") as f:
                        st.download_button(
                            label="Download Processed Video",
                            data=f,
                            file_name="processed_video.mp4",
                            mime="video/mp4"
                        )
                
                finally:
                    if 'cap' in locals() and cap.isOpened():
                        cap.release()
                    if 'out' in locals():
                        out.release()
                    try:
                        os.unlink(output_path)
                    except:
                        pass
    
    except Exception as e:
        st.error(f"Error processing video: {str(e)}")

# Main Application Flow
option = st.radio(
    "Select input type:",
    ("Image", "Video"),
    horizontal=True
)

if option == "Image":
    uploaded_file = st.file_uploader(
        "Upload an image",
        type=SUPPORTED_IMAGE_TYPES,
        help="Supported formats: JPG, JPEG, PNG"
    )
    if uploaded_file is not None:
        process_image(uploaded_file)
else:
    uploaded_file = st.file_uploader(
        "Upload a video",
        type=SUPPORTED_VIDEO_TYPES,
        help=f"Supported formats: MP4, AVI, MOV (Max {MAX_VIDEO_SIZE_MB}MB)"
    )
    if uploaded_file is not None:
        process_video(uploaded_file)

# Security warning
st.warning("""
**Important Security Notice:**
- This implementation contains sensitive credentials
- For production use:
  - Use AWS IAM roles instead of access keys
  - Store secrets in AWS Secrets Manager
  - Enable AWS CloudTrail logging
  - Comply with local emergency service regulations
""")

In [12]:
%%writefile Accident.py

import streamlit as st
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import tempfile
import os
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.base import MIMEBase
from email import encoders
from datetime import datetime
from contextlib import contextmanager
import geocoder
import boto3
import json

# Set page config
st.set_page_config(
    page_title="Accident Detection Alert System",
    page_icon="🚨",
    layout="wide"
)

# Constants
MAX_VIDEO_SIZE_MB = 100
SUPPORTED_IMAGE_TYPES = ["jpg", "jpeg", "png"]
SUPPORTED_VIDEO_TYPES = ["mp4", "avi", "mov"]
FRAME_SKIP = 2
EMERGENCY_NUMBERS = {
    "police": "+916302810409",  # India Police
    "ambulance": "+918008016773",  # India Ambulance
}

# Alert Configuration (HARDCODED - FOR DEMONSTRATION ONLY)
EMAIL_SENDER = "rnithin691@gmail.com"
EMAIL_PASSWORD = "apvnnfurzihocefq"  # App-specific password
EMAIL_RECEIVER = "rishiande9999@gmail.com"

# AWS SNS Configuration (HARDCODED - FOR DEMONSTRATION ONLY)
AWS_SNS_CONFIG = {
    "access_key": "YOUR_AWS_ACCESS_KEY",
    "secret_key": "YOUR_AWS_SECRET_KEY",
    "region": "ap-south-1",
    "sender_id": "ACCIDENTALERT"
}

# Initialize AWS SNS client
try:
    sns_client = boto3.client(
        'sns',
        aws_access_key_id=AWS_SNS_CONFIG["access_key"],
        aws_secret_access_key=AWS_SNS_CONFIG["secret_key"],
        region_name=AWS_SNS_CONFIG["region"]
    )
except Exception as e:
    st.error(f"AWS SNS initialization failed: {str(e)}")
    st.stop()

# Load model with caching
@st.cache_resource
def load_model():
    try:
        model = YOLO("D:/All Documents/Projects/Real-Time Accident Detection and Alert System/Models/best.pt")
        return model
    except Exception as e:
        st.error(f"🚨 Model loading failed: {str(e)}")
        st.stop()

model = load_model()

# Context manager for temp files
@contextmanager
def temp_video_file(uploaded_file):
    temp = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
    try:
        temp.write(uploaded_file.read())
        temp.close()
        yield temp.name
    finally:
        try:
            os.unlink(temp.name)
        except:
            pass

def get_current_location():
    """Get current GPS coordinates"""
    try:
        g = geocoder.ip('me')
        if g.ok:
            return {
                'latitude': g.latlng[0],
                'longitude': g.latlng[1],
                'address': g.address
            }
        return None
    except Exception as e:
        st.warning(f"Could not get location: {str(e)}")
        return None

def send_sms_aws(phone_number, message):
    """Send SMS using AWS SNS"""
    try:
        response = sns_client.publish(
            PhoneNumber=phone_number,
            Message=message,
            MessageAttributes={
                'AWS.SNS.SMS.SenderID': {
                    'DataType': 'String',
                    'StringValue': AWS_SNS_CONFIG["sender_id"]
                }
            }
        )
        st.success(f"📱 SMS sent via AWS! Message ID: {response['MessageId']}")
        return True
    except Exception as e:
        st.error(f"Failed to send SMS via AWS: {str(e)}")
        return False

def send_email_alert(subject, message, attachment_path=None, video_path=None):
    """Send email notification with attachments"""
    try:
        msg = MIMEMultipart()
        msg['From'] = EMAIL_SENDER
        msg['To'] = EMAIL_RECEIVER
        msg['Subject'] = subject
        
        msg.attach(MIMEText(message, 'plain'))
        
        if attachment_path and os.path.exists(attachment_path):
            with open(attachment_path, 'rb') as f:
                img = MIMEImage(f.read())
                img.add_header('Content-Disposition', 'attachment', 
                             filename=os.path.basename(attachment_path))
                msg.attach(img)
        
        if video_path and os.path.exists(video_path):
            part = MIMEBase('application', 'octet-stream')
            with open(video_path, 'rb') as f:
                part.set_payload(f.read())
            encoders.encode_base64(part)
            part.add_header('Content-Disposition',
                           'attachment',
                           filename=os.path.basename(video_path))
            msg.attach(part)
        
        with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
            server.login(EMAIL_SENDER, EMAIL_PASSWORD)
            server.send_message(msg)
        
        st.success("📧 Email alert sent successfully")
        return True
    except Exception as e:
        st.error(f"Failed to send email: {str(e)}")
        return False

def send_all_alerts(subject, message, location_info=None, attachment_path=None, video_path=None):
    """Unified alert function"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    full_message = f"{message}\n\nTimestamp: {timestamp}"
    if location_info:
        if isinstance(location_info, dict):
            full_message += (f"\nLocation: {location_info.get('address', 'Unknown')}"
                           f"\nCoordinates: {location_info.get('latitude')}, {location_info.get('longitude')}"
                           f"\nGoogle Maps: https://maps.google.com/?q={location_info.get('latitude')},{location_info.get('longitude')}")
        else:
            full_message += f"\nLocation: {location_info}"
    
    # Send email
    if enable_email:
        send_email_alert(subject, full_message, attachment_path, video_path)
    
    # Send AWS SMS
    if enable_sms and RECIPIENT_PHONE_NUMBER:
        send_sms_aws(RECIPIENT_PHONE_NUMBER, f"ACCIDENT ALERT: {subject}\n{full_message}")
    
    # Emergency calls would go here (if implementing Amazon Connect)

# Sidebar Configuration
st.sidebar.title("Settings")

# Model Settings
confidence_threshold = st.sidebar.slider(
    "Confidence Threshold", 0.1, 0.9, 0.3, 0.05
)

frame_skip = st.sidebar.selectbox(
    "Frame Processing Rate",
    options=[1, 2, 3, 4],
    index=1,
    help="Higher values process fewer frames"
)

# Alert Settings
st.sidebar.subheader("Alert Settings")
enable_email = st.sidebar.checkbox("Enable Email Alerts", True)
enable_sms = st.sidebar.checkbox("Enable SMS Alerts", True)
enable_emergency_calls = st.sidebar.checkbox(
    "Enable Emergency Calls", 
    False,
    help="Will automatically call emergency services"
)

# Location Settings
st.sidebar.subheader("Location Settings")
location_method = st.sidebar.radio(
    "Location Method",
    ("Automatic (GPS)", "Manual"),
    help="Automatic requires location permissions"
)

manual_location = ""
if location_method == "Manual":
    manual_location = st.sidebar.text_input(
        "Location Description", 
        "Intersection of Main St & 5th Ave"
    )

# Phone Number Configuration
RECIPIENT_PHONE_NUMBER = st.sidebar.text_input(
    "Recipient Phone Number",
    "+911234567890",
    help="Include country code"
)

# Main Application UI
st.title("🚨 Accident Detection & Alert System")
st.markdown("""
    <style>
    .object-info {
        padding: 10px;
        margin: 5px 0;
        border-radius: 5px;
        background-color: #f0f2f6;
    }
    </style>
""", unsafe_allow_html=True)

def get_object_details(boxes):
    """Extract detailed information about detected objects"""
    details = []
    for box in boxes:
        obj = {
            "type": model.names[int(box.cls)],
            "confidence": float(box.conf),
            "coordinates": {
                "x1": float(box.xyxy[0][0]),
                "y1": float(box.xyxy[0][1]),
                "x2": float(box.xyxy[0][2]),
                "y2": float(box.xyxy[0][3])
            }
        }
        details.append(obj)
    return details

def process_image(uploaded_file):
    """Process image with detailed object information"""
    try:
        image = Image.open(uploaded_file)
        
        with st.spinner("🔍 Analyzing image..."):
            results = model.predict(image, conf=confidence_threshold)
            
            col1, col2 = st.columns(2)
            with col1:
                st.image(image, caption="Original Image", use_container_width=True)
            
            with col2:
                if len(results[0].boxes) > 0:
                    st.warning("🚨 ACCIDENT DETECTED")
                    result_img = Image.fromarray(results[0].plot()[:, :, ::-1])
                    st.image(result_img, caption="Processed Result", use_container_width=True)
                    
                    st.subheader("Detected Objects:")
                    objects = get_object_details(results[0].boxes)
                    for obj in objects:
                        st.markdown(f"""
                            <div class="object-info">
                                <b>Type:</b> {obj['type']}<br>
                                <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                <b>Bounding Box:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                            </div>
                        """, unsafe_allow_html=True)
                    
                    location = get_current_location() if location_method == "Automatic (GPS)" else manual_location
                    
                    if enable_email or enable_sms or enable_emergency_calls:
                        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        alert_msg = f"Accident detected in image at {timestamp}"
                        
                        result_path = "accident_detected.jpg"
                        result_img.save(result_path)
                        
                        send_all_alerts(
                            "🚨 Accident Detected",
                            alert_msg,
                            location_info=location,
                            attachment_path=result_path
                        )
                        
                        os.unlink(result_path)
                else:
                    st.success("✅ NO ACCIDENT DETECTED")
                    st.image(image, caption="No detections", use_container_width=True)
    
    except Exception as e:
        st.error(f"Error processing image: {str(e)}")

def process_video(uploaded_file):
    """Process video with frame-by-frame object information"""
    try:
        if uploaded_file.size > MAX_VIDEO_SIZE_MB * 1024 * 1024:
            st.error(f"File too large. Maximum size is {MAX_VIDEO_SIZE_MB}MB")
            return
        
        st.video(uploaded_file)
        
        if st.button("Process Video with Details", type="primary"):
            with temp_video_file(uploaded_file) as input_path:
                cap = cv2.VideoCapture(input_path)
                if not cap.isOpened():
                    st.error("Error opening video file")
                    return
                
                frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                fps = int(cap.get(cv2.CAP_PROP_FPS))
                total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                
                output_path = "processed_video.mp4"
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
                
                progress_bar = st.progress(0)
                status_text = st.empty()
                result_placeholder = st.empty()
                details_placeholder = st.empty()
                
                frame_count = 0
                processed_count = 0
                accident_frames = []
                
                try:
                    while cap.isOpened():
                        ret, frame = cap.read()
                        if not ret:
                            break
                        
                        frame_count += 1
                        
                        if frame_count % frame_skip != 0:
                            continue
                            
                        results = model.predict(frame, conf=confidence_threshold)
                        annotated_frame = results[0].plot()
                        out.write(annotated_frame)
                        processed_count += 1
                        
                        if len(results[0].boxes) > 0:
                            objects = get_object_details(results[0].boxes)
                            accident_frames.append({
                                "frame_number": frame_count,
                                "time_seconds": frame_count/fps,
                                "objects": objects
                            })
                        
                        progress = frame_count / total_frames
                        progress_bar.progress(progress)
                        status_text.text(f"📊 Processing frame {frame_count}/{total_frames}")
                    
                    cap.release()
                    out.release()
                    
                    if accident_frames:
                        result_placeholder.warning(f"🚨 ACCIDENTS DETECTED IN {len(accident_frames)} FRAMES")
                        
                        with st.expander("📝 Detailed Accident Report", expanded=True):
                            st.write(f"Total frames with accidents: {len(accident_frames)}")
                            
                            for accident in accident_frames:
                                st.markdown(f"""
                                    ### Frame {accident['frame_number']} (Time: {accident['time_seconds']:.1f}s)
                                """)
                                
                                for obj in accident['objects']:
                                    st.markdown(f"""
                                        <div class="object-info">
                                            <b>Object Type:</b> {obj['type']}<br>
                                            <b>Confidence:</b> {obj['confidence']:.2f}<br>
                                            <b>Bounding Box:</b> ({obj['coordinates']['x1']:.0f}, {obj['coordinates']['y1']:.0f}) to ({obj['coordinates']['x2']:.0f}, {obj['coordinates']['y2']:.0f})
                                        </div>
                                    """, unsafe_allow_html=True)
                                st.markdown("---")
                        
                        location = get_current_location() if location_method == "Automatic (GPS)" else manual_location
                        
                        if enable_email or enable_sms or enable_emergency_calls:
                            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                            time_list = [f"{a['time_seconds']:.1f}s" for a in accident_frames]
                            alert_msg = (
                                f"Accidents detected in video at {timestamp}\n"
                                f"Occurred at: {', '.join(time_list)}"
                            )
                            
                            send_all_alerts(
                                "🚨 Multiple Accidents Detected",
                                alert_msg,
                                location_info=location,
                                video_path=output_path
                            )
                    else:
                        result_placeholder.success("✅ NO ACCIDENTS DETECTED IN VIDEO")
                    
                    st.video(output_path)
                    
                    with open(output_path, "rb") as f:
                        st.download_button(
                            label="Download Processed Video",
                            data=f,
                            file_name="processed_video.mp4",
                            mime="video/mp4"
                        )
                
                finally:
                    if 'cap' in locals() and cap.isOpened():
                        cap.release()
                    if 'out' in locals():
                        out.release()
                    try:
                        os.unlink(output_path)
                    except:
                        pass
    
    except Exception as e:
        st.error(f"Error processing video: {str(e)}")

# Main Application Flow
option = st.radio(
    "Select input type:",
    ("Image", "Video"),
    horizontal=True
)

if option == "Image":
    uploaded_file = st.file_uploader(
        "Upload an image",
        type=SUPPORTED_IMAGE_TYPES,
        help="Supported formats: JPG, JPEG, PNG"
    )
    if uploaded_file is not None:
        process_image(uploaded_file)
else:
    uploaded_file = st.file_uploader(
        "Upload a video",
        type=SUPPORTED_VIDEO_TYPES,
        help=f"Supported formats: MP4, AVI, MOV (Max {MAX_VIDEO_SIZE_MB}MB)"
    )
    if uploaded_file is not None:
        process_video(uploaded_file)

# Security warning
st.warning("""
**Important Security Notice:**
- This implementation contains sensitive credentials
- For production use:
  - Use AWS IAM roles instead of access keys
  - Store secrets in AWS Secrets Manager
  - Enable AWS CloudTrail logging
  - Comply with local emergency service regulations
""")

Overwriting Accident.py
