 ## Exercise 43 - Bike Sharing

In [24]:
inputPathReadings = "/data/students/bigdata-01QYD/ex_data/Ex43/data/readings.txt"
inputPathNeighbors = "/data/students/bigdata-01QYD/ex_data/Ex43/data/neighbors.txt"

outputPath = "out_Ex43/"
outputPath2 = "out_Ex43_2/"
outputPath3 = "out_Ex43_3/"

threshFreeSlots = 3
threshCriticalPercentage = 0.8

__Input__ : 

* A textual csv file containing the occupancy of the stations of a bike sharing system. Each line of the file contains one sensor reading/sample has the following format (stationId,date,hour,minute,num_of_bikes,num_of_free_slots). Some readings are missing due to temporarily malfunctions of the stations. Hence, the number of samplings is not exactly the same for all stations. The number of distinct stations is 100.

* A second textual csv file containing the list of  neighbors of each station. Each line of the file has the following format stationIdx, list of neighbors of stationIdx.

__Output__ : 

1. A file containing one line for each question. Each line contains a question and the list of answers to that question (QuestionId, TextOfTheQuestion, list of Answers).
   * A station is in a critical situation if the number of free slots is below a user provided threshold (e.g., 3 slots)
   *  The percentage of critical situations for a station Si is defined as (number of critical readings associated with Si)/(total number of readings associated with Si) 
   
2. Store in an HDFS file the stations with a percentage of critical situations higher than 80%  (i.e., stations that are almost always in a critical situation and need to be extended)
    * Each line of the output file is associated with one of the selected stations and contains the percentage of critical situations and the stationId. Sort the stored stations by percentage of critical situations

3. Compute the percentage of critical situations for each pair (timeslot, station)
    * Timeslot can assume the following 6 values : [0-3],[4-7],[8-11],[12-15],[16-19],[20-23]
    
4. Store in an HDFS file the pairs (timeslot, station) with a percentage of critical situations higher than  80% (i.e., stations that need rebalancing operations in specific timeslots)
    * Each line of the output file is associated with one of the selected pairs (timeslot, station) and contains the percentage of critical situations and the pair (timeslot, stationId). Sort the result by percentage of critical situations
    
5. Select a reading (i.e., a line) of the first input file if and only if the following constraints are true
    * The line is associated with a full station situation. 
    * All the neighbor stations of the station Si are full in the time stamp associated with the current line
  
6.  Store the selected readings/lines in an HDFS file and print on the standard output the total number of such lines

In [11]:
readingsRDD = sc.textFile(inputPathReadings)
readingsRDD.collect()[:3]

['s1,2015-05-01,00,00,5,4',
 's2,2015-05-01,00,00,4,4',
 's3,2015-05-01,00,00,6,3']

In [12]:
neighborsRDD = sc.textFile(inputPathNeighbors)
neighborsRDD.collect()[:3]

['s1,s2 s3', 's2,s1 s5', 's3,s1']

__2__ :  HDFS file the stations with a percentage of critical situations higher than 80%

In [25]:
# recall : the number of free slots is the last element

def mapReadings(line):
    elements = line.split(",")
    
    # critical value
    if int(elements[-1]) < threshFreeSlots:
        return (elements[0], (1,1))
    else:
        return (elements[0], (0,1))
        

# map elements 
# (sId, (x,y))
#    x = keep track of the critical situations
#    y = keep track of the total number or readings
pairsReadingsRDD = readingsRDD.map(mapReadings)
pairsReadingsRDD.collect()[:3]                       

[('s1', (0, 1)), ('s2', (0, 1)), ('s3', (0, 1))]

In [27]:
# compute the critical value ratio
# 1. sum the values by key
# 2. compute the ratio CriticalSitutations/TotalReadings
criticalPercentagesRDD = pairsReadingsRDD.reduceByKey(lambda pair1, pair2 : (pair1[0] + pair2[0], pair1[1] + pair2[1]) )\
                                         .mapValues(lambda pair : pair[0] / pair[1] )
criticalPercentagesRDD.collect()

[('s1', 0.2), ('s3', 0.4), ('s2', 0.25), ('s4', 1.0), ('s5', 0.2)]

In [28]:
# filter the stations with a criticalPercentage above threshold
# order them in a decreasing order
criticalStationsRDD = criticalPercentagesRDD.filter(lambda pair : pair[1] > threshCriticalPercentage)\
                                            .sortBy(lambda pair : pair[1], ascending=False)
criticalStationsRDD.collect()

[('s4', 1.0)]

In [None]:
criticalStationsRDD.saveAsTextFile(outputPath)

__2__ : Compute the percentage of critical situations for each pair (timeslot, station).

Timeslot can assume the following 6 values : [0-3],[4-7],[8-11],[12-15],[16-19],[20-23]

In [42]:
dic_mapping = {
    "[0-3]" : [0,1,2,3],
    "[4-7]" : [4,5,6,7],
    "[8-11]" : [8,9,10,11],
    "[12-15]" : [12,13,14,15],
    "[16-19]" : [16,17,18,19],
    "[20-23]" : [20,21,22,23]
}

def mapReadingsV2(line):
    elements = line.split(",")
    
    hour = int(elements[2])
    pair_key = ""
    
    # get pair key 
    # aka timeslot
    for k in dic_mapping:
        if hour in dic_mapping[k]:
            pair_key = k
    
    # critical value
    if int(elements[-1]) < threshFreeSlots:
        return ( (elements[0],pair_key), (1,1))
    else:
        return ( (elements[0],pair_key), (0,1))
        

