In [None]:
import polars as pl

'''загрузка  из CVS'''
df=pl.read_parquet("NF-CSE-CIC-IDS2018-V2.parquet")
print(df)





Задания (пошагово)
1. Загрузка и первичный анализ:
Загрузите файл NF-CSE-CIC-IDS2018-V2.parquet в Polars DataFrame.
Выведите:
форму датасета (количество строк и колонок);
типы колонок (последние пять);
уникальные значения в колонке Label;
уникальные значения в колонке Attack.

In [None]:
# Вывод формы датасета
print(f"Форма датасета: {df.shape}")
print(f"Количество строк: {df.shape[0]:,}")
print(f"Количество колонок: {df.shape[1]}")

# Дополнительная информация для первичного анализа
print("\nПервые 5 строк:")
print(df.head())

print("\nИнформация о колонках:")
print(df.schema)

print("\nСтатистика по целевым колонкам:")
print(df.select(['Label', 'Attack']).describe())

print("\nРаспределение меток Label (0 = Benign, 1 = Attack):")
print(df['Label'].value_counts())

print("\nРаспределение типов атак (первые 10):")
print(df['Attack'].value_counts().head(10))

print("\nТипы протоколов:")
print(df['PROTOCOL'].value_counts())

2. Распределение по меткам:
Подсчитайте:
сколько записей помечено как Label == 0 ("Benign");
сколько записей помечено как Label == 1 ("Attack").
Выведите процентное соотношение.

In [None]:

# Подсчет количества записей по меткам
label_counts = df['Label'].value_counts()

# Извлечение значений
benign_count = label_counts.filter(pl.col('Label') == 0)['count'].item()
attack_count = label_counts.filter(pl.col('Label') == 1)['count'].item()
total = len(df)

# Расчет процентного соотношения
benign_percent = (benign_count / total) * 100
attack_percent = (attack_count / total) * 100

# Вывод результатов
print("=" * 50)
print("РАСПРЕДЕЛЕНИЕ ПО МЕТКАМ LABEL")
print("=" * 50)
print(f"Всего записей: {total:,}")
print()
print(f"Label == 0 (Benign - нормальный трафик):")
print(f"  Количество: {benign_count:,}")
print(f"  Процент: {benign_percent:.2f}%")
print()
print(f"Label == 1 (Attack - атаки):")
print(f"  Количество: {attack_count:,}")
print(f"  Процент: {attack_percent:.2f}%")
print("=" * 50)


3. Создание бинарного признака:
Добавьте колонку is_attack, которая:
1, если Label != 0 (атака);
0, если Label == 0 (норма).

In [None]:


# Создание бинарного признака is_attack
df = df.with_columns(
    (pl.col('Label') != 0).cast(pl.Int8).alias('is_attack')
)

# Проверка результатов
print("=" * 50)
print("ПРОВЕРКА СОЗДАНИЯ ПРИЗНАКА is_attack")
print("=" * 50)

# Вывод первых 10 строк для сравнения
print("Первые 10 строк (сравнение Label и is_attack):")
print(df.select(['Label', 'is_attack', 'Attack']).head(10))

# Проверка соответствия значений
print("\n" + "=" * 50)
print("ПРОВЕРКА СООТВЕТСТВИЯ ЗНАЧЕНИЙ")
print("=" * 50)

# Проверка для Label = 0
check_benign = df.filter(pl.col('Label') == 0).select([
    pl.col('is_attack').min().alias('min_is_attack'),
    pl.col('is_attack').max().alias('max_is_attack'),
    pl.col('is_attack').mean().alias('mean_is_attack')
])
print("Для Label = 0 (Benign):")
print(check_benign)
print("✓ Все is_attack должны быть 0")

# Проверка для Label = 1
check_attack = df.filter(pl.col('Label') == 1).select([
    pl.col('is_attack').min().alias('min_is_attack'),
    pl.col('is_attack').max().alias('max_is_attack'),
    pl.col('is_attack').mean().alias('mean_is_attack')
])
print("\nДля Label = 1 (Attack):")
print(check_attack)
print("✓ Все is_attack должны быть 1")

