## Projeto - Extração de Dados I

### Sistema de Monitoramento de Avanços no Campo da Genômica

###Contexto:

#### O grupo trabalha no time de engenharia de dados na HealthGen, uma empresa especializada em genômica e pesquisa de medicina personalizada. A genômica é o estudo do conjunto completo de genes de um organismo, desempenha um papel fundamental na medicina personalizada e na pesquisa biomédica. Permite a análise do DNA para identificar variantes genéticas e mutações associadas a doenças e facilita a personalização de tratamentos com base nas características genéticas individuais dos pacientes.

#### A empresa precisa se manter atualizada sobre os avanços mais recentes na genômica, identificar oportunidades para pesquisa e desenvolvimento de tratamentos personalizados e acompanhar as tendências em genômica que podem influenciar estratégias de pesquisa e desenvolvimento. Pensando nisso, o time de dados apresentou uma proposta de desenvolvimento de um sistema que coleta, analisa e apresenta as últimas notícias relacionadas à genômica e à medicina personalizada, e também estuda o avanço do campo nos últimos anos. 

#### O time de engenharia de dados tem como objetivo desenvolver e garantir um pipeline de dados confiável e estável. As principais atividades são:

> #### 1. Consumo de dados com a News API: 
> #### Implementar um mecanismo para consumir dados de notícias de fontes confiáveis e especializadas em genômica e medicina personalizada, a partir da News API: 
https://newsapi.org/

> #### 2. Definir Critérios de Relevância:

> #### Desenvolver critérios precisos de relevância para filtrar as notícias. Por exemplo, o time pode se concentrar em notícias que mencionem avanços em sequenciamento de DNA, terapias genéticas personalizadas ou descobertas relacionadas a doenças genéticas específicas.

> #### 3. Cargas em Batches:

> #### Armazenar as notícias relevantes em um formato estruturado e facilmente acessível para consultas e análises posteriores. Essa carga deve acontecer 1 vez por hora. Se as notícias extraídas já tiverem sidos armazenadas na carga anterior, o processo deve ignorar e não armazenar as notícias novamente, os dados carregados não podem ficar duplicados.

<br>

<div style="text-align: center;">
<img src="https://drive.google.com/uc?export=view&id=1QLZBxgK4c4_yysUnvtamuwXzRJm4nNit"  width="70%" height="40%">
<br>
<br>

</div>

> #### 4. Dados transformados para consulta do público final

> #### A partir dos dados carregados, aplicar as seguintes transformações e armazenar o resultado final para a consulta do público final:

> #### 4.1 - Quantidade de notícias por ano, mês e dia de publicação;
> #### 4.2 - Quantidade de notícias por fonte e autor;
> #### 4.3 - Quantidade de aparições de 3 palavras chaves por ano, mês e dia de publicação (as 3 palavras chaves serão as mesmas usadas para fazer os filtros de relevância do item 2 (2. Definir Critérios de Relevância)).

> #### Atualizar os dados transformados 1 vez por dia.

<br>

<div style="text-align: center;">
<img src="https://drive.google.com/uc?export=view&id=1QOFkzKrWqb-9CY3kC3_1XkTWNVNE05dd"  width="70%" height="40%">
<br>
<br>

</div>

###Além das atividades principais, existe a necessidade de busca de dados por eventos em tempo real quando é necessário, para isso foi desenhado duas opções:

> #### Opção 1 - Apache Kafka e Spark Streaming:

> #### Preparar um pipeline com Apache Kafka e Spark Streaming para receber os dados do Produtor Kafka representado por um evento manual e consumir os dados com o Spark Streaming armazenando os resultados temporariamente. Em um processo paralelo, verificar os resultados armazenados temporiamente e armazenar no mesmo destino do item 3 (3. Cargas em Batches) aqueles resultados que ainda não foram armazenados no destino (os dados carregados não podem ficar duplicados). E por fim, eliminar os dados temporários após a verificação e a eventual carga.

<br>

<div style="text-align: center;">
<img src="https://drive.google.com/uc?export=view&id=1PvAxBXU0fvwEtJg36ZJ1VfBVSGETBpUZ"  width="70%" height="40%">
<br>
<br>

</div>


> #### Opção 2 - Webhooks com notificações por eventos:

> #### Configurar um webhook para adquirir as últimas notícias a partir de um evento representado por uma requisição POST e fazer a chamada da API e por fim armazenar os resultados temporariamente. Em um processo paralelo, verificar os resultados armazenados temporiamente e armazenar no mesmo destino do item 3 (3. Cargas em Batches) aqueles resultados que ainda não foram armazenados no destino (os dados carregados não podem ficar duplicados). E por fim, eliminar os dados temporários após a verificação e a eventual carga.

<br>

<div style="text-align: center;">
<img src="https://drive.google.com/uc?export=view&id=1Px6Jp3aNuF-wpn_9earonylEMebzOcBW"  width="70%" height="40%">
<br>
<br>

</div>

## Atividades que precisam ser realizadas pelo grupo:

