<a href="https://colab.research.google.com/github/chaarumathi1/Intrusion-Detection-System/blob/main/IDS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install scikit-learn



In [None]:
!pip install scapy
!pip install python-nmap
!pip install numpy

Collecting scapy
  Downloading scapy-2.6.1-py3-none-any.whl.metadata (5.6 kB)
Downloading scapy-2.6.1-py3-none-any.whl (2.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m15.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: scapy
Successfully installed scapy-2.6.1
Collecting python-nmap
  Downloading python-nmap-0.7.1.tar.gz (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: python-nmap
  Building wheel for python-nmap (setup.py) ... [?25l[?25hdone
  Created wheel for python-nmap: filename=python_nmap-0.7.1-py2.py3-none-any.whl size=20634 sha256=2eda29e3b7a1cd8164551b3f4e7db703b45bed178d68f988fd626a737b7128cd
  Stored in directory: /root/.cache/pip/wheels/4a/8c/1a/aaade88fbb18b99e001cea0921931af9c05bca4c4a72868b51
Successfully built python-nmap
Installin

In [None]:
from scapy.all import sniff, IP, TCP
from collections import defaultdict
import threading
import queue

class PacketCapture:
    def __init__(self):
        self.packet_queue = queue.Queue()
        self.stop_capture = threading.Event()

    def packet_callback(self, packet):
        if IP in packet and TCP in packet:
            self.packet_queue.put(packet)

    def start_capture(self, interface="eth0"):
        def capture_thread():
            sniff(iface=interface,
                  prn=self.packet_callback,
                  store=0,
                  stop_filter=lambda _: self.stop_capture.is_set())

        self.capture_thread = threading.Thread(target=capture_thread)
        self.capture_thread.start()

    def stop(self):
        self.stop_capture.set()
        self.capture_thread.join()

In [None]:
from scapy.all import sniff, IP, TCP
from collections import defaultdict
import threading
import queue

class PacketCapture:
    def __init__(self):
        self.packet_queue = queue.Queue()
        self.stop_capture = threading.Event()

    def packet_callback(self, packet):
        if IP in packet and TCP in packet:
            self.packet_queue.put(packet)

    def start_capture(self, interface="eth0"):
        def capture_thread():
            sniff(iface=interface,
                  prn=self.packet_callback,
                  store=0,
                  stop_filter=lambda _: self.stop_capture.is_set())

        self.capture_thread = threading.Thread(target=capture_thread)
        self.capture_thread.start()

    def stop(self):
        self.stop_capture.set()
        self.capture_thread.join()

class TrafficAnalyzer:
    def __init__(self):
        self.connections = defaultdict(list)
        self.flow_stats = defaultdict(lambda: {
            'packet_count': 0,
            'byte_count': 0,
            'start_time': None,
            'last_time': None
        })

    def analyze_packet(self, packet):
        if IP in packet and TCP in packet:
            ip_src = packet[IP].src
            ip_dst = packet[IP].dst
            port_src = packet[TCP].sport
            port_dst = packet[TCP].dport

            flow_key = (ip_src, ip_dst, port_src, port_dst)

            # Update flow statistics
            stats = self.flow_stats[flow_key]
            stats['packet_count'] += 1
            stats['byte_count'] += len(packet)
            current_time = packet.time

            if not stats['start_time']:
                stats['start_time'] = current_time
            stats['last_time'] = current_time

            return self.extract_features(packet, stats)

    def extract_features(self, packet, stats):
        flow_duration = stats['last_time'] - stats['start_time']
        packet_rate = stats['packet_count'] / flow_duration if flow_duration > 0 else 0
        byte_rate = stats['byte_count'] / flow_duration if flow_duration > 0 else 0

        return {
            'packet_size': len(packet),
            'flow_duration': flow_duration,
            'packet_rate': packet_rate,
            'byte_rate': byte_rate,
            'tcp_flags': packet[TCP].flags,
            'window_size': packet[TCP].window
        }

In [None]:
from sklearn.ensemble import IsolationForest
import numpy as np

class DetectionEngine:
    def __init__(self):
        self.anomaly_detector = IsolationForest(
            contamination=0.1,
            random_state=42
        )
        self.signature_rules = self.load_signature_rules()
        self.training_data = []

    def load_signature_rules(self):
        return {
            'syn_flood': {
                'condition': lambda features: (
                    features['tcp_flags'] == 2 and  # SYN flag
                    features['packet_rate'] > 100
                )
            },
            'port_scan': {
                'condition': lambda features: (
                    features['packet_size'] < 100 and
                    features['packet_rate'] > 50
                )
            }
        }

    def train_anomaly_detector(self, normal_traffic_data):
        self.anomaly_detector.fit(normal_traffic_data)

    def detect_threats(self, features):
        threats = []

        # Signature-based detection
        for rule_name, rule in self.signature_rules.items():
            if rule['condition'](features):
                threats.append({
                    'type': 'signature',
                    'rule': rule_name,
                    'confidence': 1.0
                })

        # Anomaly-based detection
        feature_vector = np.array([[
            features['packet_size'],
            features['packet_rate'],
            features['byte_rate']
        ]])

        anomaly_score = self.anomaly_detector.score_samples(feature_vector)[0]
        if anomaly_score < -0.5:  # Threshold for anomaly detection
            threats.append({
                'type': 'anomaly',
                'score': anomaly_score,
                'confidence': min(1.0, abs(anomaly_score))
            })

        return threats

In [None]:
import logging
import json
from datetime import datetime

class AlertSystem:
    def __init__(self, log_file="ids_alerts.log"):
        self.logger = logging.getLogger("IDS_Alerts")
        self.logger.setLevel(logging.INFO)

        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def generate_alert(self, threat, packet_info):
        alert = {
            'timestamp': datetime.now().isoformat(),
            'threat_type': threat['type'],
            'source_ip': packet_info.get('source_ip'),
            'destination_ip': packet_info.get('destination_ip'),
            'confidence': threat.get('confidence', 0.0),
            'details': threat
        }

        self.logger.warning(json.dumps(alert))

        if threat['confidence'] > 0.8:
            self.logger.critical(
                f"High confidence threat detected: {json.dumps(alert)}"
            )
            # Implement additional notification methods here
            # (e.g., email, Slack, SIEM integration)

In [None]:
class IntrusionDetectionSystem:
    def __init__(self, interface="eth0"):
        self.packet_capture = PacketCapture()
        self.traffic_analyzer = TrafficAnalyzer()
        self.detection_engine = DetectionEngine()
        self.alert_system = AlertSystem()

        self.interface = interface

        # You need to train the anomaly detector with normal traffic data here
        # Example:
        # normal_data = [...] # Your normal traffic data
        # self.detection_engine.train_anomaly_detector(normal_data)

        # Generating sample normal traffic data for demonstration
        # Replace this with your actual data loading and preprocessing
        import numpy as np
        sample_normal_data = np.random.rand(100, 3) # Sample data with 100 samples and 3 features
        self.detection_engine.train_anomaly_detector(sample_normal_data)


    def start(self):
        print(f"Starting IDS on interface {self.interface}")
        self.packet_capture.start_capture(self.interface)

        while True:
            try:
                packet = self.packet_capture.packet_queue.get(timeout=1)
                features = self.traffic_analyzer.analyze_packet(packet)

                if features:
                    threats = self.detection_engine.detect_threats(features)

                    for threat in threats:
                        packet_info = {
                            'source_ip': packet[IP].src,
                            'destination_ip': packet[IP].dst,
                            'source_port': packet[TCP].sport,
                            'destination_port': packet[TCP].dport
                        }
                        self.alert_system.generate_alert(threat, packet_info)

            except queue.Empty:
                continue
            except KeyboardInterrupt:
                print("Stopping IDS...")
                self.packet_capture.stop()
                break

if __name__ == "__main__":
    ids = IntrusionDetectionSystem()
    ids.start()

CRITICAL:IDS_Alerts:High confidence threat detected: {"timestamp": "2025-07-17T03:22:48.423191", "threat_type": "signature", "source_ip": "172.28.0.1", "destination_ip": "172.28.0.12", "confidence": 1.0, "details": {"type": "signature", "rule": "port_scan", "confidence": 1.0}}


Starting IDS on interface eth0


CRITICAL:IDS_Alerts:High confidence threat detected: {"timestamp": "2025-07-17T03:22:48.467020", "threat_type": "signature", "source_ip": "172.28.0.1", "destination_ip": "172.28.0.12", "confidence": 1.0, "details": {"type": "signature", "rule": "port_scan", "confidence": 1.0}}
CRITICAL:IDS_Alerts:High confidence threat detected: {"timestamp": "2025-07-17T03:22:48.509754", "threat_type": "signature", "source_ip": "172.28.0.1", "destination_ip": "172.28.0.12", "confidence": 1.0, "details": {"type": "signature", "rule": "port_scan", "confidence": 1.0}}
CRITICAL:IDS_Alerts:High confidence threat detected: {"timestamp": "2025-07-17T03:22:48.552635", "threat_type": "signature", "source_ip": "172.28.0.1", "destination_ip": "172.28.0.12", "confidence": 1.0, "details": {"type": "signature", "rule": "port_scan", "confidence": 1.0}}
CRITICAL:IDS_Alerts:High confidence threat detected: {"timestamp": "2025-07-17T03:22:48.586218", "threat_type": "signature", "source_ip": "172.28.0.1", "destination_i

Stopping IDS...


In [None]:
from scapy.all import IP, TCP

def test_ids():
    # Create test packets to simulate various scenarios
    test_packets = [
        # Normal traffic
        IP(src="192.168.1.1", dst="192.168.1.2") / TCP(sport=1234, dport=80, flags="A"),
        IP(src="192.168.1.3", dst="192.168.1.4") / TCP(sport=1235, dport=443, flags="P"),

        # SYN flood simulation
        IP(src="10.0.0.1", dst="192.168.1.2") / TCP(sport=5678, dport=80, flags="S"),
        IP(src="10.0.0.2", dst="192.168.1.2") / TCP(sport=5679, dport=80, flags="S"),
        IP(src="10.0.0.3", dst="192.168.1.2") / TCP(sport=5680, dport=80, flags="S"),

        # Port scan simulation
        IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=22, flags="S"),
        IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=23, flags="S"),
        IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=25, flags="S"),
    ]

    ids = IntrusionDetectionSystem()

    # Simulate packet processing and threat detection
    print("Starting IDS Test...")
    for i, packet in enumerate(test_packets, 1):
        print(f"\nProcessing packet {i}: {packet.summary()}")

        # Analyze the packet
        features = ids.traffic_analyzer.analyze_packet(packet)

        if features:
            # Detect threats based on features
            threats = ids.detection_engine.detect_threats(features)

            if threats:
                print(f"Detected threats: {threats}")
            else:
                print("No threats detected.")
        else:
            print("Packet does not contain IP/TCP layers or is ignored.")

    print("\nIDS Test Completed.")

if __name__ == "__main__":
    test_ids()

Starting IDS Test...

Processing packet 1: IP / TCP 192.168.1.1:1234 > 192.168.1.2:80 A
Detected threats: [{'type': 'anomaly', 'score': np.float64(-0.6392552151919073), 'confidence': np.float64(0.6392552151919073)}]

Processing packet 2: IP / TCP 192.168.1.3:1235 > 192.168.1.4:443 P
Detected threats: [{'type': 'anomaly', 'score': np.float64(-0.6392552151919073), 'confidence': np.float64(0.6392552151919073)}]

Processing packet 3: IP / TCP 10.0.0.1:5678 > 192.168.1.2:80 S
Detected threats: [{'type': 'anomaly', 'score': np.float64(-0.6392552151919073), 'confidence': np.float64(0.6392552151919073)}]

Processing packet 4: IP / TCP 10.0.0.2:5679 > 192.168.1.2:80 S
Detected threats: [{'type': 'anomaly', 'score': np.float64(-0.6392552151919073), 'confidence': np.float64(0.6392552151919073)}]

Processing packet 5: IP / TCP 10.0.0.3:5680 > 192.168.1.2:80 S
Detected threats: [{'type': 'anomaly', 'score': np.float64(-0.6392552151919073), 'confidence': np.float64(0.6392552151919073)}]

Processing 