# Enunciado


## Projeto - Extração de Dados I
------------------------------
## Sistema de Monitoramento de Avanços no Campo da Genômica  

## Contexto:  
O grupo trabalha no time de engenharia de dados na HealthGen, uma empresa especializada em genômica e pesquisa de medicina personalizada. A genômica é o estudo do conjunto completo de genes de um organismo, desempenha um papel fundamental na medicina personalizada e na pesquisa biomédica. Permite a análise do DNA para identificar variantes genéticas e mutações associadas a doenças e facilita a personalização de tratamentos com base nas características genéticas individuais dos pacientes.

A empresa precisa se manter atualizada sobre os avanços mais recentes na genômica, identificar oportunidades para pesquisa e desenvolvimento de tratamentos personalizados e acompanhar as tendências em genômica que podem influenciar estratégias de pesquisa e desenvolvimento. Pensando nisso, o time de dados apresentou uma proposta de desenvolvimento de um sistema que coleta, analisa e apresenta as últimas notícias relacionadas à genômica e à medicina personalizada, e também estuda o avanço do campo nos últimos anos.

O time de engenharia de dados tem como objetivo desenvolver e garantir um pipeline de dados confiável e estável. As principais atividades são:

### 1. Consumo de dados com a News API:  
Implementar um mecanismo para consumir dados de notícias de fontes confiáveis e especializadas em genômica e medicina personalizada, a partir da News API:  
https://newsapi.org/

### 2. Definir Critérios de Relevância:  
Desenvolver critérios precisos de relevância para filtrar as notícias. Por exemplo, o time pode se concentrar em notícias que mencionem avanços em sequenciamento de DNA, terapias genéticas personalizadas ou descobertas relacionadas a doenças genéticas específicas.

### 3. Cargas em Batches:  
Armazenar as notícias relevantes em um formato estruturado e facilmente acessível para consultas e análises posteriores. Essa carga deve acontecer 1 vez por hora. Se as notícias extraídas já tiverem sidos armazenadas na carga anterior, o processo deve ignorar e não armazenar as notícias novamente, os dados carregados não podem ficar duplicados.  
![Alt text](../imagens/image.png)

### 4. Dados transformados para consulta do público final  
A partir dos dados carregados, aplicar as seguintes transformações e armazenar o resultado final para a consulta do público final:  

4.1 - Quantidade de notícias por ano, mês e dia de publicação;  

4.2 - Quantidade de notícias por fonte e autor;  

4.3 - Quantidade de aparições de 3 palavras chaves por ano, mês e dia de publicação (as 3 palavras chaves serão as mesmas usadas para fazer os filtros de relevância do item 2 (2. Definir Critérios de Relevância)).  

Atualizar os dados transformados 1 vez por dia.  

![Alt text](../imagens/image-1.png)

----------------------------------------

Além das atividades principais, existe a necessidade de busca de dados por eventos em tempo real quando é necessário, para isso foi desenhado duas opções:

### Opção 1 - Apache Kafka e Spark Streaming:  

Preparar um pipeline com Apache Kafka e Spark Streaming para receber os dados do Produtor Kafka representado por um evento manual e consumir os dados com o Spark Streaming armazenando os resultados temporariamente. Em um processo paralelo, verificar os resultados armazenados temporiamente e armazenar no mesmo destino do item 3 (3. Cargas em Batches) aqueles resultados que ainda não foram armazenados no destino (os dados carregados não podem ficar duplicados). E por fim, eliminar os dados temporários após a verificação e a eventual carga.

![Alt text](../imagens/image-2.png)

### Opção 2 - Webhooks com notificações por eventos:  
Configurar um webhook para adquirir as últimas notícias a partir de um evento representado por uma requisição POST e fazer a chamada da API e por fim armazenar os resultados temporariamente. Em um processo paralelo, verificar os resultados armazenados temporiamente e armazenar no mesmo destino do item 3 (3. Cargas em Batches) aqueles resultados que ainda não foram armazenados no destino (os dados carregados não podem ficar duplicados). E por fim, eliminar os dados temporários após a verificação e a eventual carga.

![Alt text](../imagens/image-3.png)

Atividades que precisam ser realizadas pelo grupo definido em aula.  

