## Projeto de extração de dados I

**Resumo:** <br>
* Campos: Genômica e medicina personalizada. <br>
* Dados atualizados: (tendências, novidades e tratamentos) dessas áresas. <br>
* Fazer um processo para receber os dados, carregar e tratar essas informações gerando insights valiosos.

#### Instalando bibliotecas

In [0]:
!pip install flask

#### Importando bibliotecas

In [0]:
import requests                     # Requisições de API.
import pyspark.pandas as ps         # Trabalhar com dados estruturados.
from pyspark.dbutils import DBUtils # Realizar operações ELT.
import flask                        # Criar Webhook.
from datetime import datetime       # Trabalhar com datas.
import time as t                    # Trabalhar com tempos (Temporazidor).

#### Definições e variáveis

In [0]:
# Importar a Key da API.
%run ./api_key

In [0]:
# Diretórios para cada camada da Arquitetura Midellion.
dir_base = '/FileStore/tables/Projetos'                     # Diretório base.
dir_landing_zone = f'{dir_base}/landing_zone'              # Dados brutos da API.
dir_bronze = f'{dir_base}/bronze/dado_consolidado.parquet' # Dados consolidados.
dir_silver = f'{dir_base}/silver/dado_limpo.parquet'       # Dado consolidado e limpos.
dir_gold = f'{dir_base}/gold'                              # Dados transformados.

#### Funções

##### Função para Extração dos dados (Landing zone)

In [0]:
def consulta_api(current_datetime : str) -> dict:
    """
    Consulta API e retorna os dados.
    
    Parâmetros:
        current_datetime (str) : data para consulta.
    
    Retorno:
        dados (dict): Dados da API.
    """
    
    # Parâmetros API.
    by='popularity'

    q_base = 'genome OR (personalized AND medicine)'
    q_filtro1 = '(+DNA)'
    q_filtro2 = '(+gene AND +therapies)'
    q_filtro3 = '(+genetic AND +diseases)'
    q_search = f'{q_base} AND ({q_filtro1} OR {q_filtro2} OR {q_filtro3})' 

    url = f'https://newsapi.org/v2/everything?q={q_search}&sortBy={by}&apiKey={API_KEY}&from={current_datetime}&to={current_datetime}'

    # Buscar dados da API.
    response = requests.get(url)

    if response.status_code == 200: # Caso consiga fazer a requisição.
        dados = response.json()
        
        # Pegar os dados principais.
        dados = dados['articles']
        
        if len(dados) > 0:
            message = 'Aquisição de dados via API concluída'
        else:
            message = 'Sem dados de retorno via API'
            dados = None

    else: # Caso tenha um código diferente de 200.
        dados = None
        message = 'Aquisição de dados via API falhou, código de status: ' + str(response.status_code) 

    return dados, message

In [0]:
def melhora_dados_api(dados : dict) -> dict:
    """
    Função que melhora os dados da API.
    
    Parâmetros:
        dados (dict): Dados da API.
        
    Retorno:
        dados (dict): Dados melhorados.
    """

    # data da consulta.
    data_atual = datetime.now().strftime('%Y-%m-%d')

    # Para cada item.
    for i in range(len(dados)):
        # Arrumando a coluna 'source'.
        name = dados[i]['source']['name']
        dados[i]['source'] = name
        
        # Adicionar data atual no dicionário.
        dados[i]['data_load'] = data_atual

        # Criar uma key para cada item.
        key = dados[i]['url'] + dados[i]['publishedAt']

        # Adicionar key.
        dados[i]['key'] = key

    return dados

In [0]:
def extract_data(current_datetime : str) -> ps.DataFrame:
    """
    Função que extrai os dados da API e retorna um DataFrame, e salva um arquivo no diretório Landing_zone.
    
    Parâmetros:
        current_datetime (str) : data para importar os dados.

    Retorno:
        df_result (ps.DataFrame): DataFrame com os dados extraídos da API.
    """

    # Faz a consulta na API e retorna um dicionário.
    dados, message = consulta_api(current_datetime)
    
    print(message) # Mostra a mensagem do retorno da API.
    if dados:
        # Melhorar dados e adiciona a Key.
        dados = melhora_dados_api(dados)

        # Transforma o dicionário em um DataFrame.
        df_result = ps.DataFrame(dados) 
        
        # Exportar os dados para um arquivo CSV na Landing Zone.
        df_result.to_csv(f'{dir_landing_zone}/dados_extraidos.csv')

        print('Resultado extraído com sucesso!')

        return df_result
    else:
        print('Falha: Nenhum dado foi extraído da API!')

        return None


##### Função para Carregamento dos dados extraídos (Bronze)

In [0]:
def load_data(df_new : ps.DataFrame) -> str:
    """
    Concatena os dados extraídos para o parquet consolidado.
    
    Parâmetros:
        df_new (ps.DataFrame): DataFrame com os dados extraídos da API.
        
    Retorno:
        message (str): mensagem detalhando o sucesso ou falha da operação.
    """
    
    try: # Se tiver arquivos.
        arquivo = dbutils.fs.ls(dir_bronze)         # Lista o arquivo se existir.
        df_result = ps.read_parquet(dir_bronze)     # Carrega em df.
        df_result = ps.concat([df_result, df_new])  # Concatena os novos dados no df consolidado.
        
        # Verificação de arquivos duplicados.
        df_result = df_result.sort_values(by='data_load')
        df_result = df_result.drop_duplicates(subset='key', keep='last')

        df_result.to_parquet(dir_bronze)            # Exportar o df consolidade (Parquet).

        message = 'Resultado carregado com sucesso!'

    except Exception as e: # Se não tiver arquivos.
        if 'java.io.FileNotFoundException' in str(e):
            message = "Arquivo não encontrado para load_data, primeiro processamento"
            df_new.to_parquet(dir_bronze)

        else: 
            message = 'Erro na carga: ' + str(e)

    # Remove os dados brutos que foram carregados.
    dbutils.fs.rm(f'{dir_landing_zone}/dados_extraidos.csv', True)

    return message
        

