In [None]:
import os
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from pyspark.sql import SQLContext
import redis
from datetime import datetime

In [None]:
sc = SparkContext()
ssc = StreamingContext(sc, 5) # Create a streaming context with batch interval of 5 seconds
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["water"], {"metadata.broker.list": "kafka:9093"})
sql = SQLContext(sc)
r = redis.StrictRedis(host='redis', port=6379, db=0, decode_responses=True)

In [None]:
def setSetByHouse(row, timestamp):
    key = row.id + '-total-' + timestamp
    previous_value = 0 if r.get(key) is None else r.get(key)
    if int(previous_value) < int(row.lit):
        r.delete(key)
        r.set(key, row.lit)
        r.expire(key, 300)

In [None]:
def process(kafkaRdd):
    print("No. of rows: %d" % kafkaRdd.count())
    
    kafkaValuesRdd = kafkaRdd.map(lambda x: x[1])
    dataFrame = sql.read.json(kafkaValuesRdd)
    for row in dataFrame.rdd.collect():
        temp = datetime.strptime(row.time, "%Y-%m-%d %H:%M:%S")
        timestamp = datetime(temp.year, temp.month, temp.day, temp.hour, temp.minute)
        timestamp = timestamp.strftime("%Y%m%d%H%M")
        
        setSetByHouse(row, timestamp)

In [None]:
directKafkaStream.foreachRDD(process)

In [None]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate