# Projeto - Extração de Dados I

## Sistema de Monitoramento de Avanços no Campo da Genômica

### Contexto:

O grupo trabalha no time de engenharia de dados na HealthGen, uma empresa especializada em genômica e pesquisa de medicina personalizada. A genômica é o estudo do conjunto completo de genes de um organismo, desempenha um papel fundamental na medicina personalizada e na pesquisa biomédica. Permite a análise do DNA para identificar variantes genéticas e mutações associadas a doenças e facilita a personalização de tratamentos com base nas características genéticas individuais dos pacientes.

A empresa precisa se manter atualizada sobre os avanços mais recentes na genômica, identificar oportunidades para pesquisa e desenvolvimento de tratamentos personalizados e acompanhar as tendências em genômica que podem influenciar estratégias de pesquisa e desenvolvimento. Pensando nisso, o time de dados apresentou uma proposta de desenvolvimento de um sistema que coleta, analisa e apresenta as últimas notícias relacionadas à genômica e à medicina personalizada, e também estuda o avanço do campo nos últimos anos.

O time de engenharia de dados tem como objetivo desenvolver e garantir um pipeline de dados confiável e estável. As principais atividades são:

1. **Consumo de dados com a News API**:
    - Implementar um mecanismo para consumir dados de notícias de fontes confiáveis e especializadas em genômica e medicina personalizada, a partir da News API:
      [https://newsapi.org/](https://newsapi.org/)

2. **Definir Critérios de Relevância**:
    - Desenvolver critérios precisos de relevância para filtrar as notícias. Por exemplo, o time pode se concentrar em notícias que mencionem avanços em sequenciamento de DNA, terapias genéticas personalizadas ou descobertas relacionadas a doenças genéticas específicas.

3. **Cargas em Batches**:
    - Armazenar as notícias relevantes em um formato estruturado e facilmente acessível para consultas e análises posteriores. Essa carga deve acontecer 1 vez por hora. Se as notícias extraídas já tiverem sido armazenadas na carga anterior, o processo deve ignorar e não armazenar as notícias novamente, os dados carregados não podem ficar duplicados.

4. **Dados transformados para consulta do público final**:
    - A partir dos dados carregados, aplicar as seguintes transformações e armazenar o resultado final para a consulta do público final:
        - Quantidade de notícias por ano, mês e dia de publicação;
        - Quantidade de notícias por fonte e autor;
        - Quantidade de aparições de 3 palavras-chave por ano, mês e dia de publicação (as 3 palavras-chave serão as mesmas usadas para fazer os filtros de relevância do item 2 (2. Definir Critérios de Relevância)).
    - Atualizar os dados transformados 1 vez por dia.

## ETL
Accessing the API utilizing the chosen keywords and their synonyms, structuring the data and saving it in a delta table without duplicates.

In [0]:
import requests
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col

API_KEY = '9cfe8d8ea7cf42c480f8e1556dda0eea'
base_url = 'https://newsapi.org/v2/everything'
query = '(epigenetics OR epigenetic OR epigenomics OR epigenetic OR epigenomic) OR (disease OR sickness OR sick) OR (genomic OR genomics OR gene)'

response = requests.get(url=base_url, params={'q': query, 'apiKey': API_KEY})
response = response.json()
response = response['articles']

spark = SparkSession.builder.appName('epigenetics_api').getOrCreate()

schema = StructType([
    StructField("source", StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True)
    ]), True),
    StructField("author", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("url", StringType(), True),
    StructField("urlToImage", StringType(), True),
    StructField("publishedAt", StringType(), True),
    StructField("content", StringType(), True),
])

epigenetics_df = spark.createDataFrame(response, schema=schema)

try:    
    table_df = spark.sql("""
        SELECT *
        FROM epigenetics_tb
    """)
    joined_df = epigenetics_df.union(table_df)
    joined_df = joined_df.dropDuplicates(['description'])
    joined_df = joined_df.filter(joined_df.title != '[Removed]')
    joined_df.write.format("delta").mode("overwrite").saveAsTable("epigenetics_tb")
except:
    print("No table exists.\nCreating table.")
    epigenetics_df = epigenetics_df.dropDuplicates(['description'])
    epigenetics_df = epigenetics_df.filter(epigenetics_df.title != '[Removed]')
    epigenetics_df.write.format("delta").mode("overwrite").saveAsTable("epigenetics_tb")

## Preparing the data to be displayed, in the format provided in item 4.
Data can be consulted both in SQL views or in dataframes, which are automatically displayed here.

In [0]:
%sql
-- Quantidade de notícias por ano, mês e dia de publicação;
CREATE OR REPLACE VIEW day_ordered_view AS
SELECT LEFT(publishedAt, 10) AS date_published, COUNT(publishedAt) AS count
FROM epigenetics_tb
GROUP BY date_published
ORDER BY date_published;

CREATE OR REPLACE VIEW month_ordered_view AS
SELECT LEFT(publishedAt, 7) AS date_published, COUNT(publishedAt) AS count
FROM epigenetics_tb
GROUP BY date_published
ORDER BY date_published;

