# SPBD Assignment 1

This notebook contains the code developed to implement the propoused solutions to this course assignment

Developed by:
    * Lucas Fischer, nº54659
    * Joana Martins, nº54707

## HDFS directories setup

The first step is to create directories in the HDFS cluster.
1. Create a directory for the group
2. Create a directory for the results

In [10]:
!hdfs dfs -mkdir /user/jovyan/SPBD-1819/Lucas_Joana
!hdfs dfs -mkdir /user/jovyan/SPBD-1819/Lucas_Joana/results
!hdfs dfs -ls /user/jovyan/SPBD-1819/Lucas_Joana/results

mkdir: `/user/jovyan/SPBD-1819/Lucas_Joana': File exists
mkdir: `/user/jovyan/SPBD-1819/Lucas_Joana/results': File exists
Found 3 items
drwxr-xr-x   - jovyan supergroup          0 2018-11-14 19:31 /user/jovyan/SPBD-1819/Lucas_Joana/results/18-11-14-19-30-54
drwxr-xr-x   - jovyan supergroup          0 2018-11-14 19:35 /user/jovyan/SPBD-1819/Lucas_Joana/results/18-11-14-19-35-10
drwxr-xr-x   - jovyan supergroup          0 2018-11-14 19:53 /user/jovyan/SPBD-1819/Lucas_Joana/results/18-11-14-19-52-11
Found 6 items
drwxr-xr-x   - jovyan supergroup          0 2018-11-14 18:51 /user/jovyan/SPBD-1819/44987
drwxr-xr-x   - jovyan supergroup          0 2018-11-14 18:34 /user/jovyan/SPBD-1819/Lucas_Joana
drwxr-xr-x   - jovyan supergroup          0 2018-11-14 14:36 /user/jovyan/SPBD-1819/example
drwxr-xr-x   - jovyan supergroup          0 2018-11-14 15:03 /user/jovyan/SPBD-1819/results
-rw-r--r--   1 jovyan supergroup      12322 2018-11-14 20:03 /user/jovyan/SPBD-1819/taxi_zone_lookup.csv
-rw-r--r-

