In [1]:
"""
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("Barcelona_Bike_Sharing")
sc = SparkContext(conf = conf)
"""

'\nfrom pyspark import SparkConf, SparkContext\nconf = SparkConf().setAppName("Barcelona_Bike_Sharing")\nsc = SparkContext(conf = conf)\n'

In [8]:
print(sc.version)

4.0.0


In [9]:
import sys

In [10]:
inputStations = "In_data/stations.csv"
inputRegister = "In_data/registerSample.csv"
outputPath = "Out_data/"

In [11]:
inputRDD = sc.textFile(inputRegister)

In [15]:
#to remove the header and the lines with #free slots=0 && #used slots=0
def filterFunc(line):
    # Remove header
    if line.startswith('s'):
        return False
    else:
        fields = line.split("\t")
        usedSlots = int(fields[2])
        freeSlots = int(fields[3])

        if freeSlots != 0 or usedSlots != 0:
            return True
        else:
            return False

In [16]:
filteredRDD = inputRDD.filter(filterFunc

In [19]:
#print(filteredRDD.take(5))

In [20]:
from datetime import datetime

def checkFull(line):
    # station\ttimestamp\tused\tfree
    # 1\t2008-05-15 12:01:00\t0\t18
    fields = line.split("\t")
    stationId = fields[0]
    freeSlots = int(fields[3])
    timestamp = fields[1]
    
    datetimeObject = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")    
    dayOfTheWeek = datetimeObject.strftime("%A")
    hour = datetimeObject.hour

    if freeSlots == 0:
        # The station is full
        countTotReadingsTotFull = (1, 1)
    else:
        countTotReadingsTotFull = (1, 0)
        
    return ((stationId, dayOfTheWeek, hour), countTotReadingsTotFull)



In [21]:
# Map each line to a pair of (StationId_DayOfTheWeek_Hour, (1,1) if the station is full & (1,0) if the station is not full)
stationWeekDayHour = filteredRDD.map(checkFull)

In [22]:
# Count the total number of readings and "full" readings for each key
stationWeekDayHourCounts = stationWeekDayHour.reduceByKey(lambda p1, p2: (p1[0]+p2[0], p1[1]+p2[1]))

In [23]:
# Compute criticality for each key
stationWeekDayHourCriticality = stationWeekDayHourCounts.mapValues(lambda value: value[1]/value[0])

In [39]:
#print(stationWeekDayHourCriticality.take(5))

In [25]:
# Select only the pairs with criticality > threshold
threshold = 0.4
selectedPairs = stationWeekDayHourCriticality.filter(lambda pair: pair[1]>= threshold)

In [38]:
#print(selectedPairs.take(5))

In [27]:
# Create a new PairRDD with
# key = station Id
# value = DayOfTheWeek, Hour, Criticality
stationTimeslotCrit = selectedPairs.map(lambda StationWeekdayHourCrit:\
                                        (StationWeekdayHourCrit[0][0],\
                                         (StationWeekdayHourCrit[0][1], StationWeekdayHourCrit[0][2],\
                                          StationWeekdayHourCrit[1])\
                                        ))

In [28]:

# Function to compare criticality between two timeslots
def compareCriticality(timeslotCrit1, timeslotCrit2):

    weekday1 = timeslotCrit1[0]
    weekday2 = timeslotCrit2[0]
    
    hour1 = timeslotCrit1[1]
    hour2 = timeslotCrit2[1]

    crit1 = timeslotCrit1[2]
    crit2 = timeslotCrit2[2]
    
    
    
    if crit1>crit2 or \
    (crit1==crit2 and hour1<hour2) or \
    (crit1==crit2 and hour1==hour2 and weekday1<weekday2):
        return timeslotCrit1
    else:
        return timeslotCrit2

In [29]:
resultRDD = stationTimeslotCrit.reduceByKey(compareCriticality)

In [30]:
# Return pair of(stationId, (long, lat))
def extractStationLongLat(line):
    fields = line.split("\t")
    
    return (fields[0], (fields[1] ,fields[2]) )

In [31]:
# Read the location of the stations
stationLocation = sc.textFile(inputStations).map(extractStationLongLat)

In [32]:
# Join the locations with the "critical" stations
resultLocations = resultRDD.join(stationLocation)

In [33]:
# Return a string that represents a KML marker
def formatKMLMarker(pair):
    # input
    # (stationId, ( (weekday, hour, criticality), (long, lat) ) )
    stationId = pair[0]
    
    weekday = pair[1][0][0]
    hour = pair[1][0][1]
    criticality = pair[1][0][2]
    coordinates = pair[1][1][0]+","+pair[1][1][1]
    
    result = "<Placemark><name>" + stationId + "</name>" + "<ExtendedData>"\
    + "<Data name=\"DayWeek\"><value>" + weekday + "</value></Data>"\
    + "<Data name=\"Hour\"><value>" + str(hour) + "</value></Data>"\
    + "<Data name=\"Criticality\"><value>" + str(criticality) + "</value></Data>"\
    + "</ExtendedData>" + "<Point>" + "<coordinates>" + coordinates + "</coordinates>"\
    + "</Point>" + "</Placemark>"
    
    return result



In [34]:
# Create a string containing the description of a marker, in the KML format, for each
# sensor and the associated information
resultKML = resultLocations.map(formatKMLMarker)

In [35]:
# Set the number of partitions to 1 for resultKML and store it in the output folder
resultKML.coalesce(1).saveAsTextFile(outputPath)

In [37]:
#print(resultKML.take(10))  # This will show the first 10 elements in the RDD

In [47]:
spark_output = "Out_data/part-00000"

# Read Spark output
with open(spark_output, "r") as f:
    spark_data = f.read()


In [44]:
!touch Out_data/output.kml

In [49]:

# Write into a KML file
with open("Out_data/output.kml", "w") as f2:
    f2.write(
        """<kml xmlns="http://www.opengis.net/kml/2.2"><Document>"""
        + spark_data +
        """</Document></kml>"""
    )