# NYC Taxi Dataset Project - Data Prep

## Overall Steps

**Step 0:** Prerequisites

**Step 1:** Start Spark Cluster

**Step 2:** Upload this notebook and packages

**Step 3:** Clean Data and Calculate Features

**Step 4:** Save RDD to file on s3


Note: Step 1 is based on the CS109 [instructions](https://piazza.com/class/icf0cypdc3243c?cid=1369). However there are modifications for optimizing performance for this project

### Step 0: Prerequisites

1. You need the files CS109.pem and credentials.csv.If you had followed the cs109 instructions (for lab8 or HW5) you will already have these files.

2. You will need a directory containing the following files:
    
    a) CS109.pem
    
    b) credentials.csv
    
    c) Setup Project.ipynb
    
    d) myConfig.json
    
    e) DataPrepNew.ipynb (this notebook)
    
    f) geohash.py
3. You must have completed the instructions in Setup Project.ipynb

#### Note: The notebook was updated so the datafiles must be downloaded again in order to be properly preprocessed.

### Step 1: Start Spark cluster and sanity check

#### Step 1a) Start your Spark cluster as described in Step 1 from Setup Project (unless your spark cluster is already running)



export CLUSTER_ID=`aws emr create-cluster --name "CS109 Proj Spark cluster" \
--release-label emr-4.2.0 --applications Name=Spark Name=Ganglia --ec2-attributes KeyName=mykeypair \
--instance-type c3.2xlarge --instance-count 5 --configurations file://myConfig.json --use-default-roles \
--bootstrap-actions Path=s3://cs109-2015/install-anaconda-emr,Name=Install_Anaconda \
--query 'ClusterId' --output text` && echo $CLUSTER_ID

#### Step 1c) Wait for the cluster to be ready: AWS web console has to show "WAITING"

#### Step 1d)  Get the cluster master's IP:

export DNS_NAME=`aws emr describe-cluster --cluster-id $CLUSTER_ID \
--query 'Cluster.MasterPublicDnsName' --output text` && echo $DNS_NAME

#### Step 1e) Run the script to configure Spark 

ssh -o ServerAliveInterval=10 -i mykeypair.pem hadoop@$DNS_NAME 'sh configure-spark.sh'

#### Step 1f) Create an SSH tunel to the AWS box and connect to the cluster. This command assumes your SSH key is on the same directory you are invoking the SSH command from. At the end of this you will be in a terminal session on the cluster's master node.

ssh -o ServerAliveInterval=10 -i mykeypair.pem hadoop@$DNS_NAME -L 8989:localhost:8888

pyspark

### Step 2: Upload this notebook and geohash.py

#### Upload this Jupyter Notebook and geohash.py using the console from http://localhost:8989

