# Spark streamer

In [1]:
# Importing all the required modules
from pyspark import SparkContext
from pyspark.streaming import StreamingContext


In [2]:
import logging # python logging module

# basic format for logging
logFormat = "%(asctime)s - [%(levelname)s] (%(funcName)s:%(lineno)d) %(message)s"

# logs will be stored in tweepy.log
logging.basicConfig(filename='SparkStreaming.log', level=logging.INFO, 
                   format=logFormat, datefmt="%Y-%m-%d %H:%M:%S")


In [3]:
# Initialize the spark context
sc = SparkContext(appName="TwitterAnalysis")
sc.setLogLevel("ERROR")


In [4]:
# Initialize the streaming context
ssc = StreamingContext(sc, 30)


In [5]:
# Connect to the stream data
host = "localhost"
port = 8890
socketStream = ssc.socketTextStream(host, port)

In [6]:
lines = socketStream.window(60) # set the sliding window

#data = lines.map(lambda text : text.split(":"))
#data = data.map(lambda line: (line[0], int(line[1])))

def parse(line):
    try:
        k, v = line.split(":")
        return [(k, int(v))]
    except:
        return []

parsed = lines.flatMap(parse)
sorted_ = parsed.transform(
    lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
sorted_.pprint(20)
#author_count_sorted_dstream = hashtags.transform(lambda foo : foo.sortBy(lambda x : x[0].lower()).sortBy(lambda x : x[1].lower(),ascending=False))

#author_count_sorted_dstream.pprint()
#data.pprint()

In [7]:
# Spark streaming starts here
try:
    ssc.start()
    ssc.awaitTermination()
except KeyboardInterrupt:
    logging.error("Stopping the streaming")
except Exception as e:
    logging.error("An unhandled exception has occured.")
    logging.error(e)