# Entrenamiento modelo para detección de Phising a partir de una URL

Importamos la librerías necesarias para la realización del proyecto

In [None]:
import os
import tempfile
from pyspark.sql import SparkSession

# Detectar automáticamente Java 17 o 11 (PySpark 4.1.0+ requiere Java 17+)
if 'JAVA_HOME' not in os.environ:
    try:
        # Intentar Java 17 primero (requerido para PySpark 4.1.0+)
        java_home_17 = os.popen('/usr/libexec/java_home -v 17 2>/dev/null').read().strip()
        if java_home_17:
            os.environ['JAVA_HOME'] = java_home_17
        else:
            # Si no hay Java 17, intentar Java 11
            java_home_11 = os.popen('/usr/libexec/java_home -v 11 2>/dev/null').read().strip()
            if java_home_11:
                os.environ['JAVA_HOME'] = java_home_11
    except:
        pass

# Path dinámico para warehouse (compatible con cualquier OS)
warehouse_dir = os.path.join(tempfile.gettempdir(), "spark-warehouse")
os.makedirs(warehouse_dir, exist_ok=True)

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Ejemplo pySparkSQL") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.sql.warehouse.dir", f"file://{warehouse_dir}") \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
%matplotlib inline 
from pyspark.sql import Row, DataFrame
import matplotlib.pyplot as plt
from pyspark.sql.functions import *
from pyspark.sql.functions import col
from functools import reduce
import matplotlib.pyplot as plt
from pyspark.sql.types import NumericType
from pyspark.mllib.evaluation import MulticlassMetrics
import matplotlib.pyplot as plt
import numpy as np
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

from pyspark.ml import *
from pyspark.ml.param import *
from pyspark.ml.tuning import *
from pyspark.ml.feature import *
from pyspark.ml.evaluation import *
from pyspark.ml.classification import *

Guardamos el path del csv

In [None]:
csv_path = os.path.join(os.getcwd(), "phishing_features.csv")
print(f"CSV path: {csv_path}")

Cargamos el csv en un dataframe e imprimimos el schema del mismo

In [None]:
df = spark.read.csv(csv_path, inferSchema=True, header=True)
print("Elementos en DataFrame a partir de datos/personas.csv: " + str(df.count()) + "\nEsquema: ")
print (df.printSchema())
type(df)

Lo primero que vamos a comprobar es si nuestro dataset está desbalanceado o no. Al tener un problema binario, lo ideal sería tener un 50% de ejemplos de cada clase. En el caso de que la diferencia sea muy obvia, tendremos que aplicar ciertas medidas para que no afecte al rendimiento del modelo

In [None]:
count_1 = df.filter(df['label'] == 1).count()
count_0 = df.filter(df['label'] == 0).count()
total = df.count()


perc_0 = (count_0 / total) * 100
perc_1 = (count_1 / total) * 100

print()

print(f"Benign (0): {count_0} records ({perc_0:.2f}%)")
print(f"Phishing (1): {count_1} records ({perc_1:.2f}%)")

Comprobamos que en efecto, nuestro dataset está desbalanceado

In [None]:
labels = ["Benign (0)", "Phishing (1)"]
percentages = [perc_0, perc_1]

plt.figure()
bars = plt.bar(labels, percentages)
plt.ylabel("Percentage (%)")
plt.title("Label distribution (%)")

# Añadir el texto del porcentaje encima de cada barra
for bar, perc in zip(bars, percentages):
    plt.text(
        bar.get_x() + bar.get_width() / 2,
        bar.get_height(),
        f"{perc:.2f}%",
        ha="center",
        va="bottom"
    )

plt.show()

Vamos a dejar pasar esto por ahora a ver que tal funciona una regresión logística con estos datos.

A continuación, vamos a buscar si nuestro dataset posee valores nulos. Muchos modelos no aceptan valores nulos en su entrenamiento. Si no los tratamos, probablemente nos salte un error y no podremos continuar hasta que lo solucionemos. Si, por lo que sea, el modelo acepta los valores nulos, estos pueden suponer un problema que afecta al rendimiento del modelo ya que perderemos información que puede ser valiosa en el entrenamiento o podemos caer en sesgos. Buscamos valores nulos en el dataset para tratarlo de forma adecuada.

In [None]:
condition = reduce(
    lambda a, b: a | b,
    [col(c).isNull() for c in df.columns]
)

n = df.filter(condition).count()
df.filter(condition).show(n, truncate=False)

Vemos que hay varios registros cuyo tld (dominio) es nulo. Vamos a recorrer todas las url cuyo tld sea nulo y vamos a asignarles su 