O grupo precisa construir o pipeline de dados seguindo os requisitos das atividades principais e escolher entre a Opção 1 e Opção 2 para desenvolvimento.  

# Resolução

## 1 Bibliotecas
Para instalar as bibliotecas necessárias do projeto:

In [7]:
pip install requests newsapi-python

Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'c:\Users\Danie\AppData\Local\Programs\Python\Python39\python.exe -m pip install --upgrade pip' command.


Para importas as bibliotecas e definir as variáveis necessárias:

In [16]:
from newsapi import NewsApiClient
import requests
import time
import pandas as pd
from datetime import datetime
import re
import os

API_KEY = '9a77398581d74beebbd29dbebd159a53'
PALAVRAS_CHAVES_GERAIS = '(genômica OR genômico) AND (terapia OR sequenciamento OR doença)'
PALAVRAS_CHAVES_ESPECIFICAS = ['terapia', 'sequenciamento', 'doença']

## 2 Definições de Funções

Abaixo está localizado todas as definições de funções que será usado no projeto:

In [94]:
# 1. Consumo de dados com a News API -------------------------------------------------------------------------------------------------

## 1 Usando Biblioteca NewsAPI
def fazer_a_request_1( API_KEY:str='9a77398581d74beebbd29dbebd159a53' , PALAVRAS_CHAVES:str='(genômica OR genômico) AND (terapia OR sequenciamento OR doença)' ):
    # A função realiza o request com API_KEY e PALAVRAS_CHAVES declaradas e é retornado um json
    newsapi = NewsApiClient(api_key=API_KEY)
    response_lib = newsapi.get_everything(q=PALAVRAS_CHAVES,
                                        language='pt',
                                        sort_by='publishedAt'
    )
    return response_lib

## 2 Usando requests
def fazer_a_request_2( API_KEY:str='9a77398581d74beebbd29dbebd159a53' , PALAVRAS_CHAVES:str='(genômica OR genômico) AND (terapia OR sequenciamento OR doença)' ):
    # A função realiza o request com API_KEY e PALAVRAS_CHAVES declaradas e é retornado um json
    url = f'https://newsapi.org/v2/everything?q={PALAVRAS_CHAVES}&language=pt&sortBy=publishedAt&apiKey={API_KEY}'
    response_requests = requests.get(url).json()
    return response_requests



# 2. Definir Critérios de Relevância -------------------------------------------------------------------------------------------------

def tratar_dados(json_de_noticias) -> pd.DataFrame:
    df = pd.json_normalize(json_de_noticias['articles'])
    df['publishedAt'] = pd.to_datetime(df['publishedAt']).dt.tz_localize(None) # inserido, pq linha abaixo estava dando erro "TypeError: Cannot use .astype to convert from timezone-aware dtype to timezone-naive dtype"
    df['publishedAt'] = df['publishedAt'].astype('datetime64[ms]')
    df.fillna("desconhecido", inplace=True)

    # Remover noticias sem informações nas colunas: descrição, conteúdo e nome de fonte 
    df = df[~df['description'].str.contains('desconhecido', case=False)]
    df = df[~df['content'].str.contains('desconhecido', case=False)]
    df = df[~df['source.name'].str.contains('desconhecido', case=False)]

    return df



# 3. Cargas em Batches: ---------------------------------------------------------------------------------------------------------------

def armazenar_noticias(df) -> None:

    # Verifica se já existe um arquivo com os dados
    if os.path.isfile('noticias.csv'):
        # Lê o arquivo existente
        df_existente = carrega_armazenamento()

        # Identifica as notícias repetidas
        df_repetidas = df[df['title'].isin(df_existente['title'])]

        # Exclui as notícias repetidas
        df = df.drop(df_repetidas.index)

        # Grava o dataframe em um arquivo CSV
        df.to_csv('noticias.csv', index=0, mode='a', header=False)
    else:
        df.to_csv('noticias.csv', index=0)

    return None

def carrega_armazenamento():
    df = pd.read_csv('noticias.csv')
    return df 



# 4. Dados transformados para consulta do público final ------------------------------------------------------------------------------

