# AI Surveillance System - Google Colab Version
# This combines all  modules into a single notebook-friendly implementation


In [1]:

import cv2
import os
import pickle
import numpy as np
import time
from datetime import datetime
import argparse
from google.colab.patches import cv2_imshow
from google.colab import files
import matplotlib.pyplot as plt
from IPython.display import display, HTML, clear_output
import ipywidgets as widgets
from threading import Thread
import base64
from io import BytesIO
from PIL import Image


# ====================================================================
# UTILS MODULE
# ======================================================================


In [3]:
def load_env_file(file_path):
    """Load environment variables from a file."""
    config = {}
    try:
        with open(file_path, 'r') as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith('#'):
                    continue
                key, value = line.split('=', 1)
                config[key] = value
        return config
    except Exception as e:
        print(f"Error loading config: {e}")
        return {}


In [4]:

def parse_coordinates(coord_str):
    """Parse coordinates from string to tuple."""
    if not coord_str or coord_str.lower() == 'none':
        return None
    try:
        coords = list(map(int, coord_str.split(',')))
        if len(coords) == 2:
            return tuple(coords)
        elif len(coords) == 4:
            return (tuple(coords[:2]), tuple(coords[2:]))
        else:
            return None
    except (ValueError, TypeError):
        return None


In [5]:

def ensure_directory(directory):
    """Ensure a directory exists, create if it doesn't."""
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"Created directory: {directory}")
    return directory


In [6]:
def get_timestamp(format_str="%Y-%m-%d %H:%M:%S"):
    """Get current timestamp as string."""
    return datetime.now().strftime(format_str)


In [7]:

def get_file_timestamp():
    """Get timestamp format suitable for filenames."""
    return datetime.now().strftime("%Y%m%d_%H%M%S")


In [8]:

def draw_timestamp(frame, timestamp=None):
    """Draw timestamp on the frame."""
    if timestamp is None:
        timestamp = get_timestamp()
    cv2.putText(frame, timestamp, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    return frame


In [9]:

def draw_roi(frame, roi_start, roi_end):
    """Draw ROI rectangle on the frame."""
    if roi_start is not None and roi_end is not None:
        cv2.rectangle(frame, roi_start, roi_end, (0, 255, 0), 2)
    return frame


In [10]:

def extract_roi(frame, roi_start, roi_end):
    """Extract the region of interest from frame."""
    if roi_start is None or roi_end is None:
        return frame
    x1, y1 = roi_start
    x2, y2 = roi_end
    h, w = frame.shape[:2]
    x1 = max(0, min(x1, w-1))
    y1 = max(0, min(y1, h-1))
    x2 = max(0, min(x2, w-1))
    y2 = max(0, min(y2, h-1))
    return frame[y1:y2, x1:x2]


In [11]:

def save_frame(frame, directory, prefix="detection"):
    """Save a frame to disk with timestamp in filename."""
    timestamp = get_file_timestamp()
    filename = f"{prefix}_{timestamp}.jpg"
    filepath = os.path.join(directory, filename)
    ensure_directory(directory)
    cv2.imwrite(filepath, frame)
    print(f"Saved: {filepath}")
    return filepath


In [12]:

def str_to_bool(value):
    """Convert string value to boolean."""
    if isinstance(value, bool):
        return value
    if value.lower() in ('true', 'yes', '1', 'y'):
        return True
    elif value.lower() in ('false', 'no', '0', 'n'):
        return False
    else:
        return False



# ====================================================================
# LOGGER MODULE
# ======================================================================


In [13]:

class Logger:
    """Simple logging utility."""
    def __init__(self, log_file=None, output_dir='surveillance_footage'):
        self.output_dir = ensure_directory(output_dir)
        if log_file is None:
            log_file = os.path.join(self.output_dir, 'security_log.txt')
        self.log_file = log_file

    def log(self, event_type, details=None):
        """Log an event to file and console."""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        details_str = f" - {details}" if details else ""
        log_entry = f"[{timestamp}] {event_type}{details_str}\n"
        print(log_entry, end="")
        try:
            with open(self.log_file, 'a') as f:
                f.write(log_entry)
        except Exception as e:
            print(f"Error writing to log file: {e}")



# Global logger

In [14]:

security_logger = None

def init_logger(log_file=None, output_dir='surveillance_footage'):
    """Initialize the global logger."""
    global security_logger
    security_logger = Logger(log_file, output_dir)
    return security_logger


In [15]:

def log_event(event_type, details=None):
    """Log an event using the global logger."""
    global security_logger
    if security_logger is None:
        security_logger = Logger()
    security_logger.log(event_type, details)



# ===================================================================
# MOTION DETECTION MODULE
# =====================================================================


In [None]:

bg_subtractor = None

def init_motion_detector():
    """Initialize the motion detector."""
    global bg_subtractor
    bg_subtractor = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=16, detectShadows=True)
    log_event("INIT", "Motion detector initialized")
    return bg_subtractor


