<a href="https://colab.research.google.com/github/ekaterina533/dataset/blob/main/1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Новый раздел

In [None]:
!pip install scapy dpkt pandas numpy scikit-learn matplotlib ipywidgets

Collecting scapy
  Downloading scapy-2.6.1-py3-none-any.whl.metadata (5.6 kB)
Collecting dpkt
  Downloading dpkt-1.9.8-py3-none-any.whl.metadata (1.7 kB)
Collecting jedi>=0.16 (from ipython>=4.0.0->ipywidgets)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading scapy-2.6.1-py3-none-any.whl (2.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m28.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dpkt-1.9.8-py3-none-any.whl (194 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m195.0/195.0 kB[0m [31m12.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m49.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dpkt, scapy, jedi
Successfully installed dpkt-1.9.8 jedi-0.19.2 scapy-2.6.1


In [None]:
%%writefile security_system.py
import os
import zipfile
import pandas as pd
import numpy as np
from sklearn.preprocessing import RobustScaler, LabelEncoder
from sklearn.ensemble import IsolationForest
import dpkt
import socket
from collections import defaultdict, Counter
import matplotlib.pyplot as plt
from urllib.request import urlretrieve
import ipywidgets as widgets
from IPython.display import display, clear_output
import time
from typing import List, Dict, Any, Optional

# 1. Класс для анализа трафика
class TrafficAnalyzer:
    def __init__(self):
        self.model = IsolationForest(n_estimators=100, contamination=0.05, random_state=42)
        self.encoders = {}
        self.scaler = RobustScaler()
        self.attack_stats = defaultdict(int)
        self.traffic_stats = defaultdict(int)
        self.feature_order = ['length', 'ttl', 'src_port', 'dst_port', 'src_ip', 'dst_ip', 'protocol']  # Фиксированный порядок

    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        """Предобработка данных с сохранением порядка признаков"""
        # Заполнение отсутствующих значений
        for col in ['src_port', 'dst_port']:
            if col in df.columns:
                df[col] = df[col].fillna(0).astype(int)

        # Кодирование категориальных признаков
        for col in ['src_ip', 'dst_ip', 'protocol']:
            if col in df.columns:
                self.encoders[col] = LabelEncoder()
                df[col] = self.encoders[col].fit_transform(df[col].astype(str))

        # Нормализация числовых признаков
        numeric_cols = ['length', 'ttl', 'src_port', 'dst_port']
        numeric_cols = [col for col in numeric_cols if col in df.columns]

        if numeric_cols:
            df[numeric_cols] = self.scaler.fit_transform(df[numeric_cols])

        # Возвращаем только нужные признаки в фиксированном порядке
        return df[[col for col in self.feature_order if col in df.columns]]

    def train(self, normal_traffic: pd.DataFrame):
        """Обучение модели с контролем порядка признаков"""
        # Оставляем только нужные столбцы в правильном порядке
        train_data = normal_traffic[[col for col in self.feature_order if col in normal_traffic.columns]]
        self.model.fit(train_data)

    def detect_anomalies(self, packets: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Обнаружение аномальных пакетов"""
        features = []
        valid_packets = []

        for pkt in packets:
            feat = self._extract_features(pkt)
            if feat is not None:
                features.append(feat)
                valid_packets.append(pkt)

        if features:
            try:
                # Создаем DataFrame с правильным порядком признаков
                feature_df = pd.DataFrame(features, columns=[col for col in self.feature_order if col in self.encoders or col in ['length', 'ttl', 'src_port', 'dst_port']])
                preds = self.model.predict(feature_df)
                return [pkt for pkt, pred in zip(valid_packets, preds) if pred == -1]
            except Exception as e:
                print(f"Ошибка предсказания: {str(e)}")
                print(f"Использованные признаки: {feature_df.columns.tolist()}")
                print(f"Ожидаемые признаки: {self.model.feature_names_in_}")
                return []
        return []

    def _extract_features(self, pkt: Dict[str, Any]) -> Optional[List[float]]:
        """Извлечение признаков из пакета"""
        try:
            features = []
            for col in self.feature_order:
                if col in ['length', 'ttl', 'src_port', 'dst_port']:
                    features.append(pkt.get(col, 0))
                elif col in self.encoders:
                    val = str(pkt.get(col, '0'))
                    if val in self.encoders[col].classes_:
                        features.append(self.encoders[col].transform([val])[0])
                    else:
                        features.append(0)
            return features
        except Exception as e:
            print(f"Ошибка извлечения признаков: {str(e)}")
            return None

# 2. Класс для управления трафиком
class TrafficManager:
    def __init__(self):
        self.blocked_ips = set()
        self.rate_limited_ips = {}
        self.load_history = []

    def get_recommendations(self, attack_type, src_ip):
        recommendations = []

        if attack_type == "DDoS":
            recommendations.extend([
                "1. Блокировать IP на firewall",
                "2. Ограничить скорость для этого IP",
                "3. Перенаправить трафик через scrubbing center"
            ])
        elif attack_type == "Port Scan":
            recommendations.extend([
                "1. Закрыть неиспользуемые порты",
                "2. Включить stealth mode",
                "3. Добавить IP в blacklist"
            ])

        recommendations.append("4. Отправить уведомление администратору")
        return recommendations

    def get_load_balance_recommendations(self, cpu_load, mem_load):
        recommendations = []

        if cpu_load > 80:
            recommendations.append("1. Увеличить количество worker-процессов")
            recommendations.append("2. Перенести часть нагрузки на backup-сервер")

        if mem_load > 80:
            recommendations.append("3. Оптимизировать кэширование")
            recommendations.append("4. Увеличить swap-пространство")

        if not recommendations:
            recommendations.append("Система работает в нормальном режиме")

        return recommendations

# 3. Парсер PCAP-файлов
def parse_pcap(file_path: str) -> List[Dict[str, Any]]:
    """Парсинг PCAP-файла в список пакетов"""
    packets = []
    with open(file_path, 'rb') as f:
        try:
            pcap = dpkt.pcap.Reader(f)
            for ts, buf in pcap:
                try:
                    eth = dpkt.ethernet.Ethernet(buf)
                    if not isinstance(eth.data, dpkt.ip.IP):
                        continue

                    ip = eth.data
                    transport = ip.data

                    packet = {
                        'timestamp': ts,
                        'src_ip': socket.inet_ntoa(ip.src),
                        'dst_ip': socket.inet_ntoa(ip.dst),
                        'length': ip.len,
                        'ttl': ip.ttl,
                        'protocol': 'other'
                    }

                    if isinstance(transport, dpkt.tcp.TCP):
                        packet.update({
                            'protocol': 'tcp',
                            'src_port': transport.sport,
                            'dst_port': transport.dport,
                            'flags': transport.flags
                        })
                    elif isinstance(transport, dpkt.udp.UDP):
                        packet.update({
                            'protocol': 'udp',
                            'src_port': transport.sport,
                            'dst_port': transport.dport
                        })

                    packets.append(packet)
                except Exception as e:
                    continue
        except Exception as e:
            print(f"Ошибка чтения файла {file_path}: {str(e)}")

    return packets

# 4. Интерфейс администратора
class AdminInterface:
    def __init__(self, analyzer, manager):
        self.analyzer = analyzer
        self.manager = manager
        self.setup_ui()

    def setup_ui(self):
        self.output = widgets.Output()
        self.recommendation_dropdown = widgets.Dropdown(options=[], description='Действие:')
        self.execute_button = widgets.Button(description="Выполнить")
        self.execute_button.on_click(self.execute_action)

        display(self.output)

    def show_attack_alert(self, attack_packets):
        with self.output:
            clear_output()
            print("🚨 Обнаружена потенциальная атака!")
            print(f"Тип атаки: {self.detect_attack_type(attack_packets)}")
            print("Рекомендуемые действия:")

            src_ip = attack_packets[0]['src_ip']
            recommendations = self.manager.get_recommendations(self.detect_attack_type(attack_packets), src_ip)
            self.recommendation_dropdown.options = recommendations

            display(self.recommendation_dropdown)
            display(self.execute_button)

    def show_load_alert(self, cpu_load, mem_load):
        with self.output:
            clear_output()
            print("⚖️ Нагрузка на систему:")
            print(f"CPU: {cpu_load}%, MEM: {mem_load}%")
            print("Рекомендации по балансировке:")

            recommendations = self.manager.get_load_balance_recommendations(cpu_load, mem_load)
            self.recommendation_dropdown.options = recommendations

            display(self.recommendation_dropdown)
            display(self.execute_button)

    def detect_attack_type(self, packets):
        if len(packets) > 1000:
            return "DDoS"
        elif len(set(p['dst_port'] for p in packets)) > 20:
            return "Port Scan"
        else:
            return "Unknown"

    def execute_action(self, b):
        with self.output:
            action = self.recommendation_dropdown.value
            print(f"Выполняется: {action}...")
            time.sleep(1)
            print("✅ Действие выполнено")

# 5. Главная система
class SecuritySystem:
    def __init__(self):
        self.analyzer = TrafficAnalyzer()
        self.manager = TrafficManager()
        self.interface = AdminInterface(self.analyzer, self.manager)

        # Имитация системного мониторинга
        self.cpu_load = 30
        self.mem_load = 45

    def load_dataset(self, pcap_files: List[str]) -> List[Dict[str, Any]]:
        all_packets = []
        for file in pcap_files:
            print(f"Обработка файла: {os.path.basename(file)}")
            packets = parse_pcap(file)
            print(f"Извлечено пакетов: {len(packets)}")

            # Проверка наличия обязательных полей
            if packets and all(key in packets[0] for key in ['length', 'ttl', 'src_ip', 'dst_ip', 'protocol']):
                all_packets.extend(packets)
            else:
                print(f"Файл {file} не содержит необходимых полей")

        if not all_packets:
            raise ValueError("Не удалось извлечь пакеты с необходимыми полями")

        df = pd.DataFrame(all_packets)
        processed_df = self.analyzer.preprocess(df)

        # Используем 70% данных для обучения
        train_size = int(0.7 * len(processed_df))
        self.analyzer.train(processed_df.iloc[:train_size])

        return all_packets

    def simulate_attacks(self, packets: List[Dict[str, Any]]):
        """Имитация обнаружения атак и высокой нагрузки"""
        # Проверяем первые 500 пакетов (увеличили для лучшего обнаружения)
        test_packets = packets[:1000]

        # Обнаружение аномалий
        attack_packets = self.analyzer.detect_anomalies(test_packets)
        if attack_packets:
            attack_type = self.interface.detect_attack_type(attack_packets)
            self.analyzer.attack_stats[attack_type] += len(attack_packets)
            print(f"Обнаружено {len(attack_packets)} аномальных пакетов ({attack_type})")
            self.interface.show_attack_alert(attack_packets)
        else:
            print("Аномалий не обнаружено")

        # Имитация высокой нагрузки (30% chance)
        if np.random.random() > 0.7:
            self.cpu_load = min(100, self.cpu_load + np.random.randint(20, 60))
            self.mem_load = min(100, self.mem_load + np.random.randint(10, 40))
            print(f"Нагрузка на систему: CPU={self.cpu_load}%, MEM={self.mem_load}%")
            self.interface.show_load_alert(self.cpu_load, self.mem_load)

# 6. Загрузка данных
def download_and_extract_zip():
    zip_url = "https://github.com/westermo/network-traffic-dataset/raw/main/data/extended/pcaps/right.zip"
    zip_path = "right.zip"
    extract_dir = "right"

    if not os.path.exists(extract_dir):
        print("⬇️ Загрузка ZIP-архива...")
        urlretrieve(zip_url, zip_path)

        print("📦 Распаковка архива...")
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_dir)

        os.remove(zip_path)

    return [os.path.join(extract_dir, f) for f in os.listdir(extract_dir) if f.endswith('.pcap')]

# 7. Запуск системы
def main():
    print("🔄 Запуск системы безопасности...")

    # Загрузка данных
    pcap_files = download_and_extract_zip()
    print(f"Найдено {len(pcap_files)} PCAP-файлов")

    # Инициализация системы
    system = SecuritySystem()
    packets = system.load_dataset(pcap_files)

    # Анализ трафика
    print("🔍 Анализ трафика...")
    system.simulate_attacks(packets)

    print("\n✅ Система готова к работе")
    print("Мониторинг трафика и нагрузки выполняется в фоновом режиме...")

if __name__ == "__main__":
    main()


Overwriting security_system.py


In [None]:
%%writefile generate_test_traffic.py
import random
import time
from scapy.all import *
from scapy.layers.inet import IP, TCP, UDP, Ether

def generate_normal_traffic(output_file="normal_traffic.pcap", packet_count=1000):
    """Генерация нормального трафика с Ethernet-заголовками"""
    packets = []
    for i in range(packet_count):
        eth = Ether(src="00:11:22:33:44:55", dst="66:77:88:99:aa:bb")
        ip = IP(src=f"192.168.{random.randint(0, 255)}.{random.randint(1, 254)}",
                dst=f"10.0.{random.randint(0, 255)}.{random.randint(1, 254)}")

        if random.choice([True, False]):
            pkt = eth/ip/TCP(sport=random.randint(1024, 65535),
                            dport=random.choice([80, 443, 22, 21]))
        else:
            pkt = eth/ip/UDP(sport=random.randint(1024, 65535),
                            dport=random.choice([53, 67, 68]))

        packets.append(pkt)

    wrpcap(output_file, packets)
    print(f"Сгенерировано {packet_count} нормальных пакетов в {output_file}")

def generate_ddos_attack(output_file="ddos_attack.pcap", packet_count=300):
    """Генерация DDoS атаки"""
    packets = []
    target_ip = "10.0.0.1"

    for i in range(packet_count):
        eth = Ether(src="00:11:22:33:44:55", dst="66:77:88:99:aa:bb")
        ip = IP(src=f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}",
                dst=target_ip)
        pkt = eth/ip/TCP(dport=80, flags="S")
        packets.append(pkt)

    wrpcap(output_file, packets)
    print(f"Сгенерировано {packet_count} DDoS пакетов в {output_file}")

if __name__ == "__main__":
    generate_normal_traffic()
    generate_ddos_attack()

Overwriting generate_test_traffic.py


In [None]:
%%writefile test_model.py
import os
from security_system import SecuritySystem

def test_model():
    system = SecuritySystem()
    test_files = ["normal_traffic.pcap", "ddos_attack.pcap"]

    for file in test_files:
        if not os.path.exists(file):
            print(f"Файл {file} не найден!")
            return

    print("\n=== Тест нормального трафика ===")
    normal_packets = system.load_dataset(["normal_traffic.pcap"])
    system.simulate_attacks(normal_packets)

    print("\n=== Тест DDoS атаки ===")
    attack_packets = system.load_dataset(["ddos_attack.pcap"])
    system.simulate_attacks(attack_packets)

if __name__ == "__main__":
    test_model()

Overwriting test_model.py


In [None]:
%%writefile perfomance_test.py
import time
import random
import pandas as pd
from security_system import TrafficAnalyzer

def test_performance():
    analyzer = TrafficAnalyzer()

    # Генерация тестовых данных
    data = {
        'src_ip': [f"192.168.1.{i}" for i in range(1, 1001)],
        'dst_ip': [f"10.0.0.{i%100}" for i in range(1, 1001)],
        'protocol': ['tcp' if i%2 else 'udp' for i in range(1, 1001)],
        'length': [random.randint(40, 1500) for _ in range(1000)],
        'ttl': [random.randint(32, 128) for _ in range(1000)],
        'src_port': [random.randint(1024, 65535) for _ in range(1000)],
        'dst_port': [random.choice([80, 443, 22, 53]) for _ in range(1000)]
    }
    df = pd.DataFrame(data)

    # Тест предобработки
    start_time = time.time()
    processed_df = analyzer.preprocess(df)
    preprocess_time = time.time() - start_time

    # Тест обучения
    start_time = time.time()
    analyzer.train(processed_df)
    train_time = time.time() - start_time

    # Тест предсказания
    test_data = df.iloc[:100].to_dict('records')
    start_time = time.time()
    anomalies = analyzer.detect_anomalies(test_data)
    predict_time = time.time() - start_time

    print("\n=== Результаты тестирования производительности ===")
    print(f"Время предобработки (1000 записей): {preprocess_time:.4f} сек")
    print(f"Время обучения модели: {train_time:.4f} сек")
    print(f"Время предсказания (100 записей): {predict_time:.4f} сек")
    print(f"Обнаружено аномалий: {len(anomalies)}")

if __name__ == "__main__":
    test_performance()

Overwriting perfomance_test.py


In [None]:
%%writefile accuracy_test.py
import numpy as np
import pandas as pd
from sklearn.metrics import classification_report
from security_system import TrafficAnalyzer

def test_accuracy():
    # 1. Инициализация анализатора
    analyzer = TrafficAnalyzer()

    # 2. Генерация данных с фиксированным порядком признаков
    num_samples = 2000
    normal_samples = int(num_samples * 0.7)

    # Сначала создаем DataFrame с явным порядком столбцов
    columns_order = ['length', 'ttl', 'src_port', 'dst_port', 'src_ip', 'dst_ip', 'protocol']

    # Нормальный трафик
    normal_data = pd.DataFrame({
        'length': np.clip(np.random.normal(500, 100, normal_samples), 40, 1500).astype(int),
        'ttl': np.random.randint(50, 64, normal_samples),
        'src_port': np.random.randint(1024, 65535, normal_samples),
        'dst_port': np.random.choice([80, 443, 22, 53], normal_samples),
        'src_ip': [f"192.168.1.{i}" for i in range(normal_samples)],
        'dst_ip': [f"10.0.0.{i%10}" for i in range(normal_samples)],
        'protocol': ['tcp' if i%2 else 'udp' for i in range(normal_samples)]
    })[columns_order]  # Явно задаем порядок столбцов

    # Аномальный трафик
    attack_data = pd.DataFrame({
        'length': np.clip(np.random.normal(1500, 10, num_samples-normal_samples), 40, 1500).astype(int),
        'ttl': np.random.randint(32, 40, num_samples-normal_samples),
        'src_port': np.random.randint(1024, 65535, num_samples-normal_samples),
        'dst_port': np.random.randint(1, 1024, num_samples-normal_samples),
        'src_ip': [f"172.16.1.{i}" for i in range(num_samples-normal_samples)],
        'dst_ip': ["10.0.0.1"] * (num_samples-normal_samples),
        'protocol': ['tcp'] * (num_samples-normal_samples)
    })[columns_order]  # Тот же порядок столбцов

    # Объединение данных
    df = pd.concat([normal_data, attack_data], ignore_index=True)
    labels = np.array([1]*normal_samples + [-1]*(num_samples-normal_samples))

    # 3. Предобработка
    processed_df = analyzer.preprocess(df)

    # 4. Проверка порядка признаков после предобработки
    print("Порядок признаков после предобработки:", processed_df.columns.tolist())

    # 5. Разделение на train/test
    train_size = int(0.7 * len(processed_df))
    X_train = processed_df.iloc[:train_size]
    X_test = processed_df.iloc[train_size:]
    y_train = labels[:train_size]
    y_test = labels[train_size:]

    # 6. Обучение модели
    analyzer.train(X_train)

    # 7. Проверка feature_names_in_ в модели
    if hasattr(analyzer.model, 'feature_names_in_'):
        print("Признаки модели:", analyzer.model.feature_names_in_)

    # 8. Предсказание с явным контролем порядка признаков
    try:
        preds = analyzer.model.predict(X_test[analyzer.model.feature_names_in_])
        print("\n=== Отчет о точности модели ===")
        print(classification_report(y_test, preds,
                                 target_names=['Аномалия', 'Нормальный']))
    except Exception as e:
        print("\n=== Ошибка предсказания ===")
        print(str(e))
        print("\nСравнение признаков:")
        print("Ожидалось:", analyzer.model.feature_names_in_)
        print("Фактически:", X_test.columns.tolist())

if __name__ == "__main__":
    test_accuracy()

Overwriting accuracy_test.py


In [None]:
%%writefile integration_test.py
import os
from security_system import SecuritySystem

def run_integration_test():
    print("=== Запуск интеграционного теста системы безопасности ===")

    # 1. Инициализация системы
    system = SecuritySystem()
    print("[✓] Система инициализирована")

    # 2. Загрузка тестовых данных
    test_files = ["normal_traffic.pcap", "ddos_attack.pcap"]
    missing_files = [f for f in test_files if not os.path.exists(f)]

    if missing_files:
        print(f"[×] Отсутствуют тестовые файлы: {missing_files}")
        print("Сначала запустите generate_test_traffic.py")
        return

    try:
        packets = system.load_dataset(test_files)
        print("[✓] Тестовые данные успешно загружены")
    except Exception as e:
        print(f"[×] Ошибка загрузки данных: {str(e)}")
        return

    # 3. Тестирование обнаружения атак
    print("\nТестирование обнаружения атак...")
    system.simulate_attacks(packets)

    # 4. Проверка статистики
    print("\n=== Статистика обнаружения ===")
    for attack_type, count in system.analyzer.attack_stats.items():
        print(f"{attack_type}: {count} пакетов")

    print("\n=== Интеграционный тест завершен ===")

if __name__ == "__main__":
    run_integration_test()

Overwriting integration_test.py


In [None]:
!python generate_test_traffic.py

Сгенерировано 1000 нормальных пакетов в normal_traffic.pcap
Сгенерировано 300 DDoS пакетов в ddos_attack.pcap


In [None]:
!python test_model.py

Output()

=== Тест нормального трафика ===
Обработка файла: normal_traffic.pcap
Извлечено пакетов: 1000
Обнаружено 901 аномальных пакетов (Unknown)
🚨 Обнаружена потенциальная атака!
Тип атаки: Unknown
Рекомендуемые действия:
Dropdown(description='Действие:', options=('4. Отправить уведомление администратору',), value='4. Отправить уведомление администратору')
Button(description='Выполнить', style=ButtonStyle())

=== Тест DDoS атаки ===
Обработка файла: ddos_attack.pcap
Извлечено пакетов: 300
Обнаружено 18 аномальных пакетов (Unknown)
🚨 Обнаружена потенциальная атака!
Тип атаки: Unknown
Рекомендуемые действия:
Dropdown(description='Действие:', options=('4. Отправить уведомление администратору',), value='4. Отправить уведомление администратору')
Button(description='Выполнить', style=ButtonStyle())
Нагрузка на систему: CPU=52%, MEM=83%
⚖️ Нагрузка на систему:
CPU: 52%, MEM: 83%
Рекомендации по балансировке:
Dropdown(description='Действие:', options=('3. Оптимизировать кэширование', '4. Ув

In [None]:
!python perfomance_test.py


=== Результаты тестирования производительности ===
Время предобработки (1000 записей): 0.0121 сек
Время обучения модели: 0.2000 сек
Время предсказания (100 записей): 0.0144 сек
Обнаружено аномалий: 52


In [None]:
!python accuracy_test.py

Порядок признаков после предобработки: ['length', 'ttl', 'src_port', 'dst_port', 'src_ip', 'dst_ip', 'protocol']
Признаки модели: ['length' 'ttl' 'src_port' 'dst_port' 'src_ip' 'dst_ip' 'protocol']

=== Отчет о точности модели ===

=== Ошибка предсказания ===
Number of classes, 1, does not match size of target_names, 2. Try specifying the labels parameter

Сравнение признаков:
Ожидалось: ['length' 'ttl' 'src_port' 'dst_port' 'src_ip' 'dst_ip' 'protocol']
Фактически: ['length', 'ttl', 'src_port', 'dst_port', 'src_ip', 'dst_ip', 'protocol']


In [None]:
!python integration_test.py

=== Запуск интеграционного теста системы безопасности ===
Output()
[✓] Система инициализирована
Обработка файла: normal_traffic.pcap
Извлечено пакетов: 1000
Обработка файла: ddos_attack.pcap
Извлечено пакетов: 300
[✓] Тестовые данные успешно загружены

Тестирование обнаружения атак...
Обнаружено 799 аномальных пакетов (Unknown)
🚨 Обнаружена потенциальная атака!
Тип атаки: Unknown
Рекомендуемые действия:
Dropdown(description='Действие:', options=('4. Отправить уведомление администратору',), value='4. Отправить уведомление администратору')
Button(description='Выполнить', style=ButtonStyle())

=== Статистика обнаружения ===
Unknown: 799 пакетов

=== Интеграционный тест завершен ===


In [None]:
!python security_system.py

🔄 Запуск системы безопасности...
Найдено 1 PCAP-файлов
Output()
Обработка файла: right.pcap
Извлечено пакетов: 22902
🔍 Анализ трафика...
Обнаружено 927 аномальных пакетов (Port Scan)
🚨 Обнаружена потенциальная атака!
Тип атаки: Port Scan
Рекомендуемые действия:
Dropdown(description='Действие:', options=('1. Закрыть неиспользуемые порты', '2. Включить stealth mode', '3. Добавить IP в blacklist', '4. Отправить уведомление администратору'), value='1. Закрыть неиспользуемые порты')
Button(description='Выполнить', style=ButtonStyle())
Нагрузка на систему: CPU=59%, MEM=72%
⚖️ Нагрузка на систему:
CPU: 59%, MEM: 72%
Рекомендации по балансировке:
Dropdown(description='Действие:', options=('Система работает в нормальном режиме',), value='Система работает в нормальном режиме')
Button(description='Выполнить', style=ButtonStyle())

✅ Система готова к работе
Мониторинг трафика и нагрузки выполняется в фоновом режиме...
