# Análisis Exploratorio de Tráfico de Red

Análisis completo de patrones de tráfico de red, detección de anomalías y caracterización de ataques.

## 1. Configuración e Inicialización

In [10]:
# Inicialización de Spark con configuración optimizada
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import StorageLevel
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.window import Window
import numpy as np

spark = SparkSession.builder \
    .appName("NetworkTrafficAnalysis") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print(f"Spark Version: {spark.version}")

Spark Version: 3.5.0


## 2. Carga y Preparación de Datos

In [11]:
# Carga de datos
input_path = '../data/raw/network_data.csv'
df_raw = spark.read.option('header', 'true').option('inferSchema', 'true').csv(input_path)

# Limpieza de nombres de columnas
df = df_raw.toDF(*[c.strip().replace(' ', '_').replace('/', '_').replace('.', '_') for c in df_raw.columns])

print(f"Dimensiones del dataset: {df.count()} filas x {len(df.columns)} columnas")
print(f"\nPrimeras 5 columnas: {df.columns[:5]}")

Dimensiones del dataset: 2830736 filas x 79 columnas

Primeras 5 columnas: ['Destination_Port', 'Flow_Duration', 'Total_Fwd_Packets', 'Total_Backward_Packets', 'Total_Length_of_Fwd_Packets']


In [12]:
# Mapeo y creación de variables derivadas
column_mapping = {
    'Destination_Port': 'destination_port',
    'Flow_Duration': 'flow_duration',
    'Total_Fwd_Packets': 'total_fwd_packets',
    'Total_Backward_Packets': 'total_backward_packets',
    'Total_Length_of_Fwd_Packets': 'total_fwd_bytes',
    'Total_Length_of_Bwd_Packets': 'total_bwd_bytes',
    'Label': 'attack_type'
}

# Aplicar renombrado
for old_name, new_name in column_mapping.items():
    if old_name in df.columns:
        df = df.withColumnRenamed(old_name, new_name)

# Variables derivadas
df = df.withColumn('total_packets', 
                   coalesce(col('total_fwd_packets'), lit(0)) + coalesce(col('total_backward_packets'), lit(0)))
df = df.withColumn('total_bytes', 
                   coalesce(col('total_fwd_bytes'), lit(0)) + coalesce(col('total_bwd_bytes'), lit(0)))
df = df.withColumn('packet_size_avg', 
                   when(col('total_packets') > 0, col('total_bytes') / col('total_packets')).otherwise(0))
df = df.withColumn('bytes_per_second', 
                   when(col('flow_duration') > 0, col('total_bytes') * 1000000.0 / col('flow_duration')).otherwise(0))

# Persistir en memoria
df = df.repartition(16).persist(StorageLevel.MEMORY_AND_DISK)
print("Dataset procesado y almacenado en cache")

Dataset procesado y almacenado en cache


## 3. Análisis Descriptivo General

In [13]:
# Estadísticas generales
print("=== ESTADÍSTICAS GENERALES ===")
print(f"Total de registros: {df.count():,}")
print(f"Total de columnas: {len(df.columns)}")

# Estadísticas descriptivas de variables clave
numeric_columns = ['total_packets', 'total_bytes', 'flow_duration', 'packet_size_avg', 'bytes_per_second']
print("\n=== ESTADÍSTICAS DESCRIPTIVAS ===")
df.select(numeric_columns).describe().show()

=== ESTADÍSTICAS GENERALES ===
Total de registros: 2,830,736
Total de columnas: 83

=== ESTADÍSTICAS DESCRIPTIVAS ===
+-------+------------------+------------------+--------------------+------------------+-------------------+
|summary|     total_packets|       total_bytes|       flow_duration|   packet_size_avg|   bytes_per_second|
+-------+------------------+------------------+--------------------+------------------+-------------------+
|  count|           2830736|           2830736|             2830736|           2830736|            2830736|
|   mean|19.754900492310128|16711.972962155425|1.4785620264671803E7|184.72267293328247| 1490183.0986010963|
| stddev|1746.6651284911054| 2266645.477299444| 3.365367906074919E7| 331.1007762091756|2.590706378761566E7|
|    min|                 2|                 0|                 -13|               0.0|                0.0|
|    max|            511681|         656776408|           119999998|3893.3333333333335|            2.071E9|
+-------+---------

## 4. Análisis por Puertos y Protocolos