In [16]:

def detect_motion(frame, roi_start=None, roi_end=None, min_area=5000):
    """Detect motion in the frame or region of interest."""
    global bg_subtractor
    if bg_subtractor is None:
        bg_subtractor = init_motion_detector()

    if roi_start is not None and roi_end is not None:
        roi_frame = extract_roi(frame, roi_start, roi_end)
    else:
        roi_frame = frame

    fg_mask = bg_subtractor.apply(roi_frame)
    _, thresh = cv2.threshold(fg_mask, 25, 255, cv2.THRESH_BINARY)

    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
    thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
    thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
    thresh = cv2.dilate(thresh, kernel, iterations=2)

    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    motion_detected = False
    motion_contours = []

    for contour in contours:
        area = cv2.contourArea(contour)
        if area > min_area:
            motion_detected = True
            motion_contours.append(contour)

    return motion_detected, thresh, motion_contours


In [17]:

def draw_motion_contours(frame, contours, roi_start=None, roi_end=None):
    """Draw motion contours on the frame."""
    if not contours:
        return frame

    result = frame.copy()

    if roi_start is not None:
        x_offset, y_offset = roi_start
        for contour in contours:
            shifted_contour = contour + np.array([x_offset, y_offset])
            cv2.drawContours(result, [shifted_contour], -1, (0, 255, 255), 2)
    else:
        cv2.drawContours(result, contours, -1, (0, 255, 255), 2)

    return result



# ====================================================================
# FACE DETECTION MODULE
# ======================================================================


In [18]:

face_detector = None
face_recognition_loaded = False
known_face_encodings = []
known_face_names = []


In [19]:

def init_face_detector():
    """Initialize the face detector."""
    global face_detector
    face_cascade_path = os.path.join(cv2.data.haarcascades, 'haarcascade_frontalface_default.xml')
    face_detector = cv2.CascadeClassifier(face_cascade_path)

    if face_detector.empty():
        log_event("ERROR", "Failed to load face detector")
        return None

    log_event("INIT", "Face detector initialized")
    return face_detector


In [20]:

def load_face_recognition():
    """Load face recognition library if available."""
    global face_recognition_loaded
    try:
        global face_recognition
        import face_recognition
        face_recognition_loaded = True
        log_event("INIT", "Face recognition library loaded")
        return True
    except ImportError:
        log_event("WARNING", "face_recognition library not available. Installing...")
        # Install face_recognition in Colab
        os.system("pip install face_recognition")
        try:
            import face_recognition
            face_recognition_loaded = True
            log_event("INIT", "Face recognition library installed and loaded")
            return True
        except ImportError:
            log_event("WARNING", "face_recognition library installation failed. Using basic detection only.")
            return False


In [21]:

