# Seminario de Estadística II - Tarea 1 Parte 2

**Integrantes del equipo:**
* Azpeitia Medina Samuel
* Castro Pérez Juan Antonio
* Rodríguez Rodríguez Donovan Zuriel

Primero vamos a configurar nuestro espacio de trabajo en Unity Catalog usando SQL. Con esto nos aseguramos de tener listos el catálogo, la base de datos y el volumen donde estarán los archivos que vamos a usar a lo largo del ejercicio.

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS dev;
CREATE DATABASE IF NOT EXISTS dev.ciencias_data;
CREATE VOLUME IF NOT EXISTS dev.ciencias_data.session_data;

### 1. Cree una tabla bronce en formato delta y particionado por hora para session_part1.csv.

**Solución:** 
Para este primer punto vamos a leer el archivo csv original indicando que el separador es el pipe `|`. Después, crearemos una columna nueva llamada `part_hour` que extraiga la fecha y la hora del campo `timestamp` para usarla como nuestra partición. Finalmente, guardamos el dataframe como una tabla en formato delta dentro de nuestra base de datos.

In [0]:
import ast
import re
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Leemos el archivo csv de la primera parte
df_part1 = spark.read.format("csv").option("sep", "|").option("header", "true").load("/Volumes/dev/ciencias_data/session_data/sessions_part1.csv")

# Creamos la columna para particionar por hora
df_bronce = df_part1.withColumn("part_hour", F.date_format(F.to_timestamp("timestamp"), "yyyy-MM-dd-HH"))

# Guardamos la tabla particionada en formato delta
df_bronce.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy("part_hour").saveAsTable("dev.ciencias_data.bronze_sessions")

# Visualizamos la tabla bronce
df_bronce.display()

### 2. El dataset de session part1.csv corresponden a datos de tráfico de red realizados por el sniffer Arkime, cada fila es una sesión. A continuación realice lo siguiente:

**Investigue el posible significado de cada campo y redacte un posible diccionario de datos.**

**Solución:**
Buscando información sobre el sniffer Arkime, armamos este diccionario de datos básico:
* **timestamp**: Es la fecha y hora exacta en la que se capturó la sesión.
* **srcIp / dstIp**: La dirección IP de origen (quien inició la conexión) y la de destino.
* **srcPort / dstPort**: Los puertos que se usaron para la comunicación.
* **srcMac / dstMac**: Arreglo con las direcciones MAC físicas por donde pasaron los datos.
* **protocol**: Arreglo de los protocolos de red identificados (por ejemplo: tcp, udp, dns).
* **totPackets / srcPackets / dstPackets**: El número de paquetes que se mandaron y recibieron en total.
* **totBytes**: El tamaño total de toda la sesión en bytes.
* **totDataBytes**: El tamaño en bytes pero solo de los datos útiles (sin contar encabezados de red).
* **firstPacket / lastPacket**: El momento (en epoch) en el que se vio el primer y el último paquete.
* **packetLen**: Un arreglo que dice cuánto pesó cada uno de los paquetes enviados.
* **srcGEO / dstGEO**: El país de donde viene y a donde va la conexión, sacado por la IP.
* **http**: Datos extra si la conexión fue a una página web, como el host o URL.

**Estructure la información en un dataframe donde cada columna corresponda una clave del json y cada fila son los valores del dataframe, puede ignorar las siguientes claves del json cert y packetPos**

**Solución:** 
Para extraer la información del JSON de forma segura y evitar que algún error de formato en el texto rompa el proceso, vamos a crear una función (UDF) apoyándonos en la librería `ast` de Python. 

Para cumplir con la instrucción de ignorar las claves `cert` y `packetPos`, lo que haremos será no incluirlas cuando definamos nuestro esquema (`StructType`). De esta manera, al momento en que PySpark aplane los datos, descartará automáticamente esos dos campos sin necesidad de borrarlos después.

In [0]:
import ast
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, explode, size

# Armamos el esquema sin poner cert ni packetPos
esquema_json = StructType([
    StructField("srcIp", StringType(), True),
    StructField("dstIp", StringType(), True),
    StructField("srcPort", LongType(), True),
    StructField("dstPort", LongType(), True),
    StructField("srcMac", ArrayType(StringType()), True),
    StructField("dstMac", ArrayType(StringType()), True),
    StructField("protocol", ArrayType(StringType()), True),
    StructField("totPackets", LongType(), True),
    StructField("srcPackets", LongType(), True),
    StructField("dstPackets", LongType(), True),
    StructField("totBytes", LongType(), True),
    StructField("srcBytes", LongType(), True),
    StructField("dstBytes", LongType(), True),
    StructField("totDataBytes", StringType(), True),
    StructField("firstPacket", LongType(), True),
    StructField("lastPacket", LongType(), True),
    StructField("srcGEO", StringType(), True),
    StructField("dstGEO", StringType(), True),
    StructField("packetLen", ArrayType(LongType()), True),
    StructField("http", StructType([StructField("host", ArrayType(StringType()), True)]), True)
])

