In [26]:
import sys
from datetime import datetime

In [85]:
sep = '\t'

stationsInput = sc.textFile("input/stations.csv")
registerInput = sc.textFile("input/registerSample.csv")

In [86]:
#remove headers of both files
stationsInput = stationsInput.filter(lambda x: "id" not in x)
registerInput = registerInput.filter(lambda x: "station" not in x)

In [87]:
crit_thres = 0.4

def filterStations(line):
    #stationId\ttimestamp\tusedslots\tfreeslots
    fields = line.split(sep)
    if (fields[2] == '0' and fields[3] == '0'):
        return False
    return True

registerRDD = registerInput.filter(filterStations).cache()

In [88]:
registerRDD.collect()[0:5]

                                                                                

['1\t2008-05-15 12:01:00\t0\t18',
 '1\t2008-05-15 12:02:00\t0\t18',
 '1\t2008-05-15 12:04:00\t0\t18',
 '1\t2008-05-15 12:06:00\t0\t18',
 '1\t2008-05-15 12:08:00\t0\t18']

In [89]:
def mapRegRDD(line):
    #stationId\ttimestamp\tusedslots\tfreeslots
    #return ((stationId, timestamp), (n, m))
    fields = line.split(sep)
    date = datetime.strptime(fields[1], "%Y-%m-%d %H:%M:%S")
    dayOfWeek = date.strftime("%A")
    hour = date.hour
    timestamp = dayOfWeek + "," + str(hour)

    #map to ((stationId, timestamp), (n, m))
    #if freeslots == 0 -> n=1 otherwise n = 0
    #m=1 each time
    freeSlots = int(fields[3])
    if freeSlots == 0:
        return ((fields[0], timestamp), (1, 1))
    
    return ((fields[0], timestamp), (0, 1))

In [90]:
def mapStatRDD(line):
    #stationId\tname\tlon\tlat
    #return (stationId, (lon, lat))
    fields = line.split(sep)
    return (fields[0], (float(fields[1]), float(fields[2])))

In [91]:
stationsRDD = stationsInput.map(mapStatRDD).cache()

In [92]:
stationsRDD.collect()[0:5]

[('1', (2.180019, 41.397978)),
 ('2', (2.176414, 41.394381)),
 ('3', (2.181164, 41.39375)),
 ('4', (2.1814, 41.393364)),
 ('5', (2.180214, 41.391072))]

In [93]:
criticalRDD = registerRDD.map(mapRegRDD)\
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
    .mapValues(lambda x: float(x[0]) / float(x[1]))

In [94]:
criticalRDD.collect()[0:5]

                                                                                

[(('1', 'Thursday,13'), 0.007532956685499058),
 (('1', 'Thursday,14'), 0.04189435336976321),
 (('1', 'Thursday,15'), 0.032758620689655175),
 (('1', 'Thursday,16'), 0.04424778761061947),
 (('1', 'Thursday,19'), 0.0)]

In [95]:
 criticalRDD = criticalRDD.filter(lambda x: x[1] >= crit_thres).cache()

In [96]:
criticalRDD.collect()[0:5]

[(('1', 'Thursday,0'), 0.4581005586592179),
 (('1', 'Thursday,1'), 0.4329608938547486),
 (('1', 'Sunday,4'), 0.403899721448468)]

In [97]:
'''
now we have ((stationId, timestamp), criticality)
we have to select the most critical timestamp for each station
If there are two or
more timeslots characterized by the highest criticality value for the same station,
select only one of those timeslots. Specifically, select the one associated with the
earliest hour. If also the hour is the same, consider the lexicographical order of the
name of the week day. 
'''

def customReducerByKey(x, y):
    #x, y are keys in this format (timestamp, criticality)
    if (float(x[1]) > float(y[1])):
        return x
    elif (float(x[1]) < float(y[1])):
        return y
    
    #if the criticality is the same tehn select the one with the earliest hour
    h1 = int(x[0].split(',')[1])
    h2 = int(y[0].split(',')[1])
    if h1 < h2:
        return x
    elif h1 > h2:
        return y
    
    #if the hour is the same then consider the lexicographical order of the name of the week day
    d1 = x[0].split(',')[0]
    d2 = y[0].split(',')[0]
    if d1 < d2:
        return x
    return y

In [98]:
#Map criticalRDD to ((stationId, (timestamp, criticality)))
#Use custom reducer to select the most critical timestamp for each station
#then join with stationsRDD to get the lon, lat for each stations (it's a left join) -> (stationId, ((timestamp, criticality), (lon, lat)))
criticalityTSPerStationRDD = criticalRDD.map(lambda t: (t[0][0], (t[0][1], t[1])))\
    .reduceByKey(customReducerByKey)

In [99]:
criticalityTSPerStationRDD.collect()

[('1', ('Thursday,0', 0.4581005586592179))]

In [100]:
#Join with the stations table to get long and lat for each station -> (stationId, ((timestamp, criticality), (lon, lat)))
criticalityTSPerStationRDD = criticalityTSPerStationRDD.join(stationsRDD)

In [101]:
criticalityTSPerStationRDD.collect()

[('1', (('Thursday,0', 0.4581005586592179), (2.180019, 41.397978)))]

In [102]:
def mapToKMLPlacemark(record):
    #record come as: (stationId, ((timestamp, criticality), (lon, lat)))
    #return the KML Placemark
    '''
    <Placemark><name>44</name><ExtendedData><Data
    name="DayWeek"><value>Mon</value></Data><Data
    name="Hour"><value>3</value></Data><Data
    name="Criticality"><value>0.5440729483282675</value></Data></ExtendedData><
    Point><coordinates>2.189700,41.379047</coordinates></Point></Placemark>
    '''
    pl = "<Placemark><name>" + record[0] + "</name><ExtendedData><Data " + \
        "name=\"DayWeek\"><value>" + record[1][0][0].split(',')[0] + "</value></Data><Data " + \
        "name=\"Hour\"><value>" + record[1][0][0].split(',')[1] + "</value></Data><Data " + \
        "name=\"Criticality\"><value>" + str(record[1][0][1]) + "</value></Data></ExtendedData><" + \
        "Point><coordinates>" + str(record[1][1][0]) + "," + str(record[1][1][1]) + "</coordinates></Point></Placemark>"
    
    return pl

In [103]:
criticalityTSPerStationRDD.map(mapToKMLPlacemark).collect()

['<Placemark><name>1</name><ExtendedData><Data name="DayWeek"><value>Thursday</value></Data><Data name="Hour"><value>0</value></Data><Data name="Criticality"><value>0.4581005586592179</value></Data></ExtendedData><Point><coordinates>2.180019,41.397978</coordinates></Point></Placemark>']