In [1]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SparkSession
import time
from cassandra.cluster import Cluster

In [2]:
cassandra_host = ['127.0.0.1']
cassandra_port = 9042
cassandra_keyspace = 'assignment3'

In [3]:
inputDirectoryPath = '/home/ishan/Desktop/Assignment3/streaming/'

conf = (SparkConf()
        .setAppName("Assignment3")
        .set('spark.cassandra.connection.host', cassandra_host[0])
        .set('spark.cassandra.connection.port', cassandra_port)
        .set('spark.jars', '/home/ishan/Desktop/Assignment3/jars/spark-cassandra-connector_2.11-2.3.1.jar,/home/ishan/Desktop/Assignment3/jars/jsr166e-1.1.0.jar')
        .setMaster('local[*]'))
sc = SparkContext.getOrCreate(conf = conf)
ssc = StreamingContext(sparkContext = sc, batchDuration = 5)
spark = SparkSession.builder.config(conf=conf).getOrCreate()

rows = ssc.textFileStream(inputDirectoryPath)
rows = rows.map(lambda row: row.split(','))

In [4]:
cassandra_create_queries = [
    "CREATE KEYSPACE IF NOT EXISTS " + cassandra_keyspace + " WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};",
    'use ' + cassandra_keyspace + ';',
    'create table if not exists q1 (time text, classname text, count int, primary key(time, classname));',
    'create table if not exists q2 (time text, classname text, avg_speed double, primary key(time, classname));',
    'create table if not exists q3 (time text, counter_site text, count int, primary key(time, counter_site));',
    'create table if not exists q4 (time text, classname text, count int, primary key(time, classname));'
]

def setupCassandra():
    cluster = Cluster(cassandra_host)
    session = cluster.connect()
    for query in cassandra_create_queries:
        print("Executing - {}".format(query))
        session.execute(query)

In [5]:
setupCassandra()

  # This is added back by InteractiveShellApp.init_path()


Executing - CREATE KEYSPACE IF NOT EXISTS assignment3 WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
Executing - use assignment3;
Executing - create table if not exists q1 (time text, classname text, count int, primary key(time, classname));
Executing - create table if not exists q2 (time text, classname text, avg_speed double, primary key(time, classname));
Executing - create table if not exists q3 (time text, counter_site text, count int, primary key(time, counter_site));
Executing - create table if not exists q4 (time text, classname text, count int, primary key(time, classname));


In [6]:
# 2. Prepare the streaming application to read the data streams from the streaming directory using a batch length
# of 5 seconds.

In [7]:
# Define the following streaming computations (every 5 seconds):

In [8]:
# 1. Show total number of counts (on each site of M50) by vehicle class.

In [9]:
vehicleClassColIndex = 14

vehicleCountPerClass = (rows
                        .map(lambda row: (row[vehicleClassColIndex].strip('"'), 1))
                        .reduceByKey(lambda x, y: x + y))

def saveQ1(time, rdd):
    if not rdd.isEmpty():
        rowRDD = rdd.map(lambda row: Row(time=time, classname=row[0], count=row[1]))
        df = spark.createDataFrame(rowRDD)
        (df.write.format("org.apache.spark.sql.cassandra")
         .mode('append')
         .options(table="q1", keyspace=cassandra_keyspace)
         .save())

vehicleCountPerClass.foreachRDD(saveQ1)

In [10]:
# 2. Compute the average speed (on each site on M50) by vehicle class.

In [11]:
vehicleClassColIndex = 14
speedColIndex = 18

averageSpeedPerClass = (rows
                       .map(lambda row: (row[vehicleClassColIndex].strip('"'), float(row[speedColIndex].strip('"'))))
                       .groupByKey()
                       .map(lambda row: (row[0], list(row[1])))
                       .map(lambda row: (row[0], sum(row[1]) / len(row[1]))))

def saveQ2(time, rdd):
    if not rdd.isEmpty():
        rowRDD = rdd.map(lambda row: Row(time=time, classname=row[0], avg_speed=row[1]))
        df = spark.createDataFrame(rowRDD)
        (df.write.format("org.apache.spark.sql.cassandra")
         .mode('append')
         .options(table="q2", keyspace=cassandra_keyspace)
         .save())

averageSpeedPerClass.foreachRDD(saveQ2)

In [12]:
# 3. Find the top 3 busiest counter sites on M50.

In [13]:
counterSitesColIndex = 10

counterSitesPerClass = (rows
                       .map(lambda row: (row[counterSitesColIndex].strip('"'), 1))
                       .reduceByKey(lambda x, y: x + y)
                       .transform(lambda row: row.sortBy(lambda x: x[1], ascending=False)))

def saveQ3(time, rdd):
    if not rdd.isEmpty():
        rowRDD = rdd.map(lambda row: Row(time=time, counter_site=row[0], count=row[1]))
        df = spark.createDataFrame(rowRDD)
        # Limit top 3 values
        (df.limit(3).write.format("org.apache.spark.sql.cassandra")
         .mode('append')
         .options(table="q3", keyspace=cassandra_keyspace)
         .save())

counterSitesPerClass.foreachRDD(saveQ3)

In [14]:
# 4. Find total number of counts for HGVs on M50.

In [15]:
vehicleClassColIndex = 14
counterSitesColIndex = 10

countsForHGVs = (rows
                .filter(lambda row: "HGV" in row[vehicleClassColIndex])
                .map(lambda row: (row[counterSitesColIndex].strip('"'), 1))
                .reduceByKey(lambda x, y: x + y))

def saveQ4(time, rdd):
    if not rdd.isEmpty():
        rowRDD = rdd.map(lambda row: Row(time=time, classname=row[0], count=row[1]))
        df = spark.createDataFrame(rowRDD)
        (df.write.format("org.apache.spark.sql.cassandra")
         .mode('append')
         .options(table="q4", keyspace=cassandra_keyspace)
         .save())

countsForHGVs.foreachRDD(saveQ4)

In [16]:
ssc.start()

In [17]:
ssc.awaitTerminationOrTimeout(timeout = 300)  # Timeout after 5 * 60 = 300 seconds (5 minutes)

False