## ACTIVIDAD 3
## 1. Introducción teórica

* **El aprendizaje supervisado** es una técnica, la cual trabaja por medio de etiquetas en donde se busca encontrar un algoritmo en donde se establecen relaciones entre las características y la salida, teniendo como resultado un modelo o función del conjunto de datos. Los modelos de aprendizaje supervisados se dividen en dos tipos: regresión y clasificación, cuya distinción depende del tipo de variable objetivo. Algunos ejemplos de aplicaciones de aprendizaje supervisado son detección de fraudes, predicción de ventas y reconocimiento de objetos. Los algoritmos más reconocidos e implementados en PySpark de aprendizaje supervisado son:
  1.  Regresión lineal y logística

  2.  Árboles de decisión, Random Forest, GBT

  3.  Multilayer Perceptron Classifier (red neuronal multicapa)

  4.  Linear Support Vector Machine (LinearSVC)


* **El aprendizaje no supervisado** a diferencia del supervisado, este no requiere de etiquetas o datos estructurados. Los modelos de aprendizaje no supervisados se utilizan para tres tareas principales: agrupación, asociación y reducción de la dimensionalidad. Algunos ejemplos de aplicaciones de aprendizaje supervisado son sistemas de recomendación, imagenología y secciones de noticias: Google News utiliza el aprendizaje no supervisado para categorizar artículos.

Los algoritmos más reconocidos e implementados en PySpark de aprendizaje no supervisado son:

  1. K-means

  2. PCA

  3. Gaussian Mixture






## Inicialización de spark


In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder \
    .appName("MuestreoSeguridad") \
    .getOrCreate()

In [3]:
df = spark.read.csv(r"C:\Users\fanny\Documents\Big_data\Documents\GUIDE_Train.csv", header=True,  inferSchema=True)

## Selección de Datos

In [4]:
df = df.drop("Timestamp")

In [5]:
# Ver cuántas categorías hay y cuántos registros tiene cada una
df.groupBy("Category").count().show()

+-------------------+-------+
|           Category|  count|
+-------------------+-------+
|  CommandAndControl| 826691|
| SuspiciousActivity|1003933|
| CredentialStealing|    388|
|             Impact| 750885|
|         Collection|  14753|
|          Discovery| 129342|
|      InitialAccess|4293041|
|         WebExploit|     38|
|    LateralMovement|  41754|
|          Execution| 267594|
|         Ransomware|  18974|
|     DefenseEvasion|  46662|
|            Exploit|   4648|
|        Persistence|  72682|
|PrivilegeEscalation|   4671|
|            Malware| 144081|
|   UnwantedSoftware|  18211|
|   CredentialAccess| 300518|
|       Exfiltration|1577965|
|      Weaponization|      6|
+-------------------+-------+



In [6]:
print("Número de registros: " + str(df.count()))
print("Número de columnas: " + str(len(df.columns)))

Número de registros: 9516837
Número de columnas: 44


In [7]:
partitioning_var1 = "Category"
partitioning_var2 = "IncidentGrade"

In [8]:
print(f"\nCalculando proporciones para combinaciones de '{partitioning_var1}' y '{partitioning_var2}'...")

total_count = df.count()
print(f"Total de registros en D: {total_count}")

partition_counts = df.groupBy(partitioning_var1, partitioning_var2).count()

partition_proportions = partition_counts.withColumn("proportion", F.col("count") / total_count)
partition_proportions.orderBy(F.col("count").desc()).show(truncate=False)


Calculando proporciones para combinaciones de 'Category' y 'IncidentGrade'...
Total de registros en D: 9516837
+------------------+--------------+-------+--------------------+
|Category          |IncidentGrade |count  |proportion          |
+------------------+--------------+-------+--------------------+
|InitialAccess     |TruePositive  |2266513|0.23815822420831628 |
|Exfiltration      |BenignPositive|1080813|0.11356851020985229 |
|InitialAccess     |BenignPositive|1063722|0.11177264042664595 |
|InitialAccess     |FalsePositive |954880 |0.10033585738623033 |
|CommandAndControl |BenignPositive|563544 |0.059215472535675454|
|SuspiciousActivity|BenignPositive|507279 |0.053303319159506465|
|Exfiltration      |FalsePositive |368866 |0.038759306269509505|
|SuspiciousActivity|TruePositive  |356286 |0.03743743851029496 |
|Impact            |BenignPositive|336977 |0.03540850809990757 |
|Impact            |TruePositive  |242524 |0.025483676982173804|
|Impact            |FalsePositive |171348 |

