#### **RhythmIQ: Macrotendencias y Dinámicas del Mainstream (2020-2024)**

**1. Contexto y Objetivo del proyecto** 

A través de la integración de múltiples fuentes de datos, este proyecto trasciende el análisis de consumo convencional para identificar las macrotendencias que han definido la industria en su mayor apogeo mainstream. El objetivo central es analizar las dinámicas dominantes del mercado entre **2020 y 2024**.

---

**2. Fuentes de Información** 

* **Spotify API:** Métricas oficiales de tracks, artistas, popularidad, seguidores y géneros.
* **Last.fm API:** Datos de comportamiento (oyentes únicos) y etiquetas generadas por la comunidad.
* **RhythmIQ DB:** Almacenamiento estructurado para ejecución de consultas analíticas.

---

**3. Alcance** 

* **Periodo:** 2020 - 2024.
* **Géneros:** Pop, Reggaetón/Latin, Indie, Hip-Hop.

#### **Fase 1. Extracción de datos** 

#### **1. Spotify API:** 


In [2]:
# -----------------------------------------------------------------------------------------------------------------------
# CARGA DE VARIABLES DE ENTORNO
# -----------------------------------------------------------------------------------------------------------------------

# Importamos las librerías necesarias para acceder a las variables de entorno del sistema operativo
# y para cargar automáticamente las variables definidas en el archivo .env.

import os
from dotenv import load_dotenv

# Cargamos las variables de entorno desde el archivo .env ubicado en la raíz del proyecto.
# Este archivo contiene información sensible (claves de API) y no se incluye en el repositorio.

load_dotenv()

# -----------------------------------------------------------------------------------------------------------------------
# OBTENCIÓN DE CLAVES DE ACCESO A LAS APIs
# -----------------------------------------------------------------------------------------------------------------------

# Recuperamos las claves de acceso a las APIs desde las variables de entorno utilizando os.getenv().
# De este modo, evitamos incluir información sensible directamente en el código fuente.

key_sp = os.getenv("spotipy_api_key")
pass_sp = os.getenv("spotipy_client_secret")
lastfm_key = os.getenv("lastfm_api_key")


In [3]:
# -----------------------------------------------------------------------------------------------------------------------
# CONFIGURACIÓN DEL ENTORNO
# -----------------------------------------------------------------------------------------------------------------------

# Importamos las librerías necesarias para la manipulación de datos (Pandas), control de tiempos de espera (Time) 
# y la interfaz oficial para la interacción con la API de Spotify (Spotipy).

import pandas as pd
import time
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

# -----------------------------------------------------------------------------------------------------------------------
# AUTENTICACIÓN API SPOTIFY
# -----------------------------------------------------------------------------------------------------------------------

client_id = key_sp
client_secret = pass_sp

auth_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
sp = spotipy.Spotify(auth_manager=auth_manager)

##### **1.1 Spotify: Extracción de datos de TRACKS: Muestreo por género y año.**

In [None]:
# -----------------------------------------------------------------------------------------------------------------------
# 1.1.1 EXTRACCIÓN DE DATOS GÉNERO POP (2020-2024)
# -----------------------------------------------------------------------------------------------------------------------

# Definición de los parámetros de búsqueda:
genero = "pop"
años = list(range(2020, 2025))
limite = 50  #Máximo de resultados por petición
max_items_por_año = 500  #Tamaño muestral por año
type_tracks = "track"
all_tracks_pop = []


