## This notebook is part of Hadoop and Spark training delivered by IT-DB group
### SPARK Streaming Hands-On Lab
_ by Prasanth Kothuri _

### Hands-On 1 - Stream processing using Spark Streaming and Kafka
*This demonstrates processing of unbounded data from Kafka topic and perform simple string manipulations and aggregations*

#### Import the required modules

In [None]:
import os
import json
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

#### Make spark streaming kafka module available to Spark executors

In [None]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 pyspark-shell'

#### Create SparkContext

In [None]:
conf = SparkConf().setMaster("local[*]").set("spark.driver.memory", "2g").set("spark.executor.memory", "2g")
sc = SparkContext(conf = conf)

#### Create streaming context

In [None]:
ssc = StreamingContext(sc, 60)

#### Hook upto kafka topic

In [None]:
kafkaStream = KafkaUtils.createStream(ssc, 'sstreaming:2181', 'spark-streaming-pkothuri', {'twitter_json':1})

#### Parse the messages into json

In [None]:
tweets_json = kafkaStream.map(lambda x: json.loads(x[1]))

#### Number of tweets in each batch

In [None]:
tweets_json.count().map(lambda x:'Number of tweets in this batch: %s' % x).pprint()

#### Count tweets by location

In [None]:
location_counts = tweets_json.map(lambda tweet: tweet['payload']['user']['location']).countByValue()

In [None]:
top_locations = location_counts \
    .transform( (lambda foo:foo .sortBy(lambda x:( -x[1]))) ) \
    .transform(lambda rdd:sc.parallelize(rdd.take(5)))

In [None]:
top_locations.pprint()

#### High frequency words in the tweets

In [None]:
tweets_json \
    .flatMap(lambda tweet:tweet['payload']['text'].split(" ")) \
    .countByValue() \
    .transform(lambda rdd:rdd.sortBy(lambda x:-x[1])) \
    .pprint()

#### Start the streaming context

In [None]:
ssc.start()
ssc.awaitTermination(timeout=180)

#### stop the streaming context

In [None]:
ssc.stop()

### Hands-On 2 - Stream processing using Spark Streaming and Kafka
*This demonstrates spark streaming window operations*

#### Restart the kernel to clear all the variables; we are going to create streaming context again
In the top menu, Kernel -> Restart

#### Import the required modules

In [None]:
import os
import json
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

#### Make spark streaming kafka module available to Spark executors

In [None]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 pyspark-shell'

#### Window functionality stream processing code

In [None]:
def createStreamingContext():  
    conf = SparkConf().setMaster("local[*]").set("spark.driver.memory", "2g").set("spark.executor.memory", "2g")
    sc = SparkContext(conf = conf)
    ssc = StreamingContext(sc, 10)

    # Hook upto kafka topic (streamingcontext, zookeeper endpoint, consumer_name, dictionary of topic and offset)
    kafkaStream = KafkaUtils.createStream(ssc, 'sstreaming:2181', 'spark-streaming-pkothuri', {'twitter_json':1})

    # Extract and parse the tweets
    tweets_json = kafkaStream.map(lambda x: json.loads(x[1]))
    
    # Number of tweets in this batch
    count_this_batch = kafkaStream.count().map(lambda x:('Tweets this batch: %s' % x))

    # One minute rolling counts
    count_windowed = kafkaStream.countByWindow(60,10).map(lambda x:('Tweets total (One minute rolling count): %s' % x))
    
    # Top 10 hashtags
    hashTags = tweets_json.map(lambda tweet: tweet['payload']['text']) \
                            .flatMap(lambda x : x.split(" ")) \
                            .filter(lambda x: x.startswith("#"))
                                                  
    count_hashTags = hashTags.countByValue() \
                                .transform(lambda rdd:rdd .sortBy(lambda x:-x[1])) \
                                .map(lambda x:"hashTag counts this batch:\tValue %s\tCount %s" % (x[0],x[1]))
                                                  
    count_hashTags_windowed = hashTags.countByValue() \
                                .transform(lambda rdd:rdd .sortBy(lambda x:-x[1])) \
                                .map(lambda x:"hashTag counts one minute rolling:\tValue %s\tCount %s" % (x[0],x[1]))

    # Write totals to stdout
    count_this_batch.pprint()
    count_windowed.pprint()                              
    count_hashTags.pprint(5)
    count_hashTags_windowed.pprint(5)

    return ssc

#### Start the streaming context

In [None]:
# checkpoint directory required for windowing functionality
ssc = StreamingContext.getOrCreate('/tmp/checkpoint001',lambda: createStreamingContext())  
ssc.start()  
ssc.awaitTermination() 

In [None]:
ssc.stop()