In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, explode, expr
from pyspark.sql.types import *

# Define schema
myschema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("contacts", StructType([
        StructField("email", StringType(), True),
        StructField("phones", ArrayType(StringType()), True)
    ]), True),
    StructField("skills", ArrayType(StringType()), True),
    StructField("projects", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("duration", StringType(), True)
    ])), True)
])

# Create Spark session
spark = (
    SparkSession
    .builder
    .appName("Kafka to CSV")
    .master("local[*]")
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .getOrCreate()
)

# Read from Kafka
kafka_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "people")
    .option("startingOffsets", "earliest")
    .load()
)

# Cast value to string and parse JSON
json_df = kafka_df.selectExpr("cast(value as string) as json_string")
parsed_df = json_df.withColumn("data", from_json(col("json_string"), myschema)).select("data.*")

# Flatten nested structure
df_flat = parsed_df \
    .withColumn("email", col("contacts.email")) \
    .withColumn("phones", col("contacts.phones")) \
    .withColumn("phone", explode("phones")) \
    .withColumn("skill", explode("skills")) \
    .withColumn("project", explode("projects")) \
    .withColumn("project_name", col("project.name")) \
    .withColumn("project_duration", col("project.duration")) \
    .drop("contacts", "phones", "skills", "projects", "project")

# Write to CSV
(
    df_flat.writeStream
    .format("csv")
    .option("path", "data/output/")
    .option("checkpointLocation", "checkpoint_dir_csv")
    .outputMode("append")
    .start()
    .awaitTermination()
)


25/04/11 10:22:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/04/11 10:22:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

In [5]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "people")
    .option("startingOffsets", "earliest")
    .load()
)

In [6]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [None]:
from pyspark.sql.functions import expr, explode, col,from_json
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType

# Cast Kafka value to string
kafka_json_df = kafka_df.withColumn("value", expr("CAST(value AS STRING)"))

# Define schema manually or use existing
myschema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("contacts", StructType([
        StructField("email", StringType(), True),
        StructField("phones", ArrayType(StringType()), True)
    ]), True),
    StructField("skills", ArrayType(StringType()), True),
    StructField("projects", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("duration", StringType(), True)
    ])), True)
])

# Parse JSON
streaming_df = kafka_json_df.withColumn("data", from_json(col("value"), myschema)).select("data.*")

# Explode and group
# skill_counts = streaming_df.withColumn("skill", explode("skills")) \
#                            .groupBy("skill") \
#                            .count()

# Write in complete mode
query = (
    skill_counts
    .writeStream
    .format("csv")
    .option("path", "data/output/")
    .option("checkpointLocation", "checkpoint_dir_kafka")
    .outputMode("append")
    .start()
    .awaitTermination()
)

query.awaitTermination()
