# Práctica 1

## Importaciones

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import month, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

## Inicio de sesión en Spark

In [11]:
# Inicio de la sesión de Spark.
spark = SparkSession.builder \
    .appName("MarathonRegression") \
    .getOrCreate()

## Visualización inicial de los datos

### Carga del dataset

In [12]:
# Cargar el dataset.
df = spark.read.csv(r"C:\Users\Usuario\Desktop\ALEX\GCED\7º cuatri\aprendizaje automático a gran escala\run_ww_2020_d.csv", header=True, inferSchema=True)

### Muestra de las cinco primeras filas y del esquema

In [13]:
# Mostrar las cinco primeras filas.
df.show(5, truncate=False)

# Mostrar esquema de columnas.
df.printSchema()

+---+----------+-------+--------+------------------+------+---------+--------------+-----------------------+
|_c0|datetime  |athlete|distance|duration          |gender|age_group|country       |major                  |
+---+----------+-------+--------+------------------+------+---------+--------------+-----------------------+
|0  |2020-01-01|0      |0.0     |0.0               |F     |18 - 34  |United States |CHICAGO 2019           |
|1  |2020-01-01|1      |5.72    |31.633333333333333|M     |35 - 54  |Germany       |BERLIN 2016            |
|2  |2020-01-01|2      |0.0     |0.0               |M     |35 - 54  |United Kingdom|LONDON 2018,LONDON 2019|
|3  |2020-01-01|3      |0.0     |0.0               |M     |18 - 34  |United Kingdom|LONDON 2017            |
|4  |2020-01-01|4      |8.07    |38.61666666666667 |M     |35 - 54  |United States |BOSTON 2017            |
+---+----------+-------+--------+------------------+------+---------+--------------+-----------------------+
only showing top 5 

### Muestra de filas y variables y principales estadísticas del dataset

In [14]:
# Número de filas y variables.
print(f"Filas: {df.count()}, Variables: {len(df.columns)}")

# Estadísticas descriptivas de variables numéricas.
df.describe(['distance', 'duration']).show()

# Distribución por género y grupo de edad.
df.groupBy("gender").count().show()
df.groupBy("age_group").count().show()

Filas: 13326792, Variables: 9
+-------+-----------------+------------------+
|summary|         distance|          duration|
+-------+-----------------+------------------+
|  count|         13326792|          13326792|
|   mean|3.864718473881684| 21.39197543564929|
| stddev|6.661547347662435| 39.27358918572176|
|    min|              0.0|               0.0|
|    max|           347.95|2299.9666666666667|
+-------+-----------------+------------------+

+------+--------+
|gender|   count|
+------+--------+
|     F| 3253374|
|     M|10073418|
+------+--------+

+---------+-------+
|age_group|  count|
+---------+-------+
|     55 +| 940254|
|  35 - 54|7905966|
|  18 - 34|4480572|
+---------+-------+



## Preparación del dataset

### Creación de la variable "season" a partir de la datetime

In [15]:
df = df.withColumn("month", month("datetime"))

df = df.withColumn(
    "season",
    when((df.month >= 3) & (df.month <= 5), "spring")
    .when((df.month >= 6) & (df.month <= 8), "summer")
    .when((df.month >= 9) & (df.month <= 11), "autumn")
    .otherwise("winter"))

### Filtrado de filas

Como tenemos un gran número de filas, vamos a filtrar. Primero, nos quedamos solo con los registros que no estén vacíos, es decir, aquellos cuya duración y distancia sea mayor a 0.

In [16]:
# Filtrado de filas.
df_filtered = df.filter((df.duration > 0) & (df.distance > 0))

# Comprobar tamaño.
print(f"Filas tras filtrar: {df_filtered.count()}")

Filas tras filtrar: 4581764


Como seguimos teniendo gran número de filas, volvemos a filtrar. Ahora vamos a filtrar por número de atletas. Inicialmente tenemos 36.7k atletas. Vamos a probar con cuantos nos quedamos con el número de filas más adecuado.

In [17]:
# Filtrar atletas con ID < 5000.
df_filtered_5000 = df_filtered.filter(df_filtered.athlete < 5000)
print(f"Filas con athlete < 5000: {df_filtered_5000.count()}")

