In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random
import logging
import time
import json
from datetime import datetime

from pymongo import MongoClient
import threading


client = MongoClient('localhost', 27017)
db = client.ICS5114

collection_iceland = db.glacial_iceland_collection
collection_europe = db.glacial_europe_collection


def main(topic, mongoDB_collection):
    
    total_docs = 0
    current_total_dh = 0
    average_dh = 0

    kafka_topic_name = topic
    kafka_bootstrap_servers = 'localhost:9092'


    spark = SparkSession \
            .builder \
            .appName("Structured Streaming ") \
            .master("local[*]") \
            .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    # streaming df, reading from ICS5114 topic
    kafka_df = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
            .option("failOnDataLoss", "false") \
            .option("subscribe", kafka_topic_name) \
            .option("startingOffsets", "latest") \
            .option("spark.streaming.kafka.maxRatePerPartition", "10") \
            .load()

    def data_process(df, batch_id):
        nonlocal average_dh, total_docs, current_total_dh, topic
        print("processing....")
        kafka_req = df.rdd.map(lambda x: x.value).collect()
        for record in kafka_req:
            new_json_record = json.loads(record.decode("utf-8"))
            mongoDB_collection.insert_one(new_json_record)  
            if(new_json_record["dh"] != None):
                total_docs = total_docs + 1
                current_total_dh = current_total_dh + new_json_record["dh"]
                average_dh = current_total_dh/total_docs
        print(topic +" - current elevation change average : " + str(average_dh) + "\n")


    k_df = kafka_df.writeStream \
        .foreachBatch(data_process) \
        .trigger(processingTime="5 seconds") \
        .start() \
        .awaitTermination()


# foreach method is not supported in python, the forEachBatch method was used instead --
# def row_process(row):
#     print(row)
    
# k_w = (kafka_df.writeStream \
#     .foreach(row_process) \
#     .trigger(processingTime="1 second") \
#     .start() \
#     .awaitTermination())
# ---------------------------------------------------------------------------------------

In [None]:
# create two different threads 
thread_1 = threading.Thread(target=main, args=("ICS5114-iceland",collection_iceland))
thread_2 = threading.Thread(target=main, args=("ICS5114-europe",collection_europe))

thread_1.start()
thread_2.start()

thread_1.join()
thread_2.join()


processing....
processing....
ICS5114-iceland - current elevation change average : 0
ICS5114-europe - current elevation change average : 0


processing....
processing....
ICS5114-iceland - current elevation change average : -4.195477329692933

ICS5114-europe - current elevation change average : -6.7458083657587515

processing....
processing....
ICS5114-europe - current elevation change average : -7.039806341480489

processing....
ICS5114-iceland - current elevation change average : -4.641864895863084

ICS5114-europe - current elevation change average : -7.146801832307499

processing....
ICS5114-europe - current elevation change average : -7.093093858500265

processing....
ICS5114-iceland - current elevation change average : -4.690632913537511

processing....
processing....
ICS5114-europe - current elevation change average : -7.085387520260451

processing....
ICS5114-iceland - current elevation change average : -4.69261063055779

processing....
ICS5114-iceland - current elevation change