Notes: 
1. All the steps in Step 2 are to be executed from the Jupyter Notebook iteself
2. We will frequently be loading data form the s3 bucket you created in Step 3 of Setup Project (I will use the bucket name: "sdaultontestbucket", but replace this with your own
3. All the steps in Step 2 are to be executed from the Jupyter Notebook iteself

#### Sanity check: make sure Spark cluster is working

In [1]:
import sys
rdd = sc.parallelize(xrange(10),10)
aa = rdd.map(lambda x: sys.version)
aa.count()

10

#### Make sure geohash.py was copied properly

In [2]:
import geohash
geohash.encode(40,74,6)

'txheec'

In [None]:
sc.addPyFile("geohash.py")

### Step 3: Clean Data and Calculate Features

#### Read yellow cab data
Note: You must replace "sdaultonbucket1" with the name of your own s3 bucket

In [11]:
y_rdd = sc.textFile("s3://sdaultonbucket1/nyc/*")
y_rdd = y_rdd.map(lambda line: tuple(line.split(',')))

In [12]:
g_rdd = sc.textFile("s3://sdaultonbucket1/nycg/*")
g_rdd = g_rdd.map(lambda line: tuple(line.split(',')))

### Data preparation specification
Given, a certain granularity in location (geohash length g), granularity in time (bins per day b) and a chosen wideness (w) of the neighbourhood we want to look at, the aggregated data in the end should have the following columns

#### "geohash"
Geohash with length g (categorical feature). This column will not actually be used in the prediction. It is just an id and can be used when calculating the distances between the geohashes.
#### "time_cat" 
Time of the day as a categorical feature. If b = 24 (one bin for every hour), then "time_cat" for a pickup at 14:20:00 should be the string "14:00". If b = 96 (one bin for every quarter of an hour), then "time_cat" for a pickup at 14:20:00 should be the string "14:15".
#### "time_num" 
Time of the day as a (binned!) floating point number between 0 and 1, where the center of the bin is converted to a floating point number between 0 and 1. So if b = 24, then "time_num" for a pickup at 14:20:00 should be 14.5 / 24 =  0.6042. If b = 96, it should translate to 14.375 / 24 = 0.5990.
#### "time_cos" 
The binned "time_num" variable converted to a cosine version so that time nicely 'loops' rather than going saw-like when it traverses midnight. I'm not sure if it adds much predictive power, but only one way to find out. "time_cos" = cos(time_num * 2 * Pi). So for 24 bins, 14:20:00 would translate to cos(0.6042 * 2 * Pi) = -0.7932.
#### "time_sin" 
Same thing as 4) but then with sine. So, "time_sin" = sin(time_num * 2 * Pi). For 24 bins per day, 14:20:00 would translate to sin(0.6042 * 2 * Pi) = -0.6089.
#### "day_cat" 
Day of the week as a categorical feature: "Monday", "Tuesday", etc.
#### "day_num" 
Day of the week as  a numerical feature going from 0 (Monday morning, start of the week) to 1 (Sunday night). Yeah, it's European to start the week on Monday. Whatever, haha :P. Anyway, with 24 bins, Tuesday afternoon 14:20:00 would translate to (1 + 14.5/24)/7 = 0.2292.
#### "day_cos" 
Binned "day_num" variable converted to a cosine version. "day_cos" = cos(day_num * 2 * Pi)
#### "day_sin" 
Binned "day_num"variable converted to a sine version. "day_sin" = sin(day_num * 2 * Pi)
#### "weekend" 
0 if weekday, 1 if weekend (Saturday/Sunday)
#### "holiday" (not included yet)
0 if normal day, 1 if holiday. Not sure if it's easy to find all the holidays in NYC from 2013 - 2015…
to xxxxx) 
#### Location features 
For each binned location, there should be a feature. The name of each feature should be the geohash. To calculate the value of the feature, you need the distance between the location of the row and the location of the column. The value should be calculated as 1/(distance_between_locations_in_km + 1)^w (i.e. inverse distance weighting). The larger w gets, the quicker everything drops to 0. So, for w = 4, the value for the location itself will be 1/(0 + 1)^4 = 1. For a location 2 km away, it will be 1/(2 + 1)^4 = 1/3^4 = 0.012. So information about pickup rates from locations 2 km away are pretty much not taken into account at all when predicting for a certain location. If w = 2, the value for a location 2 km away is a bit larger: 0.11. If w = 1, it is even larger: 0.33. So w is a parameter with which we can determine how 'smooth' are predictions are going to be. The larger we take w, the more we look at the neighbourhoods, the smoother the predictions will be. What the optimal w is, I don't know. I think we should try a few different options and see which works best on the validation set.

In the end, this should result in a number of records equal to 7 * b * x where x is the number of locations. For b = 24 and a geohash length g of 7, this should come down to about 24 * 7 * 10000 = roughly 1 to 2 million records.


In [13]:
import time
from datetime import date
import math
def date_extractor(date_str,b,minutes_per_bin):
    # Takes a datetime object as a parameter
    # and extracts and returns a tuple of the form: (as per the data specification)
    # (time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend)
    # Split date string into list of date, time
    
    d = date_str.split()
    
    #safety check
    if len(d) != 2:
        return tuple([None,])
    # TIME
    
    #list of hour,min,sec
    time_list = [int(t) for t in d[1].split(':')]
        
    #safety check
    if len(time_list) != 3:
        return tuple([None,])
    
    # calculate number of minute into the day
    num_minutes = time_list[0] * 60 + time_list[1]
    
    # round
    minutes_over_prev_bin = num_minutes % minutes_per_bin
    time_bin = num_minutes / minutes_per_bin
    if minutes_over_prev_bin >= (minutes_per_bin / 2.0):
        time_bin += minutes_per_bin
    
    hour_bin = time_bin / 60
    min_bin = time_bin % 60
    
    #get time_cat
    hour_str = str(hour_bin) if hour_bin / 10 > 0 else "0" + str(hour_bin)
    min_str = str(min_bin) if min_bin / 10 > 0 else "0" + str(min_bin)
    time_cat = hour_str + ":" + min_str
    
    # Get a floating point representation of the center of the time bin
    time_num = float(hour_bin + (min_bin+(minutes_per_bin / 2.0))/60.0)
    
    time_cos = math.cos(time_num * 2 * math.pi)
    time_sin = math.sin(time_num * 2 * math.pi)
    
    # DATE
    # Parse year, month, day
    date_list = d[0].split('-')
    d_obj = date(int(date_list[0]),int(date_list[1]),int(date_list[2]))
    day_to_str = {0: "Monday",
                  1: "Tuesday",
                  2: "Wednesday",
                  3: "Thursday",
                  4: "Friday",
                  5: "Saturday",
                  6: "Sunday"}
    day_of_week = d_obj.weekday()
    day_cat = day_to_str[day_of_week]
    day_num = (day_of_week + time_num/24)/7.0
    day_cos = math.cos(day_num * 2 * math.pi)
    day_sin = math.sin(day_num * 2 * math.pi)
    
    weekend = 0
    #check if it is the weekend
    if day_of_week in [5,6]:
        weekend = 1
    
    
    return (time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend)

