#### 1. Instalação de Bibliotecas

In [0]:
pip install kafka-python

Python interpreter will be restarted.
Collecting kafka-python
  Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2
Python interpreter will be restarted.


#### 2. Imports

In [0]:
from kafka import KafkaConsumer
import json
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.utils import AnalysisException

#### 3. Constantes

In [0]:
topico = 'meu-topico'

path = '/Filestore/projeto/dados_raw'

schema = StructType([
        StructField("source", StructType((
            StructField("id", StringType(), True),
            StructField("name", StringType(), True)
        )), True),
        StructField("author", StringType(), True),
        StructField("title", StringType(), True),
        StructField("description", StringType(), True),
        StructField("url", StringType(), True),
        StructField("urlToImage", StringType(), True),
        StructField("publishedAt", StringType(), True),
        StructField("content", StringType(), True)])

#### 4. Persistência dos dados

In [0]:
def obter_dados_nao_salvos(df):
  try:
    df_leitura = spark.read.parquet(path)
    df_left_join = df.alias("df1").join(df_leitura.alias("df2"), on="url", how="left")
    left_exclude = df_left_join.filter(col("df2.url").isNull()).select("df1.*")
    return left_exclude
  except AnalysisException:
    return df

def processamento_raw(df):
  df = df.dropDuplicates(['url'])
  df = df.withColumn('data_processamento', current_timestamp())
  return df

def save_data(df):
  df.write.format('delta').mode('append').parquet(path)
  print(f"{df.count()} novas notícias foram persistidas")

#### 5. Recebe os dados e persiste no arquivo parquet a cada 50 mensagens lidas

In [0]:
# Função de desserialização
def deserializer(message):
    return json.loads(message.decode('utf-8'))

# Criando o consumidor no grupo "grupo-de-consumo-1"
consumer = KafkaConsumer(
    topico,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    # group_id='grupo-de-consumo-1',  # Definindo o grupo de consumidores
    value_deserializer=deserializer
)

mensagens = []
contador = 0
for message in consumer:
    contador += 1
    valor = message.value
    mensagens.append(valor)
    
    if contador == 50:
        contador = 0
        df = spark.createDataFrame(mensagens, schema)
        df_nova_noticia = obter_dados_nao_salvos(processamento_raw(df))
        save_data(df_nova_noticia)

# Fechando o consumer
consumer.close()

49 novas notícias foram persistidas
47 novas notícias foram persistidas
50 novas notícias foram persistidas
188 novas notícias foram persistidas
0 novas notícias foram persistidas
0 novas notícias foram persistidas
0 novas notícias foram persistidas
0 novas notícias foram persistidas
0 novas notícias foram persistidas
0 novas notícias foram persistidas
0 novas notícias foram persistidas