def load_known_faces(known_faces_path='known_faces'):
    """Load known faces from the database."""
    global known_face_encodings, known_face_names, face_recognition_loaded

    ensure_directory(known_faces_path)

    if not face_recognition_loaded and not load_face_recognition():
        log_event("WARNING", "Face recognition not available. Cannot load known faces.")
        return False

    encodings_file = os.path.join(known_faces_path, "face_encodings.pickle")

    if os.path.exists(encodings_file):
        log_event("INFO", "Loading pre-computed face encodings...")
        try:
            with open(encodings_file, "rb") as f:
                data = pickle.load(f)
                known_face_encodings = data["encodings"]
                known_face_names = data["names"]
            log_event("INFO", f"Loaded {len(known_face_names)} known faces")
            return True
        except Exception as e:
            log_event("ERROR", f"Failed to load face encodings: {e}")

    log_event("INFO", "No pre-computed encodings found. Processing image files...")
    known_face_encodings = []
    known_face_names = []

    for filename in os.listdir(known_faces_path):
        if filename.endswith(('.jpg', '.jpeg', '.png')):
            name = os.path.splitext(filename)[0].replace('_', ' ').title()
            image_path = os.path.join(known_faces_path, filename)

            try:
                image = face_recognition.load_image_file(image_path)
                face_locations = face_recognition.face_locations(image)

                if face_locations:
                    face_encoding = face_recognition.face_encodings(image, [face_locations[0]])[0]
                    known_face_encodings.append(face_encoding)
                    known_face_names.append(name)
                    log_event("INFO", f"Loaded face: {name}")
                else:
                    log_event("WARNING", f"No face detected in {filename}")
            except Exception as e:
                log_event("ERROR", f"Error processing {filename}: {e}")

    if known_face_encodings:
        try:
            log_event("INFO", "Saving face encodings to file...")
            data = {"encodings": known_face_encodings, "names": known_face_names}
            with open(encodings_file, "wb") as f:
                pickle.dump(data, f)
        except Exception as e:
            log_event("ERROR", f"Failed to save face encodings: {e}")

    log_event("INFO", f"Loaded {len(known_face_names)} known faces")
    return len(known_face_names) > 0


In [22]:
def detect_faces(frame):
    """Basic face detection using Haar cascade."""
    global face_detector
    if face_detector is None:
        face_detector = init_face_detector()

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_detector.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    return faces


In [23]:

