In [1]:
from datetime import datetime 

In [2]:
# Set input and output folders 
inputPath1 = "/data/students/bigdata-01QYD/Lab7_DBD/register.csv"
inputPath2 = "/data/students/bigdata-01QYD/Lab7_DBD/stations.csv"
threshold  = 0.6
outputPath = "Lab7/res_Lab_7"

In [3]:
registerRDD_Header_wrongData = sc.textFile(inputPath1)

def cleanData(line):
    if line.startswith('s'):
        return False
    else:
        fields = line.split("\t")
        usedslots = int(fields[2])
        freeslots = int(fields[3])
        if usedslots == 0 and freeslots == 0:
            return False
        else:
            return True
    
registerRDD = registerRDD_Header_wrongData.filter(cleanData)

In [4]:
def checkFull(line):
    fields = line.split("\t")
    stationId = fields[0]
    timestamp = fields[1]
    freeslots = int(fields[3])
    
    datetimeObject = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")    
    dayOfTheWeek = datetimeObject.strftime("%A")
    hour = datetimeObject.hour
    
    if freeslots == 0:
        countReadingsFull = (1,1)
    else:
        countReadingsFull = (1,0)

    return ((stationId, dayOfTheWeek, hour), countReadingsFull)


stationWeekDayHour = registerRDD.map(checkFull)

In [5]:
#Merge samke key
stationWeekDayHourCount = stationWeekDayHour.reduceByKey(lambda c1, c2: (c1[0]+c2[0], c1[1]+c2[1]))
#Check for criticality
stationWeekDayHourCriticality = stationWeekDayHourCount.mapValues(lambda value: value[1]/value[0])
#Discard critic values
stationWeekDayHour_Critic = stationWeekDayHourCriticality.filter(lambda line: line[1]>= threshold)

#Change the structure of the RDD
station_WeekDayHourCritic = stationWeekDayHour_Critic.map(lambda line: (line[0][0],(line[0][1],line[0][2],line[1])))

In [6]:
def compareCriticality(line1,line2):
    weekday1=line1[0]
    weekday2=line2[0]
    
    hour1=line1[1]
    hour2=line2[1]
    
    critic1=line1[2]
    critic2=line2[2]
    
    if (critic1>critic2) or (critic1==critic2 and hour1<hour2) or (critic1==critic2 and weekday1<weekday2):
        return line1
    else:
        return line2


stationMostCritic = station_WeekDayHourCritic.reduceByKey(compareCriticality)

In [7]:
StationsRDD = sc.textFile(inputPath2)

def longitudeLatitude(line):
    fields=line.split("\t")
    return (fields[0],(fields[1],fields[2]))

stationIdLongitudeLatitude=StationsRDD.map(longitudeLatitude)

In [8]:
resultLocations = stationMostCritic.join(stationIdLongitudeLatitude)

In [9]:
# Return a string that represents a KML marker
def formatKMLMarker(pair):
    # input
    # (stationId, ( (weekday, hour, criticality), (long, lat) ) )
    stationId = pair[0]
    
    weekday = pair[1][0][0]
    hour = pair[1][0][1]
    criticality = pair[1][0][2]
    coordinates = pair[1][1][0]+","+pair[1][1][1]
    
    result = "<Placemark><name>" + stationId + "</name>" + "<ExtendedData>"\
    + "<Data name=\"DayWeek\"><value>" + weekday + "</value></Data>"\
    + "<Data name=\"Hour\"><value>" + str(hour) + "</value></Data>"\
    + "<Data name=\"Criticality\"><value>" + str(criticality) + "</value></Data>"\
    + "</ExtendedData>" + "<Point>" + "<coordinates>" + coordinates + "</coordinates>"\
    + "</Point>" + "</Placemark>"
    
    return result


resultKML = resultLocations.map(formatKMLMarker)

In [10]:
resultKML.coalesce(1).saveAsTextFile(outputPath)

                                                                                