# Detección de anomalías basada en KMeans + distancia

Este análisis utiliza los datos de ayudas PAC 2024, realiza ingeniería de variables, entrena KMeans, calcula una puntuación de anomalía por distancia al cluster y por último genera un dataset enriquecido listo para publicarse en un espacio de datos.

- *FEADA* -> Fondo Europeo Agrícola de Garantía
- *FEADER* -> Fondo Europeo Agrícola de Desarrollo Rural
- *IMPORTECOFIN* -> Cofinanciación nacional/regional
- *FEADER_COFIN* -> Parte cofinanciada dentro del FEADER
- *IMPORTE_EUROS* -> Importe total findal de la ayuda

## Set up

In [None]:
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, CountVectorizer, RegexTokenizer
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark import StorageLevel

spark = SparkSession.builder.appName("Anomalias-KMeans").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/08 22:58:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
input_path = "data/espacio_datos/pac_2024_clean"
output_path = "data/espacio_datos/pac_2024_anomalias_kmeans/"

# Definir rango para buscar k
k_range = list(range(3, 5)) 
# Definir percentil para marcar anomalías (98% = top 2% más alejados)
pctil_of_anomaly = 0.98  

In [None]:
# Leer datos
df = spark.read.parquet(input_path)
df.show(20,truncate=False)

In [None]:
# Normalización de nulos numéricos
num_cols = ["FEAGA", "FEADER", "IMPORTECOFIN", "FEADER_COFIN", "IMPORTE_EUROS"]
fill_num = {c: 0.0 for c in num_cols}
df = df.fillna(fill_num)

# Aplicar log transform para reducir asimetría y estabilizar varianza
df = (df.withColumn("IMPORTE_ABS", F.abs(F.col("IMPORTE_EUROS")))
       .withColumn("IMPORTE_SIGN", F.when(F.col("IMPORTE_EUROS") < 0, -1.0).otherwise(1.0))
       .withColumn("IMPORTE_LOG", F.col("IMPORTE_SIGN") * F.log1p(F.col("IMPORTE_ABS")))
    )

## Feature engeeniring

### Crear ratios de composición

In [None]:
# Usar el total absoluto para que las proporciones sean interpretables incluso en recuperaciones
safe_den = F.when(F.col("IMPORTE_ABS") > 0, F.col("IMPORTE_ABS")).otherwise(F.lit(None))


df = (df.withColumn("FEAGA_ratio", F.col("FEAGA")/safe_den)
       .withColumn("FEADER_ratio", F.col("FEADER")/safe_den)
       .withColumn("COFIN_ratio", F.col("IMPORTECOFIN")/safe_den))

# Reemplazo de posibles nulls de ratio por 0.0
for c in ["FEAGA_ratio","FEADER_ratio","COFIN_ratio"]:
    df = df.withColumn(c, F.coalesce(F.col(c), F.lit(0.0)))

In [None]:
# Tokenizar etiqueta con multipes posibles (OE4|OE5|OE6)
splitter = RegexTokenizer(
    inputCol="OBJETIVO_ESP",
    outputCol="obj_esp_tokens",
    pattern="\\|",
    gaps=True
)
cv = CountVectorizer(inputCol="obj_esp_tokens", outputCol="obj_esp_vec", binary=True, minDF=2)

# One hot encode de variables categóricas
cat_cols = ["PROVINCIA_SAFE", "MEDIDA"]
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in cat_cols]
encoder = OneHotEncoder(
    inputCols=[f"{c}_idx" for c in cat_cols],
    outputCols=[f"{c}_oh" for c in cat_cols],
    handleInvalid="keep" 
)

# Variables muméricas
numeric_cols = ["IMPORTE_ABS", "IMPORTE_LOG", "IMPORTE_SIGN","FEAGA_ratio", "FEADER_ratio", "COFIN_ratio","IS_RECUP_ANY"
]
assembler = VectorAssembler(
    inputCols=["obj_esp_vec"] + [f"{c}_oh" for c in cat_cols] + numeric_cols,
    outputCol="features_raw",
    handleInvalid="keep")

scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=False)

## Spark Model

### Seleccionar K por silhouette 

In [None]:
# Pipeline reutilizable (sin KMeans)
pre_kmeans_stages = [splitter, cv] + indexers + [encoder, assembler, scaler]
pre_pipeline = Pipeline(stages=pre_kmeans_stages)
# Modelo
pre_model = pre_pipeline.fit(df)
df_prepared = pre_model.transform(df)

In [None]:
# Evaluar K en el 20%
df_eval = df_prepared.select("features").sample(False, 0.2, seed=42)
df_eval = df_eval.repartition(200).persist(StorageLevel.MEMORY_AND_DISK)
df_eval.count()

In [None]:
# Encontrar mejor k/score
best_k = None
best_score = float("-inf")
metrics = []
for k in k_range:
    kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=k, seed=17)
    model = kmeans.fit(df_eval)
    pred = model.transform(df_eval)
    evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="cluster", metricName="silhouette")
    score = evaluator.evaluate(pred)
    metrics.append((k, score))
    if score > best_score:
        best_score = score
        best_k = k

print(f"Mejor k por silhouette: {best_k} (score={best_score:.4f})")

### Entrenar modelo con best_k

In [None]:
kmeans_final = KMeans(featuresCol="features", predictionCol="cluster", k=best_k, seed=42)
full_pipeline = Pipeline(stages=pre_kmeans_stages + [kmeans_final])
full_model = full_pipeline.fit(df)
pred = full_model.transform(df)

## Añadir puntuación de anomalía

Se calcula de acuerdo a la distancia euclídea al centroide asignado. 


In [None]:
# Obtener centroides
kmeans_model = full_model.stages[-1]
centers = kmeans_model.clusterCenters()

# UDF para distancia euclídea entre vector denso y centroide
def euclidean_distance(v, center):
    if v is None:
        return None
    if not isinstance(v, DenseVector):
        v = DenseVector(v.toArray())
    return float(sum((v[i] - center[i])**2 for i in range(len(center))) ** 0.5)

dist_udf = F.udf(lambda v, cid: euclidean_distance(v, centers[cid]), T.DoubleType())

pred = pred.withColumn("dist_to_center", dist_udf(F.col("features"), F.col("cluster")))

### Z-score de distancia dentro de cada cluster

In [None]:
cluster_stats = (pred.groupBy("cluster")
                     .agg(F.mean("dist_to_center").alias("mu"), 
                          F.stddev_pop("dist_to_center").alias("sigma")))

pred = (pred.join(cluster_stats, on="cluster", how="left")
            .withColumn("anomaly_score", (F.col("dist_to_center") - F.col("mu")) / F.col("sigma")))

# Percentil global de anomaly_score para umbral
q = pred.approxQuantile("anomaly_score", [pctil_of_anomaly], 1e-3)[0]

pred = pred.withColumn("ANOMALIA", (F.col("anomaly_score") >= F.lit(q)).cast("int"))

## Guardar resultados en parquet



In [None]:
## Seleccionar columnas para outpur
out_cols = [
"BENEFICIARIO", "PROVINCIA_SAFE", "MUNICIPIO_COD", "MUNICIPIO_NOMBRE", "MEDIDA", "OBJETIVO_ESP",
"FEAGA", "FEADER", "IMPORTECOFIN", "FEADER_COFIN", "IMPORTE_EUROS",
"IMPORTE_ABS", "IMPORTE_LOG", "IMPORTE_SIGN",
"FEAGA_ratio", "FEADER_ratio", "COFIN_ratio",
"IS_RECUP_ANY", "cluster", "dist_to_center", "anomaly_score", "ANOMALIA"
]

final_df = pred.select(*out_cols)

In [None]:
(
    final_df
    .write.mode("overwrite")
    .coalesce(1)
    .parquet(output_path)
)