In [None]:

import pandas as pd
from google.cloud import bigquery
from google.oauth2 import service_account
import requests
import base64
import access
import datetime

#---------------------------------------------------------------------------------------------------------------------------------------------
#3. Você deverá criar um método para realizar uma pesquisa no Spotify via requests e 
#trazer os primeiros 50 resultados referente a podcasts procurando pelo termo “data hackers” e criar uma tabela apenas com os campos abaixo: 
#a. Tabela 5: name = Nome do poscast.
#description = Descrição sobre o programa de poscast.
#id = Identificador único do programa. total_episodes = Total de episódios lançados até o momento.
#
###############################################################################################
#    CARGA TRASIENT COM RETORNO DA API SPOTIFY - PODCASTS PESQUISAR - LIKE  'DATA HACKERS'    #
###############################################################################################

# Função para obter o token de acesso do Spotify
def get_spotify_access_token(client_id, client_secret):
    token_url = 'https://accounts.spotify.com/api/token'
    token_data = {'grant_type': 'client_credentials'}
    credentials = base64.b64encode(f'{client_id}:{client_secret}'.encode('utf-8')).decode('utf-8')
    headers = {'Authorization': f'Basic {credentials}'}
    response = requests.post(token_url, headers=headers, data=token_data)

    if response.status_code == 200:
        token = response.json().get('access_token')
        return token
    else:
        print(f'Erro ao obter token. Código de status: {response.status_code}')
        print('Resposta da API:')
        print(response.text)
        return None

# Função para buscar podcasts no Spotify
def search_spotify_podcasts(query, token):
    search_url = 'https://api.spotify.com/v1/search'
    params = {'query': query, 'type': 'show', 'market': 'BR', 'limit': 50}
    headers = {'Authorization': f'Bearer {token}'}
    response = requests.get(search_url, params=params, headers=headers)
    
    if response.status_code == 200:
        return response.json()
    else:
        print(f'Erro na busca do Spotify. Código de status: {response.status_code}')
        print('Resposta da API:')
        print(response.text)
        return None


# Função para buscar episódios de um podcast específico no Spotify
# Função para buscar episódios de um podcast específico no Spotify
# Função para buscar episódios de um podcast específico no Spotify
def get_spotify_podcast_episodes_all(podcast_id, token):
    all_episodes = []
    limit = 50
    offset = 0

    # Buscar o número total de episódios
    total_episodes_url = f'https://api.spotify.com/v1/shows/{podcast_id}'
    headers = {'Authorization': f'Bearer {token}'}
    response = requests.get(total_episodes_url, headers=headers)
    total_episodes = response.json().get('total_episodes', 0)

    while offset < total_episodes:
        episodes_url = f'https://api.spotify.com/v1/shows/{podcast_id}/episodes?limit={limit}&offset={offset}'
        headers = {'Authorization': f'Bearer {token}'}
        response = requests.get(episodes_url, headers=headers)

        if response.status_code == 200:
            episodes_data = response.json().get('items', [])
            all_episodes.extend(episodes_data)

            offset += limit
            print(offset)
        else:
            print(f'Erro na busca de episódios no Spotify. Código de status: {response.status_code}')
            print('Resposta da API:')
            print(response.text)
            return None

    return all_episodes
        
# Função para carregar dados no BigQuery
def load_to_bigquery(df, dataset_id, table_id, credentials):
    try:
        client = bigquery.Client(credentials=credentials)
        table_ref = client.dataset(dataset_id).table(table_id)

        job_config = bigquery.LoadJobConfig(
            skip_leading_rows=0,
            source_format=bigquery.SourceFormat.CSV,
        )

        job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
        job.result()  # Aguardar término do job

        print(f'Dados carregados na tabela {table_id} no BigQuery.')
    except Exception as e:
        print(f'Erro ao carregar dados no BigQuery: {e}')


# Definir as credenciais do Spotify
client_id = access.client_id
client_secret = access.secret_id 
key_path = access.key_path

# Obter o token de acesso do Spotify
spotify_token = get_spotify_access_token(client_id, client_secret)

