<a href="https://colab.research.google.com/github/ekaterina533/2.6/blob/main/%D0%A1%D0%B8%D1%81%D1%82%D0%B5%D0%BC%D0%B0%20%D0%BC%D0%BE%D0%BD%D0%B8%D1%82%D0%BE%D1%80%D0%B8%D0%BD%D0%B3%D0%B0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Новый раздел

In [1]:
!pip install scapy dpkt pandas numpy scikit-learn matplotlib ipywidgets

Collecting scapy
  Downloading scapy-2.6.1-py3-none-any.whl.metadata (5.6 kB)
Collecting dpkt
  Downloading dpkt-1.9.8-py3-none-any.whl.metadata (1.7 kB)
Downloading scapy-2.6.1-py3-none-any.whl (2.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m17.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dpkt-1.9.8-py3-none-any.whl (194 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m195.0/195.0 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dpkt, scapy
Successfully installed dpkt-1.9.8 scapy-2.6.1


In [25]:
import os
import zipfile
import pandas as pd
import numpy as np
from sklearn.preprocessing import RobustScaler, LabelEncoder
from sklearn.ensemble import IsolationForest
import dpkt
import socket
from collections import defaultdict, Counter
import matplotlib.pyplot as plt
from urllib.request import urlretrieve
import ipywidgets as widgets
from IPython.display import display, clear_output
import time
from typing import List, Dict, Any, Optional

In [26]:
# 1. Класс для анализа трафика
class TrafficAnalyzer:
    def __init__(self):
        self.model = IsolationForest(n_estimators=100, contamination=0.05, random_state=42)
        self.encoders = {}
        self.scaler = RobustScaler()
        self.attack_stats = defaultdict(int)
        self.traffic_stats = defaultdict(int)
        self.feature_columns = []  # Сохраняем список признаков для обучения

    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        """Предобработка данных: кодирование и нормализация"""
        # Сохраняем список числовых признаков
        numeric_cols = ['length', 'ttl', 'src_port', 'dst_port']
        self.feature_columns = [col for col in numeric_cols if col in df.columns]

        # Кодирование категориальных признаков
        for col in ['src_ip', 'dst_ip', 'protocol']:
            if col in df.columns:
                self.encoders[col] = LabelEncoder()
                df[col] = self.encoders[col].fit_transform(df[col].astype(str))
                self.feature_columns.append(col)

        # Нормализация числовых признаков
        if numeric_cols:
            df[numeric_cols] = self.scaler.fit_transform(df[numeric_cols])

        return df

    def train(self, normal_traffic: pd.DataFrame):
        """Обучение модели на нормальном трафике"""
        # Оставляем только те признаки, которые были в обучении
        train_data = normal_traffic[self.feature_columns]
        self.model.fit(train_data)

    def detect_anomalies(self, packets: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Обнаружение аномальных пакетов"""
        features = []
        valid_packets = []

        for pkt in packets:
            feat = self._extract_features(pkt)
            if feat is not None:
                features.append(feat)
                valid_packets.append(pkt)

        if features:
            try:
                # Преобразуем в DataFrame для сохранения порядка признаков
                feature_df = pd.DataFrame(features, columns=self.feature_columns)
                preds = self.model.predict(feature_df)
                return [pkt for pkt, pred in zip(valid_packets, preds) if pred == -1]
            except Exception as e:
                print(f"Ошибка предсказания: {e}")
                return []
        return []

    def _extract_features(self, pkt: Dict[str, Any]) -> Optional[List[float]]:
        """Извлечение признаков из пакета"""
        try:
            features = []

            # Числовые признаки
            for col in ['length', 'ttl', 'src_port', 'dst_port']:
                if col in self.feature_columns:
                    features.append(pkt.get(col, 0))

            # Категориальные признаки
            for col in ['src_ip', 'dst_ip', 'protocol']:
                if col in self.feature_columns:
                    encoder = self.encoders.get(col)
                    if encoder:
                        val = str(pkt.get(col, '0'))
                        # Если значение не было в обучающих данных, используем 0
                        if val in encoder.classes_:
                            features.append(encoder.transform([val])[0])
                        else:
                            features.append(0)
                    else:
                        features.append(0)

            return features
        except Exception as e:
            print(f"Ошибка извлечения признаков: {e}")
            return None


In [27]:
# 1. Класс для анализа трафика
class TrafficAnalyzer:
    def __init__(self):
        self.model = IsolationForest(n_estimators=100, contamination=0.05, random_state=42)
        self.encoders = {}
        self.scaler = RobustScaler()
        self.attack_stats = defaultdict(int)
        self.traffic_stats = defaultdict(int)
        self.feature_columns = []  # Сохраняем список признаков для обучения

    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        """Предобработка данных: кодирование и нормализация"""
        # Сохраняем список числовых признаков
        numeric_cols = ['length', 'ttl', 'src_port', 'dst_port']
        self.feature_columns = [col for col in numeric_cols if col in df.columns]

        # Кодирование категориальных признаков
        for col in ['src_ip', 'dst_ip', 'protocol']:
            if col in df.columns:
                self.encoders[col] = LabelEncoder()
                df[col] = self.encoders[col].fit_transform(df[col].astype(str))
                self.feature_columns.append(col)

        # Нормализация числовых признаков
        if numeric_cols:
            df[numeric_cols] = self.scaler.fit_transform(df[numeric_cols])

        return df

    def train(self, normal_traffic: pd.DataFrame):
        """Обучение модели на нормальном трафике"""
        # Оставляем только те признаки, которые были в обучении
        train_data = normal_traffic[self.feature_columns]
        self.model.fit(train_data)

    def detect_anomalies(self, packets: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Обнаружение аномальных пакетов"""
        features = []
        valid_packets = []

        for pkt in packets:
            feat = self._extract_features(pkt)
            if feat is not None:
                features.append(feat)
                valid_packets.append(pkt)

        if features:
            try:
                # Преобразуем в DataFrame для сохранения порядка признаков
                feature_df = pd.DataFrame(features, columns=self.feature_columns)
                preds = self.model.predict(feature_df)
                return [pkt for pkt, pred in zip(valid_packets, preds) if pred == -1]
            except Exception as e:
                print(f"Ошибка предсказания: {e}")
                return []
        return []

    def _extract_features(self, pkt: Dict[str, Any]) -> Optional[List[float]]:
        """Извлечение признаков из пакета"""
        try:
            features = []

            # Числовые признаки
            for col in ['length', 'ttl', 'src_port', 'dst_port']:
                if col in self.feature_columns:
                    features.append(pkt.get(col, 0))

            # Категориальные признаки
            for col in ['src_ip', 'dst_ip', 'protocol']:
                if col in self.feature_columns:
                    encoder = self.encoders.get(col)
                    if encoder:
                        val = str(pkt.get(col, '0'))
                        # Если значение не было в обучающих данных, используем 0
                        if val in encoder.classes_:
                            features.append(encoder.transform([val])[0])
                        else:
                            features.append(0)
                    else:
                        features.append(0)

            return features
        except Exception as e:
            print(f"Ошибка извлечения признаков: {e}")
            return None

In [18]:
# 3. Парсер PCAP-файлов
def parse_pcap(file_path: str) -> List[Dict[str, Any]]:
    """Парсинг PCAP-файла в список пакетов"""
    packets = []
    with open(file_path, 'rb') as f:
        try:
            pcap = dpkt.pcap.Reader(f)
            for ts, buf in pcap:
                try:
                    eth = dpkt.ethernet.Ethernet(buf)
                    if not isinstance(eth.data, dpkt.ip.IP):
                        continue

                    ip = eth.data
                    transport = ip.data

                    packet = {
                        'timestamp': ts,
                        'src_ip': socket.inet_ntoa(ip.src),
                        'dst_ip': socket.inet_ntoa(ip.dst),
                        'length': ip.len,
                        'ttl': ip.ttl,
                        'protocol': 'other'
                    }

                    if isinstance(transport, dpkt.tcp.TCP):
                        packet.update({
                            'protocol': 'tcp',
                            'src_port': transport.sport,
                            'dst_port': transport.dport
                        })
                    elif isinstance(transport, dpkt.udp.UDP):
                        packet.update({
                            'protocol': 'udp',
                            'src_port': transport.sport,
                            'dst_port': transport.dport
                        })

                    packets.append(packet)
                except Exception as e:
                    continue
        except Exception as e:
            print(f"Ошибка чтения файла {file_path}: {e}")

    return packets

In [19]:
# 4. Интерфейс администратора
class AdminInterface:
    def __init__(self, analyzer: TrafficAnalyzer, manager: TrafficManager):
        self.analyzer = analyzer
        self.manager = manager
        self.setup_ui()

    def setup_ui(self):
        """Инициализация пользовательского интерфейса"""
        self.output = widgets.Output()
        self.recommendation_dropdown = widgets.Dropdown(options=[], description='Действие:')
        self.execute_button = widgets.Button(description="Выполнить", button_style='danger')
        self.execute_button.on_click(self.execute_action)

        display(self.output)

    def show_attack_alert(self, attack_packets: List[Dict[str, Any]]):
        """Отображение предупреждения об атаке"""
        with self.output:
            clear_output()
            print("🚨 Обнаружена потенциальная атака!")
            print(f"Тип атаки: {self.detect_attack_type(attack_packets)}")
            print(f"Источник: {attack_packets[0]['src_ip']}")
            print(f"Количество пакетов: {len(attack_packets)}")
            print("\nРекомендуемые действия:")

            src_ip = attack_packets[0]['src_ip']
            recommendations = self.manager.get_recommendations(
                self.detect_attack_type(attack_packets),
                src_ip
            )
            self.recommendation_dropdown.options = recommendations

            display(self.recommendation_dropdown)
            display(self.execute_button)

    def show_load_alert(self, cpu_load: float, mem_load: float):
        """Отображение предупреждения о нагрузке"""
        with self.output:
            clear_output()
            print("⚖️ Нагрузка на систему:")
            print(f"CPU: {cpu_load}%, MEM: {mem_load}%")
            print("\nРекомендации по балансировке:")

            recommendations = self.manager.get_load_balance_recommendations(cpu_load, mem_load)
            self.recommendation_dropdown.options = recommendations

            display(self.recommendation_dropdown)
            display(self.execute_button)

    def detect_attack_type(self, packets: List[Dict[str, Any]]) -> str:
        """Определение типа атаки"""
        if len(packets) > 1000:
            return "DDoS"
        elif len(set(p['dst_port'] for p in packets if 'dst_port' in p)) > 20:
            return "Port Scan"
        else:
            return "Unknown"

    def execute_action(self, b):
        """Обработка выбранного действия"""
        with self.output:
            action = self.recommendation_dropdown.value
            print(f"Выполняется: {action}...")

            # Имитация выполнения действия
            if "блокиров" in action.lower():
                print(f"⛔ IP заблокирован")
            elif "ограничить" in action.lower():
                print("🐢 Скорость ограничена")
            elif "перенаправить" in action.lower():
                print("🔄 Трафик перенаправлен")
            elif "уведомление" in action.lower():
                print("✉️ Уведомление отправлено")

            time.sleep(1)
            print("✅ Действие выполнено")

In [30]:
# 5. Главная система
class SecuritySystem:
    def __init__(self):
        self.analyzer = TrafficAnalyzer()
        self.manager = TrafficManager()
        self.interface = AdminInterface(self.analyzer, self.manager)

        # Имитация системного мониторинга
        self.cpu_load = 30
        self.mem_load = 45

    def load_dataset(self, pcap_files: List[str]) -> List[Dict[str, Any]]:
        """Загрузка и обработка PCAP-файлов"""
        all_packets = []
        for file in pcap_files:
            print(f"Обработка файла: {os.path.basename(file)}")
            packets = parse_pcap(file)
            print(f"Извлечено пакетов: {len(packets)}")
            all_packets.extend(packets)

        if not all_packets:
            raise ValueError("Не удалось извлечь пакеты из файлов")

        df = pd.DataFrame(all_packets)

        # Заполняем отсутствующие порты нулями
        for col in ['src_port', 'dst_port']:
            if col in df.columns:
                df[col] = df[col].fillna(0).astype(int)

        processed_df = self.analyzer.preprocess(df)

        # Используем 70% данных для обучения
        train_size = int(0.7 * len(processed_df))
        self.analyzer.train(processed_df.iloc[:train_size])

        return all_packets

    def simulate_attacks(self, packets: List[Dict[str, Any]]):
        """Имитация обнаружения атак и высокой нагрузки"""
        # Проверяем первые 500 пакетов (увеличили для лучшего обнаружения)
        test_packets = packets[:2000]

        # Обнаружение аномалий
        attack_packets = self.analyzer.detect_anomalies(test_packets)
        if attack_packets:
            attack_type = self.interface.detect_attack_type(attack_packets)
            self.analyzer.attack_stats[attack_type] += len(attack_packets)
            print(f"Обнаружено {len(attack_packets)} аномальных пакетов ({attack_type})")
            self.interface.show_attack_alert(attack_packets)
        else:
            print("Аномалий не обнаружено")

        # Имитация высокой нагрузки (30% chance)
        if np.random.random() > 0.7:
            self.cpu_load = min(100, self.cpu_load + np.random.randint(20, 50))
            self.mem_load = min(100, self.mem_load + np.random.randint(10, 40))
            print(f"Нагрузка на систему: CPU={self.cpu_load}%, MEM={self.mem_load}%")
            self.interface.show_load_alert(self.cpu_load, self.mem_load)

In [21]:
# 6. Загрузка данных
def download_and_extract_zip():
    zip_url = "https://github.com/westermo/network-traffic-dataset/raw/main/data/extended/pcaps/right.zip"
    zip_path = "right.zip"
    extract_dir = "right"

    if not os.path.exists(extract_dir):
        print("⬇️ Загрузка ZIP-архива...")
        urlretrieve(zip_url, zip_path)

        print("📦 Распаковка архива...")
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_dir)

        os.remove(zip_path)

    return [os.path.join(extract_dir, f) for f in os.listdir(extract_dir) if f.endswith('.pcap')]

In [31]:
# 7. Запуск системы
def main():
    print("🔄 Запуск системы безопасности...")

    try:
        # Загрузка данных
        pcap_files = download_and_extract_zip()
        if not pcap_files:
            print("❌ Не удалось загрузить файлы для анализа")
            return

        # Инициализация системы
        system = SecuritySystem()

        # Загрузка и обработка данных
        print("\n🔍 Загрузка и анализ трафика...")
        packets = system.load_dataset(pcap_files)

        # Анализ трафика
        print("\n🔒 Запуск мониторинга трафика...")
        system.simulate_attacks(packets)

        print("\n✅ Система готова к работе")
        print("Мониторинг трафика и нагрузки выполняется в фоновом режиме...")

    except Exception as e:
        print(f"❌ Критическая ошибка: {str(e)}")

if __name__ == "__main__":
    main()

🔄 Запуск системы безопасности...


Output()


🔍 Загрузка и анализ трафика...
Обработка файла: right.pcap
Извлечено пакетов: 22902

🔒 Запуск мониторинга трафика...
Обнаружено 1926 аномальных пакетов (DDoS)

✅ Система готова к работе
Мониторинг трафика и нагрузки выполняется в фоновом режиме...