CREATE OR REPLACE VIEW year_ordered_view AS
SELECT LEFT(publishedAt, 4) AS date_published, COUNT(publishedAt) AS count
FROM epigenetics_tb
GROUP BY date_published
ORDER BY date_published;

-- Quantidade de notícias por fonte e autor;
CREATE OR REPLACE VIEW author_count_view AS
SELECT author, COUNT(author) AS count
FROM epigenetics_tb
GROUP BY author;

CREATE OR REPLACE VIEW source_count_view AS
SELECT source.name AS source, COUNT(source.name) AS count
FROM epigenetics_tb
GROUP BY source.name;

-- Quantidade de aparições de 3 palavras-chave por ano, mês e dia de publicação (as 3 palavras-chave serão as mesmas usadas para fazer os filtros de relevância do item 2 (2. Definir Critérios de Relevância))
CREATE OR REPLACE VIEW keyword_day_view AS
SELECT LEFT(publishedAt, 10) AS date_published, 
       SUM(CASE WHEN LOWER(description) LIKE '%epigen%' OR LOWER(title) LIKE '%epigen%' OR LOWER(content) LIKE '%epigen%' THEN 1 ELSE 0 END) AS epigenetic_keyword_count,
       SUM(CASE WHEN LOWER(description) LIKE '%sick%' OR LOWER(title) LIKE '%sick%' OR LOWER(content) LIKE '%sick%' OR LOWER(description) LIKE '%disease%' OR LOWER(title) LIKE '%disease%' OR LOWER(content) LIKE '%disease%' THEN 1 ELSE 0 END) AS disease_keyword_count,
       SUM(CASE WHEN LOWER(description) LIKE '%gene%' OR LOWER(title) LIKE '%gene%' OR LOWER(content) LIKE '%gene%' OR LOWER(description) LIKE '%genom%' OR LOWER(title) LIKE '%genom%' OR LOWER(content) LIKE '%genom%' THEN 1 ELSE 0 END) AS gene_keyword_count
FROM epigenetics_tb
GROUP BY date_published
ORDER BY date_published;

CREATE OR REPLACE VIEW keyword_month_view AS
SELECT LEFT(publishedAt, 7) AS date_published, 
       SUM(CASE WHEN LOWER(description) LIKE '%epigen%' OR LOWER(title) LIKE '%epigen%' OR LOWER(content) LIKE '%epigen%' THEN 1 ELSE 0 END) AS epigenetic_keyword_count,
       SUM(CASE WHEN LOWER(description) LIKE '%sick%' OR LOWER(title) LIKE '%sick%' OR LOWER(content) LIKE '%sick%' OR LOWER(description) LIKE '%disease%' OR LOWER(title) LIKE '%disease%' OR LOWER(content) LIKE '%disease%' THEN 1 ELSE 0 END) AS disease_keyword_count,
       SUM(CASE WHEN LOWER(description) LIKE '%gene%' OR LOWER(title) LIKE '%gene%' OR LOWER(content) LIKE '%gene%' OR LOWER(description) LIKE '%genom%' OR LOWER(title) LIKE '%genom%' OR LOWER(content) LIKE '%genom%' THEN 1 ELSE 0 END) AS gene_keyword_count
FROM epigenetics_tb
GROUP BY date_published
ORDER BY date_published;

CREATE OR REPLACE VIEW keyword_year_view AS
SELECT LEFT(publishedAt, 4) AS date_published, 
       SUM(CASE WHEN LOWER(description) LIKE '%epigen%' OR LOWER(title) LIKE '%epigen%' OR LOWER(content) LIKE '%epigen%' THEN 1 ELSE 0 END) AS epigenetic_keyword_count,
       SUM(CASE WHEN LOWER(description) LIKE '%sick%' OR LOWER(title) LIKE '%sick%' OR LOWER(content) LIKE '%sick%' OR LOWER(description) LIKE '%disease%' OR LOWER(title) LIKE '%disease%' OR LOWER(content) LIKE '%disease%' THEN 1 ELSE 0 END) AS disease_keyword_count,
       SUM(CASE WHEN LOWER(description) LIKE '%gene%' OR LOWER(title) LIKE '%gene%' OR LOWER(content) LIKE '%gene%' OR LOWER(description) LIKE '%genom%' OR LOWER(title) LIKE '%genom%' OR LOWER(content) LIKE '%genom%' THEN 1 ELSE 0 END) AS gene_keyword_count
FROM epigenetics_tb
GROUP BY date_published
ORDER BY date_published;

In [0]:
# Quantidade de notícias por fonte e autor;
source_count_df = spark.sql("""
        SELECT *
        FROM source_count_view
    """)
author_count_df = spark.sql("""
        SELECT *
        FROM author_count_view
    """)

display(source_count_df)
display(author_count_df)

source,count
BBC News,11
Windows Central,1
Frontiersin.org,1
Wired,5
Phys.Org,3
The Indian Express,1
Associated Press,1
Ox.ac.uk,1
Business Insider,4
Substack.com,1


