# Cyber-Attack Detection in a Smart Home System

In [1]:
from main import AttackDetector
detector = AttackDetector()

### Normal Operations (should not trigger alerts)

In [2]:
normal_scenarios = [
    ("login_attempt", "USER", "user123", "192.168.1.100", {"success": True}),
    ("toggle_device", "USER", "user123", "192.168.1.100", {"device_id": "light_001", "action": "on"}),
    ("power_reading", "ADMIN", "admin123", "device_001", {"device_id": "device_001", "value": 95.0})
]

for event_name, user_role, user_id, source_id, context in normal_scenarios:
    threat = detector.instrument(event_name, user_role, user_id, source_id, context=context)
    print(f"  {event_name:15} | Role: {user_role:7} | Threat: {'YES' if threat else 'NO'}")

  login_attempt   | Role: USER    | Threat: NO
  toggle_device   | Role: USER    | Threat: NO
  power_reading   | Role: ADMIN   | Threat: NO


### Attack Scenarios (should trigger alerts)

In [3]:
# Test: Failed Login Attack
print("TESTING BRUTE FORCE LOGIN ATTACK:")

# Simulate 6 failed login attempts in quick succession
for i in range(6):
    threat = detector.instrument(
        "login_attempt", "USER", f"attacker{i}", "192.168.1.666",
        context={"success": False, "attempt": i+1}
    )
    print(f"  Login attempt {i+1:2} | Threat detected: {'YES' if threat else 'NO'}")



TESTING BRUTE FORCE LOGIN ATTACK:
  Login attempt  1 | Threat detected: NO
  Login attempt  2 | Threat detected: NO
  Login attempt  3 | Threat detected: NO
  Login attempt  4 | Threat detected: NO
  Login attempt  5 | Threat detected: NO
  Login attempt  6 | Threat detected: YES


In [4]:
# Test: Device Toggle Spam Attack
print("TESTING DEVICE TOGGLE SPAM ATTACK:")

# Simulate rapid device toggling
for i in range(12):
    threat = detector.instrument(
        "toggle_device", "USER", "spammer123", "192.168.1.777",
        context={"device_id": "light_002", "action": "toggle", "sequence": i+1}
    )
    print(f"  Toggle command {i+1:2} | Threat detected: {'YES' if threat else 'NO'}")



TESTING DEVICE TOGGLE SPAM ATTACK:
  Toggle command  1 | Threat detected: NO
  Toggle command  2 | Threat detected: NO
  Toggle command  3 | Threat detected: NO
  Toggle command  4 | Threat detected: NO
  Toggle command  5 | Threat detected: NO
  Toggle command  6 | Threat detected: NO
  Toggle command  7 | Threat detected: NO
  Toggle command  8 | Threat detected: NO
  Toggle command  9 | Threat detected: NO
  Toggle command 10 | Threat detected: NO
  Toggle command 11 | Threat detected: YES
  Toggle command 12 | Threat detected: YES


In [5]:
# Test: Power Anomaly Detection
print("TESTING POWER ANOMALY DETECTION:")

power_tests = [
    (90.0, "Normal reading"),
    (160.0, "Spike (160% of average)"),
    (-50.0, "Invalid negative value"),
    (300.0, "Extreme spike")
]

for power_value, description in power_tests:
    threat = detector.instrument(
        "power_reading", "USER", "monitoring", "device_001",
        context={"device_id": "device_001", "value": power_value}
    )
    print(f"  {description:25} | Value: {power_value:6.1f} | Threat: {'YES' if threat else 'NO'}")



TESTING POWER ANOMALY DETECTION:
  Normal reading            | Value:   90.0 | Threat: NO
  Spike (160% of average)   | Value:  160.0 | Threat: YES
  Invalid negative value    | Value:  -50.0 | Threat: YES
  Extreme spike             | Value:  300.0 | Threat: YES


### Role-aware filtering during business hours

In [6]:
from datetime import datetime
business_hour_time = datetime.now().replace(hour=10, minute=0, second=0)

# Test: Failed Login Attack
print("TESTING BRUTE FORCE LOGIN ATTACK:")