In [None]:
df = df.withColumn(
    "tld",
    when(col("tld").isNull(),    
        element_at(split(lit(col("url")), r"\."), -1).alias("tld")
    ).otherwise(col("tld"))
)

df = df.cache()
df.count()

df.filter((col("url") == 'google.com') | (col('url') == 'wikipedia.org') | (col('url') == 'safeexample99.net')).show(truncate=False)




Vemos en tres de URL antes filtradas que ya no tienen tld nulo

### Atención: es importante darle un valor numérico a cada dominio (los modelos no aceptan strings)

Vemos cuantos domonios y de cada tipo hay

In [None]:

from pyspark.sql.functions import countDistinct

unique_tlds = df.select("tld").distinct().count()
print(f"Total de TLDs únicos en el dataset: {unique_tlds}")

print("\nTLDs más frecuentes:")
tld_counts = df.groupBy("tld").count().orderBy(col("count").desc())
tld_counts.show(20, truncate=False)

print("\nNOTA: La conversión de TLD a numérico se realizará dentro de la Pipeline")
print("para asegurar que el StringIndexer aprenda solo del conjunto de entrenamiento.")

## Implementación de Modelos

Dividimos nuestros datos en train y test

In [None]:
seed = 12418
train, test_total = df.randomSplit([0.8, 0.2], seed=seed)

train.cache()
test_total.cache()

train_count = train.count() 
test_count_total = test_total.count()

print(f"Train count: {train_count}")
print(f"Test count: {test_count_total}")

In [None]:
val, test = df.randomSplit([0.5, 0.5], seed=seed)

val.cache()
test.cache()

val_count = val.count()
test_count = test.count()

print(f"Validation count: {val_count}")
print(f"Test count: {test_count}")


In [None]:
tld_indexer = StringIndexer(
    inputCol="tld",
    outputCol="tld_indexed", 
    handleInvalid="keep"  
)


feature_cols = ['url_length', 'num_dots', 'has_https', 'has_ip', 'num_subdirs', 
                'num_params', 'suspicious_words', 'tld_indexed', 'special_char_count', 
                'digits_count', 'entropy']

assembler = VectorAssembler(
    inputCols=feature_cols, 
    outputCol="features"
)

logistic_regression = LogisticRegression(
    maxIter=20, 
    regParam=0.01, 
    featuresCol='features', 
    labelCol='label'
)


pipeline = Pipeline(stages=[tld_indexer, assembler, logistic_regression])

print("Pipeline creada con los siguientes stages:")
print("1. StringIndexer (TLD -> numérico)")
print("2. VectorAssembler (features -> vector)")
print("3. LogisticRegression (modelo)")
print("\nEntrenando pipeline...")


pipeline_model = pipeline.fit(train)


tld_indexer_model = pipeline_model.stages[0]  
tld_labels = tld_indexer_model.labels
print("\nMapeo de TLD -> Índice numérico (aprendido del conjunto de entrenamiento):")
print("=" * 60)
for idx, tld in enumerate(tld_labels[:10]):  # Mostrar los primeros 10
    print(f"{tld:25s} -> {idx}")
if len(tld_labels) > 10:
    print(f"... y {len(tld_labels) - 10} TLDs más")
print(f"\nTotal de TLDs únicos aprendidos: {len(tld_labels)}")
print("=" * 60)

print("\nAplicando pipeline a datos de validación...")
prediction = pipeline_model.transform(val)


evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(prediction)
print(f"\nAccuracy: {accuracy:.4f}")


Vemos que nos da un accuracy muy bueno, sin embargo, este resultado es engañoso. Vamos a consultar la matriz de confusión para que está ocurriendo

In [None]:
def matrizConfusion(prediction):
    rdd = prediction.select("prediction", "label") \
        .rdd.map(lambda r: (float(r["prediction"]), float(r["label"])))

    metrics = MulticlassMetrics(rdd)
    cm = metrics.confusionMatrix().toArray()

    labels = np.unique(prediction.select("label").toPandas())

    plt.figure(figsize=(6,5))
    plt.imshow(cm, cmap='Blues')
    plt.colorbar()
    plt.xticks(range(len(labels)), labels)
    plt.yticks(range(len(labels)), labels)

    plt.xlabel("Predicción")
    plt.ylabel("Etiqueta real")
    plt.title("Matriz de confusión")

    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, int(cm[i, j]),
                    ha="center", va="center")

    plt.show()

    TN = cm[0,0]
    FP = cm[0,1]
    FN = cm[1,0]
    TP = cm[1,1]

    TPR = TP / (TP + FN) if (TP + FN) else 0.0   # recall clase 1
    TNR = TN / (TN + FP) if (TN + FP) else 0.0   # recall clase 0

    recall_medio = (TPR + TNR) / 2
    print(f"Recall medio (TPR+TNR)/2: {recall_medio:.4f}")

