# Homework - Spark Programming on Taxicab Report Dataset 

The purpose of this exercise is to write some `pyspark` code that does some computation over a large dataset. Specifically, your Spark program will analyze a dataset consisting of New York City Taxi trip reports in the Year 2013. The dataset was released under the FOIL (The Freedom of Information Law) and made public by Chris Whong (https://chriswhong.com/open-data/foiling-nycs-boro-taxi-trip-data/).

The dataset is a simple `csv` file. Each taxi trip report is a different line in the file. Among
other things, each trip report includes the starting point, the drop-off point, corresponding timestamps, and
information related to the payment. The data are reported by the time that the trip ended, i.e., upon arrive in
the order of the drop-off timestamps.
The attributes present on each line of the file are, in order:

| attribute    | description                                                       |
| -------------|-------------------------------------------------------------------|
| medallion    | an md5sum of the identifier of the taxi - vehicle bound (Taxi ID) |
| hack_license | an md5sum of the identifier for the taxi license (driver ID)      |
| vendor_id    |identifies the vendor  |
| pickup_datetime	|time when the passenger(s) were picked up  |
| payment_type	 |the payment method -credit card or cash  |
| fare_amount	 |fare amount in dollars  |
| surcharge	 |surcharge in dollars  |
| mta_tax	 |tax in dollars  |
| tip_amount	 |tip in dollars  |
| tolls_amount	 |bridge and tunnel tolls in dollars  |
| total_amount	 |total paid amount in dollars  |

Data files:
* `taxi_small_subset.csv` - This is a subset of the entire big file. You can examine this file to see what the data look like. Also, you can use this file for running your code in a single-node platform (e.g., in Vocareum) and debug it, before running your code on the big file in the cluster.   
* `2013_weekdays.csv` - This is a file with the dates of 365 days of the year 2013 with their corresponding week day. This file is used in task 4 to do join.
* S3 URI `s3://comp643bucket/homework/spark_taxicab/trip*` - This is the address of the entire dataset available in S3, which is a big file (18.4 GB). Once you debugged your code on the small subset, your final task is to run your code on this big file over an EMR cluster in AWS.

**For this homework, you need to complete 5 tasks described below.** 

**For tasks 1 through 4, write your Spark code in this Jupyter Notebook and run your code on the small subset of data, i.e., `taxi_small_subset.csv`, in Vocareum. This helps you debug your Spark program easier since you're running it in an interactive single-node platform and on a small dataset.**     

**Once you've debugged your code on a small dataset, for task 5, you need to execute your Spark code for tasks 1 through 4, in an AWS EMR cluster on the big dataset that is stored in S3 (`s3://comp643bucket/homework/spark_taxicab/trip*`).** 

In [1]:
import pyspark

In [2]:
# pyspark works best with java8 
# set JAVA_HOME enviroment variable to java8 path 
%env JAVA_HOME = /usr/lib/jvm/java-8-openjdk-amd64

env: JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64


In [3]:
sc = pyspark.SparkContext()

**Read the data file into an RDD**

In [4]:
taxi = sc.textFile('data/taxi_small_subset.csv')

In [5]:
taxi.count()

71153

In [6]:
taxi.take(3)

['medallion, hack_license, vendor_id, pickup_datetime, payment_type, fare_amount, surcharge, mta_tax, tip_amount, tolls_amount, total_amount',
 '7DD1D6A5E432ACBD68A734587B589B9B,EF3FD28F7D39F614BF68B51F0256B050,CMT,2013-08-28 06:53:33,CSH,12,0,0.5,0,0,12.5',
 'CEBDF34FE2DA2E9233B87C2E703004FF,D9EA31E70BE082F423D42860FD4BD240,CMT,NULL,CSH,7,1,0.5,0,0,8.5']

## Task 1 - clean the dataset (20 pts)

Write a Spark program that reads the dataset into an RDD, splits each line by `,` to extract field values, and cleans the RDD through the following steps:
* Remove lines with any missing value indicated by `NULL` 
* Validate the type of the following fields and remove lines with any invalid field value:
    * `pickup_datetime` must match this pattern 'YYYY-MM-DD HH:MM:SS'
    * All fileds in dollars (`fare_amount`, `surcharge`, `mta_tax`, `tip_amount`, `tolls_amount`, `total_amount`) must be non-negative numbers (with or without a decimal point)
    
After each step of cleaning, run `count()` on your RDD, to see how many lines have been left. 

Below, we give you a set of cells you can use to walk through the analysis procress. You are also welcome to simply write all of your code in one cell, following your own logic.

### Split each line by `,` to extract field values

In [7]:

split_data= taxi.map(lambda line: line.split(','))
# split_data.take(3)

In [9]:
split_data.take(3)

[['medallion',
  ' hack_license',
  ' vendor_id',
  ' pickup_datetime',
  ' payment_type',
  ' fare_amount',
  ' surcharge',
  ' mta_tax',
  ' tip_amount',
  ' tolls_amount',
  ' total_amount'],
 ['7DD1D6A5E432ACBD68A734587B589B9B',
  'EF3FD28F7D39F614BF68B51F0256B050',
  'CMT',
  '2013-08-28 06:53:33',
  'CSH',
  '12',
  '0',
  '0.5',
  '0',
  '0',
  '12.5'],
 ['CEBDF34FE2DA2E9233B87C2E703004FF',
  'D9EA31E70BE082F423D42860FD4BD240',
  'CMT',
  'NULL',
  'CSH',
  '7',
  '1',
  '0.5',
  '0',
  '0',
  '8.5']]

### Clean the RDD

**Remove lines with any `NULL` value**

In [10]:
# removed_nulls_line = split_data.filter(lambda row: not any(x=='NULL' for x in row))
# removed_nulls_line.take(5)

In [11]:
removed_nulls_line = split_data.filter(lambda row: 'NULL' not in row)
removed_nulls_line.take(3)

[['medallion',
  ' hack_license',
  ' vendor_id',
  ' pickup_datetime',
  ' payment_type',
  ' fare_amount',
  ' surcharge',
  ' mta_tax',
  ' tip_amount',
  ' tolls_amount',
  ' total_amount'],
 ['7DD1D6A5E432ACBD68A734587B589B9B',
  'EF3FD28F7D39F614BF68B51F0256B050',
  'CMT',
  '2013-08-28 06:53:33',
  'CSH',
  '12',
  '0',
  '0.5',
  '0',
  '0',
  '12.5'],
 ['A6E8AD830F49F7B358D52419084D42A0',
  'B1F1E21144EC5D9EC144AF9E4FBF320E',
  'CMT',
  '2013/08/29 12:59:08',
  'CSH',
  '6',
  '0',
  '0.5',
  '0',
  '0',
  '6.5']]

**Run `count()` on your RDD to see how many lines have been left**

In [12]:
removed_nulls_line.count()

71141

In [13]:

# pickup_datetime_rdd = removed_nulls_line.map(lambda row: row[3])
# pickup_datetime_rdd.take(5)

**Remove lines with `pickup_datetime` that does not match this pattern 'YYYY-MM-DD HH:MM:SS'**

For this task, you can use Python `re` module along with your Spark code.

In [15]:
import re

In [16]:
pat = r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}'
# bool(re.match(pat, a))