esquema_salida = ArrayType(esquema_json)

# Ponemos la funcion del profe
def parseo_seguro(data_str):
    if data_str is None:
        return[]
    try:
        parsed = ast.literal_eval(data_str)
        if isinstance(parsed, list):
            return parsed
        elif isinstance(parsed, dict):
            return [parsed]
        return[]
    except Exception as e:
        return[]

udf_parseo = udf(parseo_seguro, esquema_salida)

# Aplicamos la funcion a la tabla bronce que hicimos en el paso 1
df_parseado = df_bronce.withColumn("data_parsed", udf_parseo(col("data")))
df_parseado = df_parseado.filter(size(col("data_parsed")) > 0)
df_explotado = df_parseado.select("timestamp", "part_hour", explode(col("data_parsed")).alias("columna_json"))

# Aplanamos la estructura para que queden como columnas normales
df_estructurado = df_explotado.select("timestamp", "part_hour", "columna_json.*")

#Visualizamos la tabla
df_estructurado.display()

**Sustituya la columna packetLen por los sumarizados: suma total, media, mínimo y máximo. Y asigne el tipo correcto a los datos por ejemplo lastPacket debe ser transformado a fecha.**

**Solución:** 
Primero vamos a convertir los campos `firstPacket` y `lastPacket` a formato Timestamp. Como vienen en milisegundos, tenemos que dividirlos entre 1000. También aprovechamos para castear `totDataBytes` que a veces viene como string vacío.
Después, usaremos funciones de arreglos de Spark para sacar las estadísticas de `packetLen` y borraremos la columna original.

In [0]:
import pyspark.sql.functions as F

# 1. Arreglamos los tipos de datos y fechas
df_tipos = df_estructurado.withColumn("totDataBytes", F.when(F.col("totDataBytes") == "", None).otherwise(F.col("totDataBytes")).cast(LongType()))
df_tipos = df_tipos.withColumn("firstPacket", (F.col("firstPacket") / 1000).cast(TimestampType()))
df_tipos = df_tipos.withColumn("lastPacket", (F.col("lastPacket") / 1000).cast(TimestampType()))

# 2. Sacamos las estadisticas del arreglo packetLen
df_tipos = df_tipos.withColumn("tamano_array", F.size(F.col("packetLen")))
df_tipos = df_tipos.withColumn("packet_len_suma", F.expr("aggregate(packetLen, 0L, (acc, x) -> acc + x)"))
df_tipos = df_tipos.withColumn("packet_len_min", F.array_min(F.col("packetLen")))
df_tipos = df_tipos.withColumn("packet_len_max", F.array_max(F.col("packetLen")))
df_tipos = df_tipos.withColumn("packet_len_media", F.col("packet_len_suma") / F.col("tamano_array"))

# Borramos la columna original como pide la instruccion
df_transformado = df_tipos.drop("packetLen", "tamano_array")

#Visualizamos la tabla
df_transformado.display()

**Cree una tabla silver con la información transformada, los nombres de los campos deben estar en snake_case y carga únicamente la información de session_part1.csv**

**Solución:** 
Para no cambiar los nombres uno por uno a mano, vamos a hacer un ciclo `for` que pase por todas las columnas y use una expresión regular para convertir las mayúsculas en minúsculas separadas por un guión bajo (snake_case). Al final, guardamos todo en nuestra base de datos.

In [0]:
import re

# Funcion sencilla en python para cambiar formato a snake_case
def convertir_snake_case(nombre):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', nombre)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

df_silver_final = df_transformado

# Recorremos todas las columnas y las renombramos
for columna in df_silver_final.columns:
    df_silver_final = df_silver_final.withColumnRenamed(columna, convertir_snake_case(columna))

# Guardamos como tabla silver 
df_silver_final.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("dev.ciencias_data.silver_sessions")

#Visualizamos la tabla
df_silver_final.display()

### 3. Diseñe un proceso que realice una carga incremental y cargue los datos de session_part2.csv en la tabla anteriormente creada, para ello debera aplicar las mismas transformaciones y agregaciones realizadas para la parte 1.

