In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lag, sum as _sum, when, expr, row_number, unix_timestamp, 
    current_timestamp, first, when, min, desc, to_timestamp, lit
)
from pyspark.sql.window import Window


In [2]:
# añadir scroll al .show()
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [3]:

# Configuración de la sesión de Spark
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("Formula1FeaturesIngestion") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.20") \
    .getOrCreate()

# Propiedades de conexión JDBC
jdbc_url = "jdbc:postgresql://postgres:5432/mydatabase"
jdbc_properties = {
    "user": "admin",
    "password": "admin_password",
    "driver": "org.postgresql.Driver"
}


In [4]:

# Función para leer tablas desde PostgreSQL

# función para leer tablas enteras
def read_table(table_name):
    return spark.read.jdbc(url=jdbc_url, table=table_name, properties=jdbc_properties)

# función para definir un filtro
#def read_table(table_name):
#    query = f"(SELECT * FROM {table_name} WHERE session_key = '9110') AS temp"
#    return spark.read.jdbc(url=jdbc_url, table=query, properties=jdbc_properties)

# filters = col("driver_number") == '1'

# Lectura de tablas
# meetings = read_table("formula1.meetings").withColumnRenamed("date_start","meeting_date_start")
sessions = read_table("formula1.sessions") \
    .withColumnRenamed("date_start","session_date_start").withColumnRenamed("date_end","session_date_end")
#drivers = read_table("formula1.drivers")
#car_data = read_table("formula1.car_data")
laps = read_table("formula1.laps")
position = read_table("formula1.position").withColumnRenamed("date", "position_date")
pit = read_table("formula1.pit").withColumnRenamed("date", "pit_date")
# race_control = read_table("formula1.race_control")
stints = read_table("formula1.stints") \
    .withColumnRenamed("lap_start", "stint_lap_start") \
    .withColumnRenamed("lap_end", "stint_lap_end") \
    .withColumnRenamed("compound", "stint_compound") \
    .withColumnRenamed("tyre_age_at_start", "stint_tyre_age_at_start")
# location = read_table("formula1.location")
# intervals = read_table("formula1.intervals")

In [5]:
# Unir laps con drivers_sessions
laps_ts = laps.join(
    sessions, "session_key", how="left"
).withColumnRenamed("date_start", "laps_date_start").drop("meeting_key") \
.withColumn("laps_date_start_ts", 
            when(col("lap_number") == 1, to_timestamp("session_date_start")).otherwise(to_timestamp("laps_date_start")))

# Unir car_data para obtener datos de rendimiento
#car_data_filtered = car_data.select(
#    "driver_number", "session_key", "date", "rpm", "speed", "n_gear", "throttle", "brake", "drs"
#).withColumnRenamed("date", "car_data_date")

#laps_car = laps_enriched.join(
#    laps_enriched,
#    on=["driver_number", "session_key"],
#    how="left"
#)


In [6]:

# Unir position
position_filtered = position.select(
    col("driver_number").alias("pos_driver_number"),
    col("session_key").alias("pos_session_key"),
    "position_date", "position"
)

window_spec_position = Window.partitionBy("pos_driver_number", "pos_session_key").orderBy("position_date")

position_with_lap = position_filtered.withColumn(
    "position_lap_number",
    row_number().over(window_spec_position)
)

laps_ts_position = laps_ts.join(
    position_with_lap,
    (laps_ts.driver_number == position_with_lap.pos_driver_number) &
    (laps_ts.session_key == position_with_lap.pos_session_key) &
    (laps_ts.lap_number == position_with_lap.position_lap_number),
    how="left"
).drop(position_with_lap.pos_driver_number) \
 .drop(position_with_lap.pos_session_key) \
 .drop("position_date") \
 .drop("position_lap_number") # valorar añadir esta columna

In [7]:
# Unir pit data
pit_filtered = pit.select(
    "driver_number", "session_key", "pit_date", "pit_duration", "lap_number"
)

laps_final = laps_ts_position.join(
    pit_filtered,
    on=["driver_number", "session_key", "lap_number"],
    how="left"
).withColumnRenamed("date", "pit_date")


