In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession 
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

In [2]:
ss = SparkSession.Builder() \
     .appName("Average") \
     .master("spark://spark-master-2:7077") \
     .config("spark.jars", "./spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar") \
     .getOrCreate()

In [3]:
sc = ss.sparkContext
ssc = StreamingContext(sc, 20)
ss.sparkContext.setLogLevel('WARN')

In [4]:
brokers = 'kafka-1:9092,kafka-2:9092'
topic='BTC'

In [5]:
ks = KafkaUtils.createDirectStream(
    ssc, [topic], kafkaParams={"metadata.broker.list":brokers})

In [6]:
def processing(rdd):
    from pyspark.sql.functions import mean, desc
    from pymongo import MongoClient
    if not rdd.isEmpty():
        global ss
        print(f"Recieved {len(rdd.collect())} records")
        df = ss.createDataFrame(
            rdd,
            schema=[
                "timestamp",
                "buy_price",
                "sell_price",
            ]
        )
        # df.show()
        timestamp = df.sort(desc('timestamp')).head()['timestamp']
        mean_buy = df.select(mean('buy_price')).take(1)[0]['avg(buy_price)']
        mean_sell = df.select(mean('sell_price')).take(1)[0]['avg(sell_price)']
        
        # Save to database
        conn = MongoClient('mongodb', 27017)
        db = conn.btc
        db.average.insert({
            'timestamp': timestamp,
            'buy_price': mean_buy,
            'sell_price': mean_sell
        })
        conn.close()

In [7]:
def get_list(s):
    t = json.loads(s[1])
    res = [t['timestamp'], t['buy_price'], t['sell_price']]
    return res

In [None]:
lines = ks.map(lambda x: get_list(x))
lines.foreachRDD(processing)

ssc.start()
ssc.awaitTermination()

Recieved 1 records




Recieved 3 records
Recieved 1 records
Recieved 3 records
Recieved 4 records
Recieved 4 records
Recieved 3 records
Recieved 4 records
Recieved 3 records
Recieved 4 records
Recieved 3 records
Recieved 3 records
Recieved 4 records
Recieved 3 records
Recieved 4 records
Recieved 3 records
Recieved 3 records
Recieved 3 records
Recieved 4 records
Recieved 3 records
Recieved 3 records
Recieved 4 records
Recieved 3 records
Recieved 3 records
Recieved 3 records
Recieved 4 records
Recieved 4 records
Recieved 3 records
Recieved 3 records