def recognize_faces(frame, threshold=0.6):
    """Detect and recognize faces in the frame."""
    global face_recognition_loaded, known_face_encodings, known_face_names

    display_frame = frame.copy()
    detected_faces = []

    if not face_recognition_loaded:
        if not load_face_recognition():
            faces = detect_faces(frame)
            for (x, y, w, h) in faces:
                cv2.rectangle(display_frame, (x, y), (x+w, y+h), (0, 0, 255), 2)
                name = "Unknown"
                cv2.putText(display_frame, name, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
                detected_faces.append({'name': name, 'location': (x, y, x+w, y+h), 'recognized': False})
            return display_frame, detected_faces

    if not known_face_encodings:
        load_known_faces()

    small_frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
    rgb_small_frame = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)

    face_locations = face_recognition.face_locations(rgb_small_frame)
    face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations)

    for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
        top *= 4
        right *= 4
        bottom *= 4
        left *= 4

        matches = []
        name = "Unknown"

        if known_face_encodings:
            matches = face_recognition.compare_faces(known_face_encodings, face_encoding, tolerance=threshold)
            if True in matches:
                first_match_index = matches.index(True)
                name = known_face_names[first_match_index]

        cv2.rectangle(display_frame, (left, top), (right, bottom), (0, 0, 255), 2)
        cv2.rectangle(display_frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
        cv2.putText(display_frame, name, (left + 6, bottom - 6), cv2.FONT_HERSHEY_DUPLEX, 1.0, (255, 255, 255), 1)

        detected_faces.append({'name': name, 'location': (left, top, right, bottom), 'recognized': name != "Unknown"})

    return display_frame, detected_faces



# ====================================================================
# MAIN SURVEILLANCE SYSTEM
# ======================================================================


In [24]:

class SurveillanceSystem:
    def __init__(self):
        self.config = self.load_default_config()
        self.running = False
        self.motion_counter = 0
        self.last_alert_time = 0

        # Initialize components
        init_logger()
        log_event("SYSTEM", "Surveillance System Initialized")

    def load_default_config(self):
        """Load default configuration for Colab environment."""
        return {
            'camera_source': 0,  # Use webcam
            'roi_start_point': None,
            'roi_end_point': None,
            'detection_mode': 'both',
            'min_contour_area': 5000,
            'persistence_threshold': 5,
            'alert_interval': 10,
            'face_recognition_enabled': True,
            'face_recognition_threshold': 0.6,
            'output_dir': 'surveillance_footage'
        }

    def process_frame(self, frame):
        """Process a single frame for motion and face detection."""
        result_frame = frame.copy()
        alerts = []

        # Add timestamp
        result_frame = draw_timestamp(result_frame)

        # Draw ROI if specified
        if self.config['roi_start_point'] and self.config['roi_end_point']:
            result_frame = draw_roi(result_frame, self.config['roi_start_point'], self.config['roi_end_point'])

        # Motion detection
        if self.config['detection_mode'] in ['motion', 'both']:
            motion_detected, motion_mask, motion_contours = detect_motion(
                frame,
                self.config['roi_start_point'],
                self.config['roi_end_point'],
                self.config['min_contour_area']
            )

            if motion_detected:
                self.motion_counter += 1
                result_frame = draw_motion_contours(result_frame, motion_contours,
                                                 self.config['roi_start_point'], self.config['roi_end_point'])

                if self.motion_counter >= self.config['persistence_threshold']:
                    current_time = time.time()
                    if current_time - self.last_alert_time > self.config['alert_interval']:
                        alerts.append("MOTION DETECTED")
                        log_event("ALERT", "Motion detected")
                        save_frame(result_frame, self.config['output_dir'], "motion")
                        self.last_alert_time = current_time
            else:
                self.motion_counter = 0

        # Face detection/recognition
        if self.config['detection_mode'] in ['face', 'both'] and self.config['face_recognition_enabled']:
            result_frame, detected_faces = recognize_faces(result_frame, self.config['face_recognition_threshold'])

            for face in detected_faces:
                if not face['recognized']:
                    alerts.append(f"UNKNOWN FACE DETECTED")
                    log_event("ALERT", f"Unknown face detected")
                    save_frame(result_frame, self.config['output_dir'], "unknown_face")
                else:
                    log_event("INFO", f"Known face detected: {face['name']}")

        return result_frame, alerts

    def run_on_image(self, image_path):
        """Process a single image."""
        frame = cv2.imread(image_path)
        if frame is None:
            print(f"Could not load image: {image_path}")
            return None

        processed_frame, alerts = self.process_frame(frame)

        print(f"Alerts: {alerts}")
        return processed_frame

    def setup_dataset(self, dataset_path):
        """Setup known faces from dataset."""
        if not os.path.exists(dataset_path):
            print(f"Dataset path {dataset_path} does not exist")
            return False

        known_faces_dir = ensure_directory('known_faces')

        # Copy some sample images from dataset to known_faces
        count = 0
        for person_dir in os.listdir(dataset_path):
            person_path = os.path.join(dataset_path, person_dir)
            if os.path.isdir(person_path) and count < 5:  # Limit to 5 people for demo
                images = [f for f in os.listdir(person_path) if f.endswith(('.jpg', '.jpeg', '.png'))]
                if images:
                    # Take first image for each person
                    src_image = os.path.join(person_path, images[0])
                    dst_image = os.path.join(known_faces_dir, f"{person_dir}.jpg")

                    # Copy the image
                    import shutil
                    shutil.copy2(src_image, dst_image)
                    print(f"Added {person_dir} to known faces")
                    count += 1

        # Load the known faces
        load_known_faces()
        return True



# ====================================================================
# COLAB INTERFACE FUNCTIONS
# ======================================================================


In [25]:

def demo_image_processing():
    """Demo function for processing uploaded images."""
    print("Upload an image to test the surveillance system:")
    uploaded = files.upload()

    if not uploaded:
        print("No file uploaded")
        return

    # Initialize surveillance system
    system = SurveillanceSystem()

    for filename in uploaded.keys():
        print(f"Processing {filename}...")

        # Save uploaded file
        with open(filename, 'wb') as f:
            f.write(uploaded[filename])

        # Process the image
        processed_frame = system.run_on_image(filename)

        if processed_frame is not None:
            # Display results
            plt.figure(figsize=(12, 8))
            plt.subplot(1, 2, 1)
            original = cv2.imread(filename)
            plt.imshow(cv2.cvtColor(original, cv2.COLOR_BGR2RGB))
            plt.title('Original Image')
            plt.axis('off')

            plt.subplot(1, 2, 2)
            plt.imshow(cv2.cvtColor(processed_frame, cv2.COLOR_BGR2RGB))
            plt.title('Processed Image')
            plt.axis('off')

            plt.tight_layout()
            plt.show()


In [26]:

def setup_lfw_dataset():
    """Setup Labeled Faces in the Wild dataset."""
    print("Setting up LFW dataset...")
    print("Please upload the LFW dataset or run this cell after downloading it:")
    print("!wget http://vis-www.cs.umass.edu/lfw/lfw.tgz")
    print("!tar -xzf lfw.tgz")

    # Check if dataset exists
    if os.path.exists('lfw'):
        system = SurveillanceSystem()
        system.setup_dataset('lfw')
        print("Dataset setup complete!")
    else:
        print("LFW dataset not found. Please download it first.")



# ====================================================================
# USAGE INSTRUCTIONS
# ======================================================================


In [27]:

def print_usage_instructions():
    """Print usage instructions for Colab."""
    print("""
    🚀 AI SURVEILLANCE SYSTEM - COLAB VERSION
    ==========================================

    Your surveillance system includes:
    ✅ Motion Detection using background subtraction
    ✅ Face Detection using Haar cascades
    ✅ Face Recognition using face_recognition library
    ✅ Configurable ROI (Region of Interest)
    ✅ Event logging and frame saving
    ✅ Modular architecture maintained

    QUICK START GUIDE:
    ------------------

    1. Test with an image:
       demo_image_processing()

    2. Setup LFW dataset for face recognition:
       setup_lfw_dataset()

    3. Create surveillance system instance:
       system = SurveillanceSystem()

    4. Process an image:
       frame = cv2.imread('your_image.jpg')
       processed, alerts = system.process_frame(frame)

    5. Check logs:
       !cat surveillance_footage/security_log.txt

    FILES CREATED:
    --------------
    📁 surveillance_footage/ - Saved detection frames and logs
    📁 known_faces/ - Database of known faces
    📄 security_log.txt - Event logs

    """)


# Initialize and show instructions


In [28]:
print_usage_instructions()


    🚀 AI SURVEILLANCE SYSTEM - COLAB VERSION
    
    Your surveillance system includes:
    ✅ Motion Detection using background subtraction
    ✅ Face Detection using Haar cascades  
    ✅ Face Recognition using face_recognition library
    ✅ Configurable ROI (Region of Interest)
    ✅ Event logging and frame saving
    ✅ Modular architecture maintained
    
    QUICK START GUIDE:
    ------------------
    
    1. Test with an image:
       demo_image_processing()
    
    2. Setup LFW dataset for face recognition:
       setup_lfw_dataset()
    
    3. Create surveillance system instance:
       system = SurveillanceSystem()
       
    4. Process an image:
       frame = cv2.imread('your_image.jpg')
       processed, alerts = system.process_frame(frame)
    
    5. Check logs:
       !cat surveillance_footage/security_log.txt
    
    FILES CREATED:
    --------------
    📁 surveillance_footage/ - Saved detection frames and logs
    📁 known_faces/ - Database of known faces
    📄 s

In [29]:
       demo_image_processing()


Upload an image to test the surveillance system:


Saving 3face1.jpg to 3face1.jpg
Created directory: surveillance_footage
[2025-05-30 14:04:33] SYSTEM - Surveillance System Initialized
Processing 3face1.jpg...
[2025-05-30 14:04:33] INIT - Motion detector initialized


RuntimeError: Error while calling cudaGetDevice(&the_device_id) in file /tmp/.tmpYAwLzy/sdists-v9/pypi/dlib/19.24.6/b_ZuI215-z7cRkD5Gta0t/src/dlib/cuda/gpu_data.cpp:204. code: 35, reason: CUDA driver version is insufficient for CUDA runtime version

In [None]:
       setup_lfw_dataset()


In [None]:
       system = SurveillanceSystem()
