# Intrusion Prevention System (IPS) based on this: https://github.com/TheKreator666/Ekpa-ai-cyber-security-course/blob/main/IDS_colab_notebook.ipynb

In [48]:
# ------------------ Imports ------------------
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.utils import class_weight
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Input, Conv1D, GlobalMaxPooling1D, Dropout, Dense
import threading
import json

# ------------------ IPS Configuration ------------------
class IPSConfig:
    def __init__(self):
        self.actions = {
            'BLOCK_TRAFFIC': True,
            'QUARANTINE_DEVICE': True,
            'ALERT_ADMIN': True,
            'ADJUST_FIREWALL': True,
            'LOG_INCIDENT': True
        }

        self.threat_levels = {
            'CRITICAL': ['Syn', 'WebDDoS', 'DrDoS_NTP', 'DrDoS_SSDP'],
            'HIGH': ['DrDoS_UDP', 'DrDoS_NetBIOS', 'DrDoS_DNS'],
            'MEDIUM': ['UDP-lag', 'TFTP', 'DrDoS_MSSQL'],
            'LOW': ['DrDoS_SNMP']
        }

        self.response_rules = {
            'CRITICAL': ['BLOCK_TRAFFIC', 'QUARANTINE_DEVICE', 'ALERT_ADMIN', 'ADJUST_FIREWALL'],
            'HIGH': ['BLOCK_TRAFFIC', 'ALERT_ADMIN', 'ADJUST_FIREWALL'],
            'MEDIUM': ['ALERT_ADMIN', 'ADJUST_FIREWALL'],
            'LOW': ['ALERT_ADMIN', 'LOG_INCIDENT']
        }

# ------------------ IPS Action Engine ------------------
class IPSActionEngine:
    def __init__(self, config):
        self.config = config
        self.incident_log = []
        self.blocked_ips = set()
        self.quarantined_devices = set()

    def determine_threat_level(self, attack_type):
        for level, attacks in self.config.threat_levels.items():
            if attack_type in attacks:
                return level
        return 'LOW'

    def execute_actions(self, packet_data, predicted_class, confidence, original_label):
        attack_type = list(label_map.keys())[list(label_map.values()).index(predicted_class)]
        threat_level = self.determine_threat_level(attack_type)
        actions_to_take = self.config.response_rules.get(threat_level, ['LOG_INCIDENT'])

        # Extract source IP (assuming it's in the first few features)
        source_ip = f"192.168.{int(packet_data[0] * 255)}.{int(packet_data[1] * 255)}"

        incident = {
            'timestamp': datetime.now().isoformat(),
            'source_ip': source_ip,
            'attack_type': attack_type,
            'threat_level': threat_level,
            'confidence': float(confidence),
            'predicted_class': int(predicted_class),
            'original_label': int(original_label),
            'actions_taken': []
        }

        # Execute actions based on threat level
        for action in actions_to_take:
            if self.config.actions.get(action, False):
                try:
                    if action == 'BLOCK_TRAFFIC':
                        self.block_traffic(source_ip)
                        incident['actions_taken'].append('BLOCK_TRAFFIC')

                    elif action == 'QUARANTINE_DEVICE':
                        self.quarantine_device(source_ip)
                        incident['actions_taken'].append('QUARANTINE_DEVICE')

                    elif action == 'ALERT_ADMIN':
                        self.alert_admin(incident)
                        incident['actions_taken'].append('ALERT_ADMIN')

                    elif action == 'ADJUST_FIREWALL':
                        self.adjust_firewall_rules(attack_type)
                        incident['actions_taken'].append('ADJUST_FIREWALL')

                    elif action == 'LOG_INCIDENT':
                        incident['actions_taken'].append('LOG_INCIDENT')

                except Exception as e:
                    print(f"Error executing action {action}: {e}")

        self.incident_log.append(incident)
        return incident

    def block_traffic(self, ip_address):
        """Simulate blocking traffic from an IP"""
        self.blocked_ips.add(ip_address)
        print(f"🚫 BLOCKED traffic from {ip_address}")
        # In real implementation: iptables -A INPUT -s {ip_address} -j DROP

    def quarantine_device(self, ip_address):
        """Simulate quarantining a device"""
        self.quarantined_devices.add(ip_address)
        print(f"🔒 QUARANTINED device {ip_address}")
        # In real implementation: isolate device from network

    def alert_admin(self, incident):
        """Simulate alerting administrator"""
        message = f"🚨 ALERT: {incident['attack_type']} detected from {incident['source_ip']} " \
                 f"(Confidence: {incident['confidence']:.2%}, Threat: {incident['threat_level']})"
        print(message)
        # In real implementation: send email/SMS/notification

    def adjust_firewall_rules(self, attack_type):
        """Simulate dynamic firewall rule adjustment"""
        print(f"🛡️  Adjusting firewall rules for {attack_type} protection")
        # In real implementation: update firewall rules based on attack type

    def get_stats(self):
        return {
            'total_incidents': len(self.incident_log),
            'blocked_ips': len(self.blocked_ips),
            'quarantined_devices': len(self.quarantined_devices),
            'recent_incidents': self.incident_log[-5:] if self.incident_log else []
        }