# Definición de la función para la extracción en la API:
def get_tracks_por_año_pop():
    tracks_año = []
    offset = 0

    while len(tracks_año) < max_items_por_año and offset < 1000: 
        try:
            #Consulta a la API aplicando filtros de genero y año. Maneja la paginación mendiante offset.
            results = sp.search(q=f"genre:{genero} year:{año}", type="track", limit=limite, offset=offset)
        except Exception as e:
            print(f"Error en año {año}, offset {offset}: {e}")
            break

        items = results['tracks']['items']
        if not items:
            break

        # Extracción de información relevante para el análisis:
        for t in items:
            lista_nombres_artistas = [artist["name"] for artist in t["artists"]]
            lista_ids_artistas = [artist["id"] for artist in t["artists"]]
            tracks_año.append({
                "track_id": t["id"],
                "track": t["name"],
                "artist": t["artists"][0]["name"], # Artista principal
                "artistid": t["artists"][0]["id"], # ID para cruzar con artist_info
                "all_artists_names": lista_nombres_artistas, # Lista completa de nombres
                "all_artists_ids": lista_ids_artistas, # Lista completa de IDs
                "collaboration": len(lista_ids_artistas) > 1, # True si hay más de 1 artista
                "album": t["album"]["name"],
                "album_id": t["album"]["id"], #ID para cruzar con album_info
                "track_release_date": t["album"]["release_date"],
                "track_popularity": t["popularity"],
                "track_year": año,
                "genre_extracted": genero
            })

        # Avance de página y pausa para respetar el rate limit.
        offset += limite
        time.sleep(0.2)

    return tracks_año


# Ejecutamos el proceso para el rango de años especificado:
for año in años:
    print(f"Extrayendo tracks del año {año}...")
    tracks = get_tracks_por_año_pop()
    all_tracks_pop.extend(tracks)

print(f"Total tracks extraídos: {len(all_tracks_pop)}")

In [None]:
# -----------------------------------------------------------------------------------------------------------------------
# 1.1.2 EXTRACCIÓN DE DATOS GÉNERO LATIN/REGGAETON (2020-2024)
# -----------------------------------------------------------------------------------------------------------------------

# Definición de los parámetros de búsqueda:
genero = "latin"
años = list(range(2020, 2025))
limite = 50
max_items_por_año = 500
type_tracks = "track"
all_tracks_latin = []


# Definición de la función para la extracción en la API:
def get_tracks_por_año_latin():
    tracks_año = []
    offset = 0

    while len(tracks_año) < max_items_por_año and offset < 1000: 
        try:
            results = sp.search(q=f"genre:{genero} year:{año}", type="track", limit=limite, offset=offset)
        except Exception as e:
            print(f"Error en año {año}, offset {offset}: {e}")
            break

        items = results['tracks']['items']
        if not items:
            break

        for t in items:
            lista_nombres_artistas = [artist["name"] for artist in t["artists"]]
            lista_ids_artistas = [artist["id"] for artist in t["artists"]]
            tracks_año.append({
                "track_id": t["id"],
                "track": t["name"],
                "artist": t["artists"][0]["name"], 
                "artistid": t["artists"][0]["id"], 
                "all_artists_names": lista_nombres_artistas, 
                "all_artists_ids": lista_ids_artistas, 
                "collaboration": len(lista_ids_artistas) > 1, 
                "album": t["album"]["name"],
                "album_id": t["album"]["id"], 
                "track_release_date": t["album"]["release_date"],
                "track_popularity": t["popularity"],
                "track_year": año,
                "genre_extracted": genero
            })

        offset += limite
        time.sleep(0.2)

    return tracks_año


# Ejecutamos el proceso para el rango de años especificado:
for año in años:
    print(f"Extrayendo tracks del año {año}...")
    tracks = get_tracks_por_año_latin()
    all_tracks_latin.extend(tracks)

print(f"Total tracks extraídos: {len(all_tracks_latin)}")

In [None]:
# -----------------------------------------------------------------------------------------------------------------------
# 1.1.3 EXTRACCIÓN DE DATOS GÉNERO INDIE (2020-2024)
# -----------------------------------------------------------------------------------------------------------------------

# Definición de los parámetros de búsqueda:
genero = "indie"
años = list(range(2020, 2025))
limite = 50
max_items_por_año = 500
type_tracks = "track"
all_tracks_indie = []