##### Função para Limpeza dos dados (Silver)

In [0]:
def clear_data() -> bool:
    """
    Função que limpa os dados consolidados.
    
    Parâmetros:
        None
    
    Retorno:
        bool (bool): valor de sucesso ou falha da execução da função.
    """
    try:
        # Ler o dado consolidado.
        df = ps.read_parquet(dir_bronze)

        # Remover linhas com valores nulos.
        df = df.dropna()

        # Mudar tipo de dados da coluna publishedAt.
        # Arrumar a data de publicação.
        df['publishedAt'] = df['publishedAt'].apply(lambda x : x.split('T')[0])
        df['publishedAt'] = ps.to_datetime(df['publishedAt'], format='%Y-%m-%d')

        # Armazenar parquet limpo.
        df.to_parquet(dir_silver)

        print('Limpeza realizada com sucesso!')

        return True
    except:
        print('Falha: não foi possível realizar a limpeza dos dados!')

        return False


##### Função para Transformação dos dados (Gold)

In [0]:
def agrupar_by_data(df : ps.DataFrame) -> ps.DataFrame:
    # Criar novas colunas.
    df['Year'] = df['publishedAt'].dt.year
    df['Month'] = df['publishedAt'].dt.month
    df['Day'] = df['publishedAt'].dt.day

    # Agrupar -> Por ano, mês e dia.
    df = df.groupby(['Year', 'Month', 'Day']).agg(Qtde_noticias = ('title', 'Count'))

    # Reinicia o index e não exclui o anterior.
    df = df.reset_index(drop=False)

    return df

In [0]:
def agrupar_by_source(df : ps.DataFrame) -> ps.DataFrame:
    # Agrupar -> Por source e author.
    df = df.groupby(['source', 'author']).agg(Qtde_noticias = ('title', 'Count'))

    # Reinicia o index e não exclui o anterior.
    df = df.reset_index(drop=False)

    return df

In [0]:
def agrupar_by_key(df : ps.DataFrame) -> ps.DataFrame:
    # Criar novas colunas.
    df['Year'] = df['publishedAt'].dt.year
    df['Month'] = df['publishedAt'].dt.month
    df['Day'] = df['publishedAt'].dt.day
    #df['Words'] = df['description'].apply(lambda x: x.split())

    # Agrupar -> Por ano, mês e dia.
    df = df.groupby(['Year', 'Month', 'Day']).agg(Qtde_noticias = ('title', 'Count'))

    # Reinicia o index e não exclui o anterior.
    df = df.reset_index(drop=False)

    return df

In [0]:
def transform_data() -> None:
    
    # Ler o dado limpo.
    df = ps.read_parquet(dir_silver)

    # 4.1 - Quantidade de notícias por ano, mês e dia de publicação;
    df_groupDate = agrupar_by_data(df)

    # 4.2 - Quantidade de notícias por fonte e autor;
    df_groupSource = agrupar_by_source(df)

    # 4.3 - Quantidade de notícias por palavra-chave;
    df_groupWordKey = agrupar_by_key(df)

    # Armazena os resultados transformados.
    df_groupDate.to_parquet(f'{dir_gold}/dado_transformado_by_data.parquet')
    df_groupSource.to_parquet(f'{dir_gold}/dado_transformado_by_source.parquet')
    df_groupWordKey.to_parquet(f'{dir_gold}/dado_transformado_by_key.parquet')

    print('Transformação realizada com sucesso!')

##### Função ELT

In [0]:
def elt(current_datetime : str):
    """
    Função que executa o processo de ELT.

    Parâmetros:
        current_datetime (str) : data para importar os dados.

    Retorno:
        None
    """

    print("** inicializa o ELT **\n")

    try: # Verifica se existe novos dados para serem processados.

        # Faz a chamada da extração, da carga e da transformação dos dados.
        df_new = extract_data(current_datetime)
        
        if df_new is not None: # Verifica se ocorreu a extração de dados corretamente.
            message = load_data(df_new)
            if message == 'Resultado carregado com sucesso!' or message == "Arquivo não encontrado para load_data, primeiro processamento": # Verifica se conseguiu carregar os dados.
                print(message)
                limpou = clear_data()
                if limpou: # Verifica se conseguiu limpar os dados.
                    transform_data()
                    message = '\n** ELT realizado com sucesso! **\n'
        else:
            message = '\n** ELT interrompido! **\n'

    except Exception as e: #caso não exista nenhum dado novo, retorna com a mensagem e encerra o processo
            message = '\nErro no ELT:' + str(e) + '\n'
    
    return message

#### Webhook

In [0]:
# Inicialização da aplicação Flask
app = flask.Flask(__name__)

# Definição da rota "/webhook" com suporte a requisições HTTP POST
@app.route("/webhook", methods=["POST"])
def handle_webhook():
    # Recupera o conteúdo da requisição como um dicionário em Python
    data = flask.request.get_json()
    
    # Imprime o conteúdo da requisição
    print("Received data:", data)
    
    # Data recebida.
    current_datetime = data.get('current_datetime')

    # Executar o elt.
    message = elt(current_datetime)

    # Mostra a mensagem!
    print(message)

    # Retorna uma resposta HTTP simples
    return message

# Verifica se o script está sendo executado como um módulo principal
if __name__ == "__main__":
    # Inicia a execução da aplicação
    app.run()