In [23]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import logging
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType,TimestampType

In [52]:
def spark_connection():
    spark = None
    
    try:
        spark = (
            SparkSession
            .builder
            .appName("Spark Streaming")
            .config("spark.streaming.stopGracefullyOnShutdown", "true")
            .config("spark.sql.shuffle.partitions", 6)
            .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
            .master("local[*]")
            .getOrCreate()
        )
    except Exception as e:
        print(f"Error initializing Spark session: {e}")
    
    return spark

In [53]:
def kafka_connection(spark):
    kafka_read = None
    
    try:
        kafka_read = (
            spark
            .read
            .format("kafka")
            .option("kafka.bootstrap.servers", "ed-kafka:29092")
            .option("subscribe", "business")  # Fixed typo: "tecnology" -> "technology"
            .option("startingOffsets", "earliest")  # Fixed typo: "starginOffsest" -> "startingOffsets"
            .load()
        )
        logging.info("Kafka DataFrame created successfully")
    except Exception as e:
        logging.warning(f"Error creating Kafka DataFrame: {e}")
    
    return kafka_read

In [54]:
def create_struct(kafka):
    # Define schema
    schema = StructType([
        StructField("id", StringType(), nullable=False),
        StructField("title", StringType(), nullable=True),
        StructField("description", StringType(), nullable=True),
        StructField("url", StringType(), nullable=True),
        StructField("author", StringType(), nullable=True),
        StructField("image", StringType(), nullable=True),
        StructField("language", StringType(), nullable=True),
        StructField("category", ArrayType(StringType()), nullable=True),
        StructField("published", TimestampType(), nullable=True)
    ])
    
    st = kafka.withColumn("value", expr("cast(value as string)")).select(from_json(col("value"), schema).alias("data")).select("data.*")
    
    return st

In [60]:
if __name__=="__main__":
    spark_conn=spark_connection()
    
    if spark_conn is not None:
        kafka_frame=kafka_connection(spark_conn)
        create_struct(kafka_frame).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+-------------------+-------------------+
|                  id|               title|         description|                 url|              author|               image|language|           category|          published|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+-------------------+-------------------+
|ec2e5a3e-d398-4cb...|Uttarakhand: 3 Ki...|Three people died...|https://www.ndtvp...|                 PTI|https://media.ass...|      en|[general, business]|2024-07-21 09:40:58|
|3b9c3c9d-5b33-4df...|Tata Technologies...|NDTV Profit’s spe...|https://www.ndtvp...|    Icici Securities|https://media.ass...|      en|[general, business]|2024-07-21 09:37:22|
|6708efb3-b3ad-4ce...|JSW Steel Q1 Resu...|NDTV Profit’s spe...|https://www.ndtvp...|Motilal Oswal Fin...|https://m