# Spark RDD solution
The first implemented solution was to use spark and the RDDs (spark's core abstraction object).
This solution creates an inverted index where the key is a given weekday, pick-up zone ID and drop off zone ID and its value is a tuple containing the average of the trip durations, and the average of the trip amount


In [1]:
import pyspark
import traceback
import datetime
from datetime import datetime as dt
import calendar
import time
import numpy as np

sc = pyspark.SparkContext('local[*]') #Create spark context

#PU/DO zone ids range from 1 to 265, see taxi_zone_lookup.csv

#Main implementation

def get_user_options():
    """
        Function that gets all the users input for creating the inverted index.
        This function gets the desired weekday, time, pickup and dropoff zone
    """

    pickup_correct = False
    dropoff_correct = False
    weekday_correct = False
    time_correct = False
    pickup_id = ""
    dropoff_id = ""
    weekday = ""
    hour = ""
    minutes = ""                                                

    #Continue asking the user until he/she gives us a weekday
    while(not weekday_correct):
        weekday = input("\nPlease insert you weekday (1- Monday, 2- Tuesday, ..., 7- Sunday): ")
        try:
            if(int(weekday) >= 1 and int(weekday) <= 7):
                weekday_correct = True
        except:
            #User didn't sent us a number
            print("\nPlease insert a number between 1 - 7\n")

    #Continue asking the user until he/she gives us an hour
    while(not time_correct):
        time_input = input("\nPlease insert the desired time (hh:mm): ")
        try:
            user_time = time.strptime(time_input, '%H:%M') # Check time is in proper format
            time_correct = True
            hour = user_time.tm_hour #Get hour
            minutes = user_time.tm_min #Get minutes

        except:
            #User didn't sent us a number
            print("\nPlease insert a time in the format hh:mm where hh (00-23) and mm (00:59) \n")    



    #Continue asking the user until he/she gives us a number between 1 and 265
    while(not pickup_correct):
        pickup_id = input("\nPlease insert you Pick-Up location ID (1 - 265): ")
        try:
            if(int(pickup_id) >= 1 and int(pickup_id) <= 265):
                pickup_correct = True
        except:
            #User didn't sent us a number
            print("\nPlease insert a number between 1 - 265\n")

        

    #Continue asking the user until he/she gives us a number between 1 and 265
    while(not dropoff_correct):
        dropoff_id = input("\nPlease insert you Drop-Off location ID (1 - 265): ")
        try:
            if(int(dropoff_id) >= 1 and int(dropoff_id) <= 265):
                dropoff_correct = True
        except:
            #User didn't sent us a number
            print("\nPlease insert a number between 1 - 265\n")


    return(weekday, pickup_id, dropoff_id, hour, minutes)


def filter_dates(input_date_time, user_weekday, user_hour, user_minutes):
    """
        Predicate function that returns true if input_date_time is within 30 minutes radius of user's desired time, false otherwise

        Params:
            input_date - String in YYYY-MM-DD HH:MM format
            user_weekday - Integer ranging from 1 to 7 representing the weekday
            user_hour - Integer representing the hour
            user_minutes - Integer representing the minutes

        Returns:
            True if input_date_time is within 30 minutes radius of user's desired time, false otherwise
    """

    #First check if input_date_time week day is at the maximum one more day than users desired time
    date_obj = dt.strptime(input_date_time, '%Y-%m-%d %H:%M:%S')

    input_weekday = date_obj.weekday()
    user_weekday -= 1   #since input_weekday is between [0, 6] we need to subtract 1 to our user_weekday

    input_date = input_date_time[0:10] #Getting the characters that represent the date
    user_date = dt.strptime(input_date + " {}:{}:00".format(user_hour, user_minutes), '%Y-%m-%d %H:%M:%S') #Creating a new date time object with the date of the input date, and time of the user

    if(user_hour == 23 and user_minutes > 29):
        user_date = dt.strptime(input_date + " {}:{}:00".format(user_hour, user_minutes), '%Y-%m-%d %H:%M:%S') - datetime.timedelta(days = 1)

    if(input_weekday == user_weekday or (input_weekday == 0 and user_weekday == 6) or (input_weekday == user_weekday + 1)):
        time_plus_30_min = (user_date + datetime.timedelta(minutes = 30))
        return user_date <= date_obj <= time_plus_30_min
    else:
        return False


def create_key_value(line, user_weekday):
    """
        Function that creates the key value structure for every line of interest

        Params:
            A non-filtered raw line of the CSV file
    """
    splitted = line.split(",")
    pick_up_datetime = splitted[1]

    week_day = (calendar.day_name[user_weekday - 1]).lower()
    hour =  pick_up_datetime[11:13]
    minute = pick_up_datetime[14:16]

    pick_up_id = splitted[7]
    dropoff_up_id = splitted[8]

    key = (week_day, pick_up_id, dropoff_up_id)

    duration = get_duration(pick_up_datetime,splitted[2])
    total_amount = float(splitted[16])
    
    value = ([duration], [total_amount])

    return (key, value)



def get_duration(pick_up_datetime, drop_off_datetime):
    """
        Get duration of trip in minutes from pick up and drop off times
    """

    d1 = time.mktime(dt.strptime(drop_off_datetime, '%Y-%m-%d %H:%M:%S').timetuple())
    d2 = time.mktime(dt.strptime(pick_up_datetime, '%Y-%m-%d %H:%M:%S').timetuple())
    return int((d1 - d2) / 60)


def create_inverted_index(user_weekday = 1, user_puid = 41, user_doid = 24, user_hour = 0, user_minutes = 21, filename = 'hdfs:/user/jovyan/SPBD-1819/yellow_tripdata_2018-01.csv'):
    """
        Function that creates the inverted index. This function holds the main implementation of spark code to create the inverted index
        
        Params:
            user_weekday - An integer ranging from 1 to 7 representing the day of the week chosen by the user
            user_puid - An integer ranging from 1 to 265 representing the pick-up zone ID chosen by the user
            user_doid - An integer ranging from 1 to 265 representing the drop off zone ID chosen by the user
            user_hour - An integer representing the hour chosen by the user
            user_hour - An integer representing the minutes chosen by the user
            filename - Name of the file to read the information from
    """
    
    try :
        beforeT = dt.now()
        print("Calculating results, please stand by")
        lines = sc.textFile(filename) #read csv file (change this to the full dataset instead of just the sample) (this is local to my machine)
        first_line = lines.first()

        #Filtering out the first line, empty lines
        non_empty_lines = lines.filter(lambda line: len(line) > 0 and line != first_line)

        #Filter out lines that don't match user's pickup-ID and dropoff-ID
        lines_with_piud_doid = non_empty_lines.filter(lambda line: line.split(",")[7] == str(user_puid) and line.split(",")[8] == str(user_doid))

        #Filter out lines that are not within the user's time radius
        lines_with_hour = lines_with_piud_doid.filter(lambda line: filter_dates(line.split(",")[1], user_weekday, user_hour, user_minutes))

        # ((weekday, hour, minute, PU_ID, DO_ID), ([duration], [Total_Ammount]))
        organized_lines = lines_with_hour.map(lambda line: create_key_value(line, user_weekday))
        
        #Reduce everything by key returning a tuple
        #(list of durations, list of amounts)
        grouped = organized_lines.reduceByKey(lambda accum, elem: (accum[0] + elem[0], accum[1] + elem[1]))
        
        #Map each of the values to be the mean of the list of durations, and list of ammounts
        grouped_with_averages = grouped.mapValues(lambda tup: (np.mean(tup[0]), np.mean(tup[1])))
        
        grouped_with_averages.saveAsTextFile('hdfs:/user/jovyan/SPBD-1819/Lucas_Joana/results/sparkrdd_' + beforeT.strftime("%y-%m-%d-%H-%M-%S"))
        
        afterT = datetime.datetime.now()
        diffT = afterT - beforeT
        print( "Time to compute : " + str(diffT.microseconds / 1000))
        
        sc.stop()
    except:
        traceback.print_exc()
        sc.stop()



user_weekday, user_puid, user_doid, user_hour, user_minutes = get_user_options()

create_inverted_index(int(user_weekday), user_puid, user_doid, int(user_hour), int(user_minutes))


Please insert you weekday (1- Monday, 2- Tuesday, ..., 7- Sunday): 1

Please insert the desired time (hh:mm): 02:08

Please insert you Pick-Up location ID (1 - 265): 246

Please insert you Drop-Off location ID (1 - 265): 239
Calculating results, please stand by
Time to compute : 567.357

For monday at 02:08, a trip from (ID: 246) to (ID: 239) takes an average of 8.0 minutes and costs about $12.94111111111111


# Hadoop (Map-Reduce)
The second implemented solution was to achieve the same goal this time using Hadoop.


In [6]:
%%file mapper.py
#!/usr/bin/env python

import sys
import traceback
import datetime
from datetime import datetime as dt
import calendar
import time
import numpy as np


def get_user_options():
    """
        Function that gets all the users input for creating the inverted index.
        This function gets the desired weekday, time, pickup and dropoff zone
    """

    pickup_correct = False
    dropoff_correct = False
    weekday_correct = False
    time_correct = False
    pickup_id = ""
    dropoff_id = ""
    weekday = ""
    hour = ""
    minutes = ""                                                

    #Continue asking the user until he/she gives us a weekday
    while(not weekday_correct):
        weekday = input("\nPlease insert you weekday (1- Monday, 2- Tuesday, ..., 7- Sunday): ")
        try:
            if(int(weekday) >= 1 and int(weekday) <= 7):
                weekday_correct = True
        except:
            #User didn't sent us a number
            print("\nPlease insert a number between 1 - 7\n")

    #Continue asking the user until he/she gives us an hour
    while(not time_correct):
        time_input = input("\nPlease insert the desired time (hh:mm): ")
        try:
            user_time = time.strptime(time_input, '%H:%M') # Check time is in proper format
            time_correct = True
            hour = user_time.tm_hour #Get hour
            minutes = user_time.tm_min #Get minutes

        except:
            #User didn't sent us a number
            print("\nPlease insert a time in the format hh:mm where hh (00-23) and mm (00:59) \n")    



    #Continue asking the user until he/she gives us a number between 1 and 265
    while(not pickup_correct):
        pickup_id = input("\nPlease insert you Pick-Up location ID (1 - 265): ")
        try:
            if(int(pickup_id) >= 1 and int(pickup_id) <= 265):
                pickup_correct = True
        except:
            #User didn't sent us a number
            print("\nPlease insert a number between 1 - 265\n")

        

    #Continue asking the user until he/she gives us a number between 1 and 265
    while(not dropoff_correct):
        dropoff_id = input("\nPlease insert you Drop-Off location ID (1 - 265): ")
        try:
            if(int(dropoff_id) >= 1 and int(dropoff_id) <= 265):
                dropoff_correct = True
        except:
            #User didn't sent us a number
            print("\nPlease insert a number between 1 - 265\n")


    return(weekday, pickup_id, dropoff_id, hour, minutes)


for line in sys.stdin:
    print(line)

Writing mapper.py


In [7]:
%%file reducer.py

#!/usr/bin/env python

for line in sys.stdin:
    print(line)

Writing reducer.py


In [10]:
!chmod a+x mapper.py && chmod a+x reducer.py

In [19]:
!hadoop jar /opt/hadoop-2.9.1/share/hadoop/tools/lib/hadoop-*streaming*.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input hdfs:/user/jovyan/SPBD-1819/yellow_tripdata_2018-01.csv -output results_words

packageJobJar: [/tmp/hadoop-unjar1132358989608038696/] [] /tmp/streamjob6256315878908127680.jar tmpDir=null
18/11/14 19:19:21 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.24.32.251:8032
18/11/14 19:19:22 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.24.233.172:10200
18/11/14 19:19:22 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.24.32.251:8032
18/11/14 19:19:22 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.24.233.172:10200
18/11/14 19:19:42 INFO ipc.Client: Retrying connect to server: resourcemanager/172.24.32.251:8032. Already tried 0 time(s); maxRetries=45
18/11/14 19:19:43 INFO ipc.Client: Retrying connect to server: resourcemanager/172.24.32.251:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
18/11/14 19:19:44 INFO ipc.Client: Retrying connect to server: resourcemanager/172

18/11/14 19:27:42 INFO ipc.Client: Retrying connect to server: resourcemanager/172.24.32.251:8032. Already tried 4 time(s); maxRetries=45
18/11/14 19:28:02 INFO ipc.Client: Retrying connect to server: resourcemanager/172.24.32.251:8032. Already tried 5 time(s); maxRetries=45
18/11/14 19:28:22 INFO ipc.Client: Retrying connect to server: resourcemanager/172.24.32.251:8032. Already tried 6 time(s); maxRetries=45
18/11/14 19:28:42 INFO ipc.Client: Retrying connect to server: resourcemanager/172.24.32.251:8032. Already tried 7 time(s); maxRetries=45
18/11/14 19:29:02 INFO ipc.Client: Retrying connect to server: resourcemanager/172.24.32.251:8032. Already tried 8 time(s); maxRetries=45
^C


## Check results

In [22]:
!hdfs dfs -ls /user/jovyan/SPBD-1819/Lucas_Joana/results

Found 2 items
drwxr-xr-x   - jovyan supergroup          0 2018-11-14 19:31 /user/jovyan/SPBD-1819/Lucas_Joana/results/18-11-14-19-30-54
drwxr-xr-x   - jovyan supergroup          0 2018-11-14 19:35 /user/jovyan/SPBD-1819/Lucas_Joana/results/18-11-14-19-35-10


In [23]:
!hdfs dfs -cat /user/jovyan/SPBD-1819/Lucas_Joana/results/18-11-14-19-30-54/*


(('monday', '246', '238'), ([11, 11], [15.96, 16.56]))
(('tuesday', '246', '238'), ([9], [13.3]))