# Filtrar atletas con ID < 8000.
df_filtered_8000 = df_filtered.filter(df_filtered.athlete < 8000)
print(f"Filas con athlete < 8000: {df_filtered_8000.count()}")

# Filtrar atletas con ID < 10000.
df_filtered_10000 = df_filtered.filter(df_filtered.athlete < 10000)
print(f"Filas con athlete < 10000: {df_filtered_10000.count()}")

# Filtrar atletas con ID < 15000.
df_final = df_filtered.filter(df_filtered.athlete < 15000)
print(f"Filas con athlete < 15000: {df_final.count()}")

# Filtrar atletas con ID < 20000.
df_filtered_20000 = df_filtered.filter(df_filtered.athlete < 20000)
print(f"Filas con athlete < 20000: {df_filtered_20000.count()}")

Filas con athlete < 5000: 629313
Filas con athlete < 8000: 1001656
Filas con athlete < 10000: 1255519
Filas con athlete < 15000: 1878765
Filas con athlete < 20000: 2483092


Nos quedamos con 15000 atletas (1878765 filas), ya que creemos que es el valor más adecuado para lograr un equilibrio entre cantidad para un correcto aprendizaje y velocidad de procesamiento. 

### Muestra de filas y variables y principales estadísticas del dataset filtrado

In [18]:
# Número de filas y variables.
print(f"Filas: {df_final.count()}, Variables: {len(df_final.columns)}")

# Estadísticas descriptivas de variables numéricas.
df_final.describe(['distance', 'duration']).show()

# Distribución por género y grupo de edad.
df_final.groupBy("gender").count().show()
df_final.groupBy("age_group").count().show()

Filas: 1878765, Variables: 11
+-------+------------------+--------------------+
|summary|          distance|            duration|
+-------+------------------+--------------------+
|  count|           1878765|             1878765|
|   mean|11.337875722616882|    62.6373797680926|
| stddev|  6.85062966629291|  44.442787948108126|
|    min|              0.01|0.016666666666666666|
|    max|            263.37|              2202.0|
+-------+------------------+--------------------+

+------+-------+
|gender|  count|
+------+-------+
|     F| 466030|
|     M|1412735|
+------+-------+

+---------+-------+
|age_group|  count|
+---------+-------+
|     55 +| 138997|
|  35 - 54|1123696|
|  18 - 34| 616072|
+---------+-------+



### Seleccionamos solo las columnas relevantes

In [19]:
# Seleccionamos las columnas que no son datetime o major.
df_final = df_final.select("athlete", "distance", "duration", "gender", "age_group", "country", "season")

In [28]:
df_final = df_final.dropna(subset=["athlete", "distance", "duration", "gender", "age_group", "country", "season"])

## Preparación de los datos para el entrenamiento

### Convertimos los datos categóricos en vectores one-hot.

In [29]:
# Convertimos las columnas de tipo categórico en índices numéricos. 
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")
age_indexer = StringIndexer(inputCol="age_group", outputCol="age_index")
country_indexer = StringIndexer(inputCol="country", outputCol="country_index")
season_indexer = StringIndexer(inputCol="season", outputCol="season_index")

# Convertimos los índices numéricos del paso anterior en vectores one-hot.
encoder = OneHotEncoder(inputCols=["gender_index", "age_index", "country_index", "season_index"],
                        outputCols=["gender_vec", "age_vec", "country_vec", "season_vec"])

### Combinamos las columnas numéricas en un solo vector.

In [30]:
# Creamos un vector features que combine todas las variables numéricas.
assembler = VectorAssembler(inputCols=["distance", "gender_vec", "age_vec", "country_vec", "season_vec"],
                            outputCol="features")

### Creamos un pipeline para aplicar todas las transformaciones

In [31]:
# Creamos el Pipeline con todas las etapas.
pipeline = Pipeline(stages=[gender_indexer, age_indexer, country_indexer, season_indexer, encoder, assembler])

# Ajustamos el pipeline y transformamos los datos.
df_prepared = pipeline.fit(df_final).transform(df_final)