# Simulate 6 failed login attempts in quick succession during business hours
for i in range(6):
    threat = detector.instrument(
        "login_attempt", "ADMIN", f"admin{i}", "192.168.1.666", business_hour_time,
        context={"success": False, "attempt": i+1}
    )
    print(f"  Login attempt {i+1:2} | Threat detected: {'YES' if threat else 'NO'}")

# Test: Device Toggle Spam Attack
print("=" * 80)
print("TESTING DEVICE TOGGLE SPAM ATTACK:")

# Simulate rapid device toggling during business hours
for i in range(12):
    threat = detector.instrument(
        "toggle_device", "MANAGER", "manager123", "192.168.1.777", business_hour_time,
        context={"device_id": "light_002", "action": "toggle", "sequence": i+1}
    )
    print(f"  Toggle command {i+1:2} | Threat detected: {'YES' if threat else 'NO'}")

# Test: Power Anomaly Detection
print("=" * 80)
print("TESTING POWER ANOMALY DETECTION:")

power_tests = [
    (90.0, "Normal reading"),
    (160.0, "Spike (160% of average)"),
    (-50.0, "Invalid negative value"),
    (300.0, "Extreme spike")
]

for power_value, description in power_tests:
    threat = detector.instrument(
        "power_reading", "ADMIN", "monitoring", "device_001", business_hour_time,
        context={"device_id": "device_001", "value": power_value}
    )
    print(f"  {description:25} | Value: {power_value:6.1f} | Threat: {'YES' if threat else 'NO'}")

TESTING BRUTE FORCE LOGIN ATTACK:
  Login attempt  1 | Threat detected: NO
  Login attempt  2 | Threat detected: NO
  Login attempt  3 | Threat detected: NO
  Login attempt  4 | Threat detected: NO
  Login attempt  5 | Threat detected: NO
  Login attempt  6 | Threat detected: NO
TESTING DEVICE TOGGLE SPAM ATTACK:
  Toggle command  1 | Threat detected: NO
  Toggle command  2 | Threat detected: NO
  Toggle command  3 | Threat detected: NO
  Toggle command  4 | Threat detected: NO
  Toggle command  5 | Threat detected: NO
  Toggle command  6 | Threat detected: NO
  Toggle command  7 | Threat detected: NO
  Toggle command  8 | Threat detected: NO
  Toggle command  9 | Threat detected: NO
  Toggle command 10 | Threat detected: NO
  Toggle command 11 | Threat detected: NO
  Toggle command 12 | Threat detected: NO
TESTING POWER ANOMALY DETECTION:
  Normal reading            | Value:   90.0 | Threat: NO
  Spike (160% of average)   | Value:  160.0 | Threat: NO
  Invalid negative value    | Valu

### Testing non business hour scenarios in role-aware filtering

In [7]:
from datetime import datetime
non_business_hour_time = datetime.now().replace(hour=23, minute=0, second=0)

# Test: Failed Login Attack
print("TESTING BRUTE FORCE LOGIN ATTACK:")

# Simulate 6 failed login attempts in quick succession during business hours
for i in range(6):
    threat = detector.instrument(
        "login_attempt", "ADMIN", f"admin{i}", "192.168.1.666", non_business_hour_time,
        context={"success": False, "attempt": i+1}
    )
    print(f"  Login attempt {i+1:2} | Threat detected: {'YES' if threat else 'NO'}")

# Test: Device Toggle Spam Attack
print("=" * 80)
print("TESTING DEVICE TOGGLE SPAM ATTACK:")

# Simulate rapid device toggling during business hours
for i in range(12):
    threat = detector.instrument(
        "toggle_device", "MANAGER", "manager123", "192.168.1.777", non_business_hour_time,
        context={"device_id": "light_002", "action": "toggle", "sequence": i+1}
    )
    print(f"  Toggle command {i+1:2} | Threat detected: {'YES' if threat else 'NO'}")

# Test: Power Anomaly Detection
print("=" * 80)
print("TESTING POWER ANOMALY DETECTION:")

power_tests = [
    (90.0, "Normal reading"),
    (160.0, "Spike (160% of average)"),
    (-50.0, "Invalid negative value"),
    (300.0, "Extreme spike")
]

