# Big Data Processing Systems — 2020/21
## Project nº 1

Consider you want to process information about taxi rides in some city to better understand the
behavior of the community and help taxi drivers maximize their profit.
To this end, you have a data set with information about taxi drivers, including information for pickup
and drop-off location.

### pySpark Exercises

You are asked to use pySpark to create indexes for answering the following queries:

1.	(2 points) To have a rough understanding of the fluctuation in the demand for taxis per month of the year, create and index reporting

What is the accumulated number of taxi trips per month?



In [10]:
%%time

import pyspark
from operator import add as sum
from datetime import datetime

# Trip Start Timestamp position
start_timestamp_position = 3


def get_month(line):
    """ Returns the month from a line"""
    columns = line.split(';')
    start_timestamp = columns[start_timestamp_position - 1]
    start_timestamp = datetime.strptime(start_timestamp, "%m/%d/%Y %I:%M:%S %p")
    return start_timestamp.month


sc = pyspark.SparkContext('local[*]')
try:
    # assign file to context
    lines = sc.textFile('Taxi_Trips_151MB.csv')
    # data quality: remove empty lines
    non_empty_lines = lines.filter(lambda line: len(line) > 0)
    # creates pais (month,1) for aggregating months count
    line_months = non_empty_lines.map(lambda line: (get_month(line), 1))
    # applies the aggregation function
    occurrences = line_months.reduceByKey(sum)
    # computes and iterates for all months
    for (k, v) in occurrences.collect():
        print(k, v)

    sc.stop()
except:
    sc.stop()


5 34979
10 33618
1 30357
6 35016
11 30017
2 31013
12 29252
7 32141
3 35260
8 32747
4 32884
9 31466
CPU times: user 26.3 ms, sys: 17.8 ms, total: 44.1 ms
Wall time: 3.77 s



2.	(3 points) To have a rough understanding of the taxi service, create and index reporting

For each pickup region, report the list of unique dropoff regions?

In [8]:
%%time

import pyspark

pickup_location_position = 7
dropoff_location_position = 8


def get_pickup_dropoff_pairs(line):
    """ Returns the month from a line"""
    columns = line.split(';')
    pickup_location = columns[pickup_location_position - 1]
    dropoff_location = columns[dropoff_location_position - 1]
    return (pickup_location, dropoff_location)


sc = pyspark.SparkContext('local[*]')
try:
    # assign file to context
    lines = sc.textFile('Taxi_Trips_151MB.csv')
    # data quality: remove empty lines
    non_empty_lines = lines.filter(lambda line: len(line) > 0)
    # obtain pickup dropoff pairs
    pickup_dropoff_pairs = non_empty_lines.map(lambda line: (get_pickup_dropoff_pairs(line)))
    # remove duplicate pairs
    distinct_pairs = pickup_dropoff_pairs.distinct()
    # remove inconsistent data
    non_empty_distinct_pairs = distinct_pairs.filter(lambda pair:  len(pair[0]) > 0 and len(pair[1]) > 0)
    # group dropoff locations by pickup locations
    pickup_dropoff_pairs = non_empty_distinct_pairs.groupByKey()
    # computes and iterates for all pickup locations
    for pickup, dropoffs in pickup_dropoff_pairs.collect():
        print(pickup, dropoffs.data)

    sc.stop()
except:
    sc.stop()


