In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
               .appName('ml') \
               .getOrCreate()
               

In [2]:
df = spark.read.parquet("C:/Users/gema2/OneDrive/Escritorio/MAESTRIA/DatosMasivos/data/yellow_tripdata_2025-01.parquet")

In [3]:
from pyspark.sql.functions import col, year, month, dayofmonth, date_format, unix_timestamp

## Preprocesamiento

In [4]:
df = df.withColumn('duracion_viaje',(unix_timestamp(col('tpep_dropoff_datetime'))-unix_timestamp(col('tpep_pickup_datetime')))/60)

In [5]:
df = df.drop('RatecodeID','store_and_fwd_flag','PULocationID','DOLocationID')

In [6]:
df = df.withColumn('año',year(col('tpep_pickup_datetime')))

In [7]:
df = df.withColumn('día',dayofmonth(col('tpep_pickup_datetime')))

In [8]:
df = df.withColumn("dia_semana_nombre", date_format(col("tpep_pickup_datetime"), "EEEE"))

In [9]:
df = df.withColumn('mes',month(col('tpep_pickup_datetime')))

In [10]:
df_limpio = df.filter((col("passenger_count") > 0) & (col("trip_distance") > 0) & (col("total_amount") > 0) & (col("año") == 2025))

In [11]:
df_limpio.na.drop()

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp_ntz, tpep_dropoff_datetime: timestamp_ntz, passenger_count: bigint, trip_distance: double, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, Airport_fee: double, cbd_congestion_fee: double, duracion_viaje: double, año: int, día: int, dia_semana_nombre: string, mes: int]

In [12]:
df_limpio.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)
 |-- duracion_viaje: double (nullable = true)
 |-- año: integer (nullable = true)
 |-- día: integer (nullable = true)
 |-- dia_semana_nombre: string (nullable = true)
 |-- mes: integer (nullable = true)



In [13]:
df_calculado = df_limpio.withColumn("fare_per_mile", col("fare_amount") / col("trip_distance"))

In [14]:
from pyspark.sql.functions import max, min

In [15]:
df_calculado.show(3)

+--------+--------------------+---------------------+---------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+--------------+----+---+-----------------+---+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|duracion_viaje| año|día|dia_semana_nombre|mes|fare_per_mile|
+--------+--------------------+---------------------+---------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+--------------+----+---+-----------------+---+-------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|           1|       10.0|  3.5|    

## Spark MLlib
Spark MLlib requiere que todas las características (features) estén agrupadas en una sola columna de tipo vector.
Se usará Regresión Lineal para predecir el costo del viaje (fare_amount).

In [16]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StandardScaler

#### Preparación de características
Indexar → Vectorizar → Escalar → Seleccionar → Entrenar.

In [35]:
indexer = StringIndexer(inputCol="dia_semana_nombre", outputCol="dia_index")
df_ml = indexer.fit(df_limpio).transform(df_limpio)

In [36]:
assembler = VectorAssembler().setInputCols(['trip_distance', 'dia_index','passenger_count']).setOutputCol("vector_unificado")

In [37]:
scaler = StandardScaler(inputCol="vector_unificado", outputCol="features_scaled", withStd=True, withMean=True)

In [38]:
from pyspark.ml.feature import UnivariateFeatureSelector

In [39]:
selector = UnivariateFeatureSelector(featuresCol="features_scaled", labelCol="fare_amount", outputCol="selectedFeatures")
selector.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(2)

UnivariateFeatureSelector_d61f27c617d3

#### Modelo

In [41]:
from pyspark.ml.evaluation import RegressionEvaluator

In [43]:
lr = LinearRegression(featuresCol="features_scaled", labelCol="fare_amount")

pipeline = Pipeline(stages=[indexer, assembler, scaler, selector, lr])

train, test = df_limpio.randomSplit([0.7, 0.3], seed=42)

model = pipeline.fit(train)
predictions = model.transform(test)

evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")  
rmse = evaluator.evaluate(predictions)

print(f"Modelo entrenado con éxito.")
print(f"Error promedio (RMSE): ${round(rmse, 2)}")

Modelo entrenado con éxito.
Error promedio (RMSE): $17.3


In [None]:
selector_model = model.stages[3]

indices_seleccionados = selector_model.selectedFeatures

nombres_originales = ['trip_distance', 'dia_index', 'passenger_count']
variables_finales = [nombres_originales[i] for i in indices_seleccionados]

print(f"Indices elegidos: {indices_seleccionados}")
print(f"Las 2 variables con mayor impacto en el precio son: {variables_finales}")

Indices elegidos: [0, 1]
Las 2 variables con mayor impacto en el precio son: ['trip_distance', 'dia_index']