def qtd_noticia_ano_mes_dia(df_inteiro):

    # Calcula a quantidade de notícias por ano, mês/ano e dia/mês/ano
    df_inteiro['ano'] = df_inteiro['publishedAt'].dt.year
    df_inteiro['mes_ano'] = df_inteiro['publishedAt'].dt.to_period('M')
    df_inteiro['dia_mes_ano'] = df_inteiro['publishedAt'].dt.to_period('D')
    
    qtd_por_ano = df_inteiro['ano'].value_counts().sort_index().reset_index()
    qtd_por_ano.columns = ['ano', 'quantidade']

    qtd_por_mes = df_inteiro['mes_ano'].value_counts().sort_index().reset_index()
    qtd_por_mes.columns = ['mes_ano', 'quantidade']

    qtd_por_dia = df_inteiro['dia_mes_ano'].value_counts().sort_index().reset_index()
    qtd_por_dia.columns = ['dia_mes_ano', 'quantidade']
    
    return qtd_por_ano, qtd_por_mes, qtd_por_dia 

def qtd_noticia_fonte_autor(df_inteiro):
    # Calcula a quantidade de notícias por fonte
    qtd_por_fonte = df_inteiro['source.name'].value_counts().reset_index()
    qtd_por_fonte.columns = ['fonte', 'quantidade']
    
    # Calcula a quantidade de notícias por autor
    qtd_por_autor = df_inteiro['author'].value_counts().reset_index()
    qtd_por_autor.columns = ['autor', 'quantidade']
    
    return qtd_por_fonte, qtd_por_autor

def qtd_aparicao_palavras_chaves(df_inteiro, palavras_chave):  
    # Adiciona colunas para cada palavra-chave no DataFrame
    for palavra in palavras_chave:
        df_inteiro[palavra] = df_inteiro['description'].str.contains(fr'\b{palavra}\b', case=False)
    
    df_inteiro['ano'] = df_inteiro['publishedAt'].dt.year
    df_inteiro['mes'] = df_inteiro['publishedAt'].dt.month # Aproveitei aqui para deixar as colunas apenas do mês e do dia, pois serão útil na função "qtd_aparicao_palavras_chaves"
    df_inteiro['dia'] = df_inteiro['publishedAt'].dt.day

    # Agrupa por ano, mês e dia e conta a quantidade de aparições de cada palavra-chave
    df_inteiro = df_inteiro.groupby(['ano', 'mes', 'dia'])[palavras_chave].sum().reset_index()
    
    # Cria um dataframe com as colunas 'ano', 'mes', 'dia' e a contagem de cada palavra-chave
    df_contagem_palavras_chave = df_inteiro.melt(id_vars=['ano', 'mes', 'dia'], var_name='palavra_chave', value_name='quantidade')
    
    return df_contagem_palavras_chave   

def armazenar_dados_estatisticos(qtd_por_ano, qtd_por_mes, qtd_por_dia, qtd_por_fonte, qtd_por_autor, df_contagem_palavras_chave):
    print("Quantidade de notícias por ano:")
    print(qtd_por_ano)

    print("\nQuantidade de notícias por mês:")
    print(qtd_por_mes)

    print("\nQuantidade de notícias por dia:")
    print(qtd_por_dia)

    print("\nQuantidade de notícias por fonte:")
    print(qtd_por_fonte)

    print("\nQuantidade de notícias por autor:")
    print(qtd_por_autor)

    print("\nContagem de aparições das palavras-chave:")
    print(df_contagem_palavras_chave)
    
    estatisticas = pd.concat([qtd_por_ano, qtd_por_mes, qtd_por_dia, qtd_por_fonte, qtd_por_autor, df_contagem_palavras_chave], axis=1)
    estatisticas.to_csv('resultado_final.csv', index = False)
    #estatisticas.to_excel('resultado_final.xlsx', index = False)
    return None





# # Descentralizando...

def procedimento_hora_hora():
    # 1. Consumo de dados com a News API
    response = fazer_a_request_1(API_KEY, PALAVRAS_CHAVES_GERAIS)

    # 2. Definir Critérios de Relevância
    df_agora = tratar_dados(response)

    # 3. Cargas em Batches:
    armazenar_noticias(df_agora)
    