# map elements 
# (sId, (x,y))
#    x = keep track of the critical situations
#    y = keep track of the total number or readings
pairsTimeSlotsRDD = readingsRDD.map(mapReadingsV2)
pairsTimeSlotsRDD.collect()[:3]                       

[(('s1', '[0-3]'), (0, 1)),
 (('s2', '[0-3]'), (0, 1)),
 (('s3', '[0-3]'), (0, 1))]

In [44]:
# as before
criticalPercentagesV2RDD = pairsTimeSlotsRDD.reduceByKey(lambda pair1, pair2 : (pair1[0] + pair2[0], pair1[1] + pair2[1]) )\
                                            .mapValues(lambda pair : pair[0] / pair[1] )\
                                            .filter(lambda pair : pair[1] > threshCriticalPercentage)\
                                            .sortBy(lambda pair : pair[1], ascending=False)
criticalPercentagesV2RDD.collect()

[(('s4', '[0-3]'), 1.0), (('s4', '[12-15]'), 1.0)]

In [45]:
criticalStationsRDD.saveAsTextFile(outputPath2)

__5__ : Select a reading (i.e., a line) of the first input file if and only if the following constraints are true

* The line is associated with a full station situation.
* All the neighbor stations of the station Si are full in the time stamp associated with the current line



In [47]:
## from the professor

# Map each line of the input file to a pair stationid, list of neighbor stations
nPairRDD = neighborsRDD.map(lambda line: (line.split(",")[0], line.split(",")[1].split(" ")) )
nPairRDD.collect()

[('s1', ['s2', 's3']),
 ('s2', ['s1', 's5']),
 ('s3', ['s1']),
 ('s4', ['s5']),
 ('s5', ['s4', 's2'])]

In [53]:
# Create a local dictionary in the main memory of the driver that will be used to store the mapping 
# stationid -> list of neighbors
# There are only 100 stations. Hence, you can suppose that data about neighbors can be stored in the main memory
neighbors=nPairRDD.collectAsMap()

In [54]:
# Select the lines/readings associated with a full status (number of free slots equal to 0)
fullStatusLines = readingsRDD.filter(lambda line: int(line.split(",")[5])==0)

In [55]:
def extractTimestamp(reading):
    fields = reading.split(",")
    timestamp = fields[1] + fields[2] + fields[3]
    
    return timestamp

In [56]:
# Create an RDD of pairs with key = timestamp and value=reading associated with that timestamp
# The concatenation of fields[1], fields[2], fields[3] is the timestamp of the reading
fullLinesPRDD = fullStatusLines.map(lambda reading: (extractTimestamp(reading), reading))
fullLinesPRDD.collect()

[('2015-05-020000', 's1,2015-05-02,00,00,9,0'),
 ('2015-05-020000', 's2,2015-05-02,00,00,8,0'),
 ('2015-05-020000', 's3,2015-05-02,00,00,9,0')]

In [57]:
#  Collapse all the values with the same key in one single pair (timestamp, reading associated with that timestamp)
fullReadingsPerTimestamp = fullLinesPRDD.groupByKey()
fullReadingsPerTimestamp.mapValues(lambda v: list(v)).collect()

[('2015-05-020000',
  ['s1,2015-05-02,00,00,9,0',
   's2,2015-05-02,00,00,8,0',
   's3,2015-05-02,00,00,9,0'])]

In [61]:
def selectReadingssFunc(pairTimeStampListReadings):
    # Extract the list of stations that appear in the readings
    # associated with the current key 
    # (i.e., the list of stations that are full in this timestamp)
    # The list of readings is in the value part of the inpput key-value pair
    stations = []
    for reading in pairTimeStampListReadings[1]:
        # Extract the stationid from each reading
        fields = reading.split(",")
        stationId = fields[0]
        stations.append(stationId)
        
        
    # Iterate again over the list of readings to select the readings satistying the constraint on the 
    # full status situation of all neighboors 
    selectedReading = []

    for reading in pairTimeStampListReadings[1]:
        # This reading must be selected if all the neighbors of
        # the station of this reading are also in the value of
        # the current key-value pair (i.e., if they are in list stations)
        # Extract the stationid of this reading
        fields = reading.split(",")
        stationId = fields[0]

        # Select the list of neighbors of the current station
        nCurrentStation = neighbors[stationId]
        
        # Check if all the neighbors of the current station are in value 
        # (i.e., the local list stations) of the current key-value pair
        allNeighborsFull = True
        
        for neighborStation in nCurrentStation:
            if neighborStation not in stations:
                # There is at least one neighbor of th current station
                # that is not in the full status in this timestamp
                allNeighborsFull = False
                
        if allNeighborsFull == True:
            selectedReading.append(reading)
            
    return selectedReading

In [62]:
# Each pair contains a timestamp and the list of readings (with number of free slots equal to 0) 
# associated with that timestamp.
# Check, for each reading in the list, if all the neighbors of the station of that reading are 
# also present in this list of readings
# Emit one "string" for each reading associated with a completely full status 
selectedReadingsRDD = fullReadingsPerTimestamp.flatMap(selectReadingssFunc)

In [63]:
selectedReadingsRDD.collect()

['s1,2015-05-02,00,00,9,0', 's3,2015-05-02,00,00,9,0']