In [1]:
# This Python file checks location of "data/input" constantly. If there is new file, spark gets data inside file and insert to MongoDB. 

In [2]:
import findspark
findspark.init("")

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pymongo
import pandas as pd
import datetime

In [4]:
# This creates a client that uses the default port on localhost.
client = pymongo.MongoClient()
db = client["humour"]
col = db["jokes"]


In [5]:
# This function inserts data to MongoDB
def foreach_batch_function(df, epoch_id):  
    
    df = df.toPandas() # Spark df to Pandas df
    df = df.rename(columns={'id':'id_'})    
    
    data = df.to_dict('records')
    now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 

    data_list = [{ "date":now,"jokes":data}]

    x = col.insert_many(data_list)
    print(now + ': Data has been inserted to collection of humour.jokes')
       
    pass

In [None]:
if __name__ == "__main__":

    spark = SparkSession \
            .builder \
            .appName("File Streaming Application - MongoDB") \
            .master("local[*]") \
            .getOrCreate()
    
    print('File Streaming Application - MongoDB has been started!\n')
    
    input_schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("type", StringType(), True),
        StructField("setup", StringType(), True),
        StructField("punchline", StringType(), True)
    ])


    streaming_df = spark \
                .readStream \
                .format("csv") \
                .option("header", "true") \
                .schema(input_schema) \
                .load(path="data/input")
    
    print('Schema details are:\n')
    streaming_df.printSchema()

    streaming_df_query = streaming_df \
                        .writeStream \
                        .foreachBatch(foreach_batch_function) \
                        .start()
    
    streaming_df_query.awaitTermination()

File Streaming Application - MongoDB has been started!

Schema details are:

root
 |-- id: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- setup: string (nullable = true)
 |-- punchline: string (nullable = true)

2021-05-22 16:22:55: Data has been inserted to collection of humour.jokes
2021-05-22 16:23:06: Data has been inserted to collection of humour.jokes
