 <font color=blue size=4> 
Case 2 - Integração com API Spotify
 </font>

In [None]:
# Instalação de bibliotecas
%pip install requests google-cloud-bigquery google-cloud-secret-manager google-cloud-storage

# Importação de bibliotecas
import os
import json
import base64
import requests
from google.cloud import bigquery
from google.cloud import secretmanager
from google.cloud import storage
from google.oauth2 import service_account

# Função para baixar o arquivo de chave JSON do Cloud Storage
def service_account_key(bucket_name, file_name, destination_file_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)

    # Baixar o arquivo da chave para o destino
    blob.download_to_filename(destination_file_name)
    print(f"Chave de service account baixada para {destination_file_name}")

# Função para configurar o Secret Manager e obter as credenciais do Spotify
def get_secret(secret_id):
    client = secretmanager.SecretManagerServiceClient()
    secret_name = f"projects/{os.environ['GOOGLE_CLOUD_PROJECT']}/secrets/{secret_id}/versions/latest" # Garante que a credencial utilizada será a mais atualizada
    response = client.access_secret_version(request={"name": secret_name})
    return json.loads(response.payload.data.decode("UTF-8"))

# Obter as credenciais do Spotify armazenadas no Secret Manager
spotify_credentials = get_secret("spotify-credentials")

# Função para obter o token de acesso do Spotify
def acess_token_spotify(client_id, client_secret):
    auth_url = "https://accounts.spotify.com/api/token"
    credentials = f"{client_id}:{client_secret}"
    encoded_credentials = base64.b64encode(credentials.encode()).decode()

    headers = {
        "Authorization": f"Basic {encoded_credentials}",
        "Content-Type": "application/x-www-form-urlencoded"
    }
    data = {
        "grant_type": "client_credentials"
    }

    response = requests.post(auth_url, headers=headers, data=data)
    if response.status_code != 200:
        print(f"Erro ao obter token: {response.text}")
        return None
    return response.json().get("access_token")

# Função para buscar os primeiros 50 resultados de podcasts relacionados ao termo “data hackers”
def busca_podcasts(token, search_term="data hackers"):
    search_url = f"https://api.spotify.com/v1/search?q={search_term}&type=show&market=BR&limit=50"
    headers = {
        "Authorization": f"Bearer {token}"
    }

    response = requests.get(search_url, headers=headers)
    if response.status_code != 200:
        print(f"Erro na busca de podcasts: {response.text}")
        return None

    response_data = response.json()
    
    # Exibir apenas os 2 primeiros resultados para otimizar visualização durante a execução
    limited_data = response_data.get('shows', {}).get('items', [])[:2]
    print("Primeiros 10 podcasts encontrados:", json.dumps(limited_data, indent=2)) 
    return response_data.get('shows', {}).get('items', [])
    
# Função para buscar todos os episódios lançados pelos Data Hackers
def busca_episodios(token, show_id):
    episodes_url = f"https://api.spotify.com/v1/shows/{show_id}/episodes"
    headers = {
        "Authorization": f"Bearer {token}"
    }
    
    all_episodes = []  # Lista para armazenar todos os episódios
    params = {'limit': 50}  # Define um limite de 50 por página

    while True:
        response = requests.get(episodes_url, headers=headers, params=params)
        if response.status_code != 200:
            print(f"Erro ao buscar episódios: {response.text}")
            return None
        
        response_data = response.json()
        all_episodes.extend(response_data.get('items', []))  # Adiciona os episódios da página atual

        # Verifica se há mais páginas
        if not response_data.get('next'):  # Se não houver próximo, sai do loop
            break
        
        # Atualiza a URL para a próxima página
        episodes_url = response_data.get('next')
        
    return all_episodes  # Retorna todos os episódios encontrados