In [17]:
cleaned_dates = removed_nulls_line.filter(lambda row: re.match(pat, row[3]))
cleaned_dates.take(3)

[['7DD1D6A5E432ACBD68A734587B589B9B',
  'EF3FD28F7D39F614BF68B51F0256B050',
  'CMT',
  '2013-08-28 06:53:33',
  'CSH',
  '12',
  '0',
  '0.5',
  '0',
  '0',
  '12.5'],
 ['506DF1356EA4AAA9DE38EE3103CDAAF9',
  'EBE5414C3D3E4DC06885874F0022917A',
  'CMT',
  '2013-08-27 17:26:48',
  'CSH',
  '$17.5',
  '1',
  '0.5',
  '0',
  '5.33',
  '24.33'],
 ['801D1D2678B78ABC6F4FDBF224108FAB',
  'FBF69C3FCBC582B89CF9646BEC45A724',
  'CMT',
  '2013-08-31 02:32:14',
  'CSH',
  '5.5',
  '0.5',
  '0.5',
  '0',
  '0',
  '$6.5']]

**Run `count()` on your RDD to see how many lines have been left**

In [18]:
cleaned_dates.count()

71134

In [19]:
# # Extract the  dollars fields from each line.
# dollars = cleaned_dates.map(lambda row: row[5:])
# dollars.take(10)

**All the fields indicating an amount in dollar (`fare_amount`, `surcharge`, `mta_tax`, `tip_amount`, `tolls_amount`, `total_amount`) must be non-negative numeric (with or without decimal point) value. Remove lines with any value that does not match this pattern.** 

For this task, you can use Python `re` module along with your Spark code.

In [20]:
# remove or strip the fields 
pat2 = "^\\d+(\\.\\d+)?$"

In [24]:
cleaned_amts = cleaned_dates.filter(lambda row: all(re.match(pat2, field.strip()) for field in row[5:]))

In [25]:
cleaned_amts.take(3)