**Solución:** 
Para hacer la carga incremental, vamos a leer el segundo archivo (`sessions_part2.csv`). Primero le agregaremos la partición por hora y luego le aplicaremos exactamente la misma secuencia de transformaciones que usamos en el paso anterior (el parseo con la UDF, el casteo de fechas, los cálculos estadísticos de `packetLen` y los nombres en snake_case). 

La clave de este paso es que al momento de guardar el dataframe en nuestra tabla silver, usaremos el modo `append` en lugar de `overwrite`. Esto hará que los datos nuevos se sumen a los que ya teníamos de la parte 1.

In [0]:
# Leemos el segundo archivo csv
df_part2 = spark.read.format("csv").option("sep", "|").option("header", "true").load("/Volumes/dev/ciencias_data/session_data/sessions_part2.csv")

# Agregamos la partición por hora
df_bronce_p2 = df_part2.withColumn("part_hour", F.date_format(F.to_timestamp("timestamp"), "yyyy-MM-dd-HH"))

# Parseamos el JSON con la UDF que ya teníamos registrada arriba
df_parseado_p2 = df_bronce_p2.withColumn("data_parsed", udf_parseo(F.col("data")))
df_parseado_p2 = df_parseado_p2.filter(F.size(F.col("data_parsed")) > 0)
df_explotado_p2 = df_parseado_p2.select("timestamp", "part_hour", F.explode(F.col("data_parsed")).alias("columna_json"))

# Aplanamos la estructura
df_estructurado_p2 = df_explotado_p2.select("timestamp", "part_hour", "columna_json.*")

# Transformaciones de tipos y packetLen (igual que en la parte 1)
df_tipos_p2 = df_estructurado_p2.withColumn("totDataBytes", F.when(F.col("totDataBytes") == "", None).otherwise(F.col("totDataBytes")).cast(LongType()))
df_tipos_p2 = df_tipos_p2.withColumn("firstPacket", (F.col("firstPacket") / 1000).cast(TimestampType()))
df_tipos_p2 = df_tipos_p2.withColumn("lastPacket", (F.col("lastPacket") / 1000).cast(TimestampType()))

df_tipos_p2 = df_tipos_p2.withColumn("tamano_array", F.size(F.col("packetLen")))
df_tipos_p2 = df_tipos_p2.withColumn("packet_len_suma", F.expr("aggregate(packetLen, 0L, (acc, x) -> acc + x)"))
df_tipos_p2 = df_tipos_p2.withColumn("packet_len_min", F.array_min(F.col("packetLen")))
df_tipos_p2 = df_tipos_p2.withColumn("packet_len_max", F.array_max(F.col("packetLen")))
df_tipos_p2 = df_tipos_p2.withColumn("packet_len_media", F.col("packet_len_suma") / F.col("tamano_array"))

df_transformado_p2 = df_tipos_p2.drop("packetLen", "tamano_array")

# Pasamos las columnas a snake_case usando nuestra funcion de python
df_silver_p2 = df_transformado_p2
for columna in df_silver_p2.columns:
    df_silver_p2 = df_silver_p2.withColumnRenamed(columna, convertir_snake_case(columna))

# Guardamos en la misma tabla pero con modo APPEND para la carga incremental
df_silver_p2.write.format("delta").mode("append").saveAsTable("dev.ciencias_data.silver_sessions")

# Cargamos nuestra tabla silver completa
df_silver = spark.sql("SELECT * FROM dev.ciencias_data.silver_sessions")

#Visualizamos la tabla
df_silver.display()

## 4. Una vez que tenga la tabla silver estructurada realice lo siguiente:

**Obtenga el número de sesiones por países destino y países origen.**

**Solución:** 
Usaremos las columnas `src_geo` y `dst_geo`. Solo necesitamos agrupar por ambos campos, contarlos y ordenarlos de mayor a menor para ver las rutas más comunes.

In [0]:
df_paises = df_silver.groupBy("src_geo", "dst_geo").count().orderBy(F.desc("count"))
df_paises.display()

**Obtenga el número de sesiones por srcIP y dstIP, así también totBytes, totDataBytes y totPackets por srcIP y Protocolo.**

**Solución:**
Esta pregunta nos pide dos cosas distintas, así que haremos dos cálculos en esta celda:
1. Primero, agrupamos por IP de origen (`src_ip`) y de destino (`dst_ip`) y contamos cuántas sesiones hay entre ellas.
2. Segundo, para los totales por protocolo, como la columna `protocol` es un arreglo, usamos `explode` para separar cada protocolo en una fila. Luego agrupamos por la IP de origen y el protocolo individual para sumar los bytes y paquetes.

In [0]:
# Parte 1: Sesiones por IPs
df_ips = df_silver.groupBy("src_ip", "dst_ip").count().orderBy(F.desc("count"))

