In [1]:
"""
CREATE KEYSPACE streaming_vehicle_report WITH replication = {'class': 'SimpleStrategy', 'replication_factor':1};
USE streaming_vehicle_report;
CREATE TABLE cosit_class_count(time text, cosit text, classname text, count int, primary key(time, cosit, count));
CREATE TABLE cosit_class_averagesp(time text, cosit text, classname text, averagesp float, primary key(time, cosit, averagesp));
CREATE TABLE busy_sites(time text, cosit text, count int, primary key(time, cosit, count));
CREATE TABLE count_hgb(time text, count int, primary key(time, count));
"""
from pyspark.sql import *

def cosit_class_count(time, RDD):
    if not RDD.isEmpty():
        rdd = RDD.map(lambda row: Row(time=time, cosit=row[0][0], classname=row[0][1], count=row[1]))
        dataframe = spark.createDataFrame(rdd)
        dataframe.write.format("org.apache.spark.sql.cassandra").options(table="cosit_class_count", keyspace="streaming_vehicle_report").save(mode="append")
    
def cosit_class_averagesp(time, RDD):
    if not RDD.isEmpty():
        rdd = RDD.map(lambda row: Row(time=time, cosit=row[0][0], classname=row[0][1], averagesp=row[1]))
        dataframe = spark.createDataFrame(rdd)
        dataframe.write.format("org.apache.spark.sql.cassandra").options(table="cosit_class_averagesp", keyspace="streaming_vehicle_report").save(mode="append")
    
def busy_sites(time, RDD):
    if not RDD.isEmpty():
        rdd = RDD.map(lambda row: Row(time=time, cosit=row[0], count=row[1]))
        dataframe = spark.createDataFrame(rdd)
        dataframe.limit(3).write.format("org.apache.spark.sql.cassandra").options(table="busy_sites", keyspace="streaming_vehicle_report").save(mode="append")
    
def count_hgb(time, RDD):
    if not RDD.isEmpty():
        rdd = RDD.map(lambda row: Row(time=time, count=row[1]))
        dataframe = spark.createDataFrame(rdd)
        dataframe.write.format("org.apache.spark.sql.cassandra").options(table="count_hgb", keyspace="streaming_vehicle_report").save(mode="append")


In [2]:
from pyspark.streaming import StreamingContext
import time



header = 'cosit,year,month,day,hour,minute,second,millisecond,minuteofday,lane,lanename,straddlelane,straddlelanename,class,classname,length,headway,gap,speed,weight,temperature,duration,validitycode,numberofaxles,axleweights,axlespacings'

# to create dictionary of (line from streaming and its header)
def to_dict(header, line):
    zipped = zip(header.split(','), map(lambda value: value.strip("\""), line.split(",")))
    return dict(zipped)


# creating spark streaming context from spark context with 5 secs.
ssc = StreamingContext(sc, 5)

# for development
# lines = ssc.socketTextStream("localhost", 9999)

# production
lines = ssc.textFileStream("./streams/")

# creating dictionary so its easy to access data later.
dict_data = lines.map(lambda line: to_dict(header, line))


# filtering records for M50
m50_data = dict_data.filter(lambda row:row['cosit'].lstrip('0') in ['1012','1500','1501','1502','1503','1504','1505','1506','1507','1508','1509','15010','15011','15012'])




In [3]:
# Show total number of counts (on each site of M50) by vehicle class.

# grouping m50 data by cosit and classname, and merging rows into lists
# this creates data in format (('xxxcosit','xxxclassname'),[{xxxrow}, {xxxrow}, ..., {xxxrow}])
m50_grouped_by_classname_cosit = m50_data.map(lambda row: ((row['cosit'],row['classname']), row)).map(lambda row : (row[0], [row[1]])).reduceByKey(lambda a,b:a+b)
# better thing to do is 
# ** m50_grouped_by_classname_cosit = m50_data.map(lambda row: ((row['cosit'],row['classname']), 1)).reduceByKey(lambda a,b:a+b)

# taking key and length of value from above DStream
m50_vehicle_count = m50_grouped_by_classname_cosit.map(lambda row: (row[0], len(row[1])))
# ** if we do as marked as (**) above then
# ** m50_grouped_by_classname_cosit = m50_data.map(lambda row: ((row['cosit'],row['classname']), 1)).reduceByKey(lambda a,b:a+b)

# displaying in console
m50_vehicle_count.pprint()

# saving each record into cassandra
m50_vehicle_count.foreachRDD(cosit_class_count)



In [4]:

# Show average speed (on each site of M50) by vehicle class.

# again grouping m50 data by cosit and classname, and merging only speed into lists
m50_grouped_by_classname_cosit_speed = m50_data.map(lambda row: ((row['cosit'],row['classname']), float(row['speed']))).map(lambda row : (row[0], [row[1]])).reduceByKey(lambda a,b:a+b)
# average_speed = sum/length
m50_grouped_by_classname_cosit_speed_averaged = m50_grouped_by_classname_cosit_speed.map(lambda row: (row[0], sum(row[1])/len(row[1])))
# displaying in console
m50_grouped_by_classname_cosit_speed_averaged.pprint()
# saving each record into cassandra
m50_grouped_by_classname_cosit_speed_averaged.foreachRDD(cosit_class_averagesp)




In [5]:
# Find top 3 busiest counter sites on M50

# grouping by cosit and counting records belonging to cosit
m50_grouped_by_cosit_count = m50_data.map(lambda row: (row['cosit'], 1)).reduceByKey(lambda a,b:a+b)
# sorting data in descending order by key.
m50_grouped_by_cosit_count_sorted = m50_grouped_by_cosit_count.transform(lambda rdd: rdd.sortByKey(ascending=False))
# displaying in console
m50_grouped_by_cosit_count_sorted.pprint()
# saving each record into cassandra
m50_grouped_by_cosit_count_sorted.foreachRDD(busy_sites)





In [6]:


# Find total number of counts of HGV on M50

# filtering out HGV's from m50_data, and counting records
m50_total_HGV = m50_data.filter(lambda row: row['classname'].strip().upper() == 'HGV_RIG').map(lambda row:('HGV_RIG',1)).reduceByKey(lambda a,b:a+b)
# displaying data in console.
m50_total_HGV.pprint()
# saving data into cassandra
m50_total_HGV.foreachRDD(count_hgb)




In [7]:

# starting execution
ssc.start()
# running for 600 seconds = 10 min
# you probably may have to stop the kernel manually
time.sleep(600)
# stoping sparkstreamingcontext
ssc.stop(stopSparkContext=False)

-------------------------------------------
Time: 2020-05-04 08:12:40
-------------------------------------------

-------------------------------------------
Time: 2020-05-04 08:12:40
-------------------------------------------

-------------------------------------------
Time: 2020-05-04 08:12:40
-------------------------------------------

-------------------------------------------
Time: 2020-05-04 08:12:40
-------------------------------------------

-------------------------------------------
Time: 2020-05-04 08:12:45
-------------------------------------------
(('000000001012', 'HGV_ART'), 1)
(('000000001507', 'BUS'), 1)
(('000000001503', 'HGV_ART'), 3)
(('000000001506', 'CAR'), 8)
(('000000001506', 'HGV_RIG'), 1)
(('000000001509', 'LGV'), 1)
(('000000001509', 'HGV_ART'), 2)
(('000000015012', 'LGV'), 2)
(('000000001504', 'LGV'), 3)
(('000000015010', 'HGV_ART'), 3)
...



KeyboardInterrupt: 