for power_value, description in power_tests:
    threat = detector.instrument(
        "power_reading", "ADMIN", "monitoring", "device_001", non_business_hour_time,
        context={"device_id": "device_001", "value": power_value}
    )
    print(f"  {description:25} | Value: {power_value:6.1f} | Threat: {'YES' if threat else 'NO'}")



TESTING BRUTE FORCE LOGIN ATTACK:
  Login attempt  1 | Threat detected: NO
  Login attempt  2 | Threat detected: NO
  Login attempt  3 | Threat detected: NO
  Login attempt  4 | Threat detected: NO
  Login attempt  5 | Threat detected: NO
  Login attempt  6 | Threat detected: YES
TESTING DEVICE TOGGLE SPAM ATTACK:
  Toggle command  1 | Threat detected: NO
  Toggle command  2 | Threat detected: NO
  Toggle command  3 | Threat detected: NO
  Toggle command  4 | Threat detected: NO
  Toggle command  5 | Threat detected: NO
  Toggle command  6 | Threat detected: NO
  Toggle command  7 | Threat detected: NO
  Toggle command  8 | Threat detected: NO
  Toggle command  9 | Threat detected: NO
  Toggle command 10 | Threat detected: NO
  Toggle command 11 | Threat detected: YES
  Toggle command 12 | Threat detected: YES
TESTING POWER ANOMALY DETECTION:
  Normal reading            | Value:   90.0 | Threat: NO
  Spike (160% of average)   | Value:  160.0 | Threat: YES
  Invalid negative value    | 

### Additional anomaly detection functions

1. Detect suspicious command patterns

In [8]:
# Test: Suspicious command execution
print("TESTING SUSPICIOUS COMMAND EXECUTION:")
suspicious_commands = [
    ("execute_command", "USER", "admin123", "192.168.1.888", {"command": "rm -rf /"}),
    ("execute_command", "USER", "user123", "192.168.1.999", {"command": "sudo su"}),
    ("execute_command", "USER", "guest123", "192.168.1.100", {"command": "cat /etc/passwd"}),
    ("execute_command", "USER", "malicious_user", "192.168.1.101", {"command": "curl http://malicious.com"}),
    ("execute_command", "USER", "normaluser", "192.168.1.102", {"command": "ls -la"}),
    ("execute_command", "USER", "admin123", "192.168.1.103", {"command": "git status"})
]

for command, role, user, source, context in suspicious_commands:
    threat = detector.instrument(
        command, role, user, source, context=context
    )
    print(f"  Command: {context['command']} | Threat detected: {'YES' if threat else 'NO'}")



TESTING SUSPICIOUS COMMAND EXECUTION:
  Command: rm -rf / | Threat detected: YES
  Command: sudo su | Threat detected: YES
  Command: cat /etc/passwd | Threat detected: YES
  Command: curl http://malicious.com | Threat detected: YES
  Command: ls -la | Threat detected: NO
  Command: git status | Threat detected: NO


In [9]:
# Testing role-aware filtering in business and non-business hours
from datetime import datetime
business_hour_time = datetime.now().replace(hour=10, minute=0, second=0)
non_business_hour_time = datetime.now().replace(hour=23, minute=0, second=0)
suspicious_commands = [
    ("execute_command", "ADMIN", "admin123", "192.168.1.888", {"command": "rm -rf /"}),
    ("execute_command", "MANAGER", "user123", "192.168.1.999", {"command": "sudo su"}),
    ("execute_command", "ADMIN", "guest123", "192.168.1.100", {"command": "cat /etc/passwd"}),
    ("execute_command", "MANAGER", "malicious_user", "192.168.1.101", {"command": "curl http://malicious.com"}),
    ("execute_command", "ADMIN", "normaluser", "192.168.1.102", {"command": "ls -la"}),
    ("execute_command", "MANAGER", "admin123", "192.168.1.103", {"command": "git status"})
]

print("TESTING ROLE-AWARE FILTERING IN BUSINESS HOURS:")
for command, role, user, source, context in suspicious_commands:
    threat = detector.instrument(
        command, role, user, source, business_hour_time, context=context
    )
    print(f"  Command: {context['command']} | Threat detected: {'YES' if threat else 'NO'}")