print("1. Número de sesiones por IP Origen y Destino:")
df_ips.display()

# Parte 2: Totales por IP y Protocolo
# Separamos el arreglo de protocolos
df_protocolos = df_silver.withColumn("protocolo_ind", F.explode(F.col("protocol")))

# Agrupamos y sumamos
df_metricas_proto = df_protocolos.groupBy("src_ip", "protocolo_ind").agg(F.sum("tot_bytes"), F.sum("tot_data_bytes"), F.sum("tot_packets"))

print("2. Totales por SrcIP y Protocolo:")
df_metricas_proto.display()

**Obtenga el totBytes, totDataBytes y totPackets por srcMac y dstMac.**

**Solución:**
Las columnas de MAC (`src_mac` y `dst_mac`) son arreglos porque los paquetes pasan por varios dispositivos. Spark no deja agrupar por arreglos, así que primero los convertimos a texto separándolos por comas (`concat_ws`) y luego ya podemos agrupar y sumar los totales.

In [0]:
# Convertimos los arreglos a texto para poder agrupar
df_macs_texto = df_silver.withColumn("src_mac_txt", F.concat_ws(",", F.col("src_mac")))
df_macs_texto = df_macs_texto.withColumn("dst_mac_txt", F.concat_ws(",", F.col("dst_mac")))

df_totales_mac = df_macs_texto.groupBy("src_mac_txt", "dst_mac_txt").agg(F.sum("tot_bytes"), F.sum("tot_data_bytes"), F.sum("tot_packets"))

df_totales_mac.display()

**Obtenga los valores mínimo, máximo y promedio para los totBytes, totDataBytes y totPackets por srcIp, dstIp, srcMac y dstMac**

**Solución:**
Vamos a usar el dataframe anterior que ya tiene las MACs como texto (`df_macs_texto`). Agrupamos por las 4 columnas que nos piden (IPs y MACs) y calculamos el `min`, `max` y `avg` (promedio) de cada métrica.

In [0]:
df_estadisticas = df_macs_texto.groupBy("src_ip", "dst_ip", "src_mac_txt", "dst_mac_txt").agg(
    F.min("tot_bytes"), F.max("tot_bytes"), F.avg("tot_bytes"),
    F.min("tot_data_bytes"), F.max("tot_data_bytes"), F.avg("tot_data_bytes"),
    F.min("tot_packets"), F.max("tot_packets"), F.avg("tot_packets")
)

df_estadisticas.display()

**Haga un top 5 de srcIp, srcMac con mayor número de sesiones.**

**Solución:**
Haremos dos conteos rápidos. Uno agrupando solo por la IP de origen y otro por la MAC de origen (usando la versión de texto). Ordenamos de mayor a menor y limitamos a los primeros 5 resultados.

In [0]:
print("Top 5 srcIP:")
df_silver.groupBy("src_ip").count().orderBy(F.desc("count")).limit(5).display()

print("Top 5 srcMac:")
df_macs_texto.groupBy("src_mac_txt").count().orderBy(F.desc("count")).limit(5).display()

**Cuente el número de srcMac, dstMac involucradas de cada sesión, es decir el tamaño del array de esos campos.**

**Solución:**
Simplemente usamos la función `size` sobre las columnas originales que tienen los arreglos para saber cuántas MACs hay en cada lista.

In [0]:
df_tamano_macs = df_silver.select("src_ip", "dst_ip", F.size(F.col("src_mac")).alias("conteo_src_mac"), F.size(F.col("dst_mac")).alias("conteo_dst_mac"))
df_tamano_macs.display()

**¿Cuáles son los protocolos de red más usados?**

**Solución:**
Usamos de nuevo el `explode` en la columna `protocol` para contar cada uno por separado, ordenamos por cantidad y mostramos los más frecuentes.

In [0]:
df_protocolos_top = df_silver.select(F.explode(F.col("protocol")).alias("protocolo")).groupBy("protocolo").count().orderBy(F.desc("count"))
df_protocolos_top.display()

**¿Cuáles son las páginas web más visitadas?**

**Solución:**
El host está dentro de la estructura `http`. Primero filtramos para quedarnos solo con los registros que no sean nulos (que sí sean tráfico web) y luego explotamos el arreglo `http.host` para contar las páginas individuales.

In [0]:
# Filtramos nulos y explotamos
df_webs = df_silver.filter(F.col("http.host").isNotNull()).select(F.explode(F.col("http.host")).alias("pagina_web"))

df_webs_top = df_webs.groupBy("pagina_web").count().orderBy(F.desc("count"))
df_webs_top.display()