# Проверка уникальных комбинаций
print("\n" + "=" * 50)
print("УНИКАЛЬНЫЕ КОМБИНАЦИИ LABEL И IS_ATTACK")
print("=" * 50)
print(df.group_by(['Label', 'is_attack']).agg([
    pl.len().alias('count')
]).sort(['Label', 'is_attack']))

# Проверка распределения нового признака
print("\n" + "=" * 50)
print("РАСПРЕДЕЛЕНИЕ ПРИЗНАКА is_attack")
print("=" * 50)
print(df['is_attack'].value_counts().sort('is_attack'))

# Информация о размерности после добавления признака
print("\n" + "=" * 50)
print("РАЗМЕРНОСТЬ ДАТАСЕТА")
print("=" * 50)
print(f"Количество строк: {df.shape[0]:,}")
print(f"Количество колонок: {df.shape[1]}")

# Сохраняем обновленный DataFrame для следующих заданий
# df.write_parquet('NF-CSE-CIC-IDS2018-V2_with_is_attack.parquet')
# print("\n✓ DataFrame сохранен с новым признаком")

4. Агрегация по типам атак:
Отфильтруйте только атаки (Label != 0).
Сгруппируйте данные по колонке Attack.
Посчитайте:
среднюю длительность потока (FLOW_DURATION_MILLISECONDS);
среднее количество входящих байт (IN_BYTES);
общее количество записей для каждого типа атаки.
Отсортируйте по убыванию avg_in_bytes.

In [None]:

# Фильтруем только атаки и группируем по типам
result = (df
    .filter(pl.col('Label') == 1)  # только атаки
    .group_by('Attack')  # группировка по типу атаки
    .agg([
        pl.col('FLOW_DURATION_MILLISECONDS').mean().alias('avg_duration'),
        pl.col('IN_BYTES').mean().alias('avg_in_bytes'),
        pl.len().alias('total_records')
    ])
    .sort('avg_in_bytes', descending=True)  # сортировка по убыванию входящих байт
)

print("Агрегация по типам атак:")
print(result)

5. Топ-3 атак по трафику:
Выведите топ-3 типа атак по среднему объёму входящего трафика (avg_in_bytes).

In [None]:


top_3 = (df
    .filter(pl.col('Label') == 1)
    .group_by('Attack')
    .agg(pl.mean('IN_BYTES').alias('avg_in_bytes'))
    .sort('avg_in_bytes', descending=True)
    .head(3)
)

print(top_3)

6. Распределение по протоколам:
Выведите:
общее распределение по PROTOCOL;
распределение по PROTOCOL только для Benign;
распределение по PROTOCOL только для Attack (с разбивкой по Attack);
сравнение PROTOCOL между Benign и Attack.

In [None]:


# 1. Общее распределение по PROTOCOL
print("1. ОБЩЕЕ РАСПРЕДЕЛЕНИЕ ПО ПРОТОКОЛАМ:")
print(df['PROTOCOL'].value_counts().sort('count', descending=True))
print()

# 2. Распределение по PROTOCOL только для Benign
print("2. РАСПРЕДЕЛЕНИЕ ПО ПРОТОКОЛАМ (BENIGN):")
print(df.filter(pl.col('Label') == 0)['PROTOCOL'].value_counts().sort('count', descending=True))
print()

# 3. Распределение по PROTOCOL только для Attack (с разбивкой по Attack)
print("3. РАСПРЕДЕЛЕНИЕ ПО ПРОТОКОЛАМ (ATTACK):")
print(df.filter(pl.col('Label') == 1).group_by(['PROTOCOL', 'Attack']).len().sort('len', descending=True))
print()

# 4. Сравнение PROTOCOL между Benign и Attack
print("4. СРАВНЕНИЕ ПРОТОКОЛОВ (BENIGN vs ATTACK):")
comparison = df.group_by(['PROTOCOL', 'Label']).len().pivot(
    index='PROTOCOL', 
    columns='Label', 
    values='len'
).fill_null(0).rename({'0': 'Benign', '1': 'Attack'})
print(comparison)

7. Сравнение метрик: Benign vs Attack:
Сгруппируйте по is_attack и посчитайте:
средние значения IN_BYTES, OUT_BYTES, FLOW_DURATION_MILLISECONDS;
общее количество записей.