# Definición de la función para la extracción en la API:
def get_tracks_por_año_indie():
    tracks_año = []
    offset = 0

    while len(tracks_año) < max_items_por_año and offset < 1000: 
        try:
            results = sp.search(q=f"genre:{genero} year:{año}", type="track", limit=limite, offset=offset)
        except Exception as e:
            print(f"Error en año {año}, offset {offset}: {e}")
            break

        items = results['tracks']['items']
        if not items:
            break

        for t in items:
            lista_nombres_artistas = [artist["name"] for artist in t["artists"]]
            lista_ids_artistas = [artist["id"] for artist in t["artists"]]
            tracks_año.append({
                "track_id": t["id"],
                "track": t["name"],
                "artist": t["artists"][0]["name"], 
                "artistid": t["artists"][0]["id"], 
                "all_artists_names": lista_nombres_artistas, 
                "all_artists_ids": lista_ids_artistas, 
                "collaboration": len(lista_ids_artistas) > 1, 
                "album": t["album"]["name"],
                "album_id": t["album"]["id"], 
                "track_release_date": t["album"]["release_date"],
                "track_popularity": t["popularity"],
                "track_year": año,
                "genre_extracted": genero
            })

        offset += limite
        time.sleep(0.2)

    return tracks_año


# Ejecutamos el proceso para el rango de años especificado:
for año in años:
    print(f"Extrayendo tracks del año {año}...")
    tracks = get_tracks_por_año_indie()
    all_tracks_indie.extend(tracks)

print(f"Total tracks extraídos: {len(all_tracks_indie)}")

In [None]:
# -----------------------------------------------------------------------------------------------------------------------
# 1.1.4 EXTRACCIÓN DE DATOS GÉNERO HIP-HOP (2020-2024)
# -----------------------------------------------------------------------------------------------------------------------

# Definición de los parámetros de búsqueda:
genero = "hip-hop"
años = list(range(2020, 2025))
limite = 50
max_items_por_año = 500
type_tracks = "track"
all_tracks_hiphop = []


# Definición de la función para la extracción en la API:
def get_tracks_por_año_hiphop():
    tracks_año = []
    offset = 0

    while len(tracks_año) < max_items_por_año and offset < 1000: 
        try:
            results = sp.search(q=f"genre:{genero} year:{año}", type="track", limit=limite, offset=offset)
        except Exception as e:
            print(f"Error en año {año}, offset {offset}: {e}")
            break

        items = results['tracks']['items']
        if not items:
            break

        for t in items:
            lista_nombres_artistas = [artist["name"] for artist in t["artists"]]
            lista_ids_artistas = [artist["id"] for artist in t["artists"]]
            tracks_año.append({
                "track_id": t["id"],
                "track": t["name"],
                "artist": t["artists"][0]["name"], 
                "artistid": t["artists"][0]["id"], 
                "all_artists_names": lista_nombres_artistas, 
                "all_artists_ids": lista_ids_artistas, 
                "collaboration": len(lista_ids_artistas) > 1, 
                "album": t["album"]["name"],
                "album_id": t["album"]["id"], 
                "track_release_date": t["album"]["release_date"],
                "track_popularity": t["popularity"],
                "track_year": año,
                "genre_extracted": genero
            })

        offset += limite
        time.sleep(0.2)

    return tracks_año


# Ejecutamos el proceso para el rango de años especificado:
for año in años:
    print(f"Extrayendo tracks del año {año}...")
    tracks = get_tracks_por_año_hiphop()
    all_tracks_hiphop.extend(tracks)

print(f"Total tracks extraídos: {len(all_tracks_hiphop)}")

##### **1.2. Spotify: Extracción de datos de ARTISTAS: Muestreo por track extraído.**

In [None]:
# Cruza la lista de tracks extraídos con la API de artistas para obtener sus métricas relevantes:
# Popularidad del artista, número de seguidores y géneros asociados.

# 2.1. Integración de la información extraída por género y año en una estructura global:
all_tracks = all_tracks_pop + all_tracks_latin + all_tracks_indie + all_tracks_hiphop
print(f"Total registros brutos extraídos: {len(all_tracks)}")

# 2.2. Identificación de duplicados:

# Como es posible que un track aparezca en múltiples géneros, usamos el 'track_id' para realizar una depuración previa a la extracción para evitar consultas redundantes a la API.
tracks_unicos_dict = {t['track_id']: t for t in all_tracks}
all_tracks_cleaned = list(tracks_unicos_dict.values())

print(f"Total tracks únicos tras depuración: {len(all_tracks_cleaned)}")
all_tracks_cleaned

