# Big Data - Dashboard
## ALBICHARI Kaïs - D'HOSE Tanguy - ULB

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import sys
import os
from datetime import datetime, time, timedelta
from dateutil.relativedelta import relativedelta
from pathlib import Path
from kafka import KafkaConsumer
from kafka.errors import KafkaError

In [2]:
import headtail
headtail.head('data/data.conv.txt', 5)

['2017-03-31 03:38:16.508 1-0 122.153 2.03397\n',
 '2017-03-31 03:38:15.967 1-1 -3.91901 2.09397\n',
 '2017-03-31 03:38:16.577 1-2 11.04 2.07397\n',
 '2017-02-28 00:59:16.359 1-0 19.9884 2.74964\n',
 '2017-02-28 00:59:16.803 1-1 37.0933 2.76964\n']

In [3]:
STREAM_IN = "stream-IN"

sc = SparkContext("local[*]", "test")
sc.setLogLevel("WARN")   #Make sure warnings and errors observed by spark are printed.

ssc = StreamingContext(sc, 30)  #generate a mini-batch every 5 seconds

filestream = ssc.textFileStream(STREAM_IN) #monitor new files in folder stream-IN
print(filestream)

<pyspark.streaming.dstream.DStream object at 0x7ff902776e48>


In [4]:

def parseRow(row):
    '''parses a single row into a dictionary'''
    print("test")
    v = row.split(" ")
    try:
        return [{"topic": int(v[0]),
                 "time": datetime.strptime(v[1] + " "+ v[2], "%Y-%m-%d %H:%M:%S.%f"),
                 "p-i": values[3],
                 "measurement": float(values[4]),
                 "voltage": float(values[5]),
                "municipality": values[6]}]
    except Exception as err:
        print("Unexpected error: %s" % (err))


def checkAttributes(sensor_type, space_tag, time_tag):
    if sensor_type < 0 or sensor_type > 3:
        print("Sensor type value is not supported. Give a value in [0,3]")
        return False
    elif space_tag < 0 or space_tag > 2:
        print("Space tag value is not supported. Give a value in [0,2].")
        print("0: grouped per space")
        print("1: grouped per municipality")
        print("2: grouped for the entirety of Brussels")
        return False
    elif time_tag <0 or time_tag > 4:
        print("Time tag value is not supported. Give a value in [0,4].")
        print("0: last 24h")
        print("1: last 2 days")
        print("2: last week")
        print("3: last month")
        print("4: last year")
        return False
    else:
        return True
    
    
def basicStats(space_tag, time_tag):
    
    #filestream = ssc.textFileStream(STREAM_IN)
    print(filestream)
    rows = filestream.flatMap(parseRow)
    #print(rows.pprint())
    if space_tag == 0: #Group per place
        group_space = rows.map(lambda r: ((r[0],[2].split("-")[0]), r))
        #key = (sensor_type, place), value = row
        
    elif space_tag == 1: #Group per municipality
        group_space = rows.map(lambda r: ((r[0],[-1]), r))
        #key = (sensor_type, municipality), value = row
        
    else:                #Group for Brussels
        group_space = rows.map(lambda r: ((r[0]), r))
        #key = (sensor_type), value = row
    #print(group_space.pprint())
    #current_time = datetime.now()   #Ideally
    current_time = datetime.strptime("2017-02-28 00:09:19.196", "%Y-%m-%d %H:%M:%S.%f") #As in stream_to_kafka file
    if time_tag == 0:   #Last 24h
        d1 = relativedelta(days=1)
        group_time = group_space.filter(lambda r: d1 + r[1][1] > current_time)
        
    elif time_tag == 1:  #Last 48h
        d2 = relativedelta(days=2)
        group_time = group_space.filter(lambda r: d2 + r[1][1] > current_time)
        
    elif time_tag == 2:  #Last week
        w1 = relativedelta(weeks=1)
        group_time = group_space.filter(lambda r: w1 + r[1][1] > current_time)
        
    elif time_tag == 3:  #Last month
        m1 = relativedelta(months=1)
        group_time = group_space.filter(lambda r: m1 + r[1][1] > current_time)
        
    else:                #Last year
        y1 = relativedelta(years=1)
        group_time = group_space.filter(lambda r: y1 + r[1][1] > current_time)
        
    max_by_group = group_time.reduceByKey(lambda r1, r2: max(r1, r2, key=lambda r: r[-3]))
    min_by_group = group_time.reduceByKey(lambda r1, r2: min(r1, r2, key=lambda r: r[-3]))
    print(type(group_time))
    mean_by_group = group_time.transform(lambda rdd: rdd.aggregateByKey((0,0), 
                    lambda acc1, value: (acc1[0] + value[-3]   , acc1[1] + 1   ),
                    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) )\
    .mapValues(lambda v: v[0]/v[1]) \
    .sortBy(lambda pair: pair[1], False))


    max_by_group.pprint()
    min_by_group.pprint()
    mean_by_group.pprint()
    ssc.start()
    ssc.awaitTermination()

    return max_by_group, min_by_group, mean_by_group


