In [1]:
import os
# Java
os.environ['JAVA_HOME'] = '/usr/lib/jvm/zulu-8'
# Python del entorno
os.environ['PYSPARK_PYTHON'] = '/home/debian1/BigData_UPAO/bigdata_env/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/home/debian1/BigData_UPAO/bigdata_env/bin/python3'

# Archivos de configuración de Hadoop/YARN
os.environ['HADOOP_CONF_DIR'] = '/opt/hadoop-3.3.6/etc/hadoop'
os.environ['YARN_CONF_DIR'] = '/opt/hadoop-3.3.6/etc/hadoop'

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .appName("BetGol-Model")
        .master("yarn")
        .config("spark.driver.memory", "3g")
        .config("spark.executor.memory", "3g")
        .config("spark.executor.cores", "2")
        .config("spark.sql.shuffle.partitions", "150")
        .config("spark.sql.parquet.compression.codec", "snappy")
        .getOrCreate()
)

print("SparkSession iniciada correctamente")
print("Versión:", spark.version)
print("Master:", spark.sparkContext.master)


25/11/23 20:58:04 WARN Utils: Your hostname, vbox resolves to a loopback address: 127.0.1.1; using 192.168.18.70 instead (on interface enp0s3)
25/11/23 20:58:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/23 20:58:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/23 20:58:11 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


SparkSession iniciada correctamente
Versión: 3.5.1
Master: yarn


# Dividir train/test

In [2]:
from pyspark.sql.functions import col

df_ml_ready = spark.read.parquet("hdfs:///user/johan/data/processed/")

print("Dataset cargado correctamente desde PROCESSED.")
print("Filas totales:", df_ml_ready.count())


                                                                                

Dataset cargado correctamente desde PROCESSED.


[Stage 1:>                                                          (0 + 1) / 1]

Filas totales: 29348


                                                                                

## split

In [3]:
print("--- DIVISIÓN TRAIN / TEST (POR FECHA) ---")

FECHA_CORTE = "2024-01-01"

train_data = df_ml_ready.filter(col("fecha") < FECHA_CORTE)
test_data  = df_ml_ready.filter(col("fecha") >= FECHA_CORTE)

n_train = train_data.count()
n_test  = test_data.count()
total   = n_train + n_test

print(f"Fecha de Corte: {FECHA_CORTE}")
print(f"Entrenamiento: {n_train} partidos ({n_train/total:.1%})")
print(f"Prueba:        {n_test} partidos ({n_test/total:.1%})")


--- DIVISIÓN TRAIN / TEST (POR FECHA) ---


                                                                                

Fecha de Corte: 2024-01-01
Entrenamiento: 26164 partidos (89.2%)
Prueba:        3184 partidos (10.8%)


# Resultado del Partido (1X2)

* numTrees: Usamos árboles para que voten (mayor estabilidad).
* maxDepth: Profundidad media para capturar patrones sin memorizar (overfitting).
* seed=42: Semilla para que el resultado sea reproducible.

In [11]:
from pyspark.ml.classification import RandomForestClassifier
import time

print("--- ENTRENAMIENTO DEL MODELO (Random Forest) ---")


rf = RandomForestClassifier(
    labelCol="label_resultado",
    featuresCol="features",
    numTrees=70,
    maxDepth=15,
    seed=42
)

print("Iniciando entrenamiento con datos históricos...")
start_time = time.time()

rf_model = rf.fit(train_data)

end_time = time.time()
print(f"Modelo entrenado exitosamente en {end_time - start_time:.2f} segundos.")

--- ENTRENAMIENTO DEL MODELO (Random Forest) ---
Iniciando entrenamiento con datos históricos...


25/11/23 21:50:18 WARN DAGScheduler: Broadcasting large task binary with size 1296.2 KiB
25/11/23 21:50:20 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/11/23 21:50:22 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
25/11/23 21:50:25 WARN DAGScheduler: Broadcasting large task binary with size 6.4 MiB
25/11/23 21:50:28 WARN DAGScheduler: Broadcasting large task binary with size 1325.7 KiB
25/11/23 21:50:29 WARN DAGScheduler: Broadcasting large task binary with size 10.0 MiB
25/11/23 21:50:32 WARN DAGScheduler: Broadcasting large task binary with size 1834.6 KiB
25/11/23 21:50:34 WARN DAGScheduler: Broadcasting large task binary with size 14.8 MiB
25/11/23 21:50:36 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/11/23 21:50:40 WARN DAGScheduler: Broadcasting large task binary with size 21.2 MiB
25/11/23 21:50:43 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/11/23 21:50:47 WARN DAGScheduler: Broad

