In [1]:
import pandas as pd
import findspark

import pymongo
from pymongo import MongoClient

findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[2]").appName("CA2").getOrCreate()

In [3]:
from pyspark.sql.types import *

filePath = "data"
schema = StructType().add("product_id", "integer").add("quantity", "integer").add("time", "timestamp")
sdf = spark.readStream.schema(schema).csv(filePath)

In [4]:
query = sdf.writeStream.outputMode("append").format("console")
query.start()

<pyspark.sql.streaming.StreamingQuery at 0x1ec0f81ff08>

In [5]:
from pyspark.sql.functions import window,desc

window = sdf.withWatermark("time", "3 minutes").groupBy(window(sdf.time, "60 seconds")).sum('quantity')
window = window.sort(desc("window"))


def store_aggregated_data(row):
    
    try:
        client = MongoClient()
        # Get the database
        db = client.bakeinc
        collection = db.user

        data = {}
        
        time_window      = str(row["window"]["start"])
        num_transactions = int(row["sum(quantity)"])

        data['num_transactions'] = num_transactions
        data['time_window']      = time_window
        
        query = {'time_window': time_window}
        
        try:
            x = collection.find(query).next()
            collection.update_one(query,{"$set" : data})
        except StopIteration:
            collection.insert_one(data)
        
        print("Data Inserted")
            
    except KeyboardInterrupt:
        print("Keyboard Interrupted ...")
        sys.exit()
        
    except:
        import sys
        print("Error in store_aggregated_data")
        print(sys.exc_info()[0])
        print(sys.exc_info()[1])


agg_sdf = window.writeStream.outputMode("complete").foreach(store_aggregated_data)
agg_sdf.start()



<pyspark.sql.streaming.StreamingQuery at 0x1ec0f874848>