In [14]:
# Categorización de puertos
df = df.withColumn('port_category', 
    when(col('destination_port') == 80, 'HTTP')
    .when(col('destination_port') == 443, 'HTTPS')
    .when(col('destination_port') == 22, 'SSH')
    .when(col('destination_port') == 53, 'DNS')
    .when((col('destination_port') >= 1) & (col('destination_port') <= 1023), 'SYSTEM_OTHER')
    .when((col('destination_port') >= 1024) & (col('destination_port') <= 49151), 'USER_REGISTERED')
    .when(col('destination_port') >= 49152, 'DYNAMIC_PRIVATE')
    .otherwise('UNKNOWN'))

print("=== ANÁLISIS POR CATEGORÍAS DE PUERTOS ===")
port_analysis = df.groupBy('port_category').agg(
    count('*').alias('total_flows'),
    sum('total_packets').alias('total_packets'),
    sum('total_bytes').alias('total_bytes'),
    avg('packet_size_avg').alias('avg_packet_size')
).orderBy(desc('total_flows'))

port_analysis.show()

=== ANÁLISIS POR CATEGORÍAS DE PUERTOS ===
+---------------+-----------+-------------+-----------+------------------+
|  port_category|total_flows|total_packets|total_bytes|   avg_packet_size|
+---------------+-----------+-------------+-----------+------------------+
|            DNS|     957971|      3515828|  267505873| 77.97614235002439|
|           HTTP|     618934|     30032606|33925655371| 470.2674314806422|
|          HTTPS|     505710|     18532141|12453534233|235.85317539405386|
|USER_REGISTERED|     319256|      1013618|  117143517| 26.62045525207347|
|DYNAMIC_PRIVATE|     309238|       998673|  275562682| 72.99750338632634|
|   SYSTEM_OTHER|     100992|      1054143|  204489336|  58.3908678083341|
|            SSH|      16939|       594313|   63292483|52.840954298968185|
|        UNKNOWN|       1696|       179586|          0|               0.0|
+---------------+-----------+-------------+-----------+------------------+



In [15]:
# Top 20 puertos más utilizados
print("=== TOP 20 PUERTOS MÁS UTILIZADOS ===")
top_ports = df.groupBy('destination_port').agg(
    count('*').alias('flow_count'),
    sum('total_packets').alias('total_packets'),
    avg('bytes_per_second').alias('avg_speed')
).orderBy(desc('flow_count')).limit(20)

top_ports.show()

=== TOP 20 PUERTOS MÁS UTILIZADOS ===
+----------------+----------+-------------+--------------------+
|destination_port|flow_count|total_packets|           avg_speed|
+----------------+----------+-------------+--------------------+
|              53|    957971|      3515828|  503632.01583282725|
|              80|    618934|     30032606|  235978.60993893622|
|             443|    505710|     18532141|   300309.0718942788|
|             123|     23880|        73059|  143598.81054866142|
|              22|     16939|       594313|   54058.10868003305|
|              21|     13522|       228015|   569362.6515374259|
|             137|      7917|       111341| 2.316246182393559E7|
|             389|      6405|       133717|   1499237.405293274|
|              88|      5579|        38571|  3042668.6218578396|
|             465|      3817|        77970|  1645082.2491154023|
|             139|      2881|        67541|   87313.05365284432|
|            8080|      2777|        24216|   79036.

## 5. Análisis de Patrones de Tráfico

In [16]:
# Patrones de comunicación
df = df.withColumn('communication_pattern',
    when((col('total_fwd_packets') > 0) & (col('total_backward_packets') == 0), 'UNIDIRECTIONAL_FWD')
    .when((col('total_fwd_packets') == 0) & (col('total_backward_packets') > 0), 'UNIDIRECTIONAL_BWD')
    .when((col('total_fwd_packets') > 0) & (col('total_backward_packets') > 0), 'BIDIRECTIONAL')
    .otherwise('NO_PACKETS'))

print("=== PATRONES DE COMUNICACIÓN ===")
communication_stats = df.groupBy('communication_pattern').agg(
    count('*').alias('flow_count'),
    avg('total_packets').alias('avg_packets'),
    avg('bytes_per_second').alias('avg_speed')
).orderBy(desc('flow_count'))

communication_stats.show()

=== PATRONES DE COMUNICACIÓN ===
+---------------------+----------+------------------+-----------------+
|communication_pattern|flow_count|       avg_packets|        avg_speed|
+---------------------+----------+------------------+-----------------+
|        BIDIRECTIONAL|   2377219|22.910163935253756|342215.6086347985|
|   UNIDIRECTIONAL_FWD|    453517|3.2158243241157445|7507532.235524234|
+---------------------+----------+------------------+-----------------+