M, m, avg = basicStats(2,0)
print(M.pprint())
print()
print(m.pprint())
print()
print(avg.pprint())

<pyspark.streaming.dstream.DStream object at 0x7ff902776e48>
<class 'pyspark.streaming.dstream.TransformedDStream'>
-------------------------------------------
Time: 2019-04-30 16:40:30
-------------------------------------------

-------------------------------------------
Time: 2019-04-30 16:40:30
-------------------------------------------

-------------------------------------------
Time: 2019-04-30 16:40:30
-------------------------------------------

-------------------------------------------
Time: 2019-04-30 16:41:00
-------------------------------------------

-------------------------------------------
Time: 2019-04-30 16:41:00
-------------------------------------------

-------------------------------------------
Time: 2019-04-30 16:41:00
-------------------------------------------

-------------------------------------------
Time: 2019-04-30 16:41:30
-------------------------------------------

-------------------------------------------
Time: 2019-04-30 16:41:30
---------

KeyboardInterrupt: 

In [48]:
data_file = sc.parallelize([
    (0, datetime.strptime("2017-02-28 00:04:18.147", "%Y-%m-%d %H:%M:%S.%f"), "12-0", 17.46, 2.68143, 1000),
    (0, datetime.strptime("2017-02-28 00:04:18.304", "%Y-%m-%d %H:%M:%S.%f"), "11-0", 18.048, 2.59901, 1040),
    (0, datetime.strptime("2017-02-28 00:04:19.494", "%Y-%m-%d %H:%M:%S.%f"), "42-0", 17.46, 2.65143, 1000),
    (1, datetime.strptime("2017-02-28 00:04:18.311", "%Y-%m-%d %H:%M:%S.%f"), "12-1", 43.7518, 2.65143, 1000),
    (1, datetime.strptime("2017-02-28 00:04:18.319", "%Y-%m-%d %H:%M:%S.%f"), "40-1", 40.3652, 2.73532, 1050),
    (1, datetime.strptime("2017-02-28 00:04:19.346", "%Y-%m-%d %H:%M:%S.%f"), "42-1", 43.3858, 2.71143, 1000),
    (2, datetime.strptime("2017-02-28 00:04:18.518", "%Y-%m-%d %H:%M:%S.%f"), "11-2", 3.22, 2.63901, 1040),
    (2, datetime.strptime("2017-02-28 00:04:18.606", "%Y-%m-%d %H:%M:%S.%f"), "12-2", 0.46, 2.68143, 1000),
    (2, datetime.strptime("2017-02-28 00:04:19.570", "%Y-%m-%d %H:%M:%S.%f"), "42-2", 3.68, 2.72143, 1000)])

current_time = datetime.strptime("2017-02-28 00:04:19.394", "%Y-%m-%d %H:%M:%S.%f")
print("max per sensor type - per space - for last 24h")

        
#max_by_group = (data_file.map(lambda x: ((x[0],x[1], x[2].split("-")[0]), x)))  # Convert to PairwiseRD
group_space = data_file.map(lambda x: ((x[0], x[2].split("-")[0]), x))

for group in group_space.collect():
    print(group)
group_time = group_space.filter(lambda x: x[1][1] < current_time)

