In [1]:
#import modules

from pyspark.sql import *
#transform json strings to dataframe
from pyspark.sql.types import *
from pyspark.sql.functions import from_json,col


In [2]:
#create schema that matches kafka topic
schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Booking ID", StringType(), True),
    StructField("Booking Status", StringType(), True),
    StructField("Customer ID", StringType(), True),
    StructField("Vehicle Type", StringType(), True),
    StructField("Pickup Location", StringType(), True),
    StructField("Drop Location", StringType(), True),
    StructField("Booking Value", FloatType(), True),
    StructField("Ride Distance", FloatType(), True),
    StructField("Driver Ratings", FloatType(), True),
    StructField("Customer Rating", FloatType(), True),
    StructField("Payment Method", StringType(), True)
])

In [3]:
# Create Spark session
spark = (
    SparkSession.builder
    .appName("KafkaToMongoStream")
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
            "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0")
    .config("spark.mongodb.write.connection.uri",
            "mongodb://root:example@mongo:27017/uberstreaming.bookings?authSource=admin")
    .getOrCreate()
)

In [4]:
spark

In [5]:
#read message from kafka stream

df = (spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers","kafka:9092")
     .option("subscribe","ride-bookings")
     .load())

In [6]:
#convert the binary values to string

df_output = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [7]:
#create temp view for sparkSQL

df_output.createOrReplaceTempView("message")

In [8]:
# Write out the message to the console of the environment
res = spark.sql("SELECT * from message")
res.writeStream.format("console") \
            .outputMode("append") \
            .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7f1284d4df90>

In [9]:
#parse the stream in a dataframe
parsed_stream = df_output.select(from_json(col("value"), schema).alias("data"))

In [10]:
#flatten the data
flattened_df = parsed_stream.select("data.*")

In [11]:
#Rename columns (replace spaces with underscores)
clean_df = flattened_df \
    .withColumnRenamed("Booking ID", "Booking_ID") \
    .withColumnRenamed("Booking Status", "Booking_Status") \
    .withColumnRenamed("Customer ID", "Customer_ID") \
    .withColumnRenamed("Vehicle Type", "Vehicle_Type") \
    .withColumnRenamed("Pickup Location", "Pickup_Location") \
    .withColumnRenamed("Drop Location", "Drop_Location") \
    .withColumnRenamed("Booking Value", "Booking_Value") \
    .withColumnRenamed("Ride Distance", "Ride_Distance") \
    .withColumnRenamed("Driver Ratings", "Driver_Ratings") \
    .withColumnRenamed("Customer Rating", "Customer_Rating") \
    .withColumnRenamed("Payment Method", "Payment_Method")

In [None]:
#write data into mongodb
dataStreamWriter = (clean_df.writeStream
    .format("mongodb")
    .option("checkpointLocation", "/tmp/mongo_checkpoint")
    .option("forceDeleteTempCheckpointLocation", "true")
    .option("spark.mongodb.connection.uri", "mongodb://root:example@mongo:27017/uberstreaming.bookings?authSource=admin")
    .option("spark.mongodb.database", "uberstreaming")
    .option("spark.mongodb.collection", "bookings")
    .outputMode("append")
)
# run the query
query = dataStreamWriter.start().awaitTermination()