def procedimento_diario():
    # 4. Dados transformados para consulta do público final
    df_inteiro = carrega_armazenamento()
    df_inteiro["publishedAt"] = pd.to_datetime(df_inteiro["publishedAt"])
    a, b, c = qtd_noticia_ano_mes_dia(df_inteiro)
    d, e = qtd_noticia_fonte_autor(df_inteiro)
    f = qtd_aparicao_palavras_chaves(df_inteiro, PALAVRAS_CHAVES_ESPECIFICAS)
    armazenar_dados_estatisticos(a , b , c, d, e, f)

## 3 Teste

In [10]:
# Instanciar objeto
response = fazer_a_request_1()
# Exibir chaves do dicionário
print(f'Chaves retornadas: {response.keys()}')
# Valores do dicionário
print(f"Status: {response['status']}")
print(f"Total de resultados: {response['totalResults']}")
print(f"Exemplo de artigo: {response['articles'][0]}")

Chaves retornadas: dict_keys(['status', 'totalResults', 'articles'])
Status: ok
Total de resultados: 11
Exemplo de artigo: {'source': {'id': None, 'name': 'Metropoles.com'}, 'author': 'Bethânia Nunes', 'title': 'IA: entenda como ela pode ser usada em benefício da saúde', 'description': 'A inteligência artificial (IA) já está presente na medicina no atendimento, tratamento e apoio ao paciente. Veja alguns exemplos de seu uso', 'url': 'https://www.metropoles.com/saude/inteligencia-artificial-uso-saude', 'urlToImage': 'https://uploads.metropoles.com/wp-content/uploads/2023/07/13112836/Mulher-ressonancia-cerebro-inteligencia-artificial.jpg', 'publishedAt': '2023-09-30T05:02:28Z', 'content': 'A inteligência artificial (IA) vem provocando uma grande transformação na medicina, e é provável que ela esteja em um dos consultórios que você frequenta. Embora o termo ainda pareça abstrato para gr… [+4477 chars]'}


In [None]:
# prova real que ambos os metodos de request dão o mesmo resultado
print(fazer_a_request_1() == fazer_a_request_2())
# teste do tratamento de dados
df = tratar_dados(response)
df[~df['author'].str.contains('desconhecido', case=False)]

In [100]:
df = pd.read_csv('noticias.csv')
df 

ParserError: Error tokenizing data. C error: Expected 9 fields in line 3, saw 17


In [None]:
carrega_armazenamento()

In [None]:
procedimento_hora_hora()

In [None]:

# 1. Consumo de dados com a News API
response = fazer_a_request_1(API_KEY, PALAVRAS_CHAVES_GERAIS)

# 2. Definir Critérios de Relevância
df_agora = tratar_dados(response)

df_agora

# 3. Cargas em Batches:
# armazenar_noticias(df_agora)

In [None]:
from newsapi import NewsApiClient
import requests
import time

API_KEY = '9a77398581d74beebbd29dbebd159a53'
PALAVRAS_CHAVES_GERAIS = '(genômica OR genômico) AND (terapia OR sequenciamento OR doença)'
PALAVRAS_CHAVES_ESPECIFICAS = ['terapia', 'sequenciamento', 'doença']

INTERVALO = 3.6 # 3600 segundos = 1 hora
i = 20

while True:

    procedimento_hora_hora()
    if i==23: # i=23 referente as 24h do dia
        procedimento_diario()
        i = 0
        break   
    else:
        i += 1
        
    time.sleep(INTERVALO)

## 4 Algoritmo

In [None]:
from newsapi import NewsApiClient
import requests
import time

API_KEY = '9a77398581d74beebbd29dbebd159a53'
PALAVRAS_CHAVES_GERAIS = '(genômica OR genômico) AND (terapia OR sequenciamento OR doença)'
PALAVRAS_CHAVES_ESPECIFICAS = ['terapia', 'sequenciamento', 'doença']

INTERVALO = 3600 # 3600 segundos = 1 hora
i = 0

while True:

    procedimento_hora_hora()
    if i==23: # i=23 referente as 24h do dia
        procedimento_diario()
        i = 0       
    else:
        i += 1
        
    time.sleep(INTERVALO)