# Função para salvar os resultados dos podcasts no BigQuery (Tabela 5)
def ingestao_bq_podcasts(podcasts, dataset_id, table_id, credentials_path):
    # Carregar as credenciais baixadas para service account
    credentials = service_account.Credentials.from_service_account_file(credentials_path)
    client = bigquery.Client(credentials=credentials)

    # Nome da tabela atualizada
    table_id = "tb_datahackers_limitada"
    
    # Referência para a tabela no BigQuery
    table_ref = client.dataset(dataset_id).table(table_id)

    # Definir o schema da Tabela 5
    schema = [
        bigquery.SchemaField("name", "STRING", description="Nome do podcast"),
        bigquery.SchemaField("description", "STRING", description="Descrição sobre o programa de podcast"),
        bigquery.SchemaField("id", "STRING", description="Identificador único do programa"),
        bigquery.SchemaField("total_episodes", "INTEGER", description="Total de episódios lançados até o momento")
    ]

    # Criar a tabela se não existir
    try:
        client.get_table(table_ref)
    except Exception:
        table = bigquery.Table(table_ref, schema=schema)
        table.description = "Tabela 5 - Primeiros 50 resultados referente a podcasts procurando pelo termo 'data hackers'"
        client.create_table(table)

    # Preparar os dados para ingestão
    rows_to_insert = []
    for podcast in podcasts:
        if podcast:  # Verifica se o podcast não é None
            rows_to_insert.append({
                "name": podcast.get('name'),
                "description": podcast.get('description'),
                "id": podcast.get('id'),
                "total_episodes": podcast.get('total_episodes', 0),  # Valor padrão se não existir
            })

    # Inserir os dados na tabela
    if rows_to_insert:
        errors = client.insert_rows_json(table_ref, rows_to_insert)
        if errors:
            print(f"Erro ao inserir dados no BigQuery: {errors}")
        else:
            print("Dados inseridos com sucesso na tabela tb_datahackers_limitada.")

# Função para salvar todos os episódios lançados pelo Data Hackers no BigQuery (Tabela 6)
def ingestao_bq_episodios(episodios, dataset_id, table_id, credentials_path):
    # Carregar as credenciais
    credentials = service_account.Credentials.from_service_account_file(credentials_path)
    client = bigquery.Client(credentials=credentials)

    # Nome da tabela atualizada
    table_id = "tb_datahackers_episodios_total"
    
    # Referência para a tabela no BigQuery
    table_ref = client.dataset(dataset_id).table(table_id)

    # Definir o schema da Tabela 6
    schema = [
        bigquery.SchemaField("id", "STRING", description="Identificação do episódio"),
        bigquery.SchemaField("name", "STRING", description="Nome do episódio"),
        bigquery.SchemaField("description", "STRING", description="Descrição do episódio"),
        bigquery.SchemaField("release_date", "STRING", description="Data de lançamento do episódio"),
        bigquery.SchemaField("duration_ms", "INTEGER", description="Duração em milissegundos do episódio"),
        bigquery.SchemaField("language", "STRING", description="Idioma do episódio"),
        bigquery.SchemaField("explicit", "BOOLEAN", description="Flag se o episódio possui conteúdo explícito"),
        bigquery.SchemaField("type", "STRING", description="Tipo de faixa de áudio")
    ]

    # Criar a tabela se não existir
    try:
        client.get_table(table_ref)
    except Exception:
        table = bigquery.Table(table_ref, schema=schema)
        table.description = "Tabela 6 - Resultado de todos os episódios lançados pelos Data Hackers"
        client.create_table(table)

    # Preparar os dados para ingestão
    rows_to_insert = []
    for episodio in episodios:
        if episodio:  # Verifica se retornou o episódio, evita erro na execução quando não for localizado
            rows_to_insert.append({
                "id": episodio.get('id'),
                "name": episodio.get('name'),
                "description": episodio.get('description'),
                "release_date": episodio.get('release_date'),
                "duration_ms": episodio.get('duration_ms'),
                "language": episodio.get('language'),
                "explicit": episodio.get('explicit'),
                "type": episodio.get('type'),
            })

    # Inserir os dados na tabela
    if rows_to_insert:
        errors = client.insert_rows_json(table_ref, rows_to_insert)
        if errors:
            print(f"Erro ao inserir dados no BigQuery: {errors}")
        else:
            print("Dados inseridos com sucesso na tabela tb_datahackers_episodios_total.")

