In [6]:
def spark_setup():
    
    """
    Import the necessay libraries:
    """
    from pyspark.sql import SparkSession
    from pyspark.sql.types import MapType,StringType
    from pyspark.sql.functions import from_json
    
    
    """
    Setup the connection between spark and mongoDB:
    """
    spark = (SparkSession.builder.master("local").appName("stock-price-streaming") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.mongodb.spark:mongo-spark-connector_2.11:2.4.0") \
        .config("spark.mongodb.input.uri","mongodb://root:example@mongo:27017/stock-data-streaming.*?authSource=admin")
        .config("spark.mongodb.output.uri","mongodb://root:example@mongo:27017/stock-data-streaming.*?authSource=admin")
        .getOrCreate())

In [7]:
def consumer_func(company ):
    '''
    This is a consumer function that will take in the company name that will subscribe to the topic
    of that particular topic.
    '''
    dataframe = f"{company}" + "_" + "df"
    dataframe_string = f"{company}" + "_" +"df" + "_" + "string"
    topic_name = f"{company}" + "-" + "topic"
    temp_table = f"{company}" + "_" + "message"
    temp_res = f"{company}" + "_" +"res"
    
    
    # Subscribe to the topic:
    dataframe = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9092") \
                .option("subscribe", f"{topic_name}").load()
    
    # Convert the recieved data from bytes to string:
    dataframe_string = dataframe.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    
    # Create a temporary view:
    dataframe_string.createOrReplaceTempView(f"{temp_table}")
    
    # Print the output in the console:
    temp_res = spark.sql(f"select * from {temp_table}")
    temp_res.writeStream.format("console").outputMode("append").start()
    
    return dataframe_string

In [8]:
# Write the message into MongoDB
def foreach_batch_function(df, epoch_id, collection_name):
    # Transform and write batchDF in this foreach

    # writes the dataframe with complete kafka message into mongodb
#     df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("database", "stock-data-streaming").option("collection", "amazon").save()
    
    #Transform the values of all rows in column value and create a dataframe out of it (will also only have one row)
    df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))    
   
    # Transform the dataframe so that it will have individual columns 
    df3= df2.select(["value.time","value.open","value.high","value.low","value.close","value.volume","value.Stock"])
    
    # Send the dataframe into MongoDB which will create a JSON document out of it
    df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("database", "stock-data-streaming").option("collection", "amazon").save()
    
    pass


In [11]:
 def trigger_amazon_write_stream(company, collection):
        spark_setup()
        amazon = consumer_func(company)
        amazon.writeStream.foreachBatch(lambda df, epoch_id: foreach_batch_function(df, epoch_id, collection)).start().awaitTermination()

In [None]:
trigger_amazon_write_stream(company="amazon", collection = "amazon")

In [None]:
# amazon.writeStream.foreachBatch(lambda df, epoch_id: foreach_batch_function(df, epoch_id, amazon_collection)).start().awaitTermination()