In [17]:
# Análisis de velocidades
df = df.withColumn('speed_category',
    when(col('bytes_per_second') < 1024, 'LOW_B/s')
    .when((col('bytes_per_second') >= 1024) & (col('bytes_per_second') < 1024*1024), 'MEDIUM_KB/s')
    .when((col('bytes_per_second') >= 1024*1024) & (col('bytes_per_second') < 1024*1024*10), 'HIGH_MB/s')
    .when(col('bytes_per_second') >= 1024*1024*10, 'VERY_HIGH_MB/s')
    .otherwise('NO_DATA'))

print("=== DISTRIBUCIÓN DE VELOCIDADES ===")
speed_distribution = df.groupBy('speed_category').count().orderBy(desc('count'))
speed_distribution.show()

=== DISTRIBUCIÓN DE VELOCIDADES ===
+--------------+-------+
|speed_category|  count|
+--------------+-------+
|   MEDIUM_KB/s|1342880|
|       LOW_B/s|1090200|
|     HIGH_MB/s| 350612|
|VERY_HIGH_MB/s|  47044|
+--------------+-------+



## 6. Análisis de Seguridad y Anomalías

In [19]:

quantiles = df.approxQuantile(
    ['total_packets', 'total_bytes', 'flow_duration', 'bytes_per_second'],
    [0.95],
    0.01
)

# extraer los valores del percentil 95 para cada métrica
thresholds = [q[0] if q else 0 for q in quantiles]

print("=== UMBRALES DE ANOMALÍAS (95º PERCENTIL) ===")
print(f"Paquetes: {thresholds[0]:.0f}")
print(f"Bytes: {thresholds[1]:.0f}")
print(f"Duración: {thresholds[2]:.0f} μs")
print(f"Velocidad: {thresholds[3]:.0f} B/s")

# Crear flags de anomalías (1 si supera el umbral)
df = df.withColumn('anomaly_score', 
    (col('total_packets') > thresholds[0]).cast('int') + 
    (col('total_bytes') > thresholds[1]).cast('int') + 
    (col('flow_duration') > thresholds[2]).cast('int') + 
    (col('bytes_per_second') > thresholds[3]).cast('int')
)

# Clasificar el nivel de riesgo en función del score
df = df.withColumn('risk_level',
    when(col('anomaly_score') == 0, 'LOW')
    .when(col('anomaly_score') == 1, 'MEDIUM')
    .when(col('anomaly_score') == 2, 'HIGH')
    .when(col('anomaly_score') >= 3, 'CRITICAL')
    .otherwise('UNKNOWN')
)


=== UMBRALES DE ANOMALÍAS (95º PERCENTIL) ===
Paquetes: 35
Bytes: 11977
Duración: 99726677 μs
Velocidad: 2115183 B/s


In [20]:
# Distribución de niveles de riesgo
print("=== DISTRIBUCIÓN DE NIVELES DE RIESGO ===")
risk_distribution = df.groupBy('risk_level').agg(
    count('*').alias('flow_count'),
    (count('*') * 100.0 / df.count()).alias('percentage')
).orderBy(desc('flow_count'))

risk_distribution.show()

# Anomalías por categoría de puerto
print("\n=== ANOMALÍAS POR CATEGORÍA DE PUERTO ===")
anomalies_by_port = df.filter(col('risk_level').isin(['HIGH', 'CRITICAL'])).groupBy('port_category').agg(
    count('*').alias('anomaly_count'),
    avg('anomaly_score').alias('avg_score')
).orderBy(desc('anomaly_count'))

anomalies_by_port.show()

=== DISTRIBUCIÓN DE NIVELES DE RIESGO ===
+----------+----------+------------------+
|risk_level|flow_count|        percentage|
+----------+----------+------------------+
|       LOW|   2398000| 84.71295097812018|
|    MEDIUM|    292399| 10.32943375857021|
|      HIGH|     98808|3.4905409759158044|
|  CRITICAL|     41529|1.4670742873938085|
+----------+----------+------------------+


=== ANOMALÍAS POR CATEGORÍA DE PUERTO ===
+---------------+-------------+------------------+
|  port_category|anomaly_count|         avg_score|
+---------------+-------------+------------------+
|          HTTPS|       101054| 2.329655431749362|
|           HTTP|        33526|2.2170255920777904|
|USER_REGISTERED|         1702|2.1668625146886016|
|   SYSTEM_OTHER|         1486|2.4246298788694483|
|DYNAMIC_PRIVATE|         1396|2.0458452722063036|
|        UNKNOWN|         1173|               2.0|
+---------------+-------------+------------------+