author,count
Chris Thomas,1
Amanda Hoover,1
Keegan Kelly,1
Lakshmi Varanasi,1
Beth Mole,2
Brendan Lowry,1
Joe Hernandez,1
Will Stone,1
Callum Booth,1
Alice Park,1


In [0]:
# Quantidade de notícias por ano, mês e dia de publicação;
day_ordered_df = spark.sql("""
        SELECT *
        FROM day_ordered_view
    """)

month_ordered_df = spark.sql("""
        SELECT *
        FROM month_ordered_view
    """)

year_ordered_df = spark.sql("""
        SELECT *
        FROM year_ordered_view
    """)

display(year_ordered_df)
display(month_ordered_df)
display(day_ordered_df)

date_published,count
2024,98


date_published,count
2024-03,72
2024-04,26


date_published,count
2024-03-07,5
2024-03-08,2
2024-03-09,1
2024-03-10,1
2024-03-11,6
2024-03-12,2
2024-03-13,5
2024-03-14,3
2024-03-15,1
2024-03-17,1


In [0]:
# Quantidade de aparições de 3 palavras-chave por ano, mês e dia de publicação (as 3 palavras-chave serão as mesmas usadas para fazer os filtros de relevância do item 2 (2. Definir Critérios de Relevância))
keyword_day_df = spark.sql("""
        SELECT *
        FROM keyword_day_view
    """)

keyword_month_df = spark.sql("""
        SELECT *
        FROM keyword_month_view
    """)

keyword_year_df = spark.sql("""
        SELECT *
        FROM keyword_year_view
    """)

display(keyword_year_df)
display(keyword_month_df)
display(keyword_day_df)

date_published,epigenetic_keyword_count,disease_keyword_count,gene_keyword_count
2024,4,49,31


date_published,epigenetic_keyword_count,disease_keyword_count,gene_keyword_count
2024-03,4,35,23
2024-04,0,14,8


date_published,epigenetic_keyword_count,disease_keyword_count,gene_keyword_count
2024-03-07,1,2,4
2024-03-08,0,0,1
2024-03-09,0,1,0
2024-03-10,0,0,0
2024-03-11,1,2,2
2024-03-12,0,0,0
2024-03-13,0,3,3
2024-03-14,0,3,0
2024-03-15,0,1,0
2024-03-17,0,0,0


## API

In [0]:
pip install flask fastapi uvicorn kafka-python

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
from flask import Flask, request
from kafka import KafkaProducer
import flask,json

app = Flask("app")
producer= KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,11,5), value_serializer=lambda x: dumps(x).encode('utf-8'))
topic = "views_api"

@app.route("/keyword_year/<path:year>", methods=["GET"])
def filter_keyword_by_year(year):
    data_list = keyword_year_df.collect()
    json_data = json.dumps([row.asDict() for row in data_list])
    return json_data

@app.route("/keyword_month/<path:month>", methods=["GET"])
def filter_keyword_by_month(month):
    data_list = keyword_month_df.collect()
    json_data = json.dumps([row.asDict() for row in data_list])
    return json_data

@app.route("/keyword_day/<path:day>", methods=["GET"])
def filter_keyword_by_day(day):
    data_list = keyword_day_df.collect()
    json_data = json.dumps([row.asDict() for row in data_list])
    return json_data

@app.route("/ordered_year/<path:year>", methods=["GET"])
def filter_by_year(year):
    data_list = year_ordered_df.collect()
    json_data = json.dumps([row.asDict() for row in data_list])
    return json_data

@app.route("/ordered_month/<path:month>", methods=["GET"])
def filter_by_month(month):
    data_list = month_ordered_df.collect()
    json_data = json.dumps([row.asDict() for row in data_list])
    return json_data

@app.route("/ordered_day/<path:day>", methods=["GET"])
def filter_by_day(day):
    data_list = day_ordered_df.collect()
    json_data = json.dumps([row.asDict() for row in data_list])
    return json_data

@app.route("/source/<path:source>", methods=["GET"])
def filter_by_source(source):
    data_list = source_count_df.collect()
    json_data = json.dumps([row.asDict() for row in data_list])
    return json_data

@app.route("/author/<path:author>", methods=["GET"])
def filter_by_author(author):
    data_list = author_count_df.collect()
    json_data = json.dumps([row.asDict() for row in data_list])
    return json_data

@app.route("/views_api", methods=["POST"])
def consome_dados():
    lista = []
    data = json_data
    mensagem = data
    producer.send(topic, mensagem)
    return "Completed"

app.run()

 * Serving Flask app 'app'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
[33mPress CTRL+C to quit[0m
127.0.0.1 - - [08/Apr/2024 22:34:38] "GET /ordered_year/<path:ordered_year> HTTP/1.1" 200 -
127.0.0.1 - - [08/Apr/2024 22:34:47] "GET /keyword_day/<path:keyword_day> HTTP/1.1" 200 -
127.0.0.1 - - [08/Apr/2024 22:35:06] "GET /author/<path:author> HTTP/1.1" 200 -
