In [0]:
%run "./conexion_datalake"

#Limpieza de datos: NBA

###Variables

In [0]:
tables = ["teams", "ranking", "players", "players_info", "games", "games_details", "players_salaries", "active_players"]

In [0]:
#Importe de funciones de sparksql
import pyspark.sql.functions as F

###Carga de archivos

In [0]:
for table in tables:
    exec(f'''{table} = spark.read.load(
        f"abfss://{container}@{datalake}.dfs.core.windows.net/integrador/raw_data/{table}.csv",
        format = "csv",
        sep = ",",
        header = "true",
        inferSchema = True)''')

In [0]:
df_teams = teams
df_ranking = ranking
df_players = players
df_players_info = players_info
df_games = games
df_games_details = games_details
df_players_salaries = players_salaries
df_active_players = active_players

##Limpieza de Datos

In [0]:
#hacemos lowercase los nombres de las columnas
for table in tables:
   exec(f'''for column_name in df_{table}.columns:
                df_{table} = df_{table}.withColumnRenamed(column_name, column_name.lower())
        ''')

####Teams

In [0]:
#eliminamos columnas inncessarias para el analisis
df_teams = df_teams.drop(
    "league_id", "max_year", "min_year", "dleagueaffiliation", "owner", "generalmanager"
)

In [0]:
#cambiamos nombres de columnas
df_teams = (
    df_teams.withColumnRenamed("abbreviation", "abbr")
    .withColumnRenamed("yearfounded", "year_founded")
    .withColumnRenamed("arenacapacity", "arena_capacity")
)

In [0]:
#asignamos imagen al equipo con formato https://cdn.ssref.net/req/202305101/tlogo/bbr/{abbr}-2023.png
df_teams = df_teams.withColumn("team_logo", F.concat(F.lit("https://cdn.ssref.net/req/202305101/tlogo/bbr/"), F.col("abbr"), F.lit("-2023.png")))

####Active Players

In [0]:
df_active_players = df_active_players.drop("first_name", "last_name")

####Players Info

In [0]:
cols_to_delete = ['first_name',
                  'last_name',           
 'display_last_comma_first',
 'display_fi_last',
 'player_slug',
 'school',
 'last_affiliation',
 'weight',
 'jersey',
 'rosterstatus',
 'games_played_current_season_flag',
 'team_id',
 'team_name',
 'team_abbreviation',
 'team_code',
 'team_city',
 'playercode',
 'from_year',
 'to_year',
 'dleague_flag',
 'nba_flag',
 'games_played_flag','greatest_75_flag']


In [0]:
# eliminamos columnas y renombramos. Casteamos birhtdate
df_players_info = (
    df_players_info.drop(*cols_to_delete)
    .withColumnRenamed("display_first_last", "player_name")
    .withColumnRenamed("person_id", "player_id")
).withColumn("birthdate", F.to_date(F.col("birthdate"), ("dd-MM-yyy")))

In [0]:
#Generamos un slug para los jugadores para asi asignar una imagen al jugador y utilizarla en el dashboard de powerbi
#Formato: https://www.basketball-reference.com/req/202106291/images/headshots/{player_slug}.jpg
df_players_info = df_players_info.fillna("")
df_players_info = df_players_info.withColumn(
        "player_slug",
        F.when(
            F.length(F.split(df_players_info.player_name, " ")[1]) >= 5,
            F.concat(
                F.lower(
                    F.split(F.regexp_replace(df_players_info.player_name, "'", ""), " ")[1][
                        0:5
                    ]
                ),
                F.lower(F.split(df_players_info.player_name, " ")[0][0:2]),
                F.lit("01"),
            ),
        ).otherwise(
            F.concat(
                F.lower(F.split(df_players_info.player_name, " ")[1]),
                F.lower(F.split(df_players_info.player_name, " ")[0][0:2]),
                F.lit("01"),
            )
        ),
    )