In [9]:
cond_p1 = (F.col("Category") == 'InitialAccess') & (F.col("IncidentGrade") == 'TruePositive')
cond_p2 = (F.col("Category") == 'Exfiltration') & (F.col("IncidentGrade") == 'BenignPositive')
cond_p3 = (F.col("Category") == 'InitialAccess') & (F.col("IncidentGrade") == 'BenignPositive')
cond_p4 = (F.col("Category") == 'InitialAccess') & (F.col("IncidentGrade") == 'FalsePositive')
cond_p5 = (F.col("Category") == 'CommandAndControl') & (F.col("IncidentGrade") == 'BenignPositive')
cond_p6 = (F.col("Category") == 'SuspiciousActivity') & (F.col("IncidentGrade") == 'BenignPositive')

prop_p1 = 0.23815822420831628
prop_p2 = 0.11356851020985229
prop_p3 = 0.11177264042664595
prop_p4 = 0.10033585738623033
prop_p5 = 0.059215472535675454
prop_p6 = 0.053303319159506465

In [10]:
partition_rules = [
    {
        "name": "P1_InitAccess_TruePos",
        "filter_expr": cond_p1,
        "est_proportion": prop_p1
    },
    {
        "name": "P2_Exfiltration_BenignPos",
        "filter_expr": cond_p2,
        "est_proportion": prop_p2
    },
    {
        "name": "P3_InitAccess_BenignPos",
        "filter_expr": cond_p3,
        "est_proportion": prop_p3
    },
    {
        "name": "P4_InitAccess_FalsePos",
        "filter_expr": cond_p4,
        "est_proportion": prop_p4
    },
    {
        "name": "P5_CmdCtrl_BenignPos",
        "filter_expr": cond_p5,
        "est_proportion": prop_p5
     },
     {
        "name": "P6_SuspAct_BenignPos",
        "filter_expr": cond_p6,
        "est_proportion": prop_p6
     }
]

print("Reglas de particionamiento definidas:")
total_prop_check = 0
for rule in partition_rules:
    print(f"- {rule['name']}: Estimación {rule['est_proportion']:.4%}")
    total_prop_check += rule['est_proportion']
print(f"\nSuma total de proporciones estimadas: {total_prop_check:.4f}")

Reglas de particionamiento definidas:
- P1_InitAccess_TruePos: Estimación 23.8158%
- P2_Exfiltration_BenignPos: Estimación 11.3569%
- P3_InitAccess_BenignPos: Estimación 11.1773%
- P4_InitAccess_FalsePos: Estimación 10.0336%
- P5_CmdCtrl_BenignPos: Estimación 5.9215%
- P6_SuspAct_BenignPos: Estimación 5.3303%

Suma total de proporciones estimadas: 0.6764


In [11]:
partition_dfs = {} # Diccionario para guardar los dataframes de cada partición

for rule in partition_rules:
    partition_name = rule['name']
    filter_condition = rule['filter_expr']

    print(f"\nFiltrando para la partición: {partition_name}")
    print(f"Condición: {filter_condition}")

    partition_df = df.filter(filter_condition)
    partition_dfs[partition_name] = partition_df

    print(f"Verificación para {partition_name}:")
    partition_df.show(5, truncate=False)
    count_partition = partition_df.count()
    print(f"Registros encontrados en {partition_name}: {count_partition}")


Filtrando para la partición: P1_InitAccess_TruePos
Condición: Column<'((Category = InitialAccess) AND (IncidentGrade = TruePositive))'>
Verificación para P1_InitAccess_TruePos:
+-------------+-----+----------+-------+----------+----------+-------------+---------------+-------------+-------------+--------------+-----------------+------------+--------+------+---------+------+----------+----------+---------------+-----------+----------+----------------+--------------+-----------+-----------------+-----------------+-------------+---------------+------------------+------------+--------+----------+--------------+------------+-----+--------+---------+-----------------+--------------+-----------+-----------+-----+-----+
|Id           |OrgId|IncidentId|AlertId|DetectorId|AlertTitle|Category     |MitreTechniques|IncidentGrade|ActionGrouped|ActionGranular|EntityType       |EvidenceRole|DeviceId|Sha256|IpAddress|Url   |AccountSid|AccountUpn|AccountObjectId|AccountName|DeviceName|NetworkMessageId|

