In [None]:
# Data Analysis and Root Cause Identification for Security Systems (CCTV, Access Control, Intercom)

import psycopg2
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

# Set seaborn style for better visuals
sns.set(style="whitegrid")

# PostgreSQL connection details
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'security_systems'
DB_USER = 'your_user'
DB_PASSWORD = 'your_password'

# Function to load data from PostgreSQL
def load_data_from_db(query):
    try:
        conn = psycopg2.connect(
            host=DB_HOST,
            port=DB_PORT,
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )
        df = pd.read_sql_query(query, conn)
        conn.close()
        return df
    except Exception as e:
        print(f"Error connecting to the PostgreSQL database: {e}")
        return None

# Load data from CCTV, Access Control, and Intercom systems
cctv_data = load_data_from_db("SELECT * FROM cctv_logs")
access_data = load_data_from_db("SELECT * FROM access_control_logs")
intercom_data = load_data_from_db("SELECT * FROM intercom_logs")

# Convert timestamp columns to datetime objects
cctv_data['timestamp'] = pd.to_datetime(cctv_data['timestamp'])
access_data['timestamp'] = pd.to_datetime(access_data['timestamp'])
intercom_data['timestamp'] = pd.to_datetime(intercom_data['timestamp'])

# Preview the data
print("CCTV Data:")
print(cctv_data.head())
print("\nAccess Control Data:")
print(access_data.head())
print("\nIntercom Data:")
print(intercom_data.head())


In [None]:
# --- Phase 1: Exploratory Data Analysis (EDA) ---

# 1. CCTV Motion Detection Analysis
def analyze_cctv_motion_detection(df):
    """Analyze motion detection events from CCTV data."""
    df['motion_detected'] = df['motion_detected'].astype(int)
    
    # Plot the number of motion detected events over time
    plt.figure(figsize=(12, 6))
    sns.lineplot(data=df, x='timestamp', y='motion_detected', marker='o')
    plt.title('Motion Detection Events Over Time (CCTV)', fontsize=14)
    plt.xlabel('Timestamp', fontsize=12)
    plt.ylabel('Motion Detected', fontsize=12)
    plt.xticks(rotation=45)
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # Motion detection count by camera
    motion_count_by_camera = df.groupby('camera_id')['motion_detected'].sum().reset_index()
    plt.figure(figsize=(12, 6))
    sns.barplot(data=motion_count_by_camera, x='camera_id', y='motion_detected', palette='Blues_d')
    plt.title('Total Motion Detection Events by Camera', fontsize=14)
    plt.xlabel('Camera ID', fontsize=12)
    plt.ylabel('Motion Detected', fontsize=12)
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()

analyze_cctv_motion_detection(cctv_data)


In [None]:
# 2. CCTV Uptime Analysis
def analyze_cctv_uptime(df):
    """Analyze uptime status of CCTV cameras."""
    df['is_online'] = df['status'].apply(lambda x: 1 if x == 'online' else 0)

    # Plot CCTV uptime over time
    plt.figure(figsize=(12, 6))
    sns.lineplot(data=df, x='timestamp', y='is_online', marker='o')
    plt.title('CCTV Camera Uptime Over Time', fontsize=14)
    plt.xlabel('Timestamp', fontsize=12)
    plt.ylabel('Online Status (1 = Online, 0 = Offline)', fontsize=12)
    plt.xticks(rotation=45)
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # Uptime percentage by camera
    uptime_by_camera = df.groupby('camera_id')['is_online'].mean().reset_index()
    plt.figure(figsize=(12, 6))
    sns.barplot(data=uptime_by_camera, x='camera_id', y='is_online', palette='Greens_d')
    plt.title('CCTV Camera Uptime Percentage by Camera', fontsize=14)
    plt.xlabel('Camera ID', fontsize=12)
    plt.ylabel('Uptime Percentage', fontsize=12)
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()

analyze_cctv_uptime(cctv_data)

In [None]:
# 3. Access Control Analysis
def analyze_access_control(df):
    """Analyze access control success and failure rates."""
    total_attempts = len(df)
    granted_access = df['access_granted'].sum()

    success_rate = (granted_access / total_attempts) * 100
    print(f"Access Control Success Rate: {success_rate:.2f}%")

    # Plot access control success/failure rates
    plt.figure(figsize=(6, 6))
    labels = 'Access Granted', 'Access Denied'
    sizes = [granted_access, total_attempts - granted_access]
    plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90, colors=['#4CAF50', '#FF5722'])
    plt.axis('equal')
    plt.title('Access Control Success Rate')
    plt.show()

    # Access attempts by door
    access_by_door = df.groupby('door_id')['access_granted'].sum().reset_index()
    plt.figure(figsize=(12, 6))
    sns.barplot(data=access_by_door, x='door_id', y='access_granted', palette='Purples_d')
    plt.title('Total Access Granted by Door', fontsize=14)
    plt.xlabel('Door ID', fontsize=12)
    plt.ylabel('Access Granted', fontsize=12)
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()

