In [100]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from collections import namedtuple

In [101]:
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

In [102]:
def solution(time, rdd):
    try:
        rdd.toDF().registerTempTable("records")
    
        #Solution 1
        query1 = sqlContext.sql('Select Region, Sum(SumOfSumVolume) as totalTrafficVolume \
                            FROM records\
                            GROUP BY Region')
    
    
        query1.write.format("org.apache.spark.sql.cassandra")\
        .options(table="query1", keyspace="streaming")\
        .save(mode="append")
    
    
        temp = sqlContext.sql('SELECT Region, Site, Sum(SumOfSumVolume) as totalTrafficVolume\
                             FROM records\
                             GROUP BY Region, Site')
        temp.registerTempTable("temp")
    
        temp2 = sqlContext.sql('SELECT records.*, temp.totalTrafficVolume\
                                FROM records\
                                LEFT JOIN temp ON records.Region = temp.Region AND records.Site = temp.Site')
    
        temp2.registerTempTable("temp2")
    
    
        #Solution 2

        query2 = sqlContext.sql("SELECT Region, Site, totalTrafficVolume FROM (\
                            SELECT Region, Site, totalTrafficVolume, row_number() over (PARTITION BY Region, Site ORDER BY totalTrafficVolume) as volume_rank\
                            FROM temp2) ranks\
                            GROUP BY Region, Site, totalTrafficVolume\
                            ORDER BY Region, Site, totalTrafficVolume")
    
        query2.write.format("org.apache.spark.sql.cassandra")\
        .options(table="query2", keyspace="streaming")\
        .save(mode="append")

    
        #Solution 3
    
        #3a Display the total volume of each sites whose name starts with A
        query3a = sqlContext.sql("SELECT Site_Description_Cap as Site, Sum(SumOfSumVolume) as totalTrafficVolume\
                            FROM records\
                            GROUP BY Site_Description_Cap\
                            HAVING Site_Description_Cap LIKE 'A%';")
        
        query3a.write.format("org.apache.spark.sql.cassandra")\
        .options(table="query3a", keyspace="streaming")\
        .save(mode="append")
       
    
    
        #3b Display the volume in each site in the month of January
        query3b = sqlContext.sql("SELECT Site, Month, SUM(SumOfSumVolume) as totalTrafficVolume\
                            FROM records\
                            GROUP BY Site, Month\
                            HAVING Month=1\
                            ORDER BY Site")
        query3b.write.format("org.apache.spark.sql.cassandra")\
        .options(table="query3b", keyspace="streaming")\
        .save(mode="append")
        
    except:
        pass

In [103]:
fields = ("Year", "Month", "Day", "Site", "SumOfSumVolume", "Site_Description_Cap", "Region")
Record = namedtuple('Record', fields)

In [104]:
text_stream = ssc.textFileStream('file:///home/bdm/Assignment 3/Streaming/')

In [105]:
lines = text_stream.window(20)

In [106]:
record = lines.filter(lambda value: bool(value) != False)\
              .map(lambda value: value.split("|"))\
              .filter(lambda rec: rec[6] != '')\
              .map(lambda rec: Record(rec[0], rec[1], rec[2], rec[3], rec[4], rec[5], rec[6]))
record.foreachRDD(solution)

In [107]:
ssc.start()

In [108]:
ssc.stop(stopSparkContext=False)