#### O grupo precisa construir o pipeline de dados seguindo os requisitos das atividades principais e escolher entre a Opção 1 e Opção 2 para desenvolvimento.

### Parte 01 - Extração e Carregamento dos Dados em Lotes

Dependência de projeto

In [0]:
import requests
import time

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, dayofmonth, month, year, count, expr
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

API URL

In [0]:
API_KEY = '4e8db03627bd414b941e81dfade51063'

query = 'genomics OR personalized medicine OR DNA'
sources = 'nature,sciencedaily,medical-news-today'
year_from = '2023'
month_from = '10'
year_to = '2023'
month_to = '10'

url = f'https://newsapi.org/v2/everything?from={year_from}-{month_from}-01&to={year_to}-{month_to}-30&sources={sources}&sortBy=publishedAt&apiKey={API_KEY}'

Verificação API

In [0]:
response = requests.get(url)
if response.status_code == 200:
    print("Total de resultados: ", response.json()["totalResults"])
else:
    print(f"status code: {response.status_code}")

Total de resultados:  11


Schema do DataFrame

In [0]:
schema_url = StructType([
    StructField("source", StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True)
    ]), True),
    StructField("author", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("url", StringType(), True),
    StructField("urlToImage", StringType(), True),
    StructField("publishedAt", StringType(), True),
    StructField("content", StringType(), True)
])

In [0]:
schema_raw = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("author", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("url", StringType(), True),
    StructField("urlToImage", StringType(), True),
    StructField("publishedAt", StringType(), True),
    StructField("content", StringType(), True)
])

#### Extração dos Dados

In [0]:
def extract() -> DataFrame:
    df = spark.createDataFrame([], schema=schema_raw)
    if response.status_code == 200:
        data = response.json()
        
        for article in data['articles']:
            article_df = spark.createDataFrame([article], schema=schema_url)
            article_df = article_df \
                .withColumn("id", col("source.id")) \
                .withColumn("name", col("source.name")) \
                .drop("source")
            df = df.union(article_df)
        return df
    else:
        print('Falha ao obter notícias. Código de status:', response.status_code)

#### Carregamento dos Dados

Path dos arquivos

In [0]:
parquet_raw = f"/FileStore/raw/raw_zone_{year}_{month}.parquet"

Carregamento

In [0]:
def load(df_new: DataFrame, path: str) -> None:
    try:
        data = dbutils.fs.ls(path)
        df_res = spark.read.parquet(path)
        new_data = df_new.exceptAll(df_res)
        df_res.union(new_data)
    except Exception as e:
        if "java.io.FileNotFoundException" in str(e):
            print("data not search, first process")
            df_new.write.mode('overwrite').parquet(path)

#### Função Principal da Parte 1

In [0]:
def main_step_1(loop:bool = True) -> None:
    while loop:
        print("start extract data")
        df = extract()
        print("start loading raw data")
        load(df, parquet_raw)
        print("sleep for 1 hour")
        time_sleep = 3600
        time.sleep(time_sleep)

In [0]:
main_step_1()

start extract data
start loading raw data
data not search, first process
sleep for 1 hour


In [0]:
data = spark.read.parquet(parquet_raw)

In [0]:
display(data.head(5))

id,name,author,title,description,url,urlToImage,publishedAt,content
Corrie Pelc,Jamais vu: What happens in the brain when the familiar feels new?,"The phenomenon of jamais vu, when a familiar experience feels new, is a common occurrence, but why does it happen, and should it worry us when it does? Here is what the experts and the current research say.",https://www.medicalnewstoday.com/articles/jamais-vu-whatjamais-vu-the-opposite-of-deja-vu-why-does-the-familiar-feel-strangely-new-sometimesexplains-the-opposite-of-deja-vu,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/jamais_vu_GettyImages1192800429_Thumb.jpg,2023-10-03T16:54:25Z,Many people have heard of and probably experienced déjà vu the strange feeling you have already seen or experienced something as you are seeing or experiencing it. But very few people know about the … [+9330 chars],medical-news-today,Medical News Today
Jessica Freeborn,Short bursts of exercise may lower the risk of heart attack and stroke,A recent study found that non-exercise physical activity was associated with a lower risk for major adverse cardiovascular events such as heart attack and stroke. Short bursts of physical activity among non-exercisers were also linked to a reduced risk of all…,https://www.medicalnewstoday.com/articles/short-bursts-exercise-heart-health,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Female-Steps-Building-732x549-Thumbnail.jpg,2023-10-02T17:50:54Z,A recent study found that non-exercise physical activity was associated with a lower risk for major adverse cardiovascular events.Short bursts of physical activity among non-exercise… [+6994 chars],medical-news-today,Medical News Today
Eileen Bailey,Parkinson's: How loneliness can affect progression of the disease,Researchers say they have established an association between loneliness and the development of Parkinson's disease. Experts say it's important for people with the condition to have friends and other social connections.,https://www.medicalnewstoday.com/articles/how-loneliness-may-be-associated-with-parkinson-disease,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Male-Serious-Alone-732x549-Thumbnail.jpg,2023-10-03T09:11:00Z,Researchers report that loneliness is associated with a higher risk of Parkinsons disease.They note that people with Parkinsons may worry that their illness will get in the way of fr… [+9501 chars],medical-news-today,Medical News Today
Corrie Pelc,What role do micronutrients play in neurological health?,"A review of the existing evidence looks at how micronutrients including vitamin and minerals could help treat the symptoms of different neurological diseases, including Parkinson's and Alzheimer's.",https://www.medicalnewstoday.com/articles/can-micronutrients-help-treat-neurological-diseases-such-as-parkinsons,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/leafs-science-lab-micronutrients-732x549-thumbnail.jpg,2023-10-02T11:10:55Z,Micronutrients refer to all the vitamins and minerals a body needs to stay healthy and function correctly. More than 2 billion people globally are micronutrient deficient. N… [+7296 chars],medical-news-today,Medical News Today
Eileen Bailey,Type 2 diabetes: Why diagnosis should be changed for women under 50,Researchers are recommending that the diagnosis threshold for type 2 diabetes be lowered for women under 50 because menstruation can affect blood sugar levels,https://www.medicalnewstoday.com/articles/type-2-diabetes-why-experts-say-diagnosis-threshold-should-be-lowered-for-women-under-50,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Blood-Sugar-Doctor-732x549-Thumbnail.jpg,2023-10-02T07:53:00Z,"Researchers suggest lowering the threshold for a type 2 diabetes diagnosis in women under 50 might be more accurate.They say that women are less likely to receive treatment, risk-red… [+7394 chars]",medical-news-today,Medical News Today