analyze_access_control(access_data)


In [None]:
# 4. Intercom System Analysis
def analyze_intercom_status(df):
    """Analyze intercom system status."""
    df['is_active'] = df['status'].apply(lambda x: 1 if x == 'active' else 0)

    # Plot intercom system status over time
    plt.figure(figsize=(12, 6))
    sns.lineplot(data=df, x='timestamp', y='is_active', marker='o')
    plt.title('Intercom System Active Status Over Time', fontsize=14)
    plt.xlabel('Timestamp', fontsize=12)
    plt.ylabel('Active Status (1 = Active, 0 = Inactive)', fontsize=12)
    plt.xticks(rotation=45)
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # Active percentage by intercom
    active_by_intercom = df.groupby('intercom_id')['is_active'].mean().reset_index()
    plt.figure(figsize=(12, 6))
    sns.barplot(data=active_by_intercom, x='intercom_id', y='is_active', palette='Oranges_d')
    plt.title('Intercom System Active Percentage by Intercom', fontsize=14)
    plt.xlabel('Intercom ID', fontsize=12)
    plt.ylabel('Active Percentage', fontsize=12)
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()

analyze_intercom_status(intercom_data)


In [None]:
# --- Phase 2: Root Cause Analysis ---

# 5. CCTV Failure Root Cause Analysis (Identify recurring offline periods)
def analyze_cctv_failures(df, threshold_minutes=10):
    """Identify cameras offline for extended periods and visualize the root causes."""
    offline_cameras = df[df['status'] == 'offline'].copy()
    offline_cameras['offline_duration'] = offline_cameras.groupby('camera_id')['timestamp'].diff().dt.total_seconds() / 60

    offline_anomalies = offline_cameras[offline_cameras['offline_duration'] > threshold_minutes]
    
    if not offline_anomalies.empty:
        print(f"Anomalies detected: {len(offline_anomalies)} events where cameras were offline for more than {threshold_minutes} minutes.")
        print(offline_anomalies[['timestamp', 'camera_id', 'offline_duration']])
        
        # Plot anomalies over time
        plt.figure(figsize=(12, 6))
        sns.lineplot(data=offline_anomalies, x='timestamp', y='offline_duration', hue='camera_id')
        plt.title(f'Offline Duration of CCTV Cameras (Threshold: {threshold_minutes} minutes)', fontsize=14)
        plt.xlabel('Timestamp', fontsize=12)
        plt.ylabel('Offline Duration (Minutes)', fontsize=12)
        plt.xticks(rotation=45)
        plt.grid(True)
        plt.tight_layout()
        plt.show()
    else:
        print(f"No anomalies detected where cameras were offline for more than {threshold_minutes} minutes.")
    
    return offline_anomalies

# Detect offline anomalies in CCTV cameras
cctv_anomalies = analyze_cctv_failures(cctv_data)


In [None]:
# 6. Access Control Bottleneck Identification
def analyze_access_control_bottlenecks(df):
    """Identify doors with high access denials and visualize the root causes."""
    access_denials = df[df['access_granted'] == 0]
    denials_by_door = access_denials.groupby('door_id').size().reset_index(name='denial_count')

    # Plot doors with the most access denials
    plt.figure(figsize=(12, 6))
    sns.barplot(data=denials_by_door, x='door_id', y='denial_count', palette='Reds_d')
    plt.title('Access Denials by Door', fontsize=14)
    plt.xlabel('Door ID', fontsize=12)
    plt.ylabel('Denial Count', fontsize=12)
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()

    # Display doors with highest denial rates
    print("Top doors with highest access denials:")
    print(denials_by_door.sort_values(by='denial_count', ascending=False).head())

# Identify bottlenecks in access control
analyze_access_control_bottlenecks(access_data)


In [None]:
# 7. Suggested Corrective Actions
def plot_corrective_actions():
    """Visualize corrective actions based on root cause analysis."""
    actions = ['Check Network Connectivity', 'Adjust Motion Sensitivity', 'Increase Maintenance', 'Review Access Logs']
    counts = [10, 8, 6, 7]  # Example counts of issues related to these actions

    plt.figure(figsize=(10, 6))
    sns.barplot(x=actions, y=counts, palette='coolwarm')
    plt.title('Suggested Corrective Actions Based on Root Cause Analysis', fontsize=14)
    plt.xlabel('Corrective Action', fontsize=12)
    plt.ylabel('Count of Related Issues', fontsize=12)
    plt.tight_layout()
    plt.show()

# Visualize suggested corrective actions
plot_corrective_actions()