# Desafio -> Consumo de dados da API do Spotify

Este notebook tem como objetivo implementar um pipeline de extração e processamento de dados da API do Spotify para análises.

In [None]:
import os

import findspark
findspark.init()
import requests
import urllib

from dotenv import load_dotenv
from loguru import logger
from time import sleep

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

# Carrega variáveis do arquivo .env
load_dotenv()

CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI")
SCOPE = os.getenv("SPOTIFY_SCOPE")
AUTH_URL = os.getenv("AUTH_URL")
TOKEN_URL = os.getenv("TOKEN_URL")


spark = SparkSession.builder \
      .master("local[*]") \
      .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
      .config("spark.sql.parquet.compression.codec", "snappy") \
      .appName("Desafio_Spotify") \
      .getOrCreate()

logger.info("Sessão Spark iniciada.")


[32m2025-06-05 10:43:48.896[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m36[0m - [1mSessão Spark iniciada.[0m


In [None]:

def build_auth_url() -> str:
    """
    Gera a URL de consentimento do usuário no Spotify.

    Returns:
        str: URL de autenticação para o navegador.
    """
    params = {
        "client_id": CLIENT_ID,
        "response_type": "code",
        "redirect_uri": REDIRECT_URI,
        "scope": SCOPE
    }
    return f"{AUTH_URL}?{urllib.parse.urlencode(params)}"

def get_code_manual() -> str:
    """
    Imprime a URL de autenticação e solicita a URL colada manualmente pelo usuário para obter o 'code'.

    Returns:
        str: Código de autorização retornado pelo Spotify.
    """
    url = build_auth_url()
    logger.warning(f"Abra este link no navegador, faça login e autorize o acesso: \n{url}")
    logger.info("\nApós login, copie a url completa da barra de endereços.")
    url_with_code = input("Cole a url: ").strip()
    code = url_with_code.split("code=")[1].split("&")[0]
    return code

def exchange_code_for_tokens(code: str) -> dict:
    """
    Troca o authorization code por access_token e refresh_token.

    Args:
        code (str): Authorization code recebido pelo Spotify.

    Returns:
        dict: Dados do token de acesso.
    """
    data = {
        "grant_type": "authorization_code",
        "code": code,
        "redirect_uri": REDIRECT_URI,
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET
    }
    response = requests.post(TOKEN_URL, data=data)
    try:
        response.raise_for_status()
    except requests.exceptions.HTTPError: 
        logger.error(f"ERRO AO TROCAR O CODE PELO TOKEN: {response.text}")
        raise
    return response.json()

def refresh_access_token(refresh_token: str) -> dict:
    """
    Usa o refresh_token para obter um novo access_token.

    Args:
        refresh_token (str): Refresh token salvo.

    Returns:
        dict: Contém novo 'access_token' e (opcionalmente) novo 'refresh_token'.
    """
    data = {
        "grant_type": "refresh_token",
        "refresh_token": refresh_token,
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET
    }
    response = requests.post(TOKEN_URL, data=data)
    response.raise_for_status()
    return response.json()



if __name__ == "__main__":
    logger.info("Autenticação Spotify OAuth2.")
    code = get_code_manual()
    logger.info(f"Authorization code recebido: {code}")

    logger.info("Trocando o code por access_token e refresh_token...")
    tokens = exchange_code_for_tokens(code)
    access_token = tokens["access_token"]
    refresh_token = tokens["refresh_token"]
    logger.info(f"Access Token: {access_token}")
    logger.info(f"Refresh Token: {refresh_token}")


[32m2025-06-05 10:43:53.856[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m78[0m - [1mAutenticação Spotify OAuth2.[0m
https://accounts.spotify.com/authorize?client_id=4c2e0691a2f44cfa8c2561fd6718deb2&response_type=code&redirect_uri=http%3A%2F%2F127.0.0.1%3A8888%2Fcallback&scope=playlist-read-private[0m
[32m2025-06-05 10:43:53.858[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_code_manual[0m:[36m25[0m - [1m
Após login, copie a url completa da barra de endereços.[0m
[32m2025-06-05 10:44:23.120[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m80[0m - [1mAuthorization code recebido: AQCZBrNQprka37SXf_fOBQ7hefyycO-aE_BV6WYHsQjpsgCTDNkKX_YY_9VVqQ2NOyQWmvkVz9DxxKHTee5mciC91EAdyzZmQ7eBLWIXXtpzTvMEs3uKwH8GUeqx0P_ogUbnBf5h2vcUg6nR_T7vxjQX7n-DxPvhG4fiLGZfjc1kaS4tV13bvjI4boQvea3w_TdO5MxbO-X7[0m
[32m2025-06-05 10:44:23.124[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m82[0m - [1mTrocando o code por access_token e refresh

In [None]:
# Intervalo de tempo entre as requisições para não atingir o limite permitido pela API do Spotify.
intervalo_iter = 2

def search(auth_token: str, genero: str) -> dict:
    """
    Busca tracks de um gênero específico.

    Args:
        auth_token (str): Token de autenticação.
        genero (str): Gênero do artista.
    
    Returns:
        dict: Dicionário com as informações das tracks.
    """
    limit = 3
    offset = 0

    query = f"genre:{genero}"
    query_encoded = urllib.parse.quote(query)

    url = f"https://api.spotify.com/v1/search"
    headers = {"Authorization": f"Bearer {auth_token}"}
    params = {
        "q": query_encoded,
        "type": "track",
        "limit": limit,
        "offset": offset
    }

    response = requests.get(url, params=params, headers=headers)

    try:
        response.raise_for_status()
    except requests.exceptions.HTTPError: 
        logger.error(f"Erro ao realizar a requisição: {response.text} | Status code: {response.status_code}")
        raise
    return response.json()


def artist(auth_token: str, artist_id: str) -> dict:
    """
    Obtém os dados de um artista específico.

    Args:
        auth_token (str): Token de autenticação.
        artist_id (str): ID do artista.

    Returns:
        dict: Dicionário com as informações do artista.
    """
    url = f"https://api.spotify.com/v1/artists/{artist_id}"
    headers = {"Authorization": f"Bearer {auth_token}"}
    payload = {}

    response = requests.get(url, headers=headers, data=payload)

    try:
        response.raise_for_status()
    except requests.exceptions.HTTPError: 
        logger.error(f"Erro ao realizar a requisição: {response.text} | Status code: {response.status_code}")
        raise
    return response.json()


def artists_albums(auth_token: str, artist_id: str) -> list[dict]:
    """
    Obtém os albums de um artista específico.

    Args:
        auth_token (str): Token de autenticação.
        artist_id (str): ID do artista.

    Returns:
        list[dict]: Lista de dicionários com as informações dos albums.
    """
    limit = 50
    offset = 0
    all_items = []

    while True:
        url = f"https://api.spotify.com/v1/artists/{artist_id}/albums"
        headers = {"Authorization": f"Bearer {auth_token}"}
        params = {
            "limit": limit,
            "offset": offset
        }
        payload = {}

        response = requests.get(url, params=params, headers=headers, data=payload)

        try:
            response.raise_for_status()
        except requests.exceptions.HTTPError: 
            logger.error(f"Erro ao realizar a requisição: {response.text} | Status code: {response.status_code}")
            raise

        json_response = response.json()
        items = json_response["items"]

        if not items:
            break

        all_items.extend(items)
        offset += limit
        sleep(intervalo_iter)

    return all_items


def album_tracks(auth_token: str, album_id: str) -> list[dict]:
    """
    Obtém as tracks de um album específico.

    Args:
        auth_token (str): Token de autenticação.
        album_id (str): ID do album.

    Returns:
        list[dict]: Lista de dicionários com as informações das tracks.
    """
    limit = 50
    offset = 0
    all_items = []

    while True:
        url = f"https://api.spotify.com/v1/albums/{album_id}/tracks"
        headers = {"Authorization": f"Bearer {auth_token}"}
        params = {
            "limit": limit,
            "offset": offset
        }
        payload = {}

        response = requests.get(url, params=params, headers=headers, data=payload)

        try:
            response.raise_for_status()
        except requests.exceptions.HTTPError: 
            logger.error(f"Erro ao realizar a requisição: {response.text} | Status code: {response.status_code}")
            raise

        json_response = response.json()
        items = json_response["items"]

        if not items:
            break

        all_items.extend(items)
        offset += limit
        sleep(intervalo_iter)

    return all_items


def playlist_tracks(auth_token: str, album_id: str) -> list[dict]:
    """
    Obtém as tracks de uma playlist específica.

    Args:
        auth_token (str): Token de autenticação.
        album_id (str): ID do album.

    Returns:
        list[dict]: Lista de dicionários com as informações das tracks.
    """
    limit = 50
    offset = 0
    all_items = []

    while True:
        url = f"https://api.spotify.com/v1/playlists/{album_id}/tracks"
        headers = {"Authorization": f"Bearer {auth_token}"}
        params = {
            "limit": limit,
            "offset": offset
        }
        payload = {}

        response = requests.get(url, params=params, headers=headers, data=payload)

        try:
            response.raise_for_status()
        except requests.exceptions.HTTPError: 
            logger.error(f"Erro ao realizar a requisição: {response.text} | Status code: {response.status_code}")
            raise

        json_response = response.json()
        items = json_response["items"]

        if not items:
            break

        all_items.extend(items)
        offset += limit
        sleep(intervalo_iter)

    return all_items


def current_user_playlists(access_token: str) -> list[dict]:
    """
    Obtém as informações das playlists salvas do usuário atual.

    Args:
        access_token (str): Token de autorização.

    Returns:
        list[dict]: Lista de dicionários com as informações das playlists.
    """
    limit = 50
    offset = 0
    all_items = []

    while True:
        url = f"https://api.spotify.com/v1/me/playlists"
        headers = {"Authorization": f"Bearer {access_token}"}
        params = {
            "limit": limit,
            "offset": offset
        }
        payload = {}

        response = requests.get(url, params=params, headers=headers, data=payload)

        try:
            response.raise_for_status()
        except requests.exceptions.HTTPError: 
            logger.error(f"Erro ao realizar a requisição: {response.text} | Status code: {response.status_code}")
            raise

        json_response = response.json()
        items = json_response["items"]

        if not items:
            break

        all_items.extend(items)
        offset += limit
        sleep(intervalo_iter)

    return all_items


In [None]:
# Gera novo token de acesso para não ultrapassar o tempo de validade
new_tokens = refresh_access_token(refresh_token)
access_token = new_tokens["access_token"]

# Gêneros existentes mais semenlhantes aos solicitados ("Rock Nacional", "Piseiro/Arrocha" e "Pop Internacional") 
generos_buscados = ["brazilian rock", "piseiro", "arrocha", "pop"]

tracks_gerais = []

# Para cada gênero, obtém as tracks e artistas
for genero in generos_buscados:
    tracks_genero = search(auth_token=access_token, genero=genero)
    
    for track in tracks_genero["tracks"]["items"]:
        # Salva dicionário da track para cada artista da track
        for artista in track["artists"]:
            try:
                dados = {
                    "artists_id": artista["id"],
                    "artists_name": artista["name"],
                    "track_id": track["id"],
                    "track_name": track["name"]
                }
                tracks_gerais.append(dados)
            except Exception as e:
                logger.error(f"Erro ao processar item: {e}")


# Define schema para os dados das tracks gerais e gera DataFrame
schema_tracks_gerais = StructType([
    StructField("artists_id", StringType(), True),
    StructField("artists_name", StringType(), True),
    StructField("track_id", StringType(), True),
    StructField("track_name", StringType(), True)
])

df_tracks_gerais = spark.createDataFrame(tracks_gerais, schema=schema_tracks_gerais)
# # Visualizar DataFrame
# df_tracks_gerais.show()

# Gera parquet dos dados das tracks gerais
output_path_tracks_gerais = os.path.abspath("tracks_gerais")

df_tracks_gerais.write.mode("overwrite").partitionBy("artists_id").parquet(output_path_tracks_gerais)

# Realiza leitura do parquet e obtém artistas únicos
parquet_tracks_gerais = spark.read.parquet(output_path_tracks_gerais)

all_artist_ids_df = parquet_tracks_gerais.select("artists_id").distinct()
all_artist_ids = [row.artists_id for row in all_artist_ids_df.collect()]

logger.info(f"Total de artistas únicos: {len(all_artist_ids)}")

[32m2025-06-05 08:25:54.786[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m52[0m - [1mTotal de artistas únicos: 16[0m


In [None]:
# Gera novo token de acesso para não ultrapassar o tempo de validade
new_tokens = refresh_access_token(refresh_token)
access_token = new_tokens["access_token"]

dados_artistas = []

for id_artista in all_artist_ids:
    try:
        artista = artist(auth_token=access_token, artist_id=id_artista)

        generos_artista = artista["genres"]
        # Verifica correspondência de gêneros
        if not any(gb in g for gb in generos_buscados for g in generos_artista):
            continue

        artist_row = {
            "artist_id": artista["id"],
            "artist_name": artista["name"],
            "genres": artista["genres"]
        }
        dados_artistas.append(artist_row)

    except Exception as e:
        logger.error(f"Erro ao obter os dados do artista: {e}")
        pass

logger.info(f"Dados dos artistas obtidos.")

[32m2025-06-05 08:26:05.854[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m27[0m - [1mDados dos artistas obtidos.[0m


In [None]:
new_tokens = refresh_access_token(refresh_token)
access_token = new_tokens["access_token"]

dados_musicas = []

for id_artista in all_artist_ids:

    # Obtendo os dados dos albums do artista
    albums = artists_albums(auth_token=access_token, artist_id=id_artista)

    for album in albums:
        id_album = album["id"]

        # Obtendo as músicas do album
        tracks = album_tracks(auth_token=access_token, album_id=id_album)
        for musica in tracks:
            musica_row = {
                "album_id": id_album,
                "album_name": album["name"],
                "artist_id": id_artista,
                "track_id": musica["id"],
                "track_name": musica["name"]
            }

            dados_musicas.append(musica_row)

logger.info("Dados das músicas obtidos.")

[32m2025-06-05 09:15:13.194[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m27[0m - [1mDados das músicas obtidos.[0m


In [None]:
new_tokens = refresh_access_token(refresh_token)
access_token = new_tokens["access_token"]

dados_user_playlists = []

c_user_playlists = current_user_playlists(access_token=access_token)

for playlist in c_user_playlists:
    playlist_row = {
        "playlist_id": playlist["id"],
        "playlist_name": playlist["name"],
        "owner_id": playlist["owner"]["id"],
        "owner_name": playlist["owner"]["display_name"],
    }
    dados_user_playlists.append(playlist_row)

logger.info("Dados das playlists salvas pelo usuário obtidos.")

[32m2025-06-05 09:15:39.723[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m17[0m - [1mDados das playlists salvas pelo usuário obtidos.[0m


In [None]:
new_tokens = refresh_access_token(refresh_token)
access_token = new_tokens['access_token']

# Obtém artistas das músicas de cada playlist salva pelo usuário
for p in dados_user_playlists:
    tracks_playlist = playlist_tracks(auth_token=access_token, album_id=p["playlist_id"])
    for t in tracks_playlist:
        try:
            p['artists_ids'] = [a['id'] for a in t['track']['artists']]
        except TypeError as e:
            logger.error(f'{e}: {t}') # Erro quando o retorno da track específica tem valores None
            pass

logger.info("Dados das playlists salvas pelo usuário atualizados.")

[32m2025-06-05 09:16:23.558[0m | [31m[1mERROR   [0m | [36m__main__[0m:[36m<module>[0m:[36m11[0m - [31m[1m'NoneType' object is not subscriptable: {'added_at': '2021-10-16T15:46:24Z', 'added_by': {'external_urls': {'spotify': 'https://open.spotify.com/user/22xyiobj3fkyns4oxfoc5zshi'}, 'href': 'https://api.spotify.com/v1/users/22xyiobj3fkyns4oxfoc5zshi', 'id': '22xyiobj3fkyns4oxfoc5zshi', 'type': 'user', 'uri': 'spotify:user:22xyiobj3fkyns4oxfoc5zshi'}, 'is_local': False, 'primary_color': None, 'track': None, 'video_thumbnail': {'url': None}}[0m
[32m2025-06-05 09:16:23.560[0m | [31m[1mERROR   [0m | [36m__main__[0m:[36m<module>[0m:[36m11[0m - [31m[1m'NoneType' object is not subscriptable: {'added_at': '2021-10-18T00:18:11Z', 'added_by': {'external_urls': {'spotify': 'https://open.spotify.com/user/22xyiobj3fkyns4oxfoc5zshi'}, 'href': 'https://api.spotify.com/v1/users/22xyiobj3fkyns4oxfoc5zshi', 'id': '22xyiobj3fkyns4oxfoc5zshi', 'type': 'user', 'uri': 'spotify:use

In [None]:
# Define schema para os dados dos artistas e gera DataFrame
schema_artistas = StructType([
    StructField("artist_id", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("genres", ArrayType(StringType(), True), True),
])

df_artistas = spark.createDataFrame(dados_artistas, schema=schema_artistas)
df_artistas.show(truncate=False)

# Define schema para os dados das músicas dos albums e gera DataFrame
schema_musicas = StructType([
    StructField("album_id", StringType(), True),
    StructField("album_name", StringType(), True),
    StructField("artist_id", StringType(), True),
    StructField("track_id", StringType(), True),
    StructField("track_name", StringType(), True),
])

df_musicas = spark.createDataFrame(dados_musicas, schema=schema_musicas)
df_musicas.show(truncate=False)

# Define schema para os dados dasn playlists do usuário e gera DataFrame
schema_playlists = StructType([
    StructField("playlist_id", StringType(), True),
    StructField("playlist_name", StringType(), True),
    StructField("owner_id", StringType(), True),
    StructField("owner_name", StringType(), True),
    StructField("artists_ids", ArrayType(StringType(), True), True)
])

df_user_playlists = spark.createDataFrame(dados_user_playlists, schema=schema_playlists)
df_user_playlists.show(truncate=False)

+----------------------+----------------+----------------------------------------------------+
|artist_id             |artist_name     |genres                                              |
+----------------------+----------------+----------------------------------------------------+
|1gveCzOmKMjqEMwuB5kKNH|Grandão Vaqueiro|[piseiro, forró, arrocha, agronejo, sertanejo]      |
|4gKSUeHW82tGBeJsJSB1JC|J. Eskine       |[arrocha, piseiro, seresta, forró, brega]           |
|3Yaa2NJiGdT82KIJMnHUZ7|Forró do HF     |[piseiro, forró, arrocha, brega]                    |
|1SXhEXzOTF7YeuQX59m7pT|NATTAN          |[piseiro, forró, sertanejo, arrocha, brega, seresta]|
|0aSE67DwCWR6JKhdVUmz6I|Alef Donk       |[arrocha, piseiro, forró, seresta, brega]           |
|3whgFbrRxsOmYVrq3t4hCY|Paulo Pires     |[piseiro, forró, arrocha, sertanejo]                |
|7skt0YXuBGQZr4LGkyTShp|ÉaBest          |[piseiro, forró, brazilian hip hop]                 |
|4v4g6B3AmczdtPlNE4t39X|GREGO           |[piseiro,

In [None]:
# Cria tabelas temporárias para DataFrames de artistas e playlists
df_artistas.createOrReplaceTempView("artistas_view")
df_user_playlists.createOrReplaceTempView("user_playlists_view")

# Realiza consulta sql para selecionar em ordem alfabética, os artistas da tabela temporária 
# de artistas que também estão dentre os da tabela de playlists do usuário
consulta = """
SELECT DISTINCT a.*
FROM artistas_view a
JOIN user_playlists_view p
ON array_contains(p.artists_ids, a.artist_id)
ORDER BY a.artist_name ASC
"""
artistas_selecionados = spark.sql(consulta)

# # Visualizar resultado
# artistas_selecionados.printSchema()
# artistas_selecionados.show(truncate=False)


# Gera parquet com o resultado dos artistas selecionados
output_path_art_enc = os.path.abspath("artistas_selecionados")

artistas_selecionados.write.mode("overwrite").parquet(output_path_art_enc)

# # Leitura do parquet gerado
# parquet_artistas = spark.read.parquet(output_path_art_enc)
# parquet_artistas.show()