Excluir todo o Raw Data - Para Testes

In [0]:
dbutils.fs.rm(parquet_raw, True)

### Parte 2 - Tranformação dos Dados em Lotes (Raw Zone -> Processing Zone)

Vizualização dos Dados

In [0]:
data = spark.read.parquet(parquet_raw)
display(data.head(5))

id,name,author,title,description,url,urlToImage,publishedAt,content
Corrie Pelc,Jamais vu: What happens in the brain when the familiar feels new?,"The phenomenon of jamais vu, when a familiar experience feels new, is a common occurrence, but why does it happen, and should it worry us when it does? Here is what the experts and the current research say.",https://www.medicalnewstoday.com/articles/jamais-vu-whatjamais-vu-the-opposite-of-deja-vu-why-does-the-familiar-feel-strangely-new-sometimesexplains-the-opposite-of-deja-vu,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/jamais_vu_GettyImages1192800429_Thumb.jpg,2023-10-03T16:54:25Z,Many people have heard of and probably experienced déjà vu the strange feeling you have already seen or experienced something as you are seeing or experiencing it. But very few people know about the … [+9330 chars],medical-news-today,Medical News Today
Jessica Freeborn,Short bursts of exercise may lower the risk of heart attack and stroke,A recent study found that non-exercise physical activity was associated with a lower risk for major adverse cardiovascular events such as heart attack and stroke. Short bursts of physical activity among non-exercisers were also linked to a reduced risk of all…,https://www.medicalnewstoday.com/articles/short-bursts-exercise-heart-health,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Female-Steps-Building-732x549-Thumbnail.jpg,2023-10-02T17:50:54Z,A recent study found that non-exercise physical activity was associated with a lower risk for major adverse cardiovascular events.Short bursts of physical activity among non-exercise… [+6994 chars],medical-news-today,Medical News Today
Eileen Bailey,Parkinson's: How loneliness can affect progression of the disease,Researchers say they have established an association between loneliness and the development of Parkinson's disease. Experts say it's important for people with the condition to have friends and other social connections.,https://www.medicalnewstoday.com/articles/how-loneliness-may-be-associated-with-parkinson-disease,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Male-Serious-Alone-732x549-Thumbnail.jpg,2023-10-03T09:11:00Z,Researchers report that loneliness is associated with a higher risk of Parkinsons disease.They note that people with Parkinsons may worry that their illness will get in the way of fr… [+9501 chars],medical-news-today,Medical News Today
Corrie Pelc,What role do micronutrients play in neurological health?,"A review of the existing evidence looks at how micronutrients including vitamin and minerals could help treat the symptoms of different neurological diseases, including Parkinson's and Alzheimer's.",https://www.medicalnewstoday.com/articles/can-micronutrients-help-treat-neurological-diseases-such-as-parkinsons,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/leafs-science-lab-micronutrients-732x549-thumbnail.jpg,2023-10-02T11:10:55Z,Micronutrients refer to all the vitamins and minerals a body needs to stay healthy and function correctly. More than 2 billion people globally are micronutrient deficient. N… [+7296 chars],medical-news-today,Medical News Today
Eileen Bailey,Type 2 diabetes: Why diagnosis should be changed for women under 50,Researchers are recommending that the diagnosis threshold for type 2 diabetes be lowered for women under 50 because menstruation can affect blood sugar levels,https://www.medicalnewstoday.com/articles/type-2-diabetes-why-experts-say-diagnosis-threshold-should-be-lowered-for-women-under-50,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Blood-Sugar-Doctor-732x549-Thumbnail.jpg,2023-10-02T07:53:00Z,"Researchers suggest lowering the threshold for a type 2 diabetes diagnosis in women under 50 might be more accurate.They say that women are less likely to receive treatment, risk-red… [+7394 chars]",medical-news-today,Medical News Today