In [None]:
# -----------------------------------------------------------------------------------------------------------------------
# 2.3 EXTRACCIÓN DE DATOS DEL ARTISTA
# -----------------------------------------------------------------------------------------------------------------------

def artist_info(all_tracks_cleaned, sp):
    # Identificamos los IDs únicos de artistas para optimizar las consultas (cogemos todos, artista principal y colaboradores).
    # Usamos un set() para asegurar que cada artista se consulte una sola vez, independientemente de cuántos tracks tenga en la lista.
    artist_ids = set() 

    for t in all_tracks_cleaned:
        if t.get("all_artists_ids"):
            artist_ids.update(t["all_artists_ids"])

    artist_ids = list(artist_ids)
    artist_dict = {}

    # Procesamiento por lotes (Batch Processing): 
    # La API de Spotify permite consultar hasta 50 IDs en una sola petición.
    for i in range(0, len(artist_ids), 50):
        batch_ids = artist_ids[i:i+50]
        try:
            results = sp.artists(batch_ids)
            artists = results["artists"]
            
            for a in artists:
                if a: # Verifica por si la API devuelve un nulo
                    artist_dict[a["id"]] = {
                        "artist_popularity": a["popularity"],
                        "artist_followers": a["followers"]["total"],
                        "artist_genres": a["genres"]
                    }
        except Exception as e:
            print(f"Error en el índice {i}: {e}")
    
        time.sleep(0.2)

    # Mapea la info del artista a cada track
    # Para el análisis de tablas, mantenemos las columnas de artista referido  al artista principal.
    for t in all_tracks_cleaned:
        main_artist_id = t.get("artistid")
        if main_artist_id in artist_dict:
            t.update(artist_dict[main_artist_id])
            
    print("Mapeo finalizado con éxito. Toda la información del artista principal se ha integrado con su track.")
    return all_tracks_cleaned, artist_dict # Devolvemos también el diccionario para análisis de colaboraciones
    

In [None]:
all_tracks_cleaned, maestro_artistas = artist_info(all_tracks_cleaned, sp)

print(f"Tracks listos: {len(all_tracks_cleaned)}") 
print(f"Artistas únicos en el diccionario maestro: {len(maestro_artistas)}")

In [None]:
all_tracks_cleaned

##### **1.3. Spotify: Extracción de datos de ALBUMES: Muestreo por track extraído.**

In [None]:
# -----------------------------------------------------------------------------------------------------------------------
# EXTRACCIÓN DE DATOS DEL ALBUM
# -----------------------------------------------------------------------------------------------------------------------

def album_info(all_tracks_cleaned, sp):
    # Ampliamos ahora la información de los tracks con detalles específicos del album.
    
    # Identificamos los IDs únicos de album para optimizar las consultas. Usamos un set para ello.
    album_ids = set()
    for t in all_tracks_cleaned:
        if t.get("album_id"):
            album_ids.add(t["album_id"])

    album_ids = list(album_ids)
    album_dict = {}

    # Procesamiento por lotes (Batch Processing) de 20 en 20.

    for i in range(0, len(album_ids), 20):
        batch_ids = album_ids[i:i+20]
        try:
            results = sp.albums(batch_ids)
            albums_data = results['albums']

            for a in albums_data:
                if a: # Verificamos que el álbum no sea nulo
                    album_dict[a["id"]] = {
                        'album_name': a['name'],
                        'album_type': a['album_type'],
                        'total_tracks': a['total_tracks'],
                        'album_release_date': a['release_date'],
                        'label': a.get('label', 'Unknown'), # Sello discográfico 
                        'album_artists': a['artists'][0]['name'] # Referencia del artista principal del álbum
                    }
        except Exception as e:
            print(f"Error en el índice {i}: {e}")
        
        time.sleep(0.2)

    # Mapea la info del album a cada track
    for t in all_tracks_cleaned:
        alb_id = t.get("album_id")
        if alb_id in album_dict:
            t.update(album_dict[alb_id])

    print('Mapeo finalizado con éxito. Toda la información del album se ha integrado con los tracks')        
    return all_tracks_cleaned