In [8]:
# Unir stints
stints_select = stints.select(
    "driver_number",
    "session_key",
    "stint_number",
    "stint_lap_start",
    "stint_lap_end",
    "stint_compound",
    "stint_tyre_age_at_start"
)
stints_compound = stints_select.withColumn(
    "stint_compound_numeric",
    when(col("stint_compound") == "SOFT", 1)
     .when(col("stint_compound") == "MEDIUM", 2)
     .when(col("stint_compound") == "HARD", 3)
     .when(col("stint_compound") == "INTERMEDIATE", 4)
     .when(col("stint_compound") == "WET", 5)
     .when(col("stint_compound") == "TEST_UNKNOWN", 6)
     .when(col("stint_compound") == "UNKNOWN", 7)
     .otherwise(-1)  # -1 para valores nulos o no reconocidos
)



In [9]:
# Unir stints con laps y sessions para obtener la fecha de inicio de cada stint
stints_date = (
    stints.alias("s")
    .join(
        laps.select("driver_number","session_key","lap_number","date_start").alias("l"),
        (
            (col("s.driver_number") == col("l.driver_number")) &
            (col("s.session_key") == col("l.session_key")) &
            (col("s.stint_lap_start") == col("l.lap_number"))
        ),
        how="left"
    )
    .join(sessions.select("session_key","session_date_start").alias("se"),
          col("se.session_key") == col("l.session_key"), "left")
    .withColumn("stint_date_start", 
                when(col("lap_number") == 1, to_timestamp("se.session_date_start")).otherwise(to_timestamp("l.date_start")))
    .drop("lap_number","date_start")
)


In [10]:
# unir dataframe principal con stints y calcular para cada intervalo
laps_with_stints = laps_final.join(
    stints_date,
    on = ["driver_number", "session_key"],
    how = "left"
).where(
    (col("lap_number") >= col("stint_lap_start")) &
    (col("lap_number") <= col("stint_lap_end"))
).select(
    col("l.driver_number").alias("driver_number"),
    col("l.session_key").alias("session_key"),
    "lap_number",
    "lap_duration",
    "circuit_key",
    "laps_date_start_ts",
    "pit_duration",
    "position",
    "stint_number",
    "stint_lap_start",
    "stint_date_start",
    "stint_lap_end",
    "stint_compound",
    "stint_tyre_age_at_start",
    "duration_sector_1",
    "duration_sector_2",
    "duration_sector_3",
    "session_type"
).withColumn(
    "laps_on_current_tire",
    col("lap_number") - col("stint_lap_start") + 1
).withColumn(
    "time_since_last_box_stop",
    (col("laps_date_start_ts").cast("double") - col("stint_date_start").cast("double")).cast("double")
)

In [12]:
window_spec = Window.partitionBy("driver_number", "session_key").orderBy("lap_number")

features = laps_with_stints.withColumn(
    "previous_lap_time",
    lag("lap_duration").over(window_spec)
).withColumn(
    "lap_time_delta",
    col("lap_duration") - col("previous_lap_time")
).withColumn(
    "accumulated_time",
    _sum("lap_duration").over(window_spec)
).withColumn(
    "race_percentage_completed",
    (col("lap_number") / expr("max(lap_number) over (partition by driver_number, session_key)")) * 100
).withColumn(
    "box_stops",
    when(col("pit_duration").isNotNull(), 1).otherwise(0)
).groupBy(
    "driver_number", "session_key", "lap_number", "laps_date_start_ts", "circuit_key"
).agg(
    first("lap_duration").alias("current_lap_time"),
    first("race_percentage_completed").alias("race_percentage_completed"),
    first("stint_compound").alias("current_tire"),
    first("laps_on_current_tire").alias("laps_on_current_tire"),
    _sum("box_stops").alias("box_stops"),
    first("previous_lap_time").alias("previous_lap_time"),
    first("lap_time_delta").alias("lap_time_delta"),
    first("accumulated_time").alias("accumulated_time"),
    first("position").alias("position_in_race"),
    # first("time_difference_with_leader").alias("time_difference_with_leader"),  # Añadir tabla intervals para esta columna se podría calcular restando date del driver en pos 1
    first("duration_sector_1").alias("sector_1_time"),
    first("duration_sector_1").alias("sector_2_time"),
    first("duration_sector_1").alias("sector_3_time"),
    first("pit_duration").alias("box_stop_time"),
    first("time_since_last_box_stop").alias("time_since_last_box_stop"),
    first("session_type").alias("session_type")
)