In [0]:
data.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- author: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- url: string (nullable = true)
 |-- urlToImage: string (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- content: string (nullable = true)



Quantidade de nulos

In [0]:
def count_nulls(column):
    return data.filter(col(column).isNull()).count()
for column in data.columns:
    null_count = count_nulls(column)
    print(f"Coluna '{column}': {null_count} valores nulos")

Coluna 'id': 0 valores nulos
Coluna 'name': 0 valores nulos
Coluna 'author': 0 valores nulos
Coluna 'title': 0 valores nulos
Coluna 'description': 0 valores nulos
Coluna 'url': 0 valores nulos
Coluna 'urlToImage': 0 valores nulos
Coluna 'publishedAt': 0 valores nulos
Coluna 'content': 0 valores nulos


Modificação dos nomes das colunas

In [0]:
def rename_columns(df: DataFrame) -> DataFrame:
    return df.withColumnRenamed("id", "autor")\
      .withColumnRenamed("name", "titulo")\
      .withColumnRenamed("author", "descricao")\
      .withColumnRenamed("title", "url_artigo")\
      .withColumnRenamed("description", "imagem")\
      .withColumnRenamed("url", "data")\
      .withColumnRenamed("urlToImage", "html")\
      .withColumnRenamed("content", "fonte")\
      .drop("publishedAt")

In [0]:
data_new = rename_columns(data)

In [0]:
display(data_new.head(5))

autor,titulo,descricao,url_artigo,imagem,data,html,fonte
Corrie Pelc,Jamais vu: What happens in the brain when the familiar feels new?,"The phenomenon of jamais vu, when a familiar experience feels new, is a common occurrence, but why does it happen, and should it worry us when it does? Here is what the experts and the current research say.",https://www.medicalnewstoday.com/articles/jamais-vu-whatjamais-vu-the-opposite-of-deja-vu-why-does-the-familiar-feel-strangely-new-sometimesexplains-the-opposite-of-deja-vu,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/jamais_vu_GettyImages1192800429_Thumb.jpg,2023-10-03T16:54:25Z,Many people have heard of and probably experienced déjà vu the strange feeling you have already seen or experienced something as you are seeing or experiencing it. But very few people know about the … [+9330 chars],Medical News Today
Jessica Freeborn,Short bursts of exercise may lower the risk of heart attack and stroke,A recent study found that non-exercise physical activity was associated with a lower risk for major adverse cardiovascular events such as heart attack and stroke. Short bursts of physical activity among non-exercisers were also linked to a reduced risk of all…,https://www.medicalnewstoday.com/articles/short-bursts-exercise-heart-health,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Female-Steps-Building-732x549-Thumbnail.jpg,2023-10-02T17:50:54Z,A recent study found that non-exercise physical activity was associated with a lower risk for major adverse cardiovascular events.Short bursts of physical activity among non-exercise… [+6994 chars],Medical News Today
Eileen Bailey,Parkinson's: How loneliness can affect progression of the disease,Researchers say they have established an association between loneliness and the development of Parkinson's disease. Experts say it's important for people with the condition to have friends and other social connections.,https://www.medicalnewstoday.com/articles/how-loneliness-may-be-associated-with-parkinson-disease,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Male-Serious-Alone-732x549-Thumbnail.jpg,2023-10-03T09:11:00Z,Researchers report that loneliness is associated with a higher risk of Parkinsons disease.They note that people with Parkinsons may worry that their illness will get in the way of fr… [+9501 chars],Medical News Today
Corrie Pelc,What role do micronutrients play in neurological health?,"A review of the existing evidence looks at how micronutrients including vitamin and minerals could help treat the symptoms of different neurological diseases, including Parkinson's and Alzheimer's.",https://www.medicalnewstoday.com/articles/can-micronutrients-help-treat-neurological-diseases-such-as-parkinsons,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/leafs-science-lab-micronutrients-732x549-thumbnail.jpg,2023-10-02T11:10:55Z,Micronutrients refer to all the vitamins and minerals a body needs to stay healthy and function correctly. More than 2 billion people globally are micronutrient deficient. N… [+7296 chars],Medical News Today
Eileen Bailey,Type 2 diabetes: Why diagnosis should be changed for women under 50,Researchers are recommending that the diagnosis threshold for type 2 diabetes be lowered for women under 50 because menstruation can affect blood sugar levels,https://www.medicalnewstoday.com/articles/type-2-diabetes-why-experts-say-diagnosis-threshold-should-be-lowered-for-women-under-50,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Blood-Sugar-Doctor-732x549-Thumbnail.jpg,2023-10-02T07:53:00Z,"Researchers suggest lowering the threshold for a type 2 diabetes diagnosis in women under 50 might be more accurate.They say that women are less likely to receive treatment, risk-red… [+7394 chars]",Medical News Today


In [0]:
def transform_date(df: DataFrame) -> DataFrame:
    return df.withColumn("dia", dayofmonth(col("data")))\
             .withColumn("mes", month(col("data")))\
             .withColumn("ano", year(col("data")))\
             .drop("data")

In [0]:
data_new = transform_date(data_new)

In [0]:
display(data_new.head(5))

autor,titulo,descricao,url_artigo,imagem,html,fonte,dia,mes,ano
Corrie Pelc,Jamais vu: What happens in the brain when the familiar feels new?,"The phenomenon of jamais vu, when a familiar experience feels new, is a common occurrence, but why does it happen, and should it worry us when it does? Here is what the experts and the current research say.",https://www.medicalnewstoday.com/articles/jamais-vu-whatjamais-vu-the-opposite-of-deja-vu-why-does-the-familiar-feel-strangely-new-sometimesexplains-the-opposite-of-deja-vu,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/jamais_vu_GettyImages1192800429_Thumb.jpg,Many people have heard of and probably experienced déjà vu the strange feeling you have already seen or experienced something as you are seeing or experiencing it. But very few people know about the … [+9330 chars],Medical News Today,3,10,2023
Jessica Freeborn,Short bursts of exercise may lower the risk of heart attack and stroke,A recent study found that non-exercise physical activity was associated with a lower risk for major adverse cardiovascular events such as heart attack and stroke. Short bursts of physical activity among non-exercisers were also linked to a reduced risk of all…,https://www.medicalnewstoday.com/articles/short-bursts-exercise-heart-health,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Female-Steps-Building-732x549-Thumbnail.jpg,A recent study found that non-exercise physical activity was associated with a lower risk for major adverse cardiovascular events.Short bursts of physical activity among non-exercise… [+6994 chars],Medical News Today,2,10,2023
Eileen Bailey,Parkinson's: How loneliness can affect progression of the disease,Researchers say they have established an association between loneliness and the development of Parkinson's disease. Experts say it's important for people with the condition to have friends and other social connections.,https://www.medicalnewstoday.com/articles/how-loneliness-may-be-associated-with-parkinson-disease,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Male-Serious-Alone-732x549-Thumbnail.jpg,Researchers report that loneliness is associated with a higher risk of Parkinsons disease.They note that people with Parkinsons may worry that their illness will get in the way of fr… [+9501 chars],Medical News Today,3,10,2023
Corrie Pelc,What role do micronutrients play in neurological health?,"A review of the existing evidence looks at how micronutrients including vitamin and minerals could help treat the symptoms of different neurological diseases, including Parkinson's and Alzheimer's.",https://www.medicalnewstoday.com/articles/can-micronutrients-help-treat-neurological-diseases-such-as-parkinsons,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/leafs-science-lab-micronutrients-732x549-thumbnail.jpg,Micronutrients refer to all the vitamins and minerals a body needs to stay healthy and function correctly. More than 2 billion people globally are micronutrient deficient. N… [+7296 chars],Medical News Today,2,10,2023
Eileen Bailey,Type 2 diabetes: Why diagnosis should be changed for women under 50,Researchers are recommending that the diagnosis threshold for type 2 diabetes be lowered for women under 50 because menstruation can affect blood sugar levels,https://www.medicalnewstoday.com/articles/type-2-diabetes-why-experts-say-diagnosis-threshold-should-be-lowered-for-women-under-50,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Blood-Sugar-Doctor-732x549-Thumbnail.jpg,"Researchers suggest lowering the threshold for a type 2 diabetes diagnosis in women under 50 might be more accurate.They say that women are less likely to receive treatment, risk-red… [+7394 chars]",Medical News Today,2,10,2023


#### Quantidade de notícias por ano, mês e dia de publicação

In [0]:
def count_per_day_month_year(df: DataFrame) -> DataFrame:
 return data_new.groupBy("ano", "mes", "dia").agg(count("*").alias("quantidade_por_data"))

In [0]:
df_count_date = count_per_day_month_year(data_new)

In [0]:
display(df_count_date.head(5))

ano,mes,dia,quantidade_por_data
2023,10,3,4
2023,10,2,5
2023,10,1,2


#### Quantidade de notícias por fonte e autor

In [0]:
def count_source_author(df: DataFrame) -> DataFrame:
    return df.groupBy("fonte", "autor").agg(count("*").alias("quantidade_por_fonte_autor"))

In [0]:
df_count_source_author = count_source_author(data_new)

In [0]:
display(df_count_source_author.head(5))

fonte,autor,quantidade_por_fonte_autor
Medical News Today,Jessica Freeborn,1
Medical News Today,Corrie Pelc,2
Medical News Today,Eileen Bailey,2
Medical News Today,Bob Curley,1
Medical News Today,Dan Gray,1


#### Quantidade de aparições de 3 palavras chaves por ano, mês e dia de publicação (as 3 palavras chaves serão as mesmas usadas para fazer os filtros de relevância do item 2 (2. Definir Critérios de Relevância))

In [0]:
def count_key_words(df: DataFrame, key_words: list[str]) -> DataFrame:
    for key_word in key_words:
        df = df.withColumn(f"{key_word}_quantidade_descricao", expr(f"size(filter(split(descricao, ' '), x -> x = '{key_word}'))").cast("int"))
        df.head(1)
    return df.groupBy("ano", "mes", "dia").agg(
        count("*").alias("total_de_aparicoes"),
        *([sum(f"{key_word}_quantidade_descricao").alias(f"{key_word}_aparicoes_descricao") for key_word in key_words])
        )

In [0]:
df_count_key_words = count_key_words(data_new, ["genomics"])

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-4215692481192661>:1[0m
[0;32m----> 1[0m df_count_key_words [38;5;241m=[39m [43mcount_key_words[49m[43m([49m[43mdata_new[49m[43m,[49m[43m [49m[43m[[49m[38;5;124;43m"[39;49m[38;5;124;43mgenomics[39;49m[38;5;124;43m"[39;49m[43m][49m[43m)[49m

File [0;32m<command-4215692481192660>:7[0m, in [0;36mcount_key_words[0;34m(df, key_words)[0m
[1;32m      3[0m     df [38;5;241m=[39m df[38;5;241m.[39mwithColumn([38;5;124mf[39m[38;5;124m"[39m[38;5;132;01m{[39;00mkey_word[38;5;132;01m}[39;00m[38;5;124m_quantidade_descricao[39m[38;5;124m"[39m, expr([38;5;124mf[39m[38;5;124m"[39m[38;5;124msize(filter(split(descricao, [39m[38;5;124m'[39m[38;5;124m [39m[38;5;124m'[39m[38;5;124m), x -> x = [39m[38;5;124m'[39m[38;5;132;01m{[39;00mkey_word[38;5;132;

#### Função Principal da Parte 2

Path da Processing Zone

In [0]:
path_processing = f"/FileStore/processing/processing_zone_{year}_{month}.parquet"
path_count_per_day_month_year = f"/FileStore/processing/count_per_day_month_year_{year}_{month}.parquet"
path_count_source_author = f"/FileStore/processing/count_source_author_{year}_{month}.parquet"

In [0]:
def main_step_2(loop:bool = True) -> None:
    first_loop_step_2 = True
    loop_step_2 = 0
    while loop:
        main_step_1(loop=False)
        if first_loop_step_2 == True or loop_step_2 == 24: 
            print("extract raw zone")
            data = spark.read.parquet(parquet_raw)
            print("processing zone")
            data_processing = rename_columns(data)
            data_processing = transform_date(data_processing)
            load(data_processing, path_processing)
            print("count per day, month and year")
            data_count_per_day_month_year = count_per_day_month_year(data_processing)
            load(data_count_per_day_month_year, path_count_per_day_month_year)
            print("count source author")
            data_count_source_author = count_source_author(data_processing)
            load(data_count_source_author, path_count_source_author)
            print("end")
            first_loop_step_2 = False
            loop_step_2 = 0
        loop_step_2 += 1
        time_sleep = 3600
        time.sleep(time_sleep)

In [0]:
main_step_2()

extract raw zone
processing zone
data not search, first process
count per day, month and year
data not search, first process
count source author
data not search, first process
end


Parte 2 - Testes

In [0]:
data_processing = spark.read.parquet(path_processing)
display(data_processing.head(5))

autor,titulo,descricao,url_artigo,imagem,html,fonte,dia,mes,ano
Corrie Pelc,Jamais vu: What happens in the brain when the familiar feels new?,"The phenomenon of jamais vu, when a familiar experience feels new, is a common occurrence, but why does it happen, and should it worry us when it does? Here is what the experts and the current research say.",https://www.medicalnewstoday.com/articles/jamais-vu-whatjamais-vu-the-opposite-of-deja-vu-why-does-the-familiar-feel-strangely-new-sometimesexplains-the-opposite-of-deja-vu,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/jamais_vu_GettyImages1192800429_Thumb.jpg,Many people have heard of and probably experienced déjà vu the strange feeling you have already seen or experienced something as you are seeing or experiencing it. But very few people know about the … [+9330 chars],Medical News Today,3,10,2023
Jessica Freeborn,Short bursts of exercise may lower the risk of heart attack and stroke,A recent study found that non-exercise physical activity was associated with a lower risk for major adverse cardiovascular events such as heart attack and stroke. Short bursts of physical activity among non-exercisers were also linked to a reduced risk of all…,https://www.medicalnewstoday.com/articles/short-bursts-exercise-heart-health,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Female-Steps-Building-732x549-Thumbnail.jpg,A recent study found that non-exercise physical activity was associated with a lower risk for major adverse cardiovascular events.Short bursts of physical activity among non-exercise… [+6994 chars],Medical News Today,2,10,2023
Mandy French,"Feeling of impending doom: Meaning, causes, and more",A feeling of impending doom is a sense that something tragic or life threatening is about to happen. Learn more here.,https://www.medicalnewstoday.com/articles/feeling-of-impending-doom,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/08/umbrella-rain-impending-doom-732x549-thumbnail.jpg,A feeling of impending doom is a sense that something tragic or life threatening is about to happen. It can occur due to mental health or medical conditions. People may describe a feeling of impend… [+5780 chars],Medical News Today,1,10,2023
Eileen Bailey,Parkinson's: How loneliness can affect progression of the disease,Researchers say they have established an association between loneliness and the development of Parkinson's disease. Experts say it's important for people with the condition to have friends and other social connections.,https://www.medicalnewstoday.com/articles/how-loneliness-may-be-associated-with-parkinson-disease,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/Male-Serious-Alone-732x549-Thumbnail.jpg,Researchers report that loneliness is associated with a higher risk of Parkinsons disease.They note that people with Parkinsons may worry that their illness will get in the way of fr… [+9501 chars],Medical News Today,3,10,2023
Corrie Pelc,What role do micronutrients play in neurological health?,"A review of the existing evidence looks at how micronutrients including vitamin and minerals could help treat the symptoms of different neurological diseases, including Parkinson's and Alzheimer's.",https://www.medicalnewstoday.com/articles/can-micronutrients-help-treat-neurological-diseases-such-as-parkinsons,https://post.medicalnewstoday.com/wp-content/uploads/sites/3/2023/10/leafs-science-lab-micronutrients-732x549-thumbnail.jpg,Micronutrients refer to all the vitamins and minerals a body needs to stay healthy and function correctly. More than 2 billion people globally are micronutrient deficient. N… [+7296 chars],Medical News Today,2,10,2023


In [0]:
data_count_per_day_month_year = spark.read.parquet(path_count_per_day_month_year)
display(data_count_per_day_month_year.head(5))

ano,mes,dia,quantidade_por_data
2023,10,3,4
2023,10,2,5
2023,10,1,2


In [0]:
data_count_source_author = spark.read.parquet(path_count_source_author)
display(data_count_source_author.head(5))

fonte,autor,quantidade_por_fonte_autor
Medical News Today,Jessica Freeborn,1
Medical News Today,Corrie Pelc,2
Medical News Today,Eileen Bailey,2
Medical News Today,Bob Curley,1
Medical News Today,Dan Gray,1


### Parte 3 - Extração e Carregamento em Streaming (Kafka)

Instalando o Kafka

In [0]:
%sh
sudo wget https://downloads.apache.org/kafka/3.5.1/kafka_2.12-3.5.1.tgz

--2023-10-04 22:23:39--  https://downloads.apache.org/kafka/3.5.1/kafka_2.12-3.5.1.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 106956505 (102M) [application/x-gzip]
Saving to: ‘kafka_2.12-3.5.1.tgz’

     0K .......... .......... .......... .......... ..........  0%  140K 12m23s
    50K .......... .......... .......... .......... ..........  0%  226K 10m3s
   100K .......... .......... .......... .......... ..........  0%  369K 8m16s
   150K .......... .......... .......... .......... ..........  0% 1.13M 6m34s
   200K .......... .......... .......... .......... ..........  0%  373K 6m11s
   250K .......... .......... .......... .......... ..........  0% 17.6M 5m10s
   300K .......... .......... .......... .......... ..........  0% 1.16M 4m38s
   350K .......... .........

In [0]:
%sh
tar -xvf kafka_2.12-3.5.1.tgz

kafka_2.12-3.5.1/
kafka_2.12-3.5.1/LICENSE
kafka_2.12-3.5.1/NOTICE
kafka_2.12-3.5.1/bin/
kafka_2.12-3.5.1/bin/connect-distributed.sh
kafka_2.12-3.5.1/bin/connect-mirror-maker.sh
kafka_2.12-3.5.1/bin/connect-standalone.sh
kafka_2.12-3.5.1/bin/kafka-acls.sh
kafka_2.12-3.5.1/bin/kafka-broker-api-versions.sh
kafka_2.12-3.5.1/bin/kafka-cluster.sh
kafka_2.12-3.5.1/bin/kafka-configs.sh
kafka_2.12-3.5.1/bin/kafka-console-consumer.sh
kafka_2.12-3.5.1/bin/kafka-console-producer.sh
kafka_2.12-3.5.1/bin/kafka-consumer-groups.sh
kafka_2.12-3.5.1/bin/kafka-consumer-perf-test.sh
kafka_2.12-3.5.1/bin/kafka-delegation-tokens.sh
kafka_2.12-3.5.1/bin/kafka-delete-records.sh
kafka_2.12-3.5.1/bin/kafka-dump-log.sh
kafka_2.12-3.5.1/bin/kafka-e2e-latency.sh
kafka_2.12-3.5.1/bin/kafka-features.sh
kafka_2.12-3.5.1/bin/kafka-get-offsets.sh
kafka_2.12-3.5.1/bin/kafka-jmx.sh
kafka_2.12-3.5.1/bin/kafka-leader-election.sh
kafka_2.12-3.5.1/bin/kafka-log-dirs.sh
kafka_2.12-3.5.1/bin/kafka-metadata-quorum.sh
kafka_2.1

Instalando o Zookeeper

In [0]:
%sh
./kafka_2.12-3.5.1/bin/zookeeper-server-start.sh ./kafka_2.12-3.5.1/config/zookeeper.properties

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/databricks/jars/----ws_3_3--mvn--hadoop3--org.apache.logging.log4j--log4j-slf4j-impl--org.apache.logging.log4j__log4j-slf4j-impl__2.18.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/databricks/driver/kafka_2.12-3.5.1/libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
[2023-10-04 22:24:13,778] INFO Reading configuration from: ./kafka_2.12-3.5.1/config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-10-04 22:24:13,828] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-10-04 22:24:13,829] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-10-04 22:24:13,829] INFO obse

Chamada do Server do Kafka

In [0]:
%sh
./kafka_2.12-3.5.1/bin/kafka-server-start.sh ./kafka_2.12-3.5.1/config/server.properties

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/databricks/jars/----ws_3_3--mvn--hadoop3--org.apache.logging.log4j--log4j-slf4j-impl--org.apache.logging.log4j__log4j-slf4j-impl__2.18.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/databricks/driver/kafka_2.12-3.5.1/libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
[2023-10-04 22:25:38,613] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2023-10-04 22:25:39,604] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2023-10-04 22:25:39,902] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler

Criando um tópico no Kafka

In [0]:
%sh
./kafka_2.12-3.5.1/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic primeiro_topico --partitions 1 --replication-factor 1

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/databricks/jars/----ws_3_3--mvn--hadoop3--org.apache.logging.log4j--log4j-slf4j-impl--org.apache.logging.log4j__log4j-slf4j-impl__2.18.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/databricks/driver/kafka_2.12-3.5.1/libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Created topic primeiro_topico.


Executando o console para consumir os dados do tópico criado

In [0]:
%sh
./kafka_2.12-3.5.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic primeiro_topico --from-beginning

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/databricks/jars/----ws_3_3--mvn--hadoop3--org.apache.logging.log4j--log4j-slf4j-impl--org.apache.logging.log4j__log4j-slf4j-impl__2.18.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/databricks/driver/kafka_2.12-3.5.1/libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]


Instalação da biblioteca "kafka-python"

In [0]:
%sh
pip install --upgrade pip
pip install kafka-python

Collecting pip
  Using cached pip-23.2.1-py3-none-any.whl (2.1 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 21.2.4
    Uninstalling pip-21.2.4:
      Successfully uninstalled pip-21.2.4
Successfully installed pip-23.2.1
Collecting kafka-python
  Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2


Criando o produto para o tópico criado anteriormente - Kafka Producer

In [0]:
import json

from kafka import KafkaProducer

In [0]:
def streaming(df: DataFrame) -> None:
    producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
    topic = "primeiro_topico"

    # json_data = df.toJSON().collect()
    json_data = df
    mensagem = json.dumps(json_data[0]).encode('utf-8')

    producer.send(topic, mensagem)

In [0]:
df = extract()
for index in range(1, len(df.collect()) + 1):
    linha_selecionada = df.head(index)
    streaming(linha_selecionada)

Consumindo os arquivos do Spark Streaming

In [0]:
dbutils.fs.ls("/FileStore/streaming_data/")

Out[79]: [FileInfo(path='dbfs:/FileStore/streaming_data/data/', name='data/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/streaming_data/data.parquet/', name='data.parquet/', size=0, modificationTime=0)]

In [0]:
data_streaming = spark.read.parquet("/FileStore/streaming_data/")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4215692481192687>:1[0m
[0;32m----> 1[0m data_streaming [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43mread[49m[38;5;241;43m.[39;49m[43mparquet[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43m/FileStore/streaming_data/data[39;49m[38;5;124;43m"[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m

In [0]:
data_streaming.head(5)

Out[143]: [Row(id='1', name=None, author=None, title=None, description=None, url=None, urlToImage=None, publishedAt=None, content=None),
 Row(id='1', name=None, author=None, title=None, description=None, url=None, urlToImage=None, publishedAt=None, content=None),
 Row(id='1', name=None, author=None, title=None, description=None, url=None, urlToImage=None, publishedAt=None, content=None),
 Row(id='1', name=None, author=None, title=None, description=None, url=None, urlToImage=None, publishedAt=None, content=None),
 Row(id='1', name=None, author=None, title=None, description=None, url=None, urlToImage=None, publishedAt=None, content=None)]