In [None]:
all_tracks_cleaned = album_info(all_tracks_cleaned, sp)
all_tracks_cleaned

##### **1.4. Spotify: Tratamiento y almacenamiento de la información extraída**

In [None]:
#Generamos un dataframe con toda la información extraída sobre la que hacer un mínimo de estandarizaciones:
df_spotipy_tracks = pd.DataFrame(all_tracks_cleaned)

#Aseguramos el formato fecha para la posterior carga de datos. Estandarizamos: 
df_spotipy_tracks['track_release_date'] = pd.to_datetime(df_spotipy_tracks['track_release_date'], errors='coerce')
df_spotipy_tracks['album_release_date'] = pd.to_datetime(df_spotipy_tracks['album_release_date'], errors='coerce')

#Tratamiento de las listas para la lectura en csv
cols_listas = ['all_artists_names', 'all_artists_ids', 'artist_genres']
for col in cols_listas:
    if col in df_spotipy_tracks.columns:
        df_spotipy_tracks[col] = df_spotipy_tracks[col].apply(lambda x: ', '.join(x) if isinstance(x, list) else x)

#Guardamos la información en un csv
df_spotipy_tracks.to_csv('dataset_spotify_tracks.csv', index=False, encoding='utf-8-sig')
print(f"Dataframe guardado con éxito con {len(df_spotipy_tracks)} registros.")


In [None]:
#Ahora convertimos el maestro de artistas a una estructura compatible con DataFrame:
maestro_artistas_lista = []
for artista_id, info in maestro_artistas.items():
    fila = {'artist_id': artista_id}
    fila.update(info) 
    maestro_artistas_lista.append(fila)

# Ahora sí generamos el DataFrame
df_maestro_artistas = pd.DataFrame(maestro_artistas_lista)

# 3. Estandarizamos los géneros (de lista a texto separado por comas)
if 'artist_genres' in df_maestro_artistas.columns:
    df_maestro_artistas['artist_genres'] = df_maestro_artistas['artist_genres'].apply(
        lambda x: ', '.join(x) if isinstance(x, list) else x
    )

# Guardamos la información en un csv
df_maestro_artistas.to_csv('maestro_artistas.csv', index=False, encoding='utf-8-sig')
print(f"Dataframe guardado con éxito con {len(df_maestro_artistas)} registros.")


#### **2. Last.fm API:** 

In [None]:
# -----------------------------------------------------------------------------------------------------------------------
# CONFIGURACIÓN DEL ENTORNO
# -----------------------------------------------------------------------------------------------------------------------

# Importamos las librerías necesarias para la extracción de datos con la API de Last.fm
# Además de las librerías para el formateo de datos json, hasta el momento no utilizadas.

import requests
import json
from urllib.parse import unquote

# -----------------------------------------------------------------------------------------------------------------------
# AUTENTICACIÓN API Last.fm
# -----------------------------------------------------------------------------------------------------------------------

api_key = lastfm_key
base_url = "https://ws.audioscrobbler.com/2.0/"

##### **2.1. Last.fm: Extracción de datos de la API: Muestreo por track-artista extraído de Spotify.**

In [None]:
def llamar_api(url):
    llamada = requests.get(url)
    print(f"La llamada a la API nos ha dado una respuesta de tipo: {llamada.status_code}")
    if llamada.status_code != 200:
        print(f"El motivo por el que la llamada falló es {llamada.reason}")
    else:
        return llamada.json()

In [None]:
# -----------------------------------------------------------------------------------------------------------------------
# 2.1. EXTRACCION DE DATOS DE LAST.FM
# -----------------------------------------------------------------------------------------------------------------------

# Listas de control: 'track_results' para los datos nuevos y 'error_tracks' para auditar fallos o no coincidencias
track_results = []
error_tracks = []