matrizConfusion(prediction)

Comprobamos que al ser un problema excesivamente desbalanceado, el accuracy no nos sirve para medir el rendimiento del modelo. En la matriz de confusión vemos que el modelo clasifica todos los registros como clase 1 (phising). Al ser la mayoría ejemplos de clase 1, el accuracy nos sale muy alto. Sin embargo, el modelo no está funcionando bien porque no es capaz de clasificar correctamente un ejemplo que no es phising.

Vamos a tomar ciertas medidas para balancear un poco los ejemplos del problema para ver si el rendimiento del modelo aumenta

### Undersampling

In [None]:
# Todo: MEDIDAS PARA TRATAR DESBALANCEO DE CLASES
phising_examples = train.filter(col("label") == 1)
non_phising_examples = train.filter(col("label") == 0)

print(f"Train original → phising: {phising_examples.count()}, no phising: {non_phising_examples.count()}")

fraction = non_phising_examples.count() / phising_examples.count()

majority_under = phising_examples.sample(
    withReplacement=False,
    fraction=fraction,
    seed=42
)

train_under = non_phising_examples.unionByName(majority_under)

print("Train balanceado:")
train_under.groupBy("label").count().show()

In [None]:
tld_indexer = StringIndexer(
    inputCol="tld",
    outputCol="tld_indexed", 
    handleInvalid="keep"  
)


feature_cols = ['url_length', 'num_dots', 'has_https', 'has_ip', 'num_subdirs', 
                'num_params', 'suspicious_words', 'tld_indexed', 'special_char_count', 
                'digits_count', 'entropy']

assembler = VectorAssembler(
    inputCols=feature_cols, 
    outputCol="features"
)

logistic_regression = LogisticRegression(
    maxIter=20, 
    regParam=0.01, 
    featuresCol='features', 
    labelCol='label'
)


pipeline = Pipeline(stages=[tld_indexer, assembler, logistic_regression])

print("Pipeline creada con los siguientes stages:")
print("1. StringIndexer (TLD -> numérico)")
print("2. VectorAssembler (features -> vector)")
print("3. LogisticRegression (modelo)")
print("\nEntrenando pipeline...")


pipeline_model = pipeline.fit(train_under)


tld_indexer_model = pipeline_model.stages[0]  
tld_labels = tld_indexer_model.labels
print("\nMapeo de TLD -> Índice numérico (aprendido del conjunto de entrenamiento):")
print("=" * 60)
for idx, tld in enumerate(tld_labels[:10]):  # Mostrar los primeros 10
    print(f"{tld:25s} -> {idx}")
if len(tld_labels) > 10:
    print(f"... y {len(tld_labels) - 10} TLDs más")
print(f"\nTotal de TLDs únicos aprendidos: {len(tld_labels)}")
print("=" * 60)

print("\nAplicando pipeline a datos de validación...")
prediction = pipeline_model.transform(val)

In [None]:
matrizConfusion(prediction)

Podemos observar el recall medio por clases mejora considerablemente (de 0.5 a 0.9980). Vemos que ahora hay 400 ejemplos de la clase no-phising que se están clasificando correctamente, es decir, ya no clasifica todo como clase phising. Todavía hay bastantes falsos negativos pero en general los resultados son bastante satisifactorios aplicando undersampling

### Oversampling

In [None]:
phising_examples = train.filter(col("label") == 1)
non_phising_examples = train.filter(col("label") == 0)

print(f"Train original → phising: {phising_examples.count()}, no phising: {non_phising_examples.count()}")

fraction = phising_examples.count() / non_phising_examples.count()

majority_under = non_phising_examples.sample(
    withReplacement=True,
    fraction=fraction,
    seed=42
)

train_over = phising_examples.unionByName(majority_under)

print("Train balanceado:")
train_over.groupBy("label").count().show()

In [None]:
tld_indexer = StringIndexer(
    inputCol="tld",
    outputCol="tld_indexed", 
    handleInvalid="keep"  
)


feature_cols = ['url_length', 'num_dots', 'has_https', 'has_ip', 'num_subdirs', 
                'num_params', 'suspicious_words', 'tld_indexed', 'special_char_count', 
                'digits_count', 'entropy']

assembler = VectorAssembler(
    inputCols=feature_cols, 
    outputCol="features"
)

logistic_regression = LogisticRegression(
    maxIter=20, 
    regParam=0.01, 
    featuresCol='features', 
    labelCol='label'
)

pipeline = Pipeline(stages=[tld_indexer, assembler, logistic_regression])

