[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/fisamz/Repositorio_MCDAA/blob/main/Tarea4_5/Tarea4_5.ipynb)

# Tarea 4 y 5 — MLlib PySpark
**Alumno:** Fisam Zavala  
**Dataset:** Resultados de futbol & momios de casas de apuestas.  
**Fuente:** [European Soccer Database](https://www.kaggle.com/datasets/hugomathien/soccer)


In [3]:
#%pip install pyspark
import pyspark
pyspark.__version__

'4.1.1'


## Objetivo

El objetivo de esta práctica es construir un modelo de clasificación binaria utilizando MLlib de Apache Spark, con el fin de predecir la probabilidad de victoria del equipo local en partidos de fútbol profesional, empleando información disponible antes del inicio del encuentro.

---



In [None]:
# (Opcional / referencia) Descarga desde Kaggle:
# !pip install kaggle
# !kaggle datasets download -d hugomathien/soccer
# !unzip soccer.zip -d data/

#import sqlite3, pandas as pd

#conn = sqlite3.connect("../data/database.sqlite")

#df_match = pd.read_sql("SELECT * FROM Match", conn)
#df_match.to_csv("../data/match.csv", index=False)

#conn.close()


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Tarea4_MLlib") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")


In [5]:
from pyspark.sql import functions as F

df_match = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("../data/match.csv")
)

print("Filas:", df_match.count())
df_match.printSchema()
df_match.show(5, truncate=False)


                                                                                

Filas: 27383
root
 |-- id: string (nullable = true)
 |-- country_id: string (nullable = true)
 |-- league_id: integer (nullable = true)
 |-- season: string (nullable = true)
 |-- stage: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- match_api_id: integer (nullable = true)
 |-- home_team_api_id: integer (nullable = true)
 |-- away_team_api_id: integer (nullable = true)
 |-- home_team_goal: integer (nullable = true)
 |-- away_team_goal: integer (nullable = true)
 |-- home_player_X1: double (nullable = true)
 |-- home_player_X2: double (nullable = true)
 |-- home_player_X3: double (nullable = true)
 |-- home_player_X4: double (nullable = true)
 |-- home_player_X5: double (nullable = true)
 |-- home_player_X6: double (nullable = true)
 |-- home_player_X7: double (nullable = true)
 |-- home_player_X8: double (nullable = true)
 |-- home_player_X9: double (nullable = true)
 |-- home_player_X10: double (nullable = true)
 |-- home_player_X11: double (nullable = true)
 |--

## Preparación de datos

Se utilizó el conjunto de datos *European Soccer Database*, del cual se extrajo la información correspondiente a partidos y momios de distintas casas de apuestas.

A partir de los momios decimales, se calcularon probabilidades implícitas normalizadas para eliminar el margen de las casas de apuestas. Posteriormente, se promediaron las probabilidades de distintas fuentes con el objetivo de obtener un estimador más robusto del mercado.

Asimismo, se incorporaron variables contextuales como temporada, jornada, liga, país y equipos participantes.

Las observaciones con valores faltantes o inconsistentes fueron eliminadas para garantizar la calidad de los datos.

---

## Variable respuesta

La variable objetivo (`label`) se definió como:

- `label = 1` si el equipo local ganó el partido.
- `label = 0` en caso contrario.

Esta definición permitió formular el problema como una tarea de clasificación binaria.

---

## Construcción de variables predictoras

Las principales variables utilizadas fueron:

- Probabilidades promedio de victoria local, empate y visita.
- Diferencia de favoritismo (`fav_strength`).
- Año de temporada.
- Jornada del torneo.
- Identificadores de liga, país y equipos.

Estas variables representan información disponible antes del inicio del partido, evitando el uso de variables posteriores al resultado.

---

In [6]:
from pyspark.sql.functions import col

# ============ 1. Crear label ============
df = df_match.withColumn(
    "label",
    (col("home_team_goal") > col("away_team_goal")).cast("int")
)

# ============ 2. Función prob implícita ============
def implied_prob(df, h, d, a, prefix):

    df = df.withColumn(f"{prefix}_ph", 1/col(h)) \
           .withColumn(f"{prefix}_pd", 1/col(d)) \
           .withColumn(f"{prefix}_pa", 1/col(a))

    df = df.withColumn(
        f"{prefix}_sum",
        col(f"{prefix}_ph")+col(f"{prefix}_pd")+col(f"{prefix}_pa")
    )

    df = df.withColumn(f"{prefix}_ph", col(f"{prefix}_ph")/col(f"{prefix}_sum")) \
           .withColumn(f"{prefix}_pd", col(f"{prefix}_pd")/col(f"{prefix}_sum")) \
           .withColumn(f"{prefix}_pa", col(f"{prefix}_pa")/col(f"{prefix}_sum"))

    return df


# ============ 3. Aplicar a casas ============
df = implied_prob(df, "B365H","B365D","B365A","b365")
df = implied_prob(df, "BWH","BWD","BWA","bw")
df = implied_prob(df, "IWH","IWD","IWA","iw")
df = implied_prob(df, "WHH","WHD","WHA","wh")


# ============ 4. Promedio ============
df = df.withColumn(
    "p_home_avg",
    (col("b365_ph")+col("bw_ph")+col("iw_ph")+col("wh_ph"))/4
).withColumn(
    "p_draw_avg",
    (col("b365_pd")+col("bw_pd")+col("iw_pd")+col("wh_pd"))/4
).withColumn(
    "p_away_avg",
    (col("b365_pa")+col("bw_pa")+col("iw_pa")+col("wh_pa"))/4
)