print("TESTING ROLE-AWARE FILTERING IN NON-BUSINESS HOURS:")
for command, role, user, source, context in suspicious_commands:
    threat = detector.instrument(
        command, role, user, source, non_business_hour_time, context=context
    )
    print(f"  Command: {context['command']} | Threat detected: {'YES' if threat else 'NO'}")



TESTING ROLE-AWARE FILTERING IN BUSINESS HOURS:
  Command: rm -rf / | Threat detected: NO
  Command: sudo su | Threat detected: NO
  Command: cat /etc/passwd | Threat detected: NO
  Command: curl http://malicious.com | Threat detected: NO
  Command: ls -la | Threat detected: NO
  Command: git status | Threat detected: NO
TESTING ROLE-AWARE FILTERING IN NON-BUSINESS HOURS:
  Command: rm -rf / | Threat detected: YES
  Command: sudo su | Threat detected: YES
  Command: cat /etc/passwd | Threat detected: YES
  Command: curl http://malicious.com | Threat detected: YES
  Command: ls -la | Threat detected: NO
  Command: git status | Threat detected: NO


In [10]:
# Excessive normal command execution
print("TESTING EXCESSIVE NORMAL COMMAND EXECUTION:")
excessive_commands = [
    ("execute_command", "USER", "user123", "192.168.1.888", {"command": "git status"})
]

for i in range(7):
    threat = detector.instrument(
        "execute_command", "USER", "user123", "192.168.1.888",
        context={"command": "git status", "sequence": i+1}
    )
    print(f"  Command execution {i+1:2} | Threat detected: {'YES' if threat else 'NO'}")



TESTING EXCESSIVE NORMAL COMMAND EXECUTION:
  Command execution  1 | Threat detected: NO
  Command execution  2 | Threat detected: NO
  Command execution  3 | Threat detected: NO
  Command execution  4 | Threat detected: NO
  Command execution  5 | Threat detected: NO
  Command execution  6 | Threat detected: YES
  Command execution  7 | Threat detected: YES


2. Detect session hijacking

In [11]:
# Test: Session Hijacking Detection
print("TESTING SESSION HIJACKING DETECTION:")
print("   User-Agent helps detect when attackers use different tools/browsers")

# Scenario 1: Normal session usage (same IP, same user agent)
print("  Scenario 1: Normal session usage")
session_token = "session_abc123"
# Typical Chrome browser user agent
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"

# Initial login
threat = detector.instrument(
    "login_attempt", "USER", "victim_user", "192.168.1.50",
    context={"success": True, "session_token": session_token, "user_agent": user_agent}
)
print(f"    Initial login     | Threat detected: {'YES' if threat else 'NO'}")

# Normal API call from same source
threat = detector.instrument(
    "api_call", "USER", "victim_user", "192.168.1.50",
    
    context={"endpoint": "/profile", "session_token": session_token, "user_agent": user_agent}
)
print(f"    Normal API call   | Threat detected: {'YES' if threat else 'NO'}")

TESTING SESSION HIJACKING DETECTION:
   User-Agent helps detect when attackers use different tools/browsers
  Scenario 1: Normal session usage
    Initial login     | Threat detected: NO
    Normal API call   | Threat detected: NO


In [12]:
# Scenario 2: Session hijacking - different IP, same session token
print("\n  Scenario 2: Session hijacking - different IP")

# Attacker uses the same session token from different IP
threat = detector.instrument(
    "api_call", "USER", "victim_user", "10.0.0.100",  # Different IP
    context={"endpoint": "/sensitive", "session_token": session_token, "user_agent": user_agent}
)
print(f"    Hijacked API call | Threat detected: {'YES' if threat else 'NO'}")




  Scenario 2: Session hijacking - different IP
    Hijacked API call | Threat detected: YES


In [13]:
# Scenario 3: Session hijacking with user agent change
print("\n  Scenario 3: Session hijacking - different IP and User-Agent")
print("             (Legitimate user vs. attacker using command-line tools)")

new_session = "session_xyz789"
# Normal browser user agent
original_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
# Suspicious command-line tool user agent
malicious_agent = "curl/7.68.0"

