In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, abs, row_number, lit, concat
from pyspark.sql.window import Window

# -----------------------------------------------------------------------------
# Inicialización de la sesión Spark
# -----------------------------------------------------------------------------
spark = SparkSession.builder.appName("StarSchemaETL").getOrCreate()

# -----------------------------------------------------------------------------
# Definición de rutas de entrada y salida
# -----------------------------------------------------------------------------
data_dir = "/content/drive/MyDrive/clean_data/"
output_dir = "/content/drive/MyDrive/star_schema/"

# Archivos fuente
competitions_file = f"{data_dir}competitions_limpio.csv"
clubs_file = f"{data_dir}clubs_limpio.csv"
players_file = f"{data_dir}players_limpio.csv"
games_file = f"{data_dir}games_limpio.csv"
appearances_file = f"{data_dir}appearances_limpio.csv"
player_valuations_file = f"{data_dir}players_valuations_limpio.csv"

# -----------------------------------------------------------------------------
# Carga de datos desde archivos CSV
# -----------------------------------------------------------------------------
print("Cargando archivos CSV...")
competitions = spark.read.option("header", True).csv(competitions_file)
clubs = spark.read.option("header", True).csv(clubs_file)
players = spark.read.option("header", True).csv(players_file)
games = spark.read.option("header", True).csv(games_file)
appearances = spark.read.option("header", True).csv(appearances_file)
player_valuations = spark.read.option("header", True).csv(player_valuations_file)
print("Archivos cargados.")

# -----------------------------------------------------------------------------
# Conversión de columnas de fecha a tipo timestamp
# -----------------------------------------------------------------------------
games = games.withColumn("date", to_timestamp("date"))
player_valuations = player_valuations.withColumn("date", to_timestamp("date"))

# -----------------------------------------------------------------------------
# Creación de tablas dimensionales (dim_*)
# -----------------------------------------------------------------------------
print("Creando tablas dimensionales...")

# Dimensión Competición
competitions.select("competition_id", "name", "type", "country_name") \
    .coalesce(1).write.mode("overwrite").csv(output_dir + "dim_competition.csv", header=True)

# Dimensión Club
clubs.select("club_id", "name", "domestic_competition_id", "squad_size",
             "average_age", "foreigners_number", "national_team_players", "stadium_name") \
    .coalesce(1).write.mode("overwrite").csv(output_dir + "dim_club.csv", header=True)

# Dimensión Jugador
players.select("player_id", "name", "position", "sub_position", "date_of_birth", "height_in_cm") \
    .coalesce(1).write.mode("overwrite").csv(output_dir + "dim_player.csv", header=True)

# Dimensión Partido
games.select("game_id", "date", "home_club_id", "away_club_id",
             "home_club_goals", "away_club_goals", "round", "referee") \
    .coalesce(1).write.mode("overwrite").csv(output_dir + "dim_game.csv", header=True)

# Dimensión Temporada (dim_season)
# Se asume que cada temporada inicia el 1 de julio y termina el 30 de junio del siguiente año
seasons = games.select("season").distinct().dropna().withColumn("season", col("season").cast("int"))
dim_season = seasons.withColumn("season_year", col("season").cast("string").substr(1, 4)) \
                    .withColumn("season_start_date", concat(col("season_year"), lit("-07-01"))) \
                    .withColumn("next_season_year", (col("season") + 1).cast("string").substr(1, 4)) \
                    .withColumn("season_end_date", concat(col("next_season_year"), lit("-06-30"))) \
                    .drop("season_year", "next_season_year")

dim_season.coalesce(1).write.mode("overwrite").csv(output_dir + "dim_season.csv", header=True)

print("Tablas dimensionales creadas.")

# -----------------------------------------------------------------------------
# Creación de la tabla de hechos: fact_performance
# -----------------------------------------------------------------------------
print("Creando tabla de hechos...")

# Conversión de columnas numéricas relevantes a tipo entero
appearances = appearances.withColumn("goals", col("goals").cast("int")) \
                         .withColumn("assists", col("assists").cast("int")) \
                         .withColumn("minutes_played", col("minutes_played").cast("int")) \
                         .withColumn("yellow_cards", col("yellow_cards").cast("int")) \
                         .withColumn("red_cards", col("red_cards").cast("int"))

# Selección de columnas relevantes de 'appearances'
appearances_selected = appearances.select(
    'game_id', 'player_id', 'player_club_id', 'competition_id',
    'goals', 'assists', 'minutes_played', 'yellow_cards', 'red_cards'
)

# Join con 'games' para agregar fecha y temporada
perf = appearances_selected.join(
    games.select("game_id", "date", "season"),
    on="game_id",
    how="left"
).withColumnRenamed("player_club_id", "club_id")

# Selección de columnas finales para la tabla de hechos
perf = perf.select(
    "game_id", "player_id", "club_id", "competition_id", "season", "date",
    "goals", "assists", "minutes_played", "yellow_cards", "red_cards"
)

# -----------------------------------------------------------------------------
# Emparejar con la valuación más cercana en el tiempo para cada jugador
# -----------------------------------------------------------------------------
print("Calculando valuaciones de mercado más cercanas...")

# Join entre tabla de performance y valuaciones de jugador
joined = perf.join(
    player_valuations.withColumnRenamed("date", "valuation_date"),
    on="player_id",
    how="left"
).withColumn(
    "date_diff", abs(col("valuation_date").cast("long") - col("date").cast("long"))
)

# Uso de ventana para quedarnos con la valuación más cercana
window = Window.partitionBy("game_id", "player_id").orderBy("date_diff")
ranked = joined.withColumn("rn", row_number().over(window)).filter(col("rn") == 1)

# Selección de columnas finales de la tabla de hechos
fact_performance = ranked.selectExpr(
    "uuid() as performance_id",
    "game_id",
    "player_id",
    "club_id",
    "competition_id",
    "season",
    "goals",
    "assists",
    "minutes_played",
    "yellow_cards",
    "red_cards",
    "market_value_in_eur"
)

# Guardado como único archivo CSV
fact_performance.coalesce(1).write.mode("overwrite").csv(output_dir + "fact_performance.csv", header=True)

print("Tabla de hechos guardada.")

# -----------------------------------------------------------------------------
# Finalización del proceso
# -----------------------------------------------------------------------------
print("\n✅ Star schema creado correctamente en:", output_dir)


Cargando archivos CSV...
Archivos cargados.
Creando tablas dimensionales...
Tablas dimensionales creadas.
Creando tabla de hechos...
Calculando valuaciones de mercado más cercanas...
Tabla de hechos guardada.

✅ Star schema creado correctamente en: /content/drive/MyDrive/star_schema/