import geohash
def data_cleaner(zipped_row):
    # takes a tuple (row,g,b,minutes_per_bin) as a parameter and returns a tuple of the form:
    # (day of the week, hour, geotag)
    row = zipped_row[0]
    g = zipped_row[1]
    b = zipped_row[2]
    minutes_per_bin = zipped_row[3]
    # The indices of pickup datetime, longitude, and latitude respectively
    indices = (1, 6, 5)
    
    #safety check: make sure row has enough features
    if len(row) < 7:
        return None
    
    #extract day of the week and hour
    date_str = row[indices[0]]
    clean_date = date_extractor(date_str,b,minutes_per_bin)
    #get geo hash

    latitude = float(row[indices[1]])
    longitude = float(row[indices[2]])
    location = None
    #safety check: make sure latitude and longitude are valid
    if latitude < 50 and latitude > 35 and longitude < -50 and longitude > -80:
        location = geohash.encode(latitude,longitude, g)

    return tuple(list(clean_date)+[location])

#### Specify Parameters

In [14]:
# Clean Data
# Create Data as Specified
# Parameters
g = 7 #geohash length
b = 24 # number of time bins per day
# Note: b must evenly divide 60
minutes_per_bin = int((24 / float(b)) * 60)


In [15]:
gclean_rdd = g_rdd.map(lambda row: (row, g, b, minutes_per_bin))\
                .map(data_cleaner)\
                .map(lambda row: (row,1))\
                .reduceByKey(lambda a,b: a + b)\
                .map(lambda row: (row,'g'))

In [16]:
yclean_rdd = y_rdd.map(lambda row: (row, g, b, minutes_per_bin))\
                .map(data_cleaner)\
                .map(lambda row: (row,1))\
                .reduceByKey(lambda a,b: a + b)\
                .map(lambda row: (row, 'y'))

#### Combine rows from both rdds

In [17]:
combined_rdd = yclean_rdd.union(gclean_rdd)
#get rid of g, y letters and reduce
final_rdd = combined_rdd.map(lambda row: row[0])\
                        .reduceByKey(lambda a,b: a + b)

In [18]:
%time final_rdd.count()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 86, ip-172-31-58-151.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1449060163382_0001/container_1449060163382_0001_01_000003/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1449060163382_0001/container_1449060163382_0001_01_000003/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1449060163382_0001/container_1449060163382_0001_01_000003/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1449060163382_0001/container_1449060163382_0001_01_000003/pyspark.zip/pyspark/cloudpickle.py", line 653, in subimport
    __import__(name)
ImportError: ('No module named geohash', <function subimport at 0x7fbc666802a8>, ('geohash',))

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:908)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1449060163382_0001/container_1449060163382_0001_01_000003/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1449060163382_0001/container_1449060163382_0001_01_000003/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1449060163382_0001/container_1449060163382_0001_01_000003/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1449060163382_0001/container_1449060163382_0001_01_000003/pyspark.zip/pyspark/cloudpickle.py", line 653, in subimport
    __import__(name)
ImportError: ('No module named geohash', <function subimport at 0x7fbc666802a8>, ('geohash',))

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more


### Step 4: Save RDD to file on s3

In [None]:
#save the RDD to s3
# this RDD has key = (day of week, hour, location), value = number of pickups
final_rdd.saveAsTextFile("s3n://sdaultonbucket1/final_rdd")