Modelo entrenado exitosamente en 57.17 segundos.


## Generar predicciones

* Transformar el set de prueba
* Esto genera las columnas 'prediction' (0.0, 1.0, 2.0) y 'probability' (Vector)

In [12]:
print("--- GENERANDO PREDICCIONES (Test Set) ---")

predicciones_rf = rf_model.transform(test_data)

print("Predicciones realizadas.")

print("\n--- Ejemplo de Predicciones (Probabilidades) ---")
predicciones_rf.select(
    "fecha", "equipo_local", "equipo_visitante",
    "label_resultado", "prediction", "probability"
).show(5, truncate=False)

--- GENERANDO PREDICCIONES (Test Set) ---
Predicciones realizadas.

--- Ejemplo de Predicciones (Probabilidades) ---


25/11/23 21:52:17 WARN DAGScheduler: Broadcasting large task binary with size 25.5 MiB
[Stage 107:>                                                        (0 + 1) / 1]

+----------+------------+----------------+---------------+----------+------------------------------------------------------------+
|fecha     |equipo_local|equipo_visitante|label_resultado|prediction|probability                                                 |
+----------+------------+----------------+---------------+----------+------------------------------------------------------------+
|2024-01-19|alaves      |cadiz           |0.0            |0.0       |[0.6026224239794182,0.20241173024367987,0.19496584577690196]|
|2024-02-03|alaves      |barcelona       |1.0            |1.0       |[0.23191948145912994,0.5269736647539378,0.24110685378693225]|
|2024-02-10|alaves      |villarreal      |2.0            |0.0       |[0.4200976565888217,0.29985853123584205,0.28004381217533625]|
|2024-02-24|alaves      |mallorca        |2.0            |0.0       |[0.4547889988824172,0.28440987748672736,0.2608011236308555] |
|2024-03-10|alaves      |vallecano       |0.0            |0.0       |[0.43100342073

                                                                                

Aplica el modelo a los datos de 2024–2025 y devuelve:

* prediction → La clase predicha (0 = H, 1 = D, 2 = A)
* probability → Vector de probabilidad [p_H, p_D, p_A]

## Evaluacion

### Accuracy (Exactitud)

In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

print("=== MÉTRICA 1: ACCURACY ===")

eval_accuracy = MulticlassClassificationEvaluator(
    labelCol="label_resultado",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = eval_accuracy.evaluate(predicciones_rf)
print(f"Accuracy del modelo: {accuracy:.4f}")

=== MÉTRICA 1: ACCURACY ===


25/11/23 21:52:40 WARN DAGScheduler: Broadcasting large task binary with size 25.5 MiB
[Stage 108:>                                                        (0 + 1) / 1]

Accuracy del modelo: 0.5223


                                                                                

### F1-Score (Weighted)

In [14]:
print("=== MÉTRICA 2: F1-SCORE (Weighted) ===")

eval_f1 = MulticlassClassificationEvaluator(
    labelCol="label_resultado",
    predictionCol="prediction",
    metricName="f1"
)

f1_score = eval_f1.evaluate(predicciones_rf)
print(f"F1-Score ponderado: {f1_score:.4f}")

=== MÉTRICA 2: F1-SCORE (Weighted) ===


25/11/23 21:52:49 WARN DAGScheduler: Broadcasting large task binary with size 25.5 MiB
[Stage 110:>                                                        (0 + 1) / 1]

F1-Score ponderado: 0.4664


                                                                                

### Log Loss

In [15]:
import pyspark.sql.functions as F
import math

print("=== MÉTRICA 3: LOG LOSS ===")

# UDF para calcular logloss por fila
def log_loss_udf(probabilities, label):
    p = probabilities[int(label)]
    p = max(min(p, 1 - 1e-15), 1e-15)  # evitar log(0)
    return float(-math.log(p))

logloss_udf = F.udf(log_loss_udf)

# Cálculo de Log Loss fila a fila
df_log = predicciones_rf.withColumn(
    "logloss_individual",
    logloss_udf(F.col("probability"), F.col("label_resultado"))
)

# Log Loss global
logloss_global = df_log.agg(F.mean("logloss_individual")).first()[0]

print(f"Log Loss del modelo: {logloss_global:.4f}")


=== MÉTRICA 3: LOG LOSS ===


25/11/23 21:52:54 WARN DAGScheduler: Broadcasting large task binary with size 25.5 MiB
[Stage 112:>                                                        (0 + 1) / 1]

Log Loss del modelo: 0.9840


                                                                                

### Matriz de confusion

In [16]:
print("=== MATRIZ DE CONFUSIÓN (H-D-A) ===")

predicciones_rf.groupBy(
    "label_resultado", "prediction"
).count().orderBy("label_resultado", "prediction").show()

=== MATRIZ DE CONFUSIÓN (H-D-A) ===


25/11/23 21:53:01 WARN DAGScheduler: Broadcasting large task binary with size 25.5 MiB
25/11/23 21:53:04 WARN DAGScheduler: Broadcasting large task binary with size 25.5 MiB
[Stage 117:>                                                        (0 + 1) / 1]

+---------------+----------+-----+
|label_resultado|prediction|count|
+---------------+----------+-----+
|            0.0|       0.0| 1076|
|            0.0|       1.0|  224|
|            0.0|       2.0|   50|
|            1.0|       0.0|  413|
|            1.0|       1.0|  541|
|            1.0|       2.0|   67|
|            2.0|       0.0|  522|
|            2.0|       1.0|  245|
|            2.0|       2.0|   46|
+---------------+----------+-----+



                                                                                

# Over2.5/Under2.5

## GBTClassifier

In [37]:
from pyspark.sql.functions import col, when

print("--- CREANDO LABEL OVER/UNDER 2.5 ---")

df_over = df_ml_ready.withColumn(
    "label_over25",
    when((col("goles_local") + col("goles_visitante")) >= 3, 1.0).otherwise(0.0)
)

df_over.select("goles_local", "goles_visitante", "label_over25").show(5)
print("Etiqueta binaria creada correctamente.")


--- CREANDO LABEL OVER/UNDER 2.5 ---
+-----------+---------------+------------+
|goles_local|goles_visitante|label_over25|
+-----------+---------------+------------+
|          1|              1|         0.0|
|          3|              1|         1.0|
|          1|              3|         1.0|
|          1|              3|         1.0|
|          0|              2|         0.0|
+-----------+---------------+------------+
only showing top 5 rows

Etiqueta binaria creada correctamente.


## División train/test

In [38]:
print("--- DIVISIÓN TRAIN / TEST (POR FECHA) ---")

FECHA_CORTE = "2024-01-01"

train_over = df_over.filter(col("fecha") < FECHA_CORTE)
test_over  = df_over.filter(col("fecha") >= FECHA_CORTE)

print(f"Train: {train_over.count()} filas")
print(f"Test:  {test_over.count()} filas")


--- DIVISIÓN TRAIN / TEST (POR FECHA) ---
Train: 26164 filas
Test:  3184 filas


## ENTRENAMIENTO DEL MODELO GBT (USANDO VECTOR features)

In [50]:
from pyspark.ml.classification import GBTClassifier
import time

print("--- ENTRENANDO MODELO GBT (Over/Under 2.5) ---")

gbt = GBTClassifier(
    labelCol="label_over25",
    featuresCol="features",
    maxIter=40,
    maxDepth=5,
    stepSize=0.1,
    subsamplingRate=0.8,
    seed=42
)

inicio = time.time()
gbt_model = gbt.fit(train_over)
fin = time.time()

print(f"Modelo entrenado en {fin - inicio:.2f} segundos.")


--- ENTRENANDO MODELO GBT (Over/Under 2.5) ---
Modelo entrenado en 16.00 segundos.


## Prediccion

In [51]:
print("--- GENERANDO PREDICCIONES (TEST SET) ---")
pred_over = gbt_model.transform(test_over)

pred_over.select(
    "fecha", "equipo_local", "equipo_visitante",
    "goles_local", "goles_visitante",
    "label_over25", "probability", "prediction"
).show(10, truncate=False)


--- GENERANDO PREDICCIONES (TEST SET) ---
+----------+------------+----------------+-----------+---------------+------------+----------------------------------------+----------+
|fecha     |equipo_local|equipo_visitante|goles_local|goles_visitante|label_over25|probability                             |prediction|
+----------+------------+----------------+-----------+---------------+------------+----------------------------------------+----------+
|2024-01-19|alaves      |cadiz           |1          |0              |0.0         |[0.6223013473745512,0.37769865262544877]|0.0       |
|2024-02-03|alaves      |barcelona       |1          |3              |1.0         |[0.3791256411405692,0.6208743588594308] |1.0       |
|2024-02-10|alaves      |villarreal      |1          |1              |0.0         |[0.4709661521518405,0.5290338478481595] |1.0       |
|2024-02-24|alaves      |mallorca        |1          |1              |0.0         |[0.5501286679154002,0.44987133208459984]|0.0       |
|2024-

## EVALUACIÓN (AUC ROC)

In [52]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

print("--- EVALUANDO MODELO (AUC ROC) ---")

evaluator = BinaryClassificationEvaluator(
    labelCol="label_over25",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(pred_over)
print(f"AUC del modelo: {auc:.4f}")


--- EVALUANDO MODELO (AUC ROC) ---
AUC del modelo: 0.6008


### Probab. del modelo

In [53]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

get_prob_over = udf(lambda v: float(v[1]), FloatType())

pred_probs = pred_over.withColumn("Prob_Modelo_Over", get_prob_over("probability"))

pred_probs.select(
    "fecha", "equipo_local", "equipo_visitante",
    "Prob_Modelo_Over", "prediction", "label_over25"
).show(10, truncate=False)


+----------+------------+----------------+----------------+----------+------------+
|fecha     |equipo_local|equipo_visitante|Prob_Modelo_Over|prediction|label_over25|
+----------+------------+----------------+----------------+----------+------------+
|2024-01-19|alaves      |cadiz           |0.37769866      |0.0       |0.0         |
|2024-02-03|alaves      |barcelona       |0.62087435      |1.0       |1.0         |
|2024-02-10|alaves      |villarreal      |0.52903384      |1.0       |0.0         |
|2024-02-24|alaves      |mallorca        |0.44987133      |0.0       |0.0         |
|2024-03-10|alaves      |vallecano       |0.35359052      |0.0       |0.0         |
|2024-03-31|alaves      |sociedad        |0.33087698      |0.0       |0.0         |
|2024-04-21|alaves      |ath madrid      |0.527472        |1.0       |0.0         |
|2024-04-27|alaves      |celta           |0.4200433       |0.0       |1.0         |
|2024-05-10|alaves      |girona          |0.63156617      |1.0       |1.0   

                                                                                

## ANÁLISIS DE VALUE BETTING SI HAY CUOTA MAS 2.5

In [54]:
from pyspark.sql.functions import col

if "cuota_mas_2_5" in df_over.columns:
    final_value = pred_probs.withColumn(
        "Prob_Impl_Casa", 1 / col("cuota_mas_2_5")
    ).withColumn(
        "Valor", col("Prob_Modelo_Over") - col("Prob_Impl_Casa")
    )

    print("\n--- TOP OPORTUNIDADES (Value Bets) ---")
    final_value.filter(col("Valor") > 0.05) \
        .select("fecha", "equipo_local", "equipo_visitante",
                "cuota_mas_2_5", "Prob_Modelo_Over", "Valor") \
        .orderBy(col("Valor").desc()) \
        .show(10)
else:
    print("\nNo existe la columna cuota_mas_2_5 en tu dataset, se omite Value Betting.")



--- TOP OPORTUNIDADES (Value Bets) ---


[Stage 2450:>                                                       (0 + 1) / 1]

+----------+------------+----------------+-------------+----------------+-------------------+
|     fecha|equipo_local|equipo_visitante|cuota_mas_2_5|Prob_Modelo_Over|              Valor|
+----------+------------+----------------+-------------+----------------+-------------------+
|2025-04-07|     bologna|          napoli|          2.5|      0.71690154|0.31690154075622556|
|2024-09-18|       betis|          getafe|          3.0|      0.64360535|0.31027201811472577|
|2025-05-10|    mallorca|      valladolid|          2.0|       0.7845036| 0.2845035791397095|
|2024-09-25|   barcelona|          getafe|         1.57|       0.9148822|0.27793950791571553|
|2025-10-26|        lyon|      strasbourg|         1.67|       0.8327308|0.23392843450614786|
|2025-08-29|       elche|         levante|          2.3|       0.6662629|0.23148031597552088|
|2024-05-11|    mallorca|      las palmas|         2.63|       0.5988938|0.21866568435734213|
|2025-02-26|nottm forest|         arsenal|          2.2|    

                                                                                

# Guardar modelos

## Modelo 1X2

In [55]:
rf_model.save("hdfs:///user/johan/modelos/modelo_1x2_rf/")

25/11/24 02:30:24 WARN TaskSetManager: Stage 2455 contains a task of very large size (12829 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [None]:
gbt_model.save("hdfs:///user/johan/modelos/modelo_over25_gbt/")

# Guardar predicciones

In [None]:
predicciones_rf.write.mode("overwrite").parquet("hdfs:///user/johan/salidas/predicciones_1x2/")

CSV NO soporta columnas complejas por eso:

In [61]:
from pyspark.ml.functions import vector_to_array
predicciones_rf_conv = predicciones_rf.withColumn(
    "prob_array", vector_to_array("probability")
)

In [62]:
predicciones_rf_csv = predicciones_rf_conv.select(
    "fecha",
    "equipo_local",
    "equipo_visitante",
    "label_resultado",
    "prediction",
    col("prob_array")[0].alias("prob_local"),
    col("prob_array")[1].alias("prob_empate"),
    col("prob_array")[2].alias("prob_visita")
)

In [64]:
predicciones_rf_csv.show(5)

25/11/24 02:59:07 WARN DAGScheduler: Broadcasting large task binary with size 25.5 MiB
[Stage 2467:>                                                       (0 + 1) / 1]

+----------+------------+----------------+---------------+----------+-------------------+-------------------+-------------------+
|     fecha|equipo_local|equipo_visitante|label_resultado|prediction|         prob_local|        prob_empate|        prob_visita|
+----------+------------+----------------+---------------+----------+-------------------+-------------------+-------------------+
|2024-01-19|      alaves|           cadiz|            0.0|       0.0| 0.6026224239794182|0.20241173024367987|0.19496584577690196|
|2024-02-03|      alaves|       barcelona|            1.0|       1.0|0.23191948145912994| 0.5269736647539378|0.24110685378693225|
|2024-02-10|      alaves|      villarreal|            2.0|       0.0| 0.4200976565888217|0.29985853123584205|0.28004381217533625|
|2024-02-24|      alaves|        mallorca|            2.0|       0.0| 0.4547889988824172|0.28440987748672736| 0.2608011236308555|
|2024-03-10|      alaves|       vallecano|            0.0|       0.0| 0.4310034207307532|0

                                                                                

In [63]:
predicciones_rf_csv.write.mode("overwrite").csv("hdfs:///user/johan/salidas_csv/predicciones_1x2/")


25/11/24 02:56:15 WARN DAGScheduler: Broadcasting large task binary with size 25.7 MiB
                                                                                

## Modelo Over/Under

In [65]:
pred_probs.write.mode("overwrite").parquet("hdfs:///user/johan/salidas/predicciones_over25/")

                                                                                

In [67]:
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col

pred_probs_clean = pred_probs.withColumn(
    "prob_array", vector_to_array(col("probability"))
)

In [68]:
pred_over_csv = pred_probs_clean.select(
    "fecha",
    "equipo_local",
    "equipo_visitante",
    "goles_local",
    "goles_visitante",
    "label_over25",
    "prediction",
    col("prob_array")[1].alias("prob_over25")
)

In [71]:
pred_over_csv.show(5)

+----------+------------+----------------+-----------+---------------+------------+----------+-------------------+
|     fecha|equipo_local|equipo_visitante|goles_local|goles_visitante|label_over25|prediction|        prob_over25|
+----------+------------+----------------+-----------+---------------+------------+----------+-------------------+
|2024-01-19|      alaves|           cadiz|          1|              0|         0.0|       0.0|0.37769865262544877|
|2024-02-03|      alaves|       barcelona|          1|              3|         1.0|       1.0| 0.6208743588594308|
|2024-02-10|      alaves|      villarreal|          1|              1|         0.0|       1.0| 0.5290338478481595|
|2024-02-24|      alaves|        mallorca|          1|              1|         0.0|       0.0|0.44987133208459984|
|2024-03-10|      alaves|       vallecano|          1|              0|         0.0|       0.0| 0.3535905231939943|
+----------+------------+----------------+-----------+---------------+----------

In [70]:
pred_over_csv.write.mode("overwrite").csv(
    "hdfs:///user/johan/salidas_csv/predicciones_over25/",
    header=True
)