In [0]:
#creamos la columna player_headshot
df_players_info = df_players_info.withColumn("player_headshot", F.concat(F.lit("https://www.basketball-reference.com/req/202106291/images/headshots/"), F.col("player_slug"), F.lit(".jpg")))

In [0]:
#Convertimos la estatura de pies-inches a CM
# Dividimos la columna "height" en pies y pulgadas
#df_players_info = df_players_info.withColumn("feet", F.split(F.col("height"), "-")[0].cast("integer"))
#df_players_info = df_players_info.withColumn("inches", F.split(F.col("height"), "-")[1].cast("integer"))

# Calcular la altura en centímetros
df_players_info = df_players_info.withColumn("height", F.round(((F.split(F.col("height"), "-")[0].cast("integer")) * 30.48) + ((F.split(F.col("height"), "-")[1].cast("integer") ) * 2.54)))

####Players Salaries

In [0]:
#eliminamos columnas innecesarias
df_players_salaries = df_players_salaries.drop("rank", "position", "team")

###Games

In [0]:
#eliminamos columnas innecesarias
df_games = df_games.drop("game_status_text", "visitor_team_id", "team_id_home")

In [0]:
#creamos columna con el id del equipo ganador
df_games = df_games.withColumn("winner_team_id", F.when(F.col("home_team_wins") == 1, F.col("home_team_id")).otherwise(F.col("home_team_id")))


In [0]:
#cambiamos nombre de team_id_away para conservar misma estructura
df_games = df_games.withColumnRenamed("team_id_away", "away_team_id")

###Game details

In [0]:
#elimino las columnas innecesarias, algunas tienen datos que estan en otras tablas, con las cuales voy a hacer join
df_games_details = df_games_details.drop("team_abbreviation", "team_city", "player_name", "nickname", "start_position", "comment")

In [0]:
#rellenamos valores nulos para poder realizar promedios luego. Creamos columna con minutos y segundos jugado
df_games_details = df_games_details.fillna(0).fillna({"min": 0})
df_games_details = df_games_details.withColumn("min_played", F.split(F.col("min"), ":")[0].cast("integer")).withColumn("sec_played", F.split(F.col("min"), ":")[0].cast("integer")).drop("min")

###Rankings

In [0]:
#cambiamos nombres de columnas, y borramos columnas innecesarias
df_ranking = (
     df_ranking.withColumnRenamed("g", "games_played")
    .withColumnRenamed("w", "wins")
    .withColumnRenamed("l", "loses")
    .withColumnRenamed("w_pct", "win_pct")
    .drop("league_id", "team", "returntoplay")
)

In [0]:
#dejamos temporada como un año (el numero antes del año representa a la conferencia)
df_ranking = df_ranking.withColumn("season_id", F.substring(F.col("season_id"), 2, 5))

In [0]:
#nos quedamos con el ultimo registro que se tomo por cada equipo, por temporada
from pyspark.sql.window import Window

# Definir una ventana particionada por temporada y equipo, ordenada por la fecha de registro en orden descendente
window = Window.partitionBy("SEASON_ID", "TEAM_ID").orderBy(F.desc("STANDINGSDATE"))

# Agregar una columna que enumere los registros dentro de la ventana
df_with_rownum = df_ranking.withColumn("row_number", F.row_number().over(window))

# Filtrar solo los registros con row_number igual a 1 (último registro por temporada y equipo)
df_ranking = df_with_rownum.filter(F.col("row_number") == 1)


##Limpieza General: duplicados

In [0]:
for table in tables:
    exec(f"df_{table} = df_{table}.dropna(how = 'all')")
    exec(f"df_{table} = df_{table}.dropDuplicates()")


###Guardamos tablas en el datalake en formato parquet

In [0]:
for table in tables:
    exec(f"df_{table}.coalesce(1).write.save(path=f'abfss://{container}@{datalake}.dfs.core.windows.net/integrador/trusted_data/{table}.parquet', format='parquet', mode='overwrite')")