# Projeto Final | Extração de Dados I
----
**Sistema de Monitoramento de Avanços no Campo da Genômica**

<div style="background-color: #f2f2f2; text-align: center; padding: 10px;">
  <h3>Script para implementação da ETL </h3>
</div>

%------------------------------------------------------------------------------------------------------%<br>
<br>
``Autores:`` Andrea Elias, Anthony Heimlich, Éverton Donato, Julia Midori e Luana Kruger  <br>
<br>
``Instituição:`` ADA Tech<br>
<br>
``Projeto:`` Santander Coders 2023.2<br>
<br>
``Descrição:`` Este código implementa um sistema de monitoramento de avanços no campo da genômica. O sistema coleta, analisa e apresenta as últimas notícias relacionadas à genômica e à medicina personalizada. <br>
<br>
``Repositório GitHub:`` https://github.com/evertondcavalcante/PD-I_Santander_Coders23  <br>
<br>

%------------------------------------------------------------------------------------------------------%

## Carregar bibliotecas

In [None]:
!pip install flask 

In [None]:
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession

import requests as req
import datetime as dt
import pyspark.pandas as ps

import flask
import json

## Carregar Funções

#### Função para extração de dados com a News API

In [None]:
def extract_data(url, acess_code, searched_word, initial_data = None, final_data = None):
    """
    Realiza extração dos dados da API da NEWSAPI com base dos parâmetros recebidos

    Parameters
    ----------
    url: string
        String do método get da API.

    acess_code: string
        String com a chave usada para coletar os dados da API.

    searched_word: string
        String com a palavra a ser buscada na notícia.
        
    initial_data: date
        Data inicial a ser buscada, no formato 'YYYY-MM-DD'.

    final_data: date
        Data final a ser buscada, 'YYYY-MM-DD'.

    Returns
    -------
    response_df: float
        DataFrame com o retorno dos dados da API

    """
    if initial_data == None:
        initial_data = dt.datetime.now().date() - dt.timedelta(days=1)

    if final_data == None:
        final_data = dt.datetime.now().date() - dt.timedelta(days=1)

    parameters = {
        'q': searched_word,
        'apiKey': acess_code,
        'sortBy': 'publishedAt',
        'from': initial_data,
        'to': final_data
    }

    response = req.get(url, params = parameters)
    response_dic = response.json()

    if response_dic['status'] != 'ok':
        messageerror = response_dic['message']
        raise Exception(messageerror)
        
    dict_res = {}
    
    # Percorrer todas as noticias encontradas no resultado da chamada para criar uma chave única
    for item in response_dic['articles']:
        chave = item['url'] + item['publishedAt']

        # Transformar data em um datetime
        publishedAt = dt.datetime.strptime(item['publishedAt'], '%Y-%m-%dT%H:%M:%SZ')

        dict_res[chave] = {
            "source": item['source']['name']
            ,"author": item['author']
            ,"title": item['title']
            ,"description": item['description']
            ,"url":  item['url']
            ,"urlToImage": item['urlToImage']
            ,"publishedAt": publishedAt
            ,"content": item['content']
        }

    response_df = ps.DataFrame.from_dict(dict_res, orient='index').reset_index().rename(columns={"index": "id"})

    print("Dados extraídos com sucesso")
    
    return response_df

#### Função para carregar os dados extraídos

In [None]:
def load_data(new_data, path, file_name, key):
    """
    Realiza a leitura do arquivo com os dados já salvo e adiciona os novos dados

    Parameters
    ----------
    new_data: DataFrame
        Dataframe contendo os dados coletados da API.

    path: string
        Diretório dentro do dbfs onde os dados serão salvos.

    file_name: string
        Nome do arquivo em formato parquet onde os dados serão salvos

    key: string
        Nome do campo que identifica o registro como único
    

    Returns
    -------
    None

    """
    # Caso já exista o arquivo transformado, segue direto com a concatenação e com a carga do resultado final
    try: 
        complete_path = path + file_name
        arquivo = dbutils.fs.ls(complete_path)

        df_res = ps.read_parquet(complete_path)
        df_res = ps.concat([df_res, new_data])
        # df_res.to_parquet(complete_path)

        # elimina os registros duplicados baseado na chave passada antes de armazenar o resultado final
        df_res = df_res.sort_values(by="publishedAt")
        df_res = df_res.drop_duplicates(subset=key, keep='last')

        # armazena o resultado final
        df_res.to_parquet(complete_path)
        message = "Carga realizada com sucesso"

    # Caso o arquivo transformando ainda não exista, quer dizer que é o primeiro processo do pipeline e é preciso criar o arquivo destino
    except Exception as e:
        # caso o arquivo destino não exista, faz a escrita direta do resultado 
        if 'java.io.FileNotFoundException' in str(e):
            print("Arquivo não encontrado, primeiro processamento")
            new_data.to_parquet(complete_path)
            message = "Carga realizada com sucesso"
        else:
            message = "Erro na carga: " + str(e)

    print(message)

    # return message

#### Função para verificação do diretório

In [None]:
def check_folder(path):
    """
    Realiza verificação de existência do diretório onde os dados serão salvos

    Parameters
    ----------
    path: string
        Diretório dentro do dbfs onde os dados serão salvos.

    Returns
    -------
    None

    """
    try:
        #Verifica se o diretório passado existe. Caso não, cria
        dbutils.fs.ls(path)
        
    except Exception as e:
        if 'java.io.FileNotFoundException' in str(e):
            print("Diretório não existente. Criando diretório.")
            dbutils.fs.mkdirs(path) 