[['7DD1D6A5E432ACBD68A734587B589B9B',
  'EF3FD28F7D39F614BF68B51F0256B050',
  'CMT',
  '2013-08-28 06:53:33',
  'CSH',
  '12',
  '0',
  '0.5',
  '0',
  '0',
  '12.5'],
 ['3AEEDFFB3524409B4C17DE01866FFD8D',
  '34CFC6CDA6FD2B0546E4B42383D6216D',
  'CMT',
  '2013-08-31 22:39:51',
  'CSH',
  '8',
  '0.5',
  '0.5',
  '0',
  '0',
  '9'],
 ['D5C5F9B31B67E0C95286065BFA945A4E',
  '2BF3D7601BE18817EBF5A21F65F36549',
  'CMT',
  '2013-08-27 09:54:33',
  'CSH',
  '5.5',
  '0',
  '0.5',
  '0',
  '0',
  '6']]

**Run `count()` on your RDD to see how many lines have been left**

In [26]:
cleaned_amts.count()

71128

## Task 2 - compute total revenue by dates (20 pts)

Write a Spark program on your derived cleaned RDD (from task 1) that computes the total amount of revenue (`total_amount` field) for each date (`pickup_datetime` field without time portions - only dates). Then, sort your RDD by the total revenue in ascending order and print out the 5 lines with the smallest total revenue. That shows the 5 dates with least total revenue.   

In [27]:
# Extract the date from the pickup_datetime field and map it to (date, total_amount)
revenue_rdd = cleaned_amts.map(lambda row: (row[3].split(' ')[0], float(row[10])))

# Compute the total revenue for each date
date_revenue_rdd = revenue_rdd.reduceByKey(lambda x, y: x + y)

# Sort the RDD by total revenue in ascending order
sorted_rdd = date_revenue_rdd.sortBy(lambda x: x[1])

# Print the 5 lines with the smallest total revenue
smallest_revenues = sorted_rdd.take(5)
print(smallest_revenues)

[('2013-01-23', 8.5),
 ('2013-08-10', 23.5),
 ('2013-04-07', 26.0),
 ('2013-03-24', 28.0),
 ('2013-03-21', 31.25)]

## Task 3 - compute total revenue by taxi drivers  (20 pts)

Write a Spark program on your derived cleaned RDD (from task 1) that computes the total amount of revenue (`total_amount` field) for each taxi driver (`hack_license`). Then, sort your RDD by the total revenue in descending order and print out the top 5 lines with the largest total revenue. That shows the 5 taxi drivers with most total revenue. 

In [28]:
# Extract the hack_license and total_amount fields and map them to (hack_license, total_amount)
revenue_rdd = cleaned_amts.map(lambda row: (row[1], float(row[10])))

# Compute the total revenue for each taxi driver
driver_revenue_rdd = revenue_rdd.reduceByKey(lambda x, y: x + y)

# Sort the RDD by total revenue in descending order
sorted_rdd = driver_revenue_rdd.sortBy(lambda x: x[1], ascending=False)

# Print the top 5 lines with the largest total revenue
largest_revenues = sorted_rdd.take(5)
print(largest_revenues)

[('CFCD208495D565EF66E7DFF9F98764DA', 508.5),
 ('178C58D2C909125EE599C388CC1A311C', 356.9),
 ('83DDCD2CC7035BEBED7AC4255688308A', 355.0),
 ('B9E81BA07F0DDA5B2FBCA9B33CCC7C9A', 335.3),
 ('98949EA21D9A4DA151ADEE27E4DEDE7C', 333.32)]

## Task 4 - compute total revenue by weekday through join operation (20 pts)

Write a Spark program on your derived cleaned RDD (from task 1) that computes the total amount of revenue (`total_amount` field) for each 7 days of the week (Sunday through Saturday).

To extract the week days and experimenting more with Spark, we suggest that you use `join` RDD operation to join the taxi dataset with the provided `2013_weekdays.csv` file that contains the dates for 365 days of the year 2013 and their corresponding week days.    

First, read `2013_weekdays.csv` into an RDD, and split each line by `,` to extract the field values.

Then, manipulate this RDD and your derived cleaned RDD of taxi dataset (from task 1), to be able to join the two and compute the total revenue by weekday.  

Finally, sum the total amount per weekday, and return the result in descending order of the total revenue.

In [29]:
# Read 2013_weekdays.csv into an RDD and split each line by comma
weekdays_rdd = sc.textFile("data/2013_weekdays.csv").map(lambda line: line.split(','))
weekdays_rdd.take(5)