In [None]:

# Создание признака is_attack
df = df.with_columns(
    (pl.col('Label') != 0).cast(pl.Int8).alias('is_attack')
)

# Группировка по is_attack и расчет метрик
result = (df
    .group_by('is_attack')
    .agg([
        pl.col('IN_BYTES').mean().alias('avg_in_bytes'),
        pl.col('OUT_BYTES').mean().alias('avg_out_bytes'),
        pl.col('FLOW_DURATION_MILLISECONDS').mean().alias('avg_duration_ms'),
        pl.len().alias('total_records')
    ])
    .sort('is_attack')
)

print("СРАВНЕНИЕ МЕТРИК: BENIGN vs ATTACK")
print("=" * 60)
print(result)

8. (Бонус) Эвристика детектирования:
Создайте колонку is_suspicious, которая равна:
1, если:
bytes_ratio > 10 (входящий трафик >> исходящего);
FLOW_DURATION_MILLISECONDS < 500 (короткий поток);
IN_PKTS > 10 (много пакетов).
0 — в противном случае.
Подсчитайте:
сколько записей помечено как is_suspicious == 1;
сколько из них на самом деле являются атаками (Label != 0).
Вычислите точность эвристики: true_attacks / total_flagged.

In [None]:


# Создание колонки bytes_ratio (отношение входящих к исходящим байтам)
# Добавляем маленькое число, чтобы избежать деления на ноль
df = df.with_columns(
    (pl.col('IN_BYTES') / (pl.col('OUT_BYTES') + 0.001)).alias('bytes_ratio')
)

# Создание колонки is_suspicious по условиям
df = df.with_columns(
    (
        (pl.col('bytes_ratio') > 10) &
        (pl.col('FLOW_DURATION_MILLISECONDS') < 500) &
        (pl.col('IN_PKTS') > 10)
    ).cast(pl.Int8).alias('is_suspicious')
)

# Подсчет результатов
total_flagged = df.filter(pl.col('is_suspicious') == 1).height
true_attacks = df.filter((pl.col('is_suspicious') == 1) & (pl.col('Label') != 0)).height

# Расчет точности
if total_flagged > 0:
    accuracy = (true_attacks / total_flagged) * 100
else:
    accuracy = 0

# Вывод результатов
print("ЭВРИСТИКА ДЕТЕКТИРОВАНИЯ")
print("=" * 50)
print(f"Всего записей, помеченных как подозрительные: {total_flagged:,}")
print(f"Из них реальных атак: {true_attacks:,}")
print(f"Точность эвристики: {accuracy:.2f}%")
print("=" * 50)

# Дополнительно: посмотрим на срабатывания по типам атак
print("\nТипы атак, которые обнаружила эвристика:")
suspicious_attacks = (df
    .filter((pl.col('is_suspicious') == 1) & (pl.col('Label') != 0))
    .group_by('Attack')
    .len()
    .sort('len', descending=True)
)
print(suspicious_attacks)

Сохраните итоговую агрегацию (по типам атак) в файл attack_summary_by_type.parquet.


In [None]:

# Создаем итоговую агрегацию по типам атак
attack_summary = (df
    .filter(pl.col('Label') == 1)  # только атаки
    .group_by('Attack')
    .agg([
        pl.col('FLOW_DURATION_MILLISECONDS').mean().alias('avg_duration_ms'),
        pl.col('IN_BYTES').mean().alias('avg_in_bytes'),
        pl.col('OUT_BYTES').mean().alias('avg_out_bytes'),
        pl.col('IN_PKTS').mean().alias('avg_in_pkts'),
        pl.col('PROTOCOL').mode().alias('most_common_protocol'),
        pl.len().alias('total_records')
    ])
    .sort('total_records', descending=True)
)

# Сохраняем в parquet файл !!!!
attack_summary.write_parquet('attack_summary_by_type.parquet')

print(f"Агрегация сохранена в файл: attack_summary_by_type.parquet")
print(f"Количество типов атак: {attack_summary.height}")
print("\nПервые 5 строк:")
print(attack_summary.head())