# Função para salvar episódios com participação do Grupo Boticário no BigQuery (Tabela 7)
def ingestao_bq_episodios_grupo_boticario(episodios, dataset_id, table_id, credentials_path):
    # Carregar as credenciais
    credentials = service_account.Credentials.from_service_account_file(credentials_path)
    client = bigquery.Client(credentials=credentials)

    # Nome da tabela atualizada
    table_id = "tb_datahackers_episodios_boticario"
    
    # Referência para a tabela no BigQuery
    table_ref = client.dataset(dataset_id).table(table_id)

    # Definir o schema da Tabela 7
    schema = [
        bigquery.SchemaField("id", "STRING", description="Identificação do episódio"),
        bigquery.SchemaField("name", "STRING", description="Nome do episódio"),
        bigquery.SchemaField("description", "STRING", description="Descrição do episódio"),
        bigquery.SchemaField("release_date", "STRING", description="Data de lançamento do episódio"),
        bigquery.SchemaField("duration_ms", "INTEGER", description="Duração em milissegundos do episódio"),
        bigquery.SchemaField("language", "STRING", description="Idioma do episódio"),
        bigquery.SchemaField("explicit", "BOOLEAN", description="Flag se o episódio possui conteúdo explícito"),
        bigquery.SchemaField("type", "STRING", description="Tipo de faixa de áudio")
    ]

    # Criar a tabela se não existir
    try:
        client.get_table(table_ref)
    except Exception:
        table = bigquery.Table(table_ref, schema=schema)
        table.description = "Tabela 7 - Episódios com participação do Grupo Boticário"
        client.create_table(table)

    # Preparar os dados para ingestão
    rows_to_insert = []
    for episodio in episodios:
        if episodio and 'Grupo Boticário' in episodio.get('description', ''):  # Filtra episódios que contém "Grupo Boticário" na descrição.
            rows_to_insert.append({
                "id": episodio.get('id'),
                "name": episodio.get('name'),
                "description": episodio.get('description'),
                "release_date": episodio.get('release_date'),
                "duration_ms": episodio.get('duration_ms'),
                "language": episodio.get('language'),
                "explicit": episodio.get('explicit'),
                "type": episodio.get('type'),
            })

    # Inserir os dados na tabela
    if rows_to_insert:
        errors = client.insert_rows_json(table_ref, rows_to_insert)
        if errors:
            print(f"Erro ao inserir dados no BigQuery: {errors}")
        else:
            print("Dados inseridos com sucesso na tabela tb_datahackers_episodios_boticario.")


# Função principal que agrupa a lógica de uso das funções
def main():
    # Variáveis e parâmetros
    token = acess_token_spotify(spotify_credentials['client_id'], spotify_credentials['client_secret'])
    bucket_name = "spotify-case-podcasts" #Informar nome do bucket criado no Cloud Storage 
    file_name = "chave_sa.json" # Informar nome do arquivo .json criado com chave da service account
    destination_file_name = "/tmp/chave_sa.json"
    dataset_id = "spotify_podcasts" # Informar nome do dataset criado para armazenar tabelas criadas.
    
    # Baixar a chave da service account
    service_account_key(bucket_name, file_name, destination_file_name)

    # Buscar os 50 primeiros resultados para buscar por termo Data Hackers e fazer ingestão no BigQuery
    podcasts = busca_podcasts(token)
    if podcasts:
        ingestao_bq_podcasts(podcasts, dataset_id, "tb_datahackers_limitada", destination_file_name)

    # Buscar todos os episódios lançados pelo Data Hackers e episódios com participação do Grupo Boticário no BigQuery para fazer ingestão no BigQuery
    if podcasts:
        podcast_id = podcasts[0]['id']
        episodios = busca_episodios(token, podcast_id)
        if episodios:
            ingestao_bq_episodios(episodios, dataset_id, "tb_datahackers_episodios_total", destination_file_name)
            ingestao_bq_episodios_grupo_boticario(episodios, dataset_id, "tb_datahackers_episodios_boticario", destination_file_name)

# Execução da função principal
if __name__ == "__main__":
    main()