### División de los datos en entrenamiento y test

In [None]:
# Obtenemos los atletas.
athletes = df_final.select("athlete").distinct()

# Asignamos aleatoriamente el 80% de atletas a train y el 20% a test.
train_athletes, test_athletes = athletes.randomSplit([0.8, 0.2], seed=42)

# Filtramos filas según atletas asignados.
train_df = df_prepared.join(train_athletes, on="athlete", how="inner")
test_df = df_prepared.join(test_athletes, on="athlete", how="inner")

# Mostramos el número de filas en train y test.
print(f"Train: {train_df.count()} filas, Test: {test_df.count()} filas")

## Creación de los modelos

### Modelo Regresión Lineal

In [24]:
from pyspark.sql.functions import col

# Definir la columna objetivo
df_model = df_prepared.withColumnRenamed("duration", "label")

# Comprobamos
df_model.select("features", "label").show(5, truncate=False)


+-------------------------------------------+------------------+
|features                                   |label             |
+-------------------------------------------+------------------+
|(113,[0,1,2,6,112],[5.72,1.0,1.0,1.0,1.0]) |31.633333333333333|
|(113,[0,1,2,4,112],[8.07,1.0,1.0,1.0,1.0]) |38.61666666666667 |
|(113,[0,2,4,112],[10.09,1.0,1.0,1.0])      |43.56666666666667 |
|(113,[0,1,4,112],[9.82,1.0,1.0,1.0])       |50.53333333333333 |
|(113,[0,1,2,5,112],[10.05,1.0,1.0,1.0,1.0])|59.05             |
+-------------------------------------------+------------------+
only showing top 5 rows


In [27]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Asegurarnos de que la columna 'label' existe
train_df = train_df.withColumnRenamed("duration", "label")
test_df = test_df.withColumnRenamed("duration", "label")

# Crear el modelo de regresión lineal
lr = LinearRegression(featuresCol="features", labelCol="label")

# Entrenar el modelo
lr_model = lr.fit(train_df)

# Generar predicciones sobre el conjunto de test
lr_predictions = lr_model.transform(test_df)

# Evaluador para regresión
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

# Calcular métricas
rmse = evaluator.setMetricName("rmse").evaluate(lr_predictions)
r2 = evaluator.setMetricName("r2").evaluate(lr_predictions)

# Mostrar resultados
print("=== Linear Regression ===")
print(f"RMSE: {rmse:.2f}")
print(f"R²: {r2:.4f}")

# (Opcional) Mostrar algunas predicciones reales vs. predichas
lr_predictions.select("athlete", "label", "prediction").show(10, truncate=False)


Py4JJavaError: An error occurred while calling o575.fit.
: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] User defined function (`StringIndexerModel$$Lambda$5161/0x00000114112b2a58`: (string) => double) failed due to: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.. SQLSTATE: 39000
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:195)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:266)
	at org.apache.spark.sql.classic.Dataset.materializedRdd$lzycompute(Dataset.scala:1578)
	at org.apache.spark.sql.classic.Dataset.materializedRdd(Dataset.scala:1576)
	at org.apache.spark.sql.classic.Dataset.$anonfun$rdd$1(Dataset.scala:1586)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withNewRDDExecutionId$1(Dataset.scala:2221)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
	at org.apache.spark.sql.classic.Dataset.withNewRDDExecutionId(Dataset.scala:2219)
	at org.apache.spark.sql.classic.Dataset.rdd$lzycompute(Dataset.scala:1586)
	at org.apache.spark.sql.classic.Dataset.rdd(Dataset.scala:1584)
	at org.apache.spark.ml.util.Instrumentation.logDataset(Instrumentation.scala:64)
	at org.apache.spark.ml.regression.LinearRegression.$anonfun$train$1(LinearRegression.scala:330)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:226)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:226)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:328)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:185)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:842)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:195)
		at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
		at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
		at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
		at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
		at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
		at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
		at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
		at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
		at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
		at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
		at org.apache.spark.scheduler.Task.run(Task.scala:147)
		at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
		at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
		at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
		at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
		... 1 more
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:377)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:372)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
