In [None]:
"""
 Processes direct stream from kafka, '\n' delimited text directly received
   every 2 seconds.
 Usage: kafka-direct-iotmsg.py <broker_list> <topic>

 To run this on your local machine, you need to setup Kafka and create a
   producer first, see:
 http://kafka.apache.org/documentation.html#quickstart

 and then run the example
    `$ bin/spark-submit --jars \
      external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \
      kafka-direct-iotmsg.py \
      localhost:9092 iotmsgs`
"""
from __future__ import print_function

import sys
import re

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

from operator import add


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka-direct-iotmsg.py <broker_list> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    sc.setLogLevel("WARN")

    ###############
    # Globals
    ###############
    
    lightTotal = 0.0
    lightCount = 0
    lightAvg = 0.0

    airHumidityTotal = 0.0
    airHumidityCount = 0
    airHumidityAvg = 0.0

    airTempratureTotal = 0.0
    airTempratureCount = 0
    airTempratureAvg = 0.0


    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

    # Read in the Kafka Direct Stream into a TransformedDStream
    lines = kvs.map(lambda x: x[1])
    jsonLines = lines.map(lambda x: re.sub(r"\s+", "", x, flags=re.UNICODE))

    ############
    # Processing
    ############
  
    def processLightRDD(time, rdd):
        
        global lightTotal
        global lightCount
        global lightAvg

        lightList = rdd.collect()
    
        for lightFloat in lightList:
            lightTotal += float(lightFloat)
            lightCount += 1
            lightAvg = lightTotal / lightCount
            
        if lightAvg >= 0 and lightAvg < 30:
            print("Your plants need light. Turn on the light switch.")
        elif lightAvg >= 30 and lightAvg < 40:
            print("Light is a little bit below the normalranges but still OK . ")
        elif lightAvg >= 40 and lightAvg <= 50:
            print("Light is in the normal range: 40-50 watt. ")
        elif lightAvg > 50 and lightAvg < 60:
            print("Light is a little bit above the normal ranges but still OK. ")
        elif lightAvg >= 60 and lightAvg < 80:
            print("Light is above the normal ranges. Raise the blinder. ")
        else:
            print("Raise the blinder immediately or your plants gonna die soon.")
 
             

    def processAirHumidityRDD(time, rdd):
        
        global airHumidityTotal
        global airHumidityCount
        global airHumidityAvg

        airHumidityList = rdd.collect()
        
        for airHumidityFloat in airHumidityList:
            airHumidityTotal += float(airHumidityFloat)
            airHumidityCount += 1
            airHumidityAvg = airHumidityTotal / airHumidityCount

        if airHumidityAvg >= 0 and airHumidityAvg < 40:
            print("Air is dry. Turn on the humidifier button. ")
        elif airHumidityAvg  >= 40 and airHumidityAvg < 50:
            print("Humidity is a little bit below the normal range. It is still ok or You can turn off the dehumidifier button.")
        elif airHumidityAvg  >= 50 and airHumidityAvg <= 60:
            print("Humidity is in the normal range:50 - 60%.")
        elif airHumidityAvg  > 60 and airHumidityAvg < 70:
            print("Humidity is a little  bit above the normal range. It is still ok or You can turn off the humidifier button.")
        else: 
            print("Air is too humid. Turn on the dehumidifier button.")
            
            

    def processAirTempratureRDD(time, rdd):
        
        global airTempratureTotal
        global airTempratureCount
        global airTempratureAvg

        airTempratureList = rdd.collect()
        
        for airTempratureFloat in airTempratureList:
            airTempratureTotal += float(airTempratureFloat)
            airTempratureCount += 1
            airTempratureAvg = airTempratureTotal / airTempratureCount

        if airTempratureAvg >= 65 and airHumidityAvg <= 75:
            print("Temperature is in the normal range: 65 - 75F ")
        elif airTempratureAvg > 75 and airHumidityAvg <= 90:
            print("Temperature is a little bit above the normal range but still OK.")
        elif airTempratureAvg > 90 and airHumidityAvg <= 100:
            print("Temperature is quite high. Cool dowm the heater for 30 minutes.")
        elif airTempratureAvg > 100:
            print("Temperature is dangerously high. Plants is dying. Turn off the heater.")
        elif airTempratureAvg < 65 and airHumidityAvg >= 40:
            print("Temperature is a little bit below the normal range but still OK.")
        elif airTempratureAvg < 40 and airHumidityAvg >= 0:
            print("Temperature is dangerously low. Plants is dying. Turn on the heater.")
        else: 
            print("You plants are freezing.")




    lightValues = jsonLines.filter(lambda x: re.findall(r"light.*", x, 0))
    lightValues.pprint(num=10000)
    parsedlightValues = lightValues.map(lambda x: re.sub(r"\"light\":", "", x).split(',')[0])

    airHumidityValues = jsonLines.filter(lambda x: re.findall(r"airHumidity.*", x, 0))
    airHumidityValues.pprint(num=10000)
    parsedairHumidityValues = airHumidityValues.map(lambda x: re.sub(r"\"airHumidity\":", "", x).split(',')[0])

    airTempratureValues = jsonLines.filter(lambda x: re.findall(r"airTemprature.*", x, 0))
    airTempratureValues.pprint(num=10000)
    parsedairTempratureValues = airTempratureValues.map(lambda x: re.sub(r"\"airTemprature\":", "", x).split(',')[0])




    countMap1 = parsedlightValues.map(lambda x: 1).reduce(add)
    valueCount1 = countMap1.map(lambda x: "Total Count of Msgs: " + unicode(x))
    valueCount1.pprint()

    countMap2 = parsedairHumidityValues.map(lambda x: 1).reduce(add)
    valueCount2 = countMap2.map(lambda x: "Total Count of Msgs: " + unicode(x))
    valueCount2.pprint()

    countMap3 = parsedairTempratureValues.map(lambda x: 1).reduce(add)
    valueCount3 = countMap3.map(lambda x: "Total Count of Msgs: " + unicode(x))
    valueCount3.pprint()




    sortedValues1 = parsedlightValues.transform(lambda x: x.sortBy(lambda y: y))
    sortedValues1.pprint(num=10000)

    sortedValues2 = parsedairHumidityValues.transform(lambda x: x.sortBy(lambda y: y))
    sortedValues2.pprint(num=10000)


    sortedValues3 = parsedairTempratureValues.transform(lambda x: x.sortBy(lambda y: y))
    sortedValues3.pprint(num=10000)


    parsedlightValues.foreachRDD(processLightRDD)
    parsedairHumidityValues.foreachRDD(processAirHumidityRDD)
    parsedairTempratureValues.foreachRDD(processAirTempratureRDD)


    ssc.start()
    ssc.awaitTermination()
