In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Streaming from Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [21]:
#Read kafka topic
kafka_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","ed-kafka:29092")
    .option("subscribe","customer_data")
    .option("startingOffsets","latest")
    .load()
)


In [22]:
kafka_df.printSchema()
# kafka_df.show(truncate = False)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [23]:
# Parse value from binay to string into kafka_json_df
from pyspark.sql.functions import expr

kafka_array_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [24]:
kafka_array_df.printSchema()
# kafka_array_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [25]:
from pyspark.sql.types import StringType, StructField, StructType
customer_schema = StructType([
    StructField('customerID', StringType(), True),
    StructField('gender', StringType(), True),
    StructField('SeniorCitizen', StringType(), True),
    StructField('Partner', StringType(), True),
    StructField('Dependents', StringType(), True),
    StructField('tenure', StringType(), True),
    StructField('PhoneService', StringType(), True),
    StructField('MultipleLines', StringType(), True),
    StructField('InternetService', StringType(), True),
    StructField('OnlineSecurity', StringType(), True),
    StructField('OnlineBackup', StringType(), True),
    StructField('DeviceProtection', StringType(), True),
    StructField('TechSupport', StringType(), True),
    StructField('StreamingTV', StringType(), True),
    StructField('StreamingMovies', StringType(), True),
    StructField('Contract', StringType(), True),
    StructField('PaperlessBilling', StringType(), True),
    StructField('PaymentMethod', StringType(), True),
    StructField('MonthlyCharges', StringType(), True),
    StructField('TotalCharges', StringType(), True)
])

In [26]:
# Apply the schema to payload to read the data
from pyspark.sql.functions import from_json,col

streaming_df = kafka_array_df.withColumn("values_json", from_json(col("value"), customer_schema)).selectExpr("values_json.*")

In [27]:
streaming_df.printSchema()
# streaming_df.show(truncate = False)

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: string (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: string (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: string (nullable = true)
 |-- TotalCharges: string (nullable = true)



In [None]:
(
    streaming_df
    .writeStream
    .format("csv")
    .outputMode("append")
    .option("path", "Streaming_csv_results")
    .option("checkpointLocation", "checkpoint_directory")
    .trigger(processingTime = "3 seconds")
    .start()
    .awaitTermination()
)