[['Date', 'WeekDay'],
 ['2013-01-01', 'Tuesday'],
 ['2013-01-02', 'Wednesday'],
 ['2013-01-03', 'Thursday'],
 ['2013-01-04', 'Friday']]

In [30]:
# Map the weekdays RDD to (date, weekday) key-value pairs
weekdays_mapping = weekdays_rdd.map(lambda row: (row[0], row[1]))
weekdays_mapping.take(5)

[('Date', 'WeekDay'),
 ('2013-01-01', 'Tuesday'),
 ('2013-01-02', 'Wednesday'),
 ('2013-01-03', 'Thursday'),
 ('2013-01-04', 'Friday')]

In [31]:
# Map the cleaned RDD to (date, total_amount) key-value pairs
revenue_rdd = cleaned_amts.map(lambda row: (row[3].split(' ')[0], float(row[10])))

revenue_rdd.take(5)

[('2013-08-28', 12.5),
 ('2013-08-31', 9.0),
 ('2013-08-27', 6.0),
 ('2013-08-30', 6.0),
 ('2013-08-27', 14.5)]

In [32]:
# Join the revenue RDD with weekdays mapping RDD
joined_rdd = revenue_rdd.join(weekdays_mapping)
# joined_rdd.take(5)

In [33]:
# Compute the total revenue by weekday
weekday_revenue_rdd = joined_rdd.map(lambda row: (row[1][1], row[1][0])).reduceByKey(lambda x, y: x + y)
# Sort the RDD by total revenue in descending order
sorted_rdd = weekday_revenue_rdd.sortBy(lambda x: x[1], ascending=False)
print(sorted_rdd.collect())

[('Thursday', 161163.15000000014),
 ('Saturday', 160515.6800000002),
 ('Friday', 160117.9600000003),
 ('Tuesday', 155380.2500000001),
 ('Wednesday', 154230.5700000003),
 ('Monday', 138941.37000000017),
 ('Sunday', 137148.43000000014)]

## Task 5 - run on a big file in EMR cluster (20 pts)

For the last part of this homework, you need to run your Spark code for tasks 1 through 4, on a big file in S3, in an AWS EMR cluster. 

Follow the instructions on `Lab - Spark Intro (AWS)` to create and connect to an EMR cluster in AWS and run Spark programs in there. 

**For better efficiency, in the hardware configuration of your cluster, choose `m5.xlarge` as instance type, and type 4 as the number of instances.**  

The big file exists in this S3 URI: `s3://comp643bucket/homework/spark_taxicab/trip*.csv`

To read the big file from S3 into an RDD, use the code below:

`taxi = sc.textFile ("s3://comp643bucket/homework/spark_taxicab/trip*.csv")`

Repeat tasks 1 through 4 on this `taxi` RDD created from the big file, and print your results in the markdown cells below (keep the results from the small subset above). 

**Repeat task 1 on the big file in your EMR cluster - print the number of lines (`count()`) of your cleaned RDD from the big file, here:** 

Copy your result in this markdown cell ...

Step 1 - 173179771

Step 2 - 173179759

Step 3 - 173176128

**Repeat task 2 on the big file in your EMR cluster - copy your result, which is the 5 dates with least total revenue, from the big file, here:** 

Copy your result in this markdown cell ...

[('2013-08-11', 3071194.309999993), ('2013-08-04', 3073608.610000252), ('2013-08-03', 3182263.760000205), ('2013-12-25', 3465875.3099999367), ('2013-08-02', 3530800.6300003207)]  




**Repeat task 3 on the big file in your EMR cluster - copy your result, which is the top 5 drivers with the most revenue, from the big file, here:**  

Copy your result in this markdown cell ...

[('664927CDE376A32789BA48BF55DFB7E3', 728594.3300000001), ('CFCD208495D565EF66E7DFF9F98764DA', 615220.1499999996), ('E4F99C9ABE9861F18BCD38BC63D007A9', 563445.9400000002), ('D85749E8852FCC66A990E40605607B2F', 246374.59), ('1EDF99EE9DAC182027330EF48828B54A', 242656.10000000012)]  



**Repeat task 4 on the big file in your EMR cluster. `2013_weekdays.csv` is also available in S3 through this URI `s3://comp643bucket/homework/spark_taxicab/2013_weekdays.csv`. Copy your result, which is the sum of revenue per weekday in descending order of total revenue, from the big file, here:**  

Copy your result in this markdown cell ...

 [('Friday', 394663376.43002135), ('Thursday', 386085572.97001326), ('Wednesday', 372733935.33000714), ('Saturday', 368934554.23002493), ('Tuesday', 362666230.20001316), ('Sunday', 341485998.91002494), ('Monday', 334818004.12003046)] 