In [1]:
from pyspark import SparkConf, SparkContext
import sys

conf = SparkConf().setAppName("Spark Application")
sc = SparkContext(conf=conf)

In [2]:
timeslotDictionary = {
    "0":"0",
    "1":"0",
    "2":"0",
    "3":"0",
    "4":"1",
    "5":"1",
    "6":"1",
    "7":"1",
    "8":"2",
    "9":"2",
    "10":"2",
    "11":"2",
    "12":"3",
    "13":"3",
    "14":"3",
    "15":"3",
    "16":"4",
    "17":"4",
    "18":"4",
    "19":"4",
    "20":"5",
    "21":"5",
    "22":"5",
    "23":"5"
}

def timeslotMapping(value):
    global timeslotDictionary
    return timeslotDictionary[str(value)]
    

In [3]:
# Read the content of a log file
neighborsRDD = sc.textFile("neighbors.txt") # stationId,neighborstationId1 neighborstationId2 ...
readingsRDD = sc.textFile("readings.txt") # stationId,date,hour,minute,num_of_bikes,num_of_free_slots

# Compute the percentage of critical situations for each station
## A station is in a critical situtation if the number of free slots is below a threshold (eg 3 slots)
## The percentage of critical situations for a station is defined as
##     numOfCriticalReadingsAssociatedWithStation / totalNumberOfReadingsAssociatedWithStation

freeSlotsThreshold = 3

numberOfCriticalSituationsForStationRDD = readingsRDD.filter(lambda value: int(value.split(",")[5]) < freeSlotsThreshold).map(lambda value: (value.split(",")[0], 1)).groupByKey().map(lambda pair: (pair[0], len(list(pair[1]))) )    
numberOfReadingsForStationRDD = readingsRDD.map(lambda value: (value.split(",")[0], 1)).groupByKey().map(lambda pair: (pair[0], len(list(pair[1]))) )

joinReadingsRDD = numberOfCriticalSituationsForStationRDD.join(numberOfReadingsForStationRDD).map(lambda pair: (pair[0], (int(pair[1][0]) / int(pair[1][1]) ) * 100))

for pair in joinReadingsRDD.collect():
    print(pair[0], pair[1])
    
criticalSituationPercentageThreshold = 80
outputJoinReadingsRDD = joinReadingsRDD.map(lambda pair: (pair[1], pair[0])).filter(lambda pair: pair[0] > criticalSituationPercentageThreshold).sortByKey().map(lambda pair: (pair[1], pair[0]))

outputJoinReadingsRDD.saveAsTextFile("critical_situations_stations")

s2 25.0
s3 40.0
s1 20.0
s4 100.0
s5 20.0


In [4]:
# Compute the percentage of critical situations for each pair (timeslot, station)
## Timeslots can assume the following 6 values
### [0-3]
### [4-7]
### [8-11]
### [12-15] 
### [16-19] 
### [20-23]

readingsTimeSlotRDD = readingsRDD.map(lambda value: value.split(",")[0] + "," + value.split(",")[1] + "," + value.split(",")[2] + "," + value.split(",")[3] +  "," + timeslotMapping(value.split(",")[4]) +  "," + value.split(",")[5])

criticalSituationsForStationRDD = readingsTimeSlotRDD.filter(lambda value: int(value.split(",")[5]) < freeSlotsThreshold)
numberOfCriticalSituationsForStationAndTimeslotRDD = criticalSituationsForStationRDD.map(lambda value: ((value.split(",")[0], value.split(",")[4]), 1)).groupByKey().map(lambda pair: (pair[0], len(list(pair[1]))))

numberOfReadingsForStationAndTimeSlotRDD = readingsTimeSlotRDD.map(lambda value: ((value.split(",")[0], value.split(",")[4]), 1)).groupByKey().map(lambda pair: (pair[0], len(list(pair[1]))))
    
joinReadingsRDD = numberOfCriticalSituationsForStationAndTimeslotRDD.join(numberOfReadingsForStationAndTimeSlotRDD).map(lambda pair: (pair[0], (int(pair[1][0]) / int(pair[1][1]) ) * 100))

for pair in joinReadingsRDD.collect():
    print(pair[0], pair[1])
    
criticalSituationPercentageThreshold = 80
outputJoinReadingsRDD = joinReadingsRDD.map(lambda pair: (pair[1], pair[0])).filter(lambda pair: pair[0] > criticalSituationPercentageThreshold).sortByKey().map(lambda pair: (pair[1], pair[0]))

outputJoinReadingsRDD.saveAsTextFile("critical_situations_stations_timeslots")

('s1', '2') 100.0
('s3', '2') 100.0
('s3', '1') 25.0
('s2', '2') 100.0
('s4', '1') 100.0
('s5', '1') 100.0
