In [0]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import IntegerType, StringType, StructType, DateType


df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "1st_topic_news") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

#define o esquema da tabela resultado do streaming a partir da estrutura esperada da mensagem do Kafka
schema = StructType() \
    .add("source", StringType()) \
    .add("sourceId", StringType()) \
    .add("author", StringType()) \
    .add("title", StringType()) \
    .add("description", StringType()) \
    .add("url", StringType()) \
    .add("urlToImage", StringType()) \
    .add("publishedAt", StringType()) \
    .add("content", StringType()) \
    .add("termo", StringType())

# traduz a mensagem com o esquema definido
df_res = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# Grava na delta table
df_res.writeStream \
    .format("parquet")\
    .outputMode("append") \
    .option("checkpointLocation", "/FileStore/tables/projeto/checkpoint") \
    .option("failOnDataLoss", "false") \
    .toTable('HEALTHGEN.tb_news') \
    .awaitTermination()
