In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, when
# you need these two to transform the json strings to dataframes
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('kafka-mongo-streaming')     
         # Add kafka package and mongodb package. Make sure to to this as one string!
         # Versions need to match the Spark version (trial & error)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0")
         # Mongo config including the username and password from compose file
         .config("spark.mongodb.input.uri"
                 ,"mongodb://root:example@mongo:27017/docstreaming.product?authSource=admin \
                 , mongodb://root:example@mongo:27017/docstreaming.category?authSource=admin \
                 ")
         .config("spark.mongodb.output.uri","mongodb://root:example@mongo:27017/docstreaming.product?authSource=admin")
         .getOrCreate())
sc = spark.sparkContext

In [2]:
# Read the message from the kafka stream for topic ingest-product
df_product = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "ingest-product") \
  .load()

# convert the binary values to string
df1_product = df_product.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [3]:
# Read the message from the kafka stream for topic ingest-category
df_category = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "ingest-category") \
  .load()

# convert the binary values to string
df1_category = df_category.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [4]:
#Create a temporary view for SparkSQL
df1_product.createOrReplaceTempView("message_product")

In [5]:
#Create a temporary view for SparkSQL
df1_category.createOrReplaceTempView("message_category")

In [6]:
# Write out the product message to the console of the environment
res_product = spark.sql("SELECT * from message_product")
res_product.writeStream.format("console") \
            .outputMode("append") \
            .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f0b32f4aed0>

In [7]:
# Write out the category message to the console of the environment
res_category = spark.sql("SELECT * from message_category")
res_category.writeStream.format("console") \
            .outputMode("append") \
            .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f0b33138bd0>

In [8]:
# Write the message into MongoDB
def foreach_batch_function(df, epoch_id, collection):
    # Transform and write batchDF in this foreach

    # writes the dataframe with complete kafka message into mongodb
#     df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
    #Transform the values of all rows in column value and create a dataframe out of it (will also only have one row)
    df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType()))) 
#     df2.show(truncate=False)
    
     # Transform the dataframe so that it will have individual columns 
    if collection == 'category':
        df3= df2.select(["value.id","value.category_name"])
    elif collection == 'product':
        df3 = df2.select(["value.asin", "value.price", "value.boughtInLastMonth", "value.isBestSeller",  
            "value.imgUrl", "value.date", "value.reviews", "value.productURL",
            "value.listPrice", "value.category_id", "value.title",
        ])
       
    # Send the dataframe into MongoDB which will create a BSON document out of it
    df3.write.format("com.mongodb.spark.sql.DefaultSource") \
    .option("spark.mongodb.output.uri", f"mongodb://root:example@mongo:27017/docstreaming.{collection}?authSource=admin") \
    .mode("append").save()        
    
    pass

In [9]:
# Start the MongoDB stream for product and wait for termination
# df1_product.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

df1_product.writeStream.foreachBatch(lambda df1_product,epoch_id: foreach_batch_function(df1_product, epoch_id,'product')).start()

<pyspark.sql.streaming.StreamingQuery at 0x7f0b32f63110>

In [10]:
# Start the MongoDB stream for category and wait for termination
df1_category.writeStream.foreachBatch(lambda df1_category,epoch_id: foreach_batch_function(df1_category, epoch_id,'category')).start()

<pyspark.sql.streaming.StreamingQuery at 0x7f0b32f596d0>

In [None]:
spark.streams.awaitAnyTermination()