# Jupyter Notebook with Kafka producer and Spark Streaming Processor
## With Windowed processing

<img src="work/image2.png">

In [None]:
from kafka import KafkaConsumer, KafkaProducer
import sys, os, json

# to get the appropriate version of spark
import findspark
findspark.init()

#    Spark
from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils


In [None]:
topic_name = 'alerts'

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
#         print( ' Message published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))


def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka', ex)
        print(str(ex))
    finally:
        return _producer

In [None]:
import pandas as pd 

df = pd.read_csv('work/alerts.csv');
print(df.head(2))
kafka_producer = connect_kafka_producer()

#  iterate over the alerts csv file and send alerts as the value, and keys can be the alert uuid
for index, row in df.iterrows():
    publish_message(kafka_producer, topic_name, 'alert',  row.to_json())
if kafka_producer is not None:
    kafka_producer.close()

In [None]:
#Setting up environment
# Important when using in jupyter notebook, since we are not submitting the job via command line

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

In [None]:
def sparkWindowedContext():
    # Create SPARK Context
    sc = SparkContext(appName="PythonSparkWindowedStreamingKafka")
    sc.setLogLevel("WARN")
    ## Create Streaming context , with 5 second interval 
    ssc = StreamingContext(sc,  5) 

    ### Connect to KAFKA
    ### consumer group id = spark-streaming 
    ### zookeeper quorum = localhost: 2181 
    ### topic: 'alerts', use 1 cluster for this topic, so {alerts: 1}

    #  After a context is defined, you have to do the following.
    # Define the input sources by creating input DStreams. here KafkaStream is a Dtream type object

    kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {topic_name: 1})

    # Count number of tweets in the batch
    count_this_batch = kafkaStream.count().map(lambda x:('Alert count this batch: %s' % x))
    
    # Count by windowed time period
    alert_count_windowed = kafkaStream.countByWindow(20, 5).map(lambda x:('Total alerts (One minute rolling count): %s' % x))
    
    # Extract data
    parsed = kafkaStream.map(lambda v: json.loads(v[1]))
    
    # Get alerts status 
    source_dstream = parsed.map(lambda alert: alert['src'])
    
    # Count each value and number of occurences 
    count_src_this_batch = source_dstream.countByValue().transform(lambda rdd: rdd).map(lambda x:"Source counts this batch:\tValue %s\tCount %s" % (x[0],x[1]))

    # for the window 
    count_src_windowed = source_dstream.countByValueAndWindow(60,5) .transform(lambda rdd:rdd).map(lambda x:"Alert source counts (One minute rolling):\tValue %s\tCount %s" % (x[0],x[1]))
                                           
    # Write tweet Status counts to stdout
    count_src_this_batch.pprint(5)
    count_src_windowed.pprint(5)     
    
    # Done with a union here instead of two separate pprint statements just to make it cleaner to display
    count_this_batch.union(alert_count_windowed).pprint() 

    return ssc


In [None]:
ssc = StreamingContext.getOrCreate('work/checkpoint_wednesday', lambda: sparkWindowedContext())
ssc.start()
ssc.awaitTermination()