## 7. Análisis por Tipos de Ataque

In [21]:
# Distribución de tipos de ataque
print("=== DISTRIBUCIÓN DE TIPOS DE ATAQUE ===")
attack_distribution = df.groupBy('attack_type').agg(
    count('*').alias('flow_count'),
    (count('*') * 100.0 / df.count()).alias('percentage')
).orderBy(desc('flow_count'))

attack_distribution.show()

# Características por tipo de ataque
print("\n=== CARACTERÍSTICAS POR TIPO DE ATAQUE ===")
attack_characteristics = df.groupBy('attack_type').agg(
    avg('total_packets').alias('avg_packets'),
    avg('total_bytes').alias('avg_bytes'),
    avg('bytes_per_second').alias('avg_speed'),
    avg('anomaly_score').alias('avg_anomaly_score')
).orderBy('attack_type')

attack_characteristics.show(truncate=False)

=== DISTRIBUCIÓN DE TIPOS DE ATAQUE ===
+--------------------+----------+--------------------+
|         attack_type|flow_count|          percentage|
+--------------------+----------+--------------------+
|              BENIGN|   2273090|   80.30031765590292|
|            DoS Hulk|    231073|   8.163000717834514|
|            PortScan|    158930|   5.614440908654145|
|                DDoS|    128027|   4.522746027888154|
|       DoS GoldenEye|     10293|  0.3636156815753924|
|         FTP-Patator|      7938|  0.2804217701686063|
|         SSH-Patator|      5897| 0.20832038028272507|
|       DoS slowloris|      5796| 0.20475240361517288|
|    DoS Slowhttptest|      5499| 0.19426043262246992|
|                 Bot|      1966| 0.06945190226146133|
|Web Attack � Brut...|      1507|0.053237038000011304|
|    Web Attack � XSS|       652|0.023032879081624002|
|        Infiltration|        36|0.001271754059721...|
|Web Attack � Sql ...|        21|7.418565348375829E-4|
|          Heartbleed|   

## 8. Análisis de Correlaciones

In [None]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
import builtins  # Para usar el abs de Python de forma segura

# Análisis de correlaciones entre variables numéricas
print("=== ANÁLISIS DE CORRELACIONES ===")

correlation_vars = ['total_packets', 'total_bytes', 'flow_duration', 'packet_size_avg', 'bytes_per_second']

# Filtrar datos válidos para correlación
df_corr = df.select(correlation_vars).na.drop()

# Ensamblar vector para correlación
assembler = VectorAssembler(inputCols=correlation_vars, outputCol="features")
df_vector = assembler.transform(df_corr).select("features")

# Calcular matriz de correlación
correlation_matrix = Correlation.corr(df_vector, "features").head()[0]

# Convertir a array NumPy
correlation_array = correlation_matrix.toArray()

print("\nCorrelaciones fuertes (|r| > 0.7):")
for i in range(len(correlation_vars)):
    for j in range(i + 1, len(correlation_vars)):
        corr_val = correlation_array[i][j]
        if builtins.abs(corr_val) > 0.7:
            print(f"  {correlation_vars[i]} <-> {correlation_vars[j]}: {corr_val:.3f}")


=== ANÁLISIS DE CORRELACIONES ===

Correlaciones fuertes (|r| > 0.7):
  total_packets <-> total_bytes: 0.996


## 9. Análisis Temporal

In [28]:
# Análisis temporal basado en duración
df = df.withColumn('duration_seconds', col('flow_duration') / 1000000.0)
df = df.withColumn('duration_category',
    when(col('duration_seconds') < 1, 'INSTANT')
    .when((col('duration_seconds') >= 1) & (col('duration_seconds') < 10), 'SHORT')
    .when((col('duration_seconds') >= 10) & (col('duration_seconds') < 60), 'MEDIUM')
    .when((col('duration_seconds') >= 60) & (col('duration_seconds') < 300), 'LONG')
    .when(col('duration_seconds') >= 300, 'VERY_LONG')
    .otherwise('UNKNOWN'))

print("=== DISTRIBUCIÓN TEMPORAL ===")
temporal_distribution = df.groupBy('duration_category').agg(
    count('*').alias('flow_count'),
    avg('total_packets').alias('avg_packets'),
    avg('bytes_per_second').alias('avg_speed')
).orderBy(
    when(col('duration_category') == 'INSTANT', 1)
    .when(col('duration_category') == 'SHORT', 2)
    .when(col('duration_category') == 'MEDIUM', 3)
    .when(col('duration_category') == 'LONG', 4)
    .when(col('duration_category') == 'VERY_LONG', 5)
    .otherwise(6)
)

