Neste notebook é feita a configuração do KafkaConsumer, que receberá a informação enviada pelo Webhook e salvará a informação na mesma tabela que é feito o salvamento das requisições automáticas.

In [0]:
pip install kafka-python

Python interpreter will be restarted.
Collecting kafka-python
  Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2
Python interpreter will be restarted.


In [0]:
import kafka
from kafka import KafkaConsumer
import requests
import json
import time
import urllib.parse
from datetime import datetime, timedelta
from pyspark.sql.types import StructType, StructField, StringType, MapType
from pyspark.sql.functions import col 
import json

In [0]:
MINUTES_TO_RUN_KAFKA_REPORT = ['5', '10', '15', '20', '25', '35', '40', '45', '50', '55']

schema = StructType([
    StructField('author', StringType(), True),
    StructField('content', StringType(), True),
    StructField('description', StringType(), True),
    StructField('publishedAt', StringType(), True),
    StructField('source', MapType(StringType(), StringType(), True), True),
    StructField('title', StringType(), True),
    StructField('url', StringType(), True),
    StructField('urlToImage', StringType(), True)
    ])

def data_save():
    consumer = KafkaConsumer('responseManualRequest', bootstrap_servers=['localhost:9092'],auto_offset_reset='earliest', consumer_timeout_ms=10000)
    kafka_consumer_report = []
    for message in consumer:
        report_dict = json.loads(json.loads(message.value.decode()))
        kafka_consumer_report.extend(report_dict['articles'])

    return kafka_consumer_report

while True:
    if str(datetime.now().minute) in MINUTES_TO_RUN_KAFKA_REPORT:
        kafka_consumer_report = data_save()
        df_report_kafka = spark.createDataFrame(kafka_consumer_report, schema=schema)
        df_report_kafka = df_report_kafka.withColumn('source', col('source.name')) # <- this overwrites the colum "source" (type:map) to make a colum
                                                                                # (type:str) with only the name of the source ("source id" deleted)
        if spark.catalog.tableExists('table_articles'):
            df_temp = spark.read.parquet('/FileStore/tables/table_articles')
            df_report_kafka = df_report_kafka.subtract(df_temp)

        df_report_kafka = df_report_kafka.filter(~col('source').contains('[Removed]'))
        df_report_kafka = df_report_kafka.dropDuplicates()
        df_report_kafka.write.mode('append').format('parquet').option(
        'path', '/FileStore/tables/table_articles').saveAsTable('table_articles')
        
        print(f'Data consumed! Total NEW articles found: {df_report_kafka.count()}')
        print(f'New articles loaded in [/FileStore/tables/table_articles]')
        print('---------------------------------------------------------')
        
        time.sleep(100) # 100 seconds


Data consumed! Total NEW articles found: 1
New articles loaded in [/FileStore/tables/table_articles]
---------------------------------------------------------
Data consumed! Total NEW articles found: 0
New articles loaded in [/FileStore/tables/table_articles]
---------------------------------------------------------
Data consumed! Total NEW articles found: 0
New articles loaded in [/FileStore/tables/table_articles]
---------------------------------------------------------