print("Pipeline creada con los siguientes stages:")
print("1. StringIndexer (TLD -> numérico)")
print("2. VectorAssembler (features -> vector)")
print("3. LogisticRegression (modelo)")
print("\nEntrenando pipeline...")

pipeline_model = pipeline.fit(train_over)

tld_indexer_model = pipeline_model.stages[0]  
tld_labels = tld_indexer_model.labels
print("\nMapeo de TLD -> Índice numérico (aprendido del conjunto de entrenamiento):")
print("=" * 60)
for idx, tld in enumerate(tld_labels[:10]):  # Mostrar los primeros 10
    print(f"{tld:25s} -> {idx}")
if len(tld_labels) > 10:
    print(f"... y {len(tld_labels) - 10} TLDs más")
print(f"\nTotal de TLDs únicos aprendidos: {len(tld_labels)}")
print("=" * 60)

print("\nAplicando pipeline a datos de validación...")
prediction = pipeline_model.transform(val)

In [None]:
matrizConfusion(prediction)

Vemos unos resultados bastante similares a los anteriores. El recall medio por clases sigue siendo de 0.9980. Los verdaderos negativos se mantienen igual, han aumentado ligeramente los falsos negativos y descendendido los verdaderos positivos. Por tanto, pese a que las diferencias son super pequeñas. Nos quedamos con la opción de undersampling.

# SVM

In [None]:
def SVM_Pipeline(train:DataFrame, regParam: float = 0.1 , maxIter: int = 50) -> PipelineModel:
    tld_indexer = StringIndexer(
        inputCol="tld",
        outputCol="tld_indexed",
        handleInvalid="keep"
    )

    assembler = VectorAssembler(
        inputCols=feature_cols, 
        outputCol="features"
    )
    
    scaler = StandardScaler(
        inputCol="features",
        outputCol="scaledFeatures",
        withMean=True,
        withStd=True
    )

    svm = LinearSVC(
        labelCol="label",
        featuresCol="scaledFeatures",
        maxIter=maxIter,
        regParam=regParam
    )

    svm_pipeline = Pipeline(stages=[
        tld_indexer,
        assembler,
        scaler,
        svm
    ])

    svm_model = svm_pipeline.fit(train)

    return svm_model

svm_model = SVM_Pipeline(train)
svm_predictions = svm_model.transform(test)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

f1_score = evaluator.evaluate(svm_predictions)
print(f"\nF1-Score: {f1_score:.4f}")

print("\nEjemplos de predicciones (primeras 10 filas):")
svm_predictions.select("url", "label", "prediction").show(10, truncate=False)

matrizConfusion(svm_predictions)

In [None]:
cm_df = svm_predictions.groupBy("label", "prediction").count()

cm_df.show()

# Arbol de decisión

In [None]:
def arbolDecision_Pipeline(train:DataFrame, maxDepth: int = 8, minInstancesPerNode: int = 20 ,maxBins:int = 32, seed:int = 12418  ) -> PipelineModel:
    """
    Arbol de Decisión con Pipeline para phising_url
    """
    tld_indexer = StringIndexer(
        inputCol="tld",
        outputCol="tld_indexed",
        handleInvalid="keep"
    )

    # transforma los indices en un vector binario, no sera necesario un VectorIndexer
    tld_ohe = OneHotEncoder(
        inputCol="tld_indexed",
        outputCol="tld_ohe",
        dropLast=True
    )

    feature_cols = ['url_length', 'num_dots', 'has_https', 'has_ip', 'num_subdirs', 
                    'num_params', 'suspicious_words',"tld_ohe", 'special_char_count', 
                    'digits_count', 'entropy']
    
    assembler_arbol = VectorAssembler(
        inputCols=feature_cols, 
        outputCol="features"
    )

    dt_arbol = DecisionTreeClassifier(
        labelCol="label",
        featuresCol="features",
        maxDepth=maxDepth,
        maxBins=maxBins,
        minInstancesPerNode=minInstancesPerNode, # para evitar overfitting
        seed=seed,
    )

    dt_pipeline = Pipeline(stages=[
        tld_indexer,
        tld_ohe,
        assembler_arbol,
        dt_arbol
    ])

    dt_model = dt_pipeline.fit(train)

    return dt_model

dt_model = arbolDecision_Pipeline(train)

dt_predictions = dt_model.transform(test)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

f1_score = evaluator.evaluate(dt_predictions)
print(f"\nF1-Score: {f1_score:.4f}")

print("\nEjemplos de predicciones (primeras 10 filas):")
dt_predictions.select("url", "label", "prediction").show(10, truncate=False)


matrizConfusion(dt_predictions)


In [None]:
cm_df = dt_predictions.groupBy("label", "prediction").count()

cm_df.show()