print("time filter ")
for group in group_time.collect():
    print(group)

max_by_group = group_time.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-3]))
print()
for group in max_by_group.collect():
    print(group)
    
print(type(group_time))
mean_by_group = group_time\
    .aggregateByKey((0,0), 
                    lambda acc1, value: (acc1[0] + value[-3]   , acc1[1] + 1   ),
                    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) )\
    .mapValues(lambda v: v[0]/v[1]) \
    .sortBy(lambda pair: pair[1], False)
print(mean_by_group.collect())
    
#print(max_by_group.collect())
## [('u2', 's2', 10), ('u1', 's1', 20)]

    #Number of orders per minute
    #ordersPerMinute = orders.map(lambda o: 1).window(60, 15) # windows lenth = 60 sec, slide = 15 sec
    #orderCountPerMinute = ordersPerMinute.reduce(add)
print()
    
print("max per sensor type - per municipality - for last 24h")
#max_by_group = (data_file.map(lambda x: ((x[0],x[1], x[-1]), x)))  # Convert to PairwiseRD
max_by_group = (data_file.map(lambda x: ((x[0], x[-1]), x))
# Take maximum of the passed arguments by the last element (key)
  # equivalent to:
  # lambda x, y: x if x[-1] > y[-1] else y
  .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-3])))
  #.values()) # Drop keys
    
for group in max_by_group.collect():
    print(group)
#print(max_by_group.collect())

print()
print("max per sensor type - Brussels - for last 24h")
#max_by_group = (data_file.map(lambda x: ((x[0],x[1]), x)))  # Convert to PairwiseRD
max_by_group = (data_file.map(lambda x: ((x[0]), x))
# Take maximum of the passed arguments by the last element (key)
  # equivalent to:
  # lambda x, y: x if x[-1] > y[-1] else y
  .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-3])))
  #.values()) # Drop keys

for group in max_by_group.collect():
    print(group)
#print(max_by_group.collect())



max per sensor type - per space - for last 24h
((0, '12'), (0, datetime.datetime(2017, 2, 28, 0, 4, 18, 147000), '12-0', 17.46, 2.68143, 1000))
((0, '11'), (0, datetime.datetime(2017, 2, 28, 0, 4, 18, 304000), '11-0', 18.048, 2.59901, 1040))
((0, '42'), (0, datetime.datetime(2017, 2, 28, 0, 4, 19, 494000), '42-0', 17.46, 2.65143, 1000))
((1, '12'), (1, datetime.datetime(2017, 2, 28, 0, 4, 18, 311000), '12-1', 43.7518, 2.65143, 1000))
((1, '40'), (1, datetime.datetime(2017, 2, 28, 0, 4, 18, 319000), '40-1', 40.3652, 2.73532, 1050))
((1, '42'), (1, datetime.datetime(2017, 2, 28, 0, 4, 19, 346000), '42-1', 43.3858, 2.71143, 1000))
((2, '11'), (2, datetime.datetime(2017, 2, 28, 0, 4, 18, 518000), '11-2', 3.22, 2.63901, 1040))
((2, '12'), (2, datetime.datetime(2017, 2, 28, 0, 4, 18, 606000), '12-2', 0.46, 2.68143, 1000))
((2, '42'), (2, datetime.datetime(2017, 2, 28, 0, 4, 19, 570000), '42-2', 3.68, 2.72143, 1000))
time filter 
((0, '12'), (0, datetime.datetime(2017, 2, 28, 0, 4, 18, 147000

In [41]:
current_time = datetime.strptime("2017-02-28 00:16:19.196", "%Y-%m-%d %H:%M:%S.%f")
print(current_time)

past = datetime(2017, 2, 28, 0, 4, 18, 304000)
print(past)
h24 = relativedelta(days=1)
print(current_time -past)
print(h24)
if(past + h24 > current_time):
    print(current_time -past)
    print("past time is within 24h")
#if(current_time - past > h48):
#    print(current_time -past)
#    print("past time is within 48h")

2017-02-28 00:16:19.196000
2017-02-28 00:04:18.304000
0:12:00.892000
relativedelta(days=+1)
0:12:00.892000
past time is within 24h