17031839100 ['17031833000', '17031320100', '17031081800', '17031081201', '17031063303', '17031320400', '17031240300', '17031070200', '17031841000', '17031071200', '17031838200', '17031063302', '17031241400', '17031032100', '17031241500', '17031070300', '17031071100', '17031838100', '17031841100', '17031060800', '17031062000', '17031063200', '17031050200', '17031310800', '17031283100', '17031831900', '17031062900', '17031010400', '17031031300', '17031351000', '17031243200', '17031160800', '17031030500', '17031242100', '17031832900', '17031050500', '17031842900', '17031062300', '17031020602', '17031243100', '17031809400', '17031062500', '17031839200', '17031760801', '17031010502', '17031220701', '17031040201', '17031242800', '17031241300', '17031838000', '17031240800', '17031411200', '17031843500', '17031030703', '17031081401', '17031071500', '17031061000', '17031281900', '17031081000', '17031160602', '17031040100', '17031071400', '17031832600', '17031842200', '17031070102', '17031081202

3.	(4 points) To have a rough understanding of the total price of each taxi ride (based in the pickup and drop off regions), create and index reporting

What is the expected duration and distance of a taxi ride, given the pickup region ID, the weekday (0=Monday, 6=Sunday) and time in format “hour AM/PM”?

In [9]:
%%time

import pyspark
from operator import add as sum
from datetime import datetime

# Trip Start Timestamp position
start_timestamp_position = 3
trip_seconds_position = 5
trip_miles_position = 6
pickup_location_position = 7


def get_line_data(line):
    """ Returns data from a line.
        All members are strings
    """
    columns = line.split(';')
    start_timestamp = columns[start_timestamp_position - 1]
    start_timestamp = datetime.strptime(start_timestamp, "%m/%d/%Y %I:%M:%S %p")
    trip_seconds = columns[trip_seconds_position - 1].replace(",", "")
    trip_miles = columns[trip_miles_position - 1].replace(",", "")
    pickup_location = columns[pickup_location_position - 1]
    # 17031281900_3_12PM
    key = "{}_{}_{}".format(
        pickup_location,
        start_timestamp.weekday(),
        start_timestamp.strftime("%I%p"))

    return key, (trip_seconds, trip_miles)


def is_line_data_valid(elem):
    # key not empty / Start position exists / trip_seconds not empty / trip_miles not empty
    return len(elem[0]) > 0 and (not elem[0].startswith("_")) \
           and len(elem[1][0]) > 0 and len(elem[1][1]) > 0


def convert_data(elem):
    return elem[0], (int(elem[1][0]), float(elem[1][1]), 1)


# Defining Sequential Operation and Combiner Operations
# Sequence operation : sums tuple
def seq_op(accumulator, element):
    # a little bit of design by contract can help debugging the infrastructure
    # assert allows to define invariants
    assert isinstance(element[0], int)
    assert isinstance(element[1], float)
    assert isinstance(element[2], int)
    # sums all tuple elements
    return tuple(map(sum, accumulator, element))


# Combiner Operation : sums tuple
def comb_op(accumulator1, accumulator2):
    # sums all tuple elements
    return tuple(map(sum, accumulator1, accumulator2))


sc = pyspark.SparkContext('local[*]')
try:
    # assign file to context
    lines = sc.textFile('Taxi_Trips_151MB.csv')
    # data quality: remove empty lines
    non_empty_lines = lines.filter(lambda line: len(line) > 0)
    # obtain data columns
    data = non_empty_lines.map(lambda line: (get_line_data(line)))
    # remove invalid lines
    data = data.filter(lambda elem: is_line_data_valid(elem))
    # convert strings to numeric values
    data = data.map(lambda elem: convert_data(elem))
    # define the initial value to the accumulator
    zero_val = (0, 0.0, 0)
    # sums trip_seconds,trip_miles, counts using sequence and combine functions
    data = data.aggregateByKey(zero_val, seq_op, comb_op)
    # transforms into (key , (avg trip_seconds, avg trip_miles))
    data = data.map(lambda elem: (elem[0], (elem[1][0] / elem[1][2], elem[1][1] / elem[1][2])))
    # computes and iterates for all pickup locations
    for k, v in data.collect():
        # key / avg trip_seconds / avg trip_miles
        print(k, "{:0.2f}".format(v[0]), "{:0.2f}".format(v[1]))

    sc.stop()
except Exception as e:
    print(e)
    sc.stop()


17031980000_5_11PM 1741.41 13.65
17031320100_3_04PM 874.51 2.49
17031081300_2_10AM 804.94 2.51
17031243500_4_10PM 406.67 1.08
17031320400_5_07PM 598.93 1.45
17031281900_3_11AM 623.63 1.53
17031063100_1_02AM 1020.00 0.53
17031030900_3_10PM 433.67 2.07
17031080202_6_02AM 543.26 1.74
17031081500_4_04PM 778.08 1.79
17031081700_5_08PM 491.47 1.09
17031280100_2_09AM 598.11 1.75
17031081201_2_05PM 893.34 1.80
17031160700_5_03PM 1200.00 6.70
17031151002_2_05PM 0.00 0.00
17031081500_4_07PM 657.36 1.54
17031081500_5_02PM 705.17 2.12
17031242300_4_10PM 752.57 2.67
17031071500_1_01PM 445.17 1.20
17031833100_5_07PM 643.00 1.43
17031063200_2_11PM 1920.00 7.18
17031080300_6_02AM 465.79 1.52
17031842200_2_08PM 477.86 1.71
17031839100_0_10AM 489.83 1.66
17031081800_0_06AM 1021.58 3.34
17031081403_1_08AM 534.14 1.43
17031080202_0_04PM 385.56 0.72
17031833000_3_05PM 854.86 1.97
17031080201_5_10AM 482.00 2.50
17031081401_0_10AM 719.77 2.12
17031081800_6_12AM 496.05 1.45
17031081700_3_01PM 508.23 2.85
1703