# Legitimate user establishes session
threat = detector.instrument(
    "login_attempt", "USER", "another_user", "192.168.1.75",
    context={"success": True, "session_token": new_session, "user_agent": original_agent}
)
print(f"    User login (browser)     | Threat detected: {'YES' if threat else 'NO'}")

# Attacker hijacks with different IP and User-Agent (curl tool)
threat = detector.instrument(
    "api_call", "USER", "another_user", "203.0.113.5",  # Different IP
    context={"endpoint": "/admin", "session_token": new_session, "user_agent": malicious_agent}
)
print(f"    Hijack (curl tool)       | Threat detected: {'YES' if threat else 'NO'}")
print(f"    User-Agent changed from browser to curl - highly suspicious!")




  Scenario 3: Session hijacking - different IP and User-Agent
             (Legitimate user vs. attacker using command-line tools)
    User login (browser)     | Threat detected: NO
    Hijack (curl tool)       | Threat detected: YES
    User-Agent changed from browser to curl - highly suspicious!


In [14]:
# Scenario 4: Rapid session switch (suspicious timing)
print("\n  Scenario 4: Rapid session location switch")

rapid_session = "session_rapid456"

# User activity from office
threat = detector.instrument(
    "api_call", "USER", "mobile_user", "192.168.1.200",
    context={"endpoint": "/data", "session_token": rapid_session, "user_agent": original_agent}
)
print(f"    Office activity   | Threat detected: {'YES' if threat else 'NO'}")

# Same session used from home immediately (suspicious)
threat = detector.instrument(
    "api_call", "USER", "mobile_user", "198.51.100.10",  # Different location
    context={"endpoint": "/files", "session_token": rapid_session, "user_agent": original_agent}
)
print(f"    Rapid location sw | Threat detected: {'YES' if threat else 'NO'}")




  Scenario 4: Rapid session location switch
    Office activity   | Threat detected: NO
    Rapid location sw | Threat detected: YES


In [15]:
# Testing role-aware filtering in business and non-business hours
from datetime import datetime
business_hour_time = datetime.now().replace(hour=10, minute=0, second=0)
non_business_hour_time = datetime.now().replace(hour=23, minute=0, second=0)

# Scenario 4: Rapid session switch (suspicious timing)
print("\n  Scenario 4: Rapid session location switch")

rapid_session = "session_rapid456"

# User activity from office
threat = detector.instrument(
    "api_call", "ADMIN", "mobile_user", "192.168.1.200", business_hour_time,
    context={"endpoint": "/data", "session_token": rapid_session, "user_agent": original_agent}
)
print(f"    Office activity   | Threat detected: {'YES' if threat else 'NO'}")

# Same session used from home immediately (suspicious)
threat = detector.instrument(
    "api_call", "ADMIN", "mobile_user", "198.51.100.10", business_hour_time, # Different location
    context={"endpoint": "/files", "session_token": rapid_session, "user_agent": original_agent}
)
print(f"    Rapid location sw | Threat detected: {'YES' if threat else 'NO'}")

# Scenario 4: Rapid session switch (suspicious timing)
print("\n  Scenario 4: Rapid session location switch")

rapid_session = "session_rapid456"

# User activity from office
threat = detector.instrument(
    "api_call", "ADMIN", "mobile_user", "192.168.1.200", non_business_hour_time,
    context={"endpoint": "/data", "session_token": rapid_session, "user_agent": original_agent}
)
print(f"    Office activity   | Threat detected: {'YES' if threat else 'NO'}")

# Same session used from home immediately (suspicious)
threat = detector.instrument(
    "api_call", "ADMIN", "mobile_user", "198.51.100.10", non_business_hour_time, # Different location
    context={"endpoint": "/files", "session_token": rapid_session, "user_agent": original_agent}
)
print(f"    Rapid location sw | Threat detected: {'YES' if threat else 'NO'}")




  Scenario 4: Rapid session location switch
    Office activity   | Threat detected: NO
    Rapid location sw | Threat detected: NO

  Scenario 4: Rapid session location switch




    Office activity   | Threat detected: YES
    Rapid location sw | Threat detected: YES
