In [1]:
import polars as pl
import time
from dotenv import load_dotenv
import os

In [2]:
# Cargamos las variables de entorno
load_dotenv()

storage_options = {
    "account_name": os.getenv("ACCOUNT_NAME"),
    "account_key": os.getenv("ACCOUNT_KEY")
}

### Carga de las canciones

Obtenemos tres dataframes:

1. Todas las canciones en el top 50 diario para todos los paises
2. El total de apariciones de cada canción en el top 50 diario para España.
3. Todos los top 1 de caciones en el periodo de tiempo para España.

In [3]:

# https://www.kaggle.com/datasets/asaniczka/top-spotify-songs-in-73-countries-daily-updated
path = "abfs://spotify-songs/"

start = time.perf_counter()


songs_df = (
    pl.scan_csv(path, storage_options=storage_options)
        .collect()
        .select(
            "album_name",
            "name",
            "artists",
            "album_release_date",
            "daily_rank",
            "daily_movement",
            "weekly_movement",
            pl.when(pl.col("country") == "").then(pl.lit("WO")).otherwise(pl.col("country")).alias("country"),
            "snapshot_date",
            "popularity",
            (pl.col("duration_ms") / 1000).alias("duration_seconds")
        )
        .cast({"duration_seconds": pl.Int64})
)

total_aparences_top_50_es_df = (
    songs_df
        .filter(pl.col("country") == "ES")
        .group_by("album_name", "name", "artists")
        .agg(pl.count("name").alias("total_appearances"))
        .sort("total_appearances", descending=True)
)

top_first_per_day_es_df = (
    songs_df
        .filter((pl.col("country") == "ES") & (pl.col("daily_rank") == 1))
        .select(
            "album_name",
            "name",
            "artists",
            "snapshot_date"
        )
        .sort("snapshot_date")
)

### Guardamos en el lago de datos

In [4]:
path_songs = "abfs://spotify-delta-lake/polars/top_fifty_songs_daily"
songs_df.rechunk().write_delta(path_songs, mode="overwrite",  storage_options=storage_options)

path_total_aparences = "abfs://spotify-delta-lake/polars/total_aparences_es"
total_aparences_top_50_es_df.rechunk().write_delta(path_total_aparences, mode="overwrite",  storage_options=storage_options)


path_top_first_per_day = "abfs://spotify-delta-lake/polars/top_first_per_day_es"
top_first_per_day_es_df.rechunk().write_delta(path_top_first_per_day, mode="overwrite", storage_options=storage_options)


In [5]:
end = time.perf_counter()

print(f"Tiempo de ejecución: {end - start:.6f} segundos")

Tiempo de ejecución: 27.293924 segundos