temporal_distribution.show()

=== DISTRIBUCIÓN TEMPORAL ===
+-----------------+----------+-----------------+------------------+
|duration_category|flow_count|      avg_packets|         avg_speed|
+-----------------+----------+-----------------+------------------+
|          INSTANT|   2043034|4.516461791629508| 2064068.827264687|
|            SHORT|    279831|15.63239598186048| 2812.039330256801|
|           MEDIUM|    111888|50.49895431145431|1928.0807170125856|
|             LONG|    395983| 92.6023768697141| 882.6805360307583|
+-----------------+----------+-----------------+------------------+



## 10. Resumen Ejecutivo

In [29]:
# Generar resumen ejecutivo
print("=== RESUMEN EJECUTIVO ===")

# Métricas generales
total_flows = df.count()
total_packets_sum = df.agg(sum('total_packets')).collect()[0][0]
total_bytes_sum = df.agg(sum('total_bytes')).collect()[0][0]

print(f"\n📊 MÉTRICAS GENERALES:")
print(f"  • Total de flujos: {total_flows:,}")
print(f"  • Total de paquetes: {total_packets_sum:,}")
print(f"  • Volumen total: {total_bytes_sum/1024/1024/1024:.2f} GB")

# Distribución de seguridad
benign_count = df.filter(col('attack_type') == 'BENIGN').count()
malicious_count = total_flows - benign_count
malicious_percentage = (malicious_count / total_flows) * 100

print(f"\n🔒 DISTRIBUCIÓN DE SEGURIDAD:")
print(f"  • Tráfico benigno: {benign_count:,} ({100-malicious_percentage:.1f}%)")
print(f"  • Tráfico malicioso: {malicious_count:,} ({malicious_percentage:.1f}%)")

# Riesgos identificados
high_risk_count = df.filter(col('risk_level').isin(['HIGH', 'CRITICAL'])).count()
risk_percentage = (high_risk_count / total_flows) * 100

print(f"\n⚠️ RIESGOS IDENTIFICADOS:")
print(f"  • Flujos de alto riesgo: {high_risk_count:,} ({risk_percentage:.2f}%)")

# Top protocolos
top_3_ports = df.groupBy('port_category').count().orderBy(desc('count')).limit(3).collect()
print(f"\n🌐 TOP 3 CATEGORÍAS DE PUERTOS:")
for i, row in enumerate(top_3_ports, 1):
    print(f"  {i}. {row['port_category']}: {row['count']:,} flujos")

=== RESUMEN EJECUTIVO ===

📊 MÉTRICAS GENERALES:
  • Total de flujos: 2,830,736
  • Total de paquetes: 55,920,908
  • Volumen total: 44.06 GB

🔒 DISTRIBUCIÓN DE SEGURIDAD:
  • Tráfico benigno: 2,273,090 (80.3%)
  • Tráfico malicioso: 557,646 (19.7%)

⚠️ RIESGOS IDENTIFICADOS:
  • Flujos de alto riesgo: 140,337 (4.96%)

🌐 TOP 3 CATEGORÍAS DE PUERTOS:
  1. DNS: 957,971 flujos
  2. HTTP: 618,934 flujos
  3. HTTPS: 505,710 flujos


## 11. Preparación para Exportación

In [30]:
# Preparar dataset final para BI
bi_columns = [
    'destination_port', 'port_category', 'total_packets', 'total_bytes', 
    'flow_duration', 'packet_size_avg', 'bytes_per_second',
    'communication_pattern', 'speed_category', 'duration_category',
    'anomaly_score', 'risk_level', 'attack_type'
]

df_final = df.select(bi_columns)
df_final = df_final.withColumn('analysis_timestamp', current_timestamp())

print(f"Dataset final preparado con {len(bi_columns) + 1} columnas")
print(f"Total de registros para exportación: {df_final.count():,}")

# Estadísticas finales
df_final.select(
    count('*').alias('total_records'),
    countDistinct('attack_type').alias('unique_attacks'),
    countDistinct('port_category').alias('unique_port_categories')
).show()

Dataset final preparado con 14 columnas
Total de registros para exportación: 2,830,736
+-------------+--------------+----------------------+
|total_records|unique_attacks|unique_port_categories|
+-------------+--------------+----------------------+
|      2830736|            15|                     8|
+-------------+--------------+----------------------+



In [36]:
# Finalizar sesión Spark
spark.stop()
print("Sesión Spark finalizada")

Sesión Spark finalizada
