In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json

# Spark session & context
spark = (SparkSession
         .builder# Builder pattern method
         .master('local')#Specify the number of parts in square brackets after the "local" (this number depends on CPU cores)
         .appName('kafka-mongo-streaming')     
         # Configuration with kafka
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0")
         # Configuration with MongoDB
         .config("spark.mongodb.input.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
         .config("spark.mongodb.output.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
         .getOrCreate())#If SparkSession already exists, then it returns otherwise creates a new SparkSession.


In [None]:
# Read the message from the Kafka stream
#Specify the port, server, subscription and load Kafka streams into a data frame
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "ingestion-topic") \
  .load()
# Convert the binary values to string
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
#Create a temporary view for SparkSQL
df1.createOrReplaceTempView("message")

In [None]:
# Write out the message to the console of the environment
res = spark.sql("SELECT * from message")
res.writeStream.format("console") \
            .outputMode("append") \
            .start()

In [None]:
# Write the message into MongoDB
#We can't use spark writeStream since the formats are not matching, but we can use foreachBatch.
#Without function we will write the entire JSON string as the value of the "value" key.
def foreach_batch_function(df, epoch_id):
    #Write the data frame with complete Kafka message into MongoDB
    #Transform the values of all rows in column value and create a data frame out of it (will also only have one row)
    df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))    
    # Transform the data frame so that it will have individual columns 
    df3= df2.select(["value.Quantity","value.UnitPrice","value.Country","value.CustomerID","value.StockCode","value.Description","value.InvoiceDate","value.InvoiceNo"])
    # Send the data frame into MongoDB which will create a BSON document out of it
    df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    pass

In [None]:
# Start the MongoDB stream and wait for termination
df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()