In [20]:
# Definir tamaño objetivo por partición
tamano_por_particion = 1500

In [14]:
muestras_por_particion = []

# Iterar sobre cada partición creada previamente
for nombre, df_particion in partition_dfs.items():
    # Calcular cuántos registros hay por categoría
    conteo_cat = df_particion.groupBy("Category").count().collect()
    total_cat = sum(r["count"] for r in conteo_cat)

    # Calcular fracciones de muestreo por categoría
    fracciones = {
        r["Category"]: (tamano_por_particion * (r["count"] / total_cat)) / r["count"]
        for r in conteo_cat if r["count"] > 0
    }

    # Aplicar el muestreo estratificado
    muestra = df_particion.sampleBy("Category", fractions=fracciones, seed=42)

    # Agregar al conjunto de muestras
    muestras_por_particion.append(muestra)

In [21]:
# Unir todas las muestras en una sola muestra final M
muestra_final = muestras_por_particion[0]
for muestra in muestras_por_particion[1:]:
    muestra_final = muestra_final.union(muestra)

# Mostrar resumen
print(f"Número total de registros en la muestra final: {muestra_final.count()}")
muestra_final.groupBy("Category").count().show()

Número total de registros en la muestra final: 8958
+------------------+-----+
|          Category|count|
+------------------+-----+
|     InitialAccess| 4520|
|      Exfiltration| 1476|
| CommandAndControl| 1475|
|SuspiciousActivity| 1487|
+------------------+-----+



## Preparación de los datos

In [16]:
total_registros = 8958

In [26]:
from pyspark.sql.functions import col, when, count, mean, stddev, trim, sum as _sum
from pyspark.sql.types import NumericType

# Paso 1: Calcular porcentaje de valores nulos o vacíos por columna de forma eficiente
conteo_nulos = muestra_final.select([
    _sum(when(col(c).isNull() | (trim(col(c)) == ""), 1).otherwise(0)).alias(c)
    for c in muestra_final.columns
]).collect()[0].asDict()

# Calcular diccionario con porcentaje de nulos
porcentaje_nulos = {
    c: conteo_nulos[c] / total_registros
    for c in muestra_final.columns
}

In [24]:
print(type(porcentaje_nulos)) 

<class 'pyspark.sql.dataframe.DataFrame'>


In [27]:
# Paso 2: Eliminar columnas con más del 50% de valores nulos o vacíos
umbral_nulos = 0.5
columnas_validas = [c for c, p in porcentaje_nulos.items() if p <= umbral_nulos]
muestra_limpia = muestra_final.select(columnas_validas)


In [28]:
# Paso 3: Eliminar filas con valores nulos o vacíos restantes
muestra_limpia = muestra_limpia.replace("", None).na.drop()

In [29]:
# Paso 4: Convertir columnas numéricas a tipo double
numeric_cols = [f.name for f in muestra_limpia.schema.fields if isinstance(f.dataType, NumericType)]
for col_name in numeric_cols:
    muestra_limpia = muestra_limpia.withColumn(col_name, col(col_name).cast("double"))

In [30]:
# Verificar resultado final
print(f"Número de registros finales: {muestra_limpia.count()}")
muestra_limpia.show(5)

Número de registros finales: 8958
+-----------------+-----+----------+--------+----------+----------+-------------+-------------+-----------------+------------+--------+--------+---------+--------+----------+----------+---------------+-----------+----------+----------------+-----------+-----------------+-----------------+-------------+---------------+------------------+--------+----------+--------------+--------+---------+-----------+------+-------+
|               Id|OrgId|IncidentId| AlertId|DetectorId|AlertTitle|     Category|IncidentGrade|       EntityType|EvidenceRole|DeviceId|  Sha256|IpAddress|     Url|AccountSid|AccountUpn|AccountObjectId|AccountName|DeviceName|NetworkMessageId|RegistryKey|RegistryValueName|RegistryValueData|ApplicationId|ApplicationName|OAuthApplicationId|FileName|FolderPath|ResourceIdName|OSFamily|OSVersion|CountryCode| State|   City|
+-----------------+-----+----------+--------+----------+----------+-------------+-------------+-----------------+------------+

