In [1]:
from pyspark.sql import SparkSession

# you need these two to transform the json strings to dataframes
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('kafka-mongo-streaming')     
         # Add kafka package and mongodb package. Make sure to to this as one string!
         # Versions need to match the Spark version (trial & error)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0")
         # Mongo config including the username and password from compose file
         .config("spark.mongodb.input.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
         .config("spark.mongodb.output.uri","mongodb://root:example@mongo:27017/docstreaming.invoices?authSource=admin")
         .getOrCreate())


In [2]:
# Read the message from the kafka stream
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "ingestion-topic") \
  .load()

# convert the binary values to string
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [3]:
#Create a temporary view for SparkSQL
df1.createOrReplaceTempView("message")

In [4]:
# Write out the message to the console of the environment
res = spark.sql("SELECT * from message")
res.writeStream.format("console") \
            .outputMode("append") \
            .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f9bee27ce10>

In [5]:
 

# Write the message into MongoDB
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF in this foreach

    # writes the dataframe with complete kafka message into mongodb
    #df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
    #Transform the values of all rows in column value and create a dataframe out of it (will also only have one row)
    df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))    
   
    # Transform the dataframe so that it will have individual columns 
    df3= df2.select(["value.Quantity","value.UnitPrice","value.Country","value.CustomerID","value.StockCode","value.Description","value.InvoiceDate","value.InvoiceNo"])
    
    # Send the dataframe into MongoDB which will create a JSON document out of it
    df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    
    pass

In [6]:
# Start the MongoDB stream and wait for termination
df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

StreamingQueryException: 'An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):\n  File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco\n    return f(*a, **kw)\n  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value\n    format(target_id, ".", name), value)\npy4j.protocol.Py4JJavaError: An error occurred while calling o84.save.\n: java.lang.IllegalArgumentException: Missing database name. Set via the \'spark.mongodb.output.uri\' or \'spark.mongodb.output.database\' property\n\tat com.mongodb.spark.config.MongoCompanionConfig$class.databaseName(MongoCompanionConfig.scala:260)\n\tat com.mongodb.spark.config.WriteConfig$.databaseName(WriteConfig.scala:37)\n\tat com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:214)\n\tat com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)\n\tat com.mongodb.spark.config.MongoCompanionConfig$class.apply(MongoCompanionConfig.scala:124)\n\tat com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)\n\tat com.mongodb.spark.config.MongoCompanionConfig$class.apply(MongoCompanionConfig.scala:113)\n\tat com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)\n\tat com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:64)\n\tat org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)\n\tat org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)\n\tat org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)\n\tat org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)\n\tat org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)\n\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)\n\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\n\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 2381, in _call_proxy\n    return_value = getattr(self.pool[obj_id], method)(*params)\n  File "/usr/local/spark/python/pyspark/sql/utils.py", line 191, in call\n    raise e\n  File "/usr/local/spark/python/pyspark/sql/utils.py", line 188, in call\n    self.func(DataFrame(jdf, self.sql_ctx), batch_id)\n  File "<ipython-input-5-160e0a02fac9>", line 15, in foreach_batch_function\n    df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()\n  File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 737, in save\n    self._jwrite.save()\n  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__\n    answer, self.gateway_client, self.target_id, self.name)\n  File "/usr/local/spark/python/pyspark/sql/utils.py", line 79, in deco\n    raise IllegalArgumentException(s.split(\': \', 1)[1], stackTrace)\npyspark.sql.utils.IllegalArgumentException: "Missing database name. Set via the \'spark.mongodb.output.uri\' or \'spark.mongodb.output.database\' property"\n\n=== Streaming Query ===\nIdentifier: [id = af210dca-f477-4ad6-8558-7bd4aca8beeb, runId = 2541e7d1-136a-4710-bc19-a568ec74d228]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {KafkaV2[Subscribe[ingestion-topic]]: {"ingestion-topic":{"0":0}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [cast(key#7 as string) AS key#21, cast(value#8 as string) AS value#22]\n+- StreamingExecutionRelation KafkaV2[Subscribe[ingestion-topic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'