# Iteramos sobre 'all_tracks_cleaned', que ya contiene los nombres de los tracks y artistas de Spotify.
def lastfm_request(all_tracks_cleaned):
    for i, track in enumerate(all_tracks_cleaned): 
        # Partimos de la info de Spotify: Artist y Track son nuestras keys de búsqueda
        artist_name = track.get('artist')
        track_name = track.get('track')
        
        # Monitor de progreso para procesos largos de ejecución
        if i % 100 == 0:
            print(f"Procesando {i}/{len(all_tracks_cleaned)}...")

        # Configuramos la petición a la API de Last.fm
        # Utilizamos 'track.getInfo' que nos permite buscar por el nombre de la canción y su artista
        params = {
            "method": "track.getInfo",
            "api_key": api_key,
            "artist": artist_name,
            "track": track_name,
            "format": "json",
            "autocorrect": 1 # Permite a la API corregir pequeñas variaciones en el nombre (ej. tildes)
        }

        try:
            # Realizamos la llamada HTTP GET
            r = requests.get(base_url, params=params)
            data = r.json()

            # Si la API localiza la canción exacta en su base de datos global
            if "track" in data:
                t_info = data["track"]
                
                # Construimos el diccionario con las métricas de Last.fm seleccionadas
                info_lastfm = {
                    "artist_spotify": artist_name, # Mantenemos referencia de origen
                    "track_spotify": track_name,
                    "lastfm_url": t_info.get("url"),
                    "listeners": int(t_info.get("listeners", 0)), # Oyentes únicos totales
                    "playcount": int(t_info.get("playcount", 0)), # Total de reproducciones históricas
                    "tags": ", ".join([tag["name"] for tag in t_info.get("toptags", {}).get("tag", [])[:5]]) # Top 5 géneros según comunidad
                }
                
                # Con la información extraída vamos a:
                # Acción A: Actualizar la lista de diccionarios creada para almacenar la información extraída de Last.fm
                track_results.append(info_lastfm)
                
                # Acción B: Incluir los datos en el diccionario original de spotify para tener un dataset final unificado de Spotify + Last.fm
                track.update(info_lastfm)
                
            else:
                # Gestión de 'No Encontrados': Registramos el track para revisión posterior
                error_tracks.append({
                    "artist": artist_name,
                    "track": track_name,
                    "error": "Not Found in Last.fm"
                })

        except Exception as e:
            # Gestión de errores técnicos: Fallos de conexión o errores de servidor (API)
            error_tracks.append({
                "artist": artist_name,
                "track": track_name,
                "error": str(e)
            })
    
    print(f"Extracción finalizada. Encontrados: {len(track_results)} | Fallidos: {len(error_tracks)}")
    return track_results

In [None]:
resultados_lastfm = lastfm_request(all_tracks_cleaned)

##### **2.2. Last.fm: Tratamiento y almacenamiento de la información extraída**

In [None]:
#Generamos un dataframe con toda la información extraída de Last.fm para su revisión:
df_lastfm = pd.DataFrame(track_results)
df_lastfm

#Guardamos la información en un csv
df_lastfm.to_csv('df_lastfm.csv', index=False, encoding='utf-8-sig')

In [None]:
#Como durante la función de extracción de la API actualizamos la lista de diccionarios original, generamos un dataframe con la actualización
df_unificado = pd.DataFrame(all_tracks_cleaned)
df_unificado

In [None]:
#Aseguramos el formato fecha para la posterior carga de datos. Estandarizamos: 
df_unificado['track_release_date'] = pd.to_datetime(df_unificado['track_release_date'], errors='coerce')
df_unificado['album_release_date'] = pd.to_datetime(df_unificado['album_release_date'], errors='coerce')

#Tratamiento de las listas para la lectura en csv
cols_listas = ['all_artists_names', 'all_artists_ids', 'artist_genres']
for col in cols_listas:
    if col in df_unificado.columns:
        df_unificado[col] = df_unificado[col].apply(lambda x: ', '.join(x) if isinstance(x, list) else x)

#Tratamiento de la columna tags
df_unificado['tags'] = df_unificado['tags'].fillna("").astype(str)

#Guardamos la información en un csv
df_unificado.to_csv('dataset_unificado.csv', index=False, encoding='utf-8-sig')
print(f"Dataframe guardado con éxito con {len(df_unificado)} registros.")
print(f"Columnas finales: {list(df_unificado.columns)}")