# ------------------ Real-time IPS Monitor ------------------
class RealTimeIPS:
    def __init__(self, model, label_map, action_engine):
        self.model = model
        self.label_map = label_map
        self.action_engine = action_engine
        self.is_monitoring = False

    def start_monitoring(self, X_test, y_test, interval=1.0):
        """Simulate real-time monitoring on test data"""
        self.is_monitoring = True
        print("🟢 Starting real-time IPS monitoring...")

        for i, (packet, true_label) in enumerate(zip(X_test, y_test)):
            if not self.is_monitoring:
                break

            # Reshape for prediction
            packet_reshaped = np.expand_dims(np.expand_dims(packet, axis=-1), axis=0)

            # Predict
            prediction = self.model.predict(packet_reshaped, verbose=0)
            predicted_class = np.argmax(prediction)
            confidence = np.max(prediction)

            # Only take action if not BENIGN (assuming BENIGN is class 1)
            if predicted_class != 1 and confidence > 0.7:  # Confidence threshold
                incident = self.action_engine.execute_actions(
                    packet, predicted_class, confidence, true_label
                )

                # Print brief incident report
                print(f"📋 Incident #{len(self.action_engine.incident_log)}: " \
                      f"{incident['attack_type']} from {incident['source_ip']}")

            time.sleep(interval)

            # Print progress every 100 packets
            if (i + 1) % 100 == 0:
                print(f"📊 Processed {i + 1} packets...")

    def stop_monitoring(self):
        self.is_monitoring = False
        print("🔴 Stopping IPS monitoring")

# ------------------ Main Execution ------------------
if __name__ == "__main__":
    # Load and preprocess data (same as before)
    url = 'https://raw.githubusercontent.com/kdemertzis/EKPA/main/Data/pcap_data.csv'
    df = pd.read_csv(url)

    # ... [Data loading and preprocessing code from previous implementation] ...
    # [Include all the data loading, cleaning, and preprocessing code here]

    # After model training, set up IPS
    config = IPSConfig()
    action_engine = IPSActionEngine(config)

    # Initialize real-time IPS
    real_time_ips = RealTimeIPS(model, label_map, action_engine)

    # Start monitoring in a separate thread
    monitoring_thread = threading.Thread(
        target=real_time_ips.start_monitoring,
        args=(X_test[:1000], y_test[:1000], 0.1)  # Test on first 1000 samples
    )
    monitoring_thread.daemon = True
    monitoring_thread.start()

    # Let it run for a while, then stop
    try:
        time.sleep(30)  # Run for 30 seconds
    except KeyboardInterrupt:
        pass
    finally:
        real_time_ips.stop_monitoring()
        monitoring_thread.join()

    # Print final statistics
    stats = action_engine.get_stats()
    print("\n" + "="*50)
    print("📊 IPS FINAL STATISTICS")
    print("="*50)
    print(f"Total incidents detected: {stats['total_incidents']}")
    print(f"IP addresses blocked: {stats['blocked_ips']}")
    print(f"Devices quarantined: {stats['quarantined_devices']}")

    print("\nRecent incidents:")
    for incident in stats['recent_incidents']:
        print(f"  - {incident['timestamp']}: {incident['attack_type']} " \
              f"from {incident['source_ip']} ({incident['threat_level']})")

    # Save incident log
    with open('ips_incident_log.json', 'w') as f:
        json.dump(action_engine.incident_log, f, indent=2)

    print("\n✅ IPS simulation completed. Incident log saved to 'ips_incident_log.json'")

# ------------------ Additional IPS Features ------------------
def generate_security_report(action_engine):
    """Generate a comprehensive security report"""
    report = {
        'summary': {
            'total_incidents': len(action_engine.incident_log),
            'unique_attack_types': len(set(inc['attack_type'] for inc in action_engine.incident_log)),
            'time_period': {
                'start': action_engine.incident_log[0]['timestamp'] if action_engine.incident_log else None,
                'end': action_engine.incident_log[-1]['timestamp'] if action_engine.incident_log else None
            }
        },
        'threat_distribution': {},
        'action_statistics': {
            'BLOCK_TRAFFIC': 0,
            'QUARANTINE_DEVICE': 0,
            'ALERT_ADMIN': 0,
            'ADJUST_FIREWALL': 0
        }
    }

    # Count threat distribution
    for incident in action_engine.incident_log:
        attack_type = incident['attack_type']
        report['threat_distribution'][attack_type] = report['threat_distribution'].get(attack_type, 0) + 1

        # Count actions
        for action in incident['actions_taken']:
            if action in report['action_statistics']:
                report['action_statistics'][action] += 1

    return report

# Example usage after monitoring:
# security_report = generate_security_report(action_engine)
# print("Security Report:", json.dumps(security_report, indent=2))

🟢 Starting real-time IPS monitoring...
📊 Processed 100 packets...
🔴 Stopping IPS monitoring

📊 IPS FINAL STATISTICS
Total incidents detected: 0
IP addresses blocked: 0
Devices quarantined: 0

Recent incidents:

✅ IPS simulation completed. Incident log saved to 'ips_incident_log.json'