## Preparación del conjunto de entrenamiento y prueba

In [32]:
# 1. Filtrar registros con IncidentGrade no nulo
df_filtrado = muestra_limpia.filter(col("IncidentGrade").isNotNull())

# 2. Calcular el número total de registros
total_count = df_filtrado.count()

# 3. Obtener el conteo por clase
stratum_counts = df_filtrado.groupBy("IncidentGrade").count().collect()


In [33]:
# 4. Calcular fracciones para test (30%) de cada clase
stratum_fractions = {
    row["IncidentGrade"]: 0.3 * (row["count"] / total_count)
    for row in stratum_counts
}

print("Fracciones por clase:", stratum_fractions)

# 5. Aplicar muestreo estratificado
test_data = df_filtrado.sampleBy("IncidentGrade", fractions=stratum_fractions, seed=42)

# 6. Conjunto de entrenamiento: lo que no está en test
train_data = df_filtrado.exceptAll(test_data)

Fracciones por clase: {'TruePositive': 0.05170797052913597, 'BenignPositive': 0.19835900870730075, 'FalsePositive': 0.04993302076356329}


In [34]:
# 7. Mostrar resultados
print(f"Registros en entrenamiento: {train_data.count()}")
print(f"Registros en prueba: {test_data.count()}")

train_data.groupBy("IncidentGrade").count().show()
test_data.groupBy("IncidentGrade").count().show()

Registros en entrenamiento: 7683
Registros en prueba: 1275
+--------------+-----+
| IncidentGrade|count|
+--------------+-----+
|BenignPositive| 4784|
| FalsePositive| 1430|
|  TruePositive| 1469|
+--------------+-----+

+--------------+-----+
| IncidentGrade|count|
+--------------+-----+
|  TruePositive|   75|
|BenignPositive| 1139|
| FalsePositive|   61|
+--------------+-----+



## Construcción de modelos de aprendizaje supervisado y no supervisado

In [36]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 

# Indexar etiqueta (IncidentGrade)
indexer = StringIndexer(inputCol="IncidentGrade", outputCol="label")
train_indexed = indexer.fit(train_data).transform(train_data)
test_indexed = indexer.fit(train_data).transform(test_data)

# Seleccionar columnas numéricas como características
features_cols = [c for c in train_indexed.columns if c.endswith("_zscore")]
assembler = VectorAssembler(inputCols=features_cols, outputCol="features")

train_ready = assembler.transform(train_indexed)
test_ready = assembler.transform(test_indexed)


In [37]:
# Modelo
lr = LogisticRegression(featuresCol="features", labelCol="label")
modelo = lr.fit(train_ready)

# Predicciones
predicciones = modelo.transform(test_ready)

# Evaluación
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predicciones)
print(f"Accuracy: {accuracy}")

Accuracy: 0.9027450980392157


In [41]:
from pyspark.ml.feature import VectorAssembler, PCA
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# 1. Ensamblar las características
assembler = VectorAssembler(inputCols=features_cols, outputCol="features")
df_assembled = assembler.transform(muestra_limpia)

# 2. Aplicar PCA para reducir dimensionalidad
pca = PCA(k=5, inputCol="features", outputCol="pca_features")  # Puedes ajustar k
pca_model = pca.fit(df_assembled)
df_pca = pca_model.transform(df_assembled)

In [42]:
# 3. Aplicar KMeans sobre los datos transformados por PCA
kmeans = KMeans(featuresCol="pca_features", k=3, seed=42)
model = kmeans.fit(df_pca)
predictions = model.transform(df_pca)

# 4. Evaluar con Silhouette Score
evaluator = ClusteringEvaluator(featuresCol="pca_features", metricName="silhouette")
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score con PCA: {silhouette}")


Silhouette Score con PCA: 0.7435578891165091