# Feature extra
df = df.withColumn(
    "fav_strength",
    col("p_home_avg") - col("p_away_avg")
)


In [7]:
df.select(
    "p_home_avg",
    "p_draw_avg",
    "p_away_avg",
    "fav_strength",
    "label"
).show(5)


+-------------------+-------------------+-------------------+--------------------+-----+
|         p_home_avg|         p_draw_avg|         p_away_avg|        fav_strength|label|
+-------------------+-------------------+-------------------+--------------------+-----+
| 0.5134253755813658|0.27204496342190043|0.21452966099673376| 0.29889571458463204|    0|
| 0.4793486838444711|0.27564460780362987|0.24500670835189906| 0.23434197549257205|    0|
|0.36347576467707754| 0.2768749038016514|0.35964933152127115|0.003826433155806...|    0|
| 0.6302556885423144|0.23262589986618726|0.13711841159149835| 0.49313727695081605|    1|
|0.19870182784368112| 0.2617382683146468| 0.5395599038416721|-0.34085807599799095|    0|
+-------------------+-------------------+-------------------+--------------------+-----+
only showing top 5 rows


In [13]:
from pyspark.sql import functions as F

# Convierte a entero SOLO si es numérico. Si no, deja NULL.
def to_int_safe(c):
    return F.when(F.col(c).cast("string").rlike(r"^-?\d+$"), F.col(c).cast("int")).otherwise(F.lit(None))

df2 = (
    df
    .withColumn("season_year", F.substring("season", 1, 4).cast("int"))
    .withColumn("stage", to_int_safe("stage"))
    .withColumn("league_id", to_int_safe("league_id"))
    .withColumn("country_id", to_int_safe("country_id"))
    .withColumn("home_team_api_id", to_int_safe("home_team_api_id"))
    .withColumn("away_team_api_id", to_int_safe("away_team_api_id"))
)

feature_cols = [
    "p_home_avg","p_draw_avg","p_away_avg","fav_strength",
    "season_year","stage","league_id","country_id",
    "home_team_api_id","away_team_api_id"
]

df2 = df2.dropna(subset=feature_cols + ["label"])

print("Filas para modelar:", df2.count())

df2.select(*(feature_cols + ["label"])).show(10, truncate=False)


Filas para modelar: 22481
+-------------------+-------------------+-------------------+---------------------+-----------+-----+---------+----------+----------------+----------------+-----+
|p_home_avg         |p_draw_avg         |p_away_avg         |fav_strength         |season_year|stage|league_id|country_id|home_team_api_id|away_team_api_id|label|
+-------------------+-------------------+-------------------+---------------------+-----------+-----+---------+----------+----------------+----------------+-----+
|0.5134253755813658 |0.27204496342190043|0.21452966099673376|0.29889571458463204  |2008       |1    |1        |1         |9987            |9993            |0    |
|0.4793486838444711 |0.27564460780362987|0.24500670835189906|0.23434197549257205  |2008       |1    |1        |1         |10000           |9994            |0    |
|0.36347576467707754|0.2768749038016514 |0.35964933152127115|0.0038264331558063858|2008       |1    |1        |1         |9984            |8635            |0  

## Modelo predictivo

Se empleó un modelo de Regresión Logística implementado en MLlib, integrado dentro de un Pipeline que incluye:

- VectorAssembler para la creación del vector de características.
- LogisticRegression para el entrenamiento del modelo.

Los datos se dividieron en conjuntos de entrenamiento (80%) y prueba (20%) para evaluar el desempeño del modelo.

---

## Evaluación del modelo

El desempeño del modelo se evaluó utilizando el área bajo la curva ROC (AUC), obteniendo un valor aproximado de 0.71.

Este resultado indica una capacidad adecuada del modelo para discriminar entre partidos ganados y no ganados por el equipo local, considerando distintos umbrales de clasificación.

Adicionalmente, se calculó la métrica de exactitud (accuracy) como referencia complementaria.

In [10]:
train, test = df2.randomSplit([0.8, 0.2], seed=123)
print(train.count(), test.count())




18003 4478


                                                                                

In [11]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=50,
    regParam=0.1,      # regularización (evita overfit)
    elasticNetParam=0  # 0 = ridge, 1 = lasso
)

pipeline = Pipeline(stages=[assembler, lr])

model = pipeline.fit(train)
pred = model.transform(test)
pred.select("label","probability","prediction").show(10, truncate=False)


                                                                                

+-----+----------------------------------------+----------+
|label|probability                             |prediction|
+-----+----------------------------------------+----------+
|1    |[0.3042460377103243,0.6957539622896757] |1.0       |
|1    |[0.2873063291971304,0.7126936708028696] |1.0       |
|1    |[0.4784801258786478,0.5215198741213523] |1.0       |
|0    |[0.4232250284279594,0.5767749715720406] |1.0       |
|1    |[0.37521204164362937,0.6247879583563707]|1.0       |
|1    |[0.6716506222016974,0.32834937779830264]|0.0       |
|0    |[0.48969962573942377,0.5103003742605763]|1.0       |
|1    |[0.8180311439027818,0.18196885609721825]|0.0       |
|0    |[0.692287362156348,0.307712637843652]   |0.0       |
|1    |[0.39341008224261115,0.6065899177573888]|1.0       |
+-----+----------------------------------------+----------+
only showing top 10 rows


In [12]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(pred)
print("AUC:", auc)


                                                                                

AUC: 0.708817655804235


In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

acc = acc_eval.evaluate(pred)
print("Accuracy:", acc)


[Stage 63:>                                                         (0 + 8) / 8]

Accuracy: 0.6523001339883877


                                                                                