In [13]:
# A partir de la vuelta 1 tomamos la fecha de inicio de la vuelta para ordenar las posiciones

# Ventana para recalcular la posición dentro de cada sesión y vuelta,
# ordenando de forma ascendente según laps_date_start.
position_window = Window.partitionBy("session_key", "lap_number").orderBy(col("laps_date_start_ts").asc())

# Se asigna el número de fila como la posición calculada para cada piloto en esa vuelta.
features = features.withColumn("calc_position", row_number().over(position_window))

# Se sobrescribe position_in_race: en la vuelta 1 se conserva el valor original (de la tabla position)
# en las demás vueltas se utiliza la posición calculada.
features = features.withColumn(
    "position_in_race",
    when(col("session_type") == "Race", when(col("lap_number") == 1, col("position_in_race")).otherwise(col("calc_position"))).otherwise(lit(None))
).drop("calc_position")

In [14]:
# features = features.withColumn("laps_date_start_ts", to_timestamp("laps_date_start"))

In [15]:
# Ventana similar para obtener la mínima fecha de inicio de cada vuelta, que será la del líder.
leader_window = Window.partitionBy("session_key", "lap_number")

features_with_leader_time = features.withColumn(
    "leader_lap_time",
    min("laps_date_start_ts").over(leader_window)
)

features_with_time_difference = features_with_leader_time.withColumn(
    "time_difference_with_leader",
    # devuelve valor en segundos sin milisegundos
    # (col("laps_date_start_ts").cast("long") - col("leader_lap_time").cast("long")).cast("double")

    # devuelve interval (exacto pero poco legible)
    #(col("laps_date_start_ts")- col("leader_lap_time"))
    when(col("session_type")=="Race",(col("laps_date_start_ts").cast("double") - col("leader_lap_time").cast("double")).cast("double"))
    .otherwise(lit(None))
)


In [17]:
features_selected = features_with_time_difference.select(
    col("laps_date_start_ts").alias("date"),
    col("circuit_key").cast("string"),
    col("session_key").cast("string"),
    col("driver_number").cast("int"),
    col("lap_number").cast("int"),
    col("current_lap_time"),
    col("race_percentage_completed"),
    col("current_tire").cast("string"),            
    col("laps_on_current_tire").cast("int"),     
    col("box_stops").cast("double"),
    col("previous_lap_time").cast("double"),
    col("lap_time_delta").cast("double"),
    col("accumulated_time").cast("double"),
    col("position_in_race").cast("int"),
    col("time_difference_with_leader").cast("double"),
    col("sector_1_time").cast("double"),
    col("sector_2_time").cast("double"),
    col("sector_3_time").cast("double"),
    col("box_stop_time").cast("double"),
    col("time_since_last_box_stop").cast("double"),
    col("session_type").cast("string")
)


In [None]:
try:
    features_selected.write.jdbc(
        url=jdbc_url,
        table="f1_consolidated.features",
        mode="append",
        properties=jdbc_properties
    )
    print("Datos escritos exitosamente en PostgreSQL.")
except Exception as e:
    print(f"Error al escribir en PostgreSQL: {e}")



In [21]:
features_selected.show()

+--------------------+-----------+-----------+-------------+----------+----------------+-------------------------+------------+--------------------+---------+------------------+-------------------+------------------+----------------+---------------------------+------------------+------------------+------------------+-------------+------------------------+------------+
|                date|circuit_key|session_key|driver_number|lap_number|current_lap_time|race_percentage_completed|current_tire|laps_on_current_tire|box_stops| previous_lap_time|     lap_time_delta|  accumulated_time|position_in_race|time_difference_with_leader|     sector_1_time|     sector_2_time|     sector_3_time|box_stop_time|time_since_last_box_stop|session_type|
+--------------------+-----------+-----------+-------------+----------+----------------+-------------------------+------------+--------------------+---------+------------------+-------------------+------------------+----------------+-------------------------

In [23]:
# Finalizar la sesión de Spark
spark.stop()