#### Função para transformação dos dados

In [None]:
def transform_data(path, file_name):

    complete_path = path + file_name

    df_new = ps.read_parquet(complete_path)

    # Garantir que os anos estão dentro de uma faixa "normal" e que não é um "bad data"
    year_range_mask = df_new["publishedAt"].dt.year>=2024
    df_new = df_new[year_range_mask]

    # Extrair ano, mês e dia
    df_new["year_published"] = df_new["publishedAt"].dt.year
    df_new["month_published"] = df_new["publishedAt"].dt.month
    df_new["day_published"] = df_new["publishedAt"].dt.day


    # Quantidade de notícias por ano, mês e dia de publicação
    df_grouped1 = df_new.groupby(["year_published", "month_published", "day_published"]).agg(quantidade_noticias=('id', 'count'))
    df_grouped1 = df_grouped1.reset_index(drop=False) # reinicia o index e não exclui o anterior


    # Quantidade de notícias por fonte e autor
    df_grouped2 = df_new.groupby(["source", "author"]).agg(quantidade_noticias=('id', 'count'))
    df_grouped2 = df_grouped2.reset_index(drop=False)


    # Quantidade de aparições de 3 palavras chaves por ano, mês e dia de publicação 
    keywords = ["cancer", "DNA", "genetic"]   # Insira as palavras chaves que deseja procurar
    for keyword in keywords:
        df_new[keyword + "_contagem"] = df_new["content"].str.count(keyword)

    df_grouped3 = df_new.groupby(["year_published", "month_published", "day_published"])[[keyword + "_contagem" for keyword in keywords]].sum()
    df_grouped3 = df_grouped3.reset_index(drop=False)



    # Salvar dados transformados
    df_grouped1.to_parquet(path + "qtd_noticias_ano_mes_dia.parquet")
    df_grouped2.to_parquet(path + "qtd_noticias_fonte_autor.parquet")
    df_grouped3.to_parquet(path + "qtd_keywords.parquet")

    return "Transformações realizadas com sucesso"

#### Função para ELT

In [None]:
def elt(url, acess_code, searched_word, path, file_name, key):

    print("Inicializa o ELT")

    try: 
        # Verifica se existe novos dados para serem processados
        arquivo = dbutils.fs.ls(path)
        message = "Arquivos novos encontrados"
        print(message)

        # Faz a chamada da extração, da carga e da transformação dos dados
        response_df = extract_data(url, acess_code, searched_word)
        check_folder(path)
        load_data(response_df, path, file_name, key)
        transform_data(path, file_name,)

        message = "ETL realizada com sucesso"

    except Exception as e: # caso não exista nenhum dado novo, retorna com a mensagem e encerra o processo
        if 'java.io.FileNotFoundException' in str(e):
            message = "Nenhum dado novo"
            print(message)
        else:
            message = "erro no ELT:" + str(e)
            print(message)
    
    return message

## Definição dos parâmetros para a execução final do processamento em lotes

In [None]:
## Definir o diretório onde será armazenado o arquivo Parquet final que conterá os dados carregados

path = "/dbfs/data_newsapi/"
file_name = "data_newsapi.parquet"

In [None]:
## Dados de acesso a News API

url = "https://newsapi.org/v2/everything?"
acess_code = "ed08e4049379498ebde784dee9d1ede8"
searched_word = "cancer OR DNA OR genetic"
key = "id"

#### Códigos úteis

In [None]:
# Remover pasta
# dbutils.fs.rm(path, True)

## Webhook aguardando a chamada de uma nova requisição

In [None]:
# Inicialização da aplicação Flask
app = flask.Flask(__name__)

# Definição da rota "/webhook" com suporte a requisições HTTP POST
@app.route("/webhook", methods=["POST"])

def handle_webhook():
    # Recupera o conteúdo da requisição como um dicionário em Python
    data = flask.request.get_json()
    
    # Imprime o conteúdo da requisição
    print("Received data:", data)
    
    message = elt(url, acess_code, searched_word, path, file_name, key)

    # Retorna uma resposta HTTP simples
    return "message"

# Verifica se o script está sendo executado como um módulo principal
if __name__ == "__main__":
    # Inicia a execução da aplicação
    app.run()

## Testes de Execução 

In [None]:
## Extração e Carga

response_df = extract_data(url, acess_code, searched_word)
check_folder(path)
load_data(response_df,path,file_name,key)

  A field of type StructType expects a pandas.DataFrame, but got: <class 'pandas.core.series.Series'>
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Arquivo não encontrado, primeiro processamento
resultado carregado com sucesso


In [None]:
### Dados extraídos
response_df.head(2)

In [None]:
### Dados carregados
df_load = ps.read_parquet(path + file_name)
df_load.head(2)

In [None]:
## Transformação

transform_data(path, file_name)

In [None]:
### Dados transformados
df_transform1 = ps.read_parquet(path + "qtd_noticias_ano_mes_dia.parquet")
df_transform2 = ps.read_parquet(path + "qtd_noticias_fonte_autor.parquet")
# df_transform3 = ps.read_parquet(path + "qtd_keywords.parquet")

In [None]:
df_transform1

In [None]:
df_transform2.head(2)

In [None]:
# Faz a chamada do ELT
elt(url, acess_code, searched_word, path, file_name, key)