if spotify_token:
    # Buscar podcasts no Spotify
    query = 'data hackers'
    resultados_api = search_spotify_podcasts(query, spotify_token)

    if resultados_api:
        # Extrair a lista de itens dos resultados
        items = resultados_api['shows'].get('items', [])

        # Criar uma única instância de DataFrame
        df_resultados = pd.DataFrame(columns=['name', 'description', 'id', 'total_episodes'])
        # Criar uma lista de dicionários
        list_resultados = []
        for item in items:
            if item and all(field in item for field in ['name', 'description', 'id', 'total_episodes']):
                new_row = {
                    'name': item.get('name'),
                    'description': item.get('description'),
                    'id': item.get('id'),
                    'total_episodes': item.get('total_episodes'),
                }
                list_resultados.append(new_row)


        # Carregar credenciais do BigQuery
        bigquery_credentials = service_account.Credentials.from_service_account_file(key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"])

        # Substituir com as informações do seu projeto, dataset e tabela no BigQuery
        dataset_id = 'transient'
        table_id = 'stg_podcasts_pesquisa'
#print(list_resultados)

print('DROP TABELAS TRASIENT')
# Construir a instrução SQL CREATE OR REPLACE --> TRUNCATE STG
sql_create_replace = f"""
CREATE OR REPLACE TABLE transient.stg_podcasts_pesquisa(
    name STRING,
    description STRING,
    id STRING,
    total_episodes INT64
);

CREATE OR REPLACE TABLE transient.stg_podcasts_ep_total (
    RELEASE_DATE DATE,
    NAME STRING,
    DESCRIPTION STRING,
    ID STRING,
    EXTERNAL_URLS STRING,
    NAME_ORIGEM STRING
);

CREATE OR REPLACE TABLE transient.stg_podcasts_ep (
    RELEASE_DATE DATE,
    NAME STRING,
    DESCRIPTION STRING,
    ID STRING,
    EXTERNAL_URLS STRING,
    NAME_ORIGEM STRING
);


"""

# Executar a instrução SQL CREATE OR REPLACE
client = bigquery.Client(credentials=bigquery_credentials)
query_job_create_replace = client.query(sql_create_replace)
query_job_create_replace.result()


if list_resultados:
    print('Dataframe com todos os resultados:')
    df_resultados = pd.DataFrame(list_resultados)
    #print(df_resultados)
    load_to_bigquery(df_resultados, dataset_id, table_id, bigquery_credentials)
else:
    print('DataFrame vazio. Nenhum dado será carregado no BigQuery.')
#TABELA 5
###############################################################################################
#            CARGA RAW DATA PODCASTS PESQUISA - LIKE  'DATA HACKERS'                          #
###############################################################################################

# Substituir com as informações dataset e tabela no BigQuery
dataset_id = 'raw_data'
table_id = 't_podcasts_pesquisa'

# Construir a instrução SQL
sql_query = f"""
SELECT
    a.NAME,
    a.DESCRIPTION,
    a.ID,
    a.TOTAL_EPISODES
FROM transient.stg_podcasts_pesquisa a
LEFT JOIN raw_data.t_podcasts_pesquisa b ON a.ID = b.ID
WHERE b.ID IS NULL;
"""

# Executar a instrução SQL e salvar os resultados em um DataFrame
query_job = client.query(sql_query)
df_result = query_job.to_dataframe()
#print(df_result)
# Verificar se há dados para inserir
if not df_result.empty:
    # Imprimir o DataFrame com os resultados
    print("DataFrame com os resultados:")
    print(df_result)  
    # Carregar dados no BigQuery
    load_to_bigquery(df_result, dataset_id, table_id, bigquery_credentials)

else:
    print(F"Nenhum novo dado para inserir na {dataset_id}.{table_id}.")

#-------------------------------------------------------------------------------------------------------------------------------------------------#
#Realizar a extração de dados de todos os episódios lançados pelos Data Hackers 
#via requests e ingerir esse resultado em duas tabelas seguindo os critérios abaixo: 
#a. Tabela 6: Resultado de todos os episódios. 
#b. Tabela 7: Apenas com os resultados dos episódios com participação do Grupo 
#Boticário. 

###############################################################################################
#    CARGA TRASIENT COM RETORNO DA API SPOTIFY - PODCAST 'DATA HACKERS' - FULL  EPISODIOS     #
###############################################################################################


# Substituir com as informações do seu projeto, dataset e tabelas no BigQuery
dataset_id_transient = 'transient'
table_id_episodes_total = 'stg_podcasts_ep_total'
table_id_episodes = 'stg_podcasts_ep'

# Obtendo ID do Data Hackers a partir dos resultados da pesquisa
data_hackers_id = None
for item in list_resultados:
    if 'Data Hackers' in item.get('name', ''):
        data_hackers_id = item['id']
        total_episodes_data_hackers = item['total_episodes']
        break

print('id do pod Data Hackers')
print(data_hackers_id)
if data_hackers_id:
    # Buscar episódios do Data Hackers no Spotify
    data_hackers_episodes = get_spotify_podcast_episodes_all(data_hackers_id, spotify_token)

    if data_hackers_episodes:
        # Criar DataFrame para os episódios
        df_episodes_total = pd.DataFrame(columns=['release_date', 'name', 'description', 'id', 'external_urls', 'name_origem'])
        df_episodes = pd.DataFrame(columns=['release_date', 'name', 'description', 'id', 'external_urls', 'name_origem'])

        for episode in data_hackers_episodes:
            release_date = datetime.datetime.strptime(episode.get('release_date'), '%Y-%m-%d').date() if episode.get('release_date') else None
            new_row = {
                'release_date': release_date,
                'name': episode.get('name'),
                'description': episode.get('description'),
                'id': episode.get('id'),
                'external_urls': str(episode.get('external_urls')),
                'name_origem': 'Data Hackers',
            }

            df_episodes_total = pd.concat([df_episodes_total, pd.DataFrame([new_row])], ignore_index=True)
            
            # Filtrar apenas os episódios com release_date
            if release_date:
                df_episodes = pd.concat([df_episodes, pd.DataFrame([new_row])], ignore_index=True)

                # Verificar a participação do Grupo Boticário
                if 'Grupo Boticário' in episode.get('description', ''):
                    # Adicionar os episódios com participação do Grupo Boticário à tabela stg_podcasts_ep
                    load_to_bigquery(pd.DataFrame([new_row]), dataset_id_transient, table_id_episodes, bigquery_credentials)

        # Carregar dados nas tabelas do BigQuery
        load_to_bigquery(df_episodes_total, dataset_id_transient, table_id_episodes_total, bigquery_credentials)
        # Não é necessário carregar novamente df_episodes na tabela stg_podcasts_ep
    else:
        print('Nenhum episódio encontrado para Data Hackers no Spotify.')

else:
    print('ID do Data Hackers não encontrado nos resultados da pesquisa no Spotify.')

#TABELA 6 
###############################################################################################
#  CARGA RAW DATA COM RETORNO DA API SPOTIFY - PODCAST 'DATA HACKERS' - FULL E FILTRADO       #
###############################################################################################

# Substituir com as informações dataset e tabela no BigQuery
dataset_id = 'raw_data'
table_id = 't_podcasts_ep_total'

# Construir a instrução SQL
sql_query_total = f"""
SELECT
    a.RELEASE_DATE ,
    a.NAME ,
    a.DESCRIPTION ,
    a.ID ,
    a.EXTERNAL_URLS ,
    a.NAME_ORIGEM 
FROM  transient.stg_podcasts_ep_total  a
LEFT JOIN     raw_data.t_podcasts_ep_total  b ON    a.ID = b.ID
WHERE  b.ID IS NULL;
"""

# Executar a instrução SQL e salvar os resultados em um DataFrame
query_job = client.query(sql_query_total)
df_result = query_job.to_dataframe()
#print(df_result)
# Verificar se há dados para inserir
if not df_result.empty:
    # Imprimir o DataFrame com os resultados
    print("DataFrame com os resultados:")
    #print(df_result)
    # Carregar dados no BigQuery
    load_to_bigquery(df_result, dataset_id, table_id, bigquery_credentials)

else:
    print(F"Nenhum novo dado para inserir na {dataset_id}.{table_id}.")


#TABELA 7
###############################################################################################
#  CARGA RAW DATA COM RETORNO DA API SPOTIFY - PODCAST 'DATA HACKERS' - FULL E FILTRADO       #
###############################################################################################

# Substituir com as informações dataset e tabela no BigQuery
dataset_id = 'raw_data'
table_id = 't_podcasts_ep'

# Construir a instrução SQL
sql_query_ep = f"""
SELECT
    a.RELEASE_DATE ,
    a.NAME ,
    a.DESCRIPTION ,
    a.ID ,
    a.EXTERNAL_URLS ,
    a.NAME_ORIGEM 
FROM  transient.stg_podcasts_ep  a
LEFT JOIN     raw_data.t_podcasts_ep  b ON    a.ID = b.ID
WHERE  b.ID IS NULL;
"""

# Executar a instrução SQL e salvar os resultados em um DataFrame
query_job = client.query(sql_query_ep)
df_result = query_job.to_dataframe()
#print(df_result)
# Verificar se há dados para inserir
if not df_result.empty:
    # Imprimir o DataFrame com os resultados
    print("DataFrame com os resultados:")
    #print(df_result)
    # Carregar dados no BigQuery
    load_to_bigquery(df_result, dataset_id, table_id, bigquery_credentials)

else:
    print(F"Nenhum novo dado para inserir na {dataset_id}.{table_id}.")
