In [1]:
import pyspark
import pandas as pd
import time
from datetime import date
import math

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7437,application_1513605045578_4983,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


## Define some variables 

In [2]:
import geohash2 as gh2

#Global variables
g = 7 #geohash length
b = 48 # number of time bins per day
# Note: b must evenly divide 60
minutes_per_bin = int((24 / float(b)) * 60)

## Extract and reformat the data

##### geohash
A categorical representation of (longitude, latitude) used as an id and dropped before prediction.
##### time_cat
A categorical representation of the time of day, number of categories is controlled by $ b $. ($b = 48$ means every half hour). The value of the bin is the centerpoint in the timespan.
##### time_num
A float representation of the time of day (as bins) between 0 and 1. The center of the bin is converted to a floating point number, e.g. 20:30 is converted to $41/48 = 0.85416666666$ 
#### time_cos
The binned time variable time_num is converted to a cosine value to even out the transition between days.
#### time_sin
The same purpose only using the sine function instead.
#### day_num
Day of week as a binned feature between 0 (Monday morning) to 1 (Sunday night).
#### day_cos
Binned day_num in a cosine representation.
#### day_sin
Binned day_num in a sine representation.
#### weekend
0 if weekday, 1 if weekend.
#### Location features
The geohashed value of the pickup and dropoff location is returned and also the the longitude and latitude of the pickup location (decoded from the geohashed value for loss of precision).

In [3]:
def date_extractor(date_str,b,minutes_per_bin):
    # Takes a datetime object as a parameter
    # and extracts and returns a tuple of the form: (as per the data specification)
    # (time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend)
    # Split date string into list of date, time
    
    d = date_str.split()
    
    #safety check
    if len(d) != 2:
        return tuple([None,])
    
    # TIME (eg. for 16:56:20 and 15 mins per bin)
    #list of hour,min,sec (e.g. [16,56,20])
    time_list = [int(t) for t in d[1].split(':')]
    
    #safety check
    if len(time_list) != 3:
        return tuple([None,])
    
    # calculate number of minute into the day (eg. 1016)
    num_minutes = time_list[0] * 60 + time_list[1]
    
    # Time of the start of the bin
    time_bin = num_minutes / minutes_per_bin     # eg. 1005
    hour_bin = num_minutes / 60                  # eg. 16
    min_bin = (time_bin * minutes_per_bin) % 60  # eg. 45
    
    #get time_cat
    hour_str = str(hour_bin) if hour_bin / 10 > 0 else "0" + str(hour_bin)  # eg. "16"
    min_str = str(min_bin) if min_bin / 10 > 0 else "0" + str(min_bin)      # eg. "45"
    time_cat = hour_str + ":" + min_str                                     # eg. "16:45"
    
    # Get a floating point representation of the center of the time bin
    time_num = (hour_bin*60 + min_bin + minutes_per_bin / 2.0)/(60*24)      # eg. 0.7065972222222222
    
    time_cos = math.cos(time_num * 2 * math.pi)
    time_sin = math.sin(time_num * 2 * math.pi)
    
    # DATE
    # Parse year, month, day
    date_list = d[0].split('-')
    d_obj = date(int(date_list[0]),int(date_list[1]),int(date_list[2]))
    day_to_str = {0: "Monday",
                  1: "Tuesday",
                  2: "Wednesday",
                  3: "Thursday",
                  4: "Friday",
                  5: "Saturday",
                  6: "Sunday"}
    day_of_week = d_obj.weekday()
    day_cat = day_to_str[day_of_week]
    day_num = (day_of_week + time_num)/7.0
    day_cos = math.cos(day_num * 2 * math.pi)
    day_sin = math.sin(day_num * 2 * math.pi)
    
    year = d_obj.year
    month = d_obj.month
    day = d_obj.day
    
    weekend = 0
    #check if it is the weekend
    if day_of_week in [5,6]:
        weekend = 1
       
    return (year, month, day, time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend)

Define two different data cleaning methods as the data is structured slightly differently for the yellow and green data. For this project the main cause was the placement of the dropoff longitude and latitude.

In [4]:
def yellow_data_cleaner(row):
    # takes a tuple (row,g,b,minutes_per_bin) as a parameter and returns a tuple of the form:
    # (time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend,geohash)
    
    #safety check: make sure row has enough features
    if len(row) < 7:
        return None
    
    #extract day of the week and hour
    date_str = row[1]
    clean_date = date_extractor(date_str,b,minutes_per_bin)
    
    # beware the order
    pickup_longitude = float(row[5])
    pickup_latitude = float(row[6])
    
    #get geo hashed pickup and dropoff locatation
    pickup_location = gh2.encode(pickup_latitude, pickup_longitude)
    #pickup_location = (pickup_latitude, pickup_longitude)
    #label for task 3 
    #dropoff_location = gh2.encode(float(row[9]), float(row[10]))
    #dropoff_location = (row[7], row[8]) 
    (decoded_lat, decoded_long) = gh2.decode(pickup_location)

    #safety check: make sure latitude and longitude are valid, i.e. inside NYC
    if pickup_latitude < 41.1 and pickup_latitude > 40.5 and pickup_longitude < -73.6 and pickup_longitude > -74.1:
        return tuple(list(clean_date)+[pickup_location]+[decoded_lat]+[decoded_long])
        #note: removed dropoff_location from the return values
    else:
        return None

In [5]:
def green_data_cleaner(row):
    # takes a tuple (row,g,b,minutes_per_bin) as a parameter and returns a tuple of the form:
    # (time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend,geohash)
    
    #safety check: make sure row has enough features
    if len(row) < 7:
        return None
    
    #extract day of the week and hour
    date_str = row[1]
    clean_date = date_extractor(date_str,b,minutes_per_bin)
    
    # beware the order
    pickup_longitude = float(row[5])
    pickup_latitude = float(row[6])
    
    #get geo hashed pickup and dropoff locatation
    pickup_location = gh2.encode(pickup_latitude, pickup_longitude)
    #pickup_location = (pickup_latitude, pickup_longitude)
    #label for task 3 
    #dropoff_location = gh2.encode(float(row[7]), float(row[8]))
    #dropoff_location = (row[7], row[8]) 
    (decoded_lat, decoded_long) = gh2.decode(pickup_location)

    #safety check: make sure latitude and longitude are valid, i.e. inside NYC
    if pickup_latitude < 41.1 and pickup_latitude > 40.5 and pickup_longitude < -73.6 and pickup_longitude > -74.1:
        return tuple(list(clean_date)+[pickup_location]+[decoded_lat]+[decoded_long])
    else:
        return None

### Define function for reading the weather data into a DataFrame (pandas) and for preparing data 

We read the weather from a measurement station in Central Park in New York City. It contains data regarding precipitation, snowfall, snow depth, wind speed and minimum and maximum temperatures. From that we extract the year, month and day from the intial data. As for temperature it is converted from Fahrenheit to Celsius.  

In [6]:
from IPython.display import display
def weather_extractor():
    raw_weather = spark.read.load("hdfs:///Projects/labs/nyc_taxi_data/data/central_park_weather.csv", format='com.databricks.spark.csv', header='true',inferSchema='true')
    weather_df = raw_weather.drop("STATION", "STATION_NAME").toPandas()
    
    weather_df["year"] = (weather_df["DATE"]/10000).apply(math.floor).astype(int)
    weather_df["month"] = (weather_df["DATE"].mod(10000)/100).apply(math.floor).astype(int)
    weather_df["day"] = weather_df["DATE"].mod(100)
    
    weather_df["PRCP"] = weather_df["PRCP"]/10
    weather_df["SNWD"] = weather_df["SNWD"]/10
    weather_df["SNOW"] = weather_df["SNOW"]/10
    weather_df["AWND"] = weather_df["AWND"]/10*3.6
    
    weather_df["TMAX"] = (weather_df["TMAX"]-32)/1.8
    weather_df["TMIN"] = (weather_df["TMIN"]-32)/1.8
    
    weather_df1 = weather_df.drop("DATE", 1)
    return weather_df1

In [None]:
# Get the number of the day in the year. So January 1st is 1, December 31st is 365 or 366
def get_yearday(df):
    date = datetime.date(df['year'],df['month'],df['day'])
    return (date.timetuple().tm_yday-1)/365.

## Prepare data

Read the traffic data, remove abnormal entries and clean it. Then count the number of pickups for each location and append it as a new column. Thereafter, clean the weather data and merge the two DataFrames into one which is persisted to disk. 

In [None]:
graw = spark.read.format("CSV").option("header","true").load("hdfs:///Projects/labs/nyc_taxi_data/data/green_tripdata*")
yraw = spark.read.format("CSV").option("header","true").load("hdfs:///Projects/labs/nyc_taxi_data/data/yellow_tripdata*")

#print(graw.count())
#print(yraw.count())

gclean_rdd = graw.rdd.map(green_data_cleaner).filter(lambda row: row != None)
yclean_rdd = yraw.rdd.map(yellow_data_cleaner).filter(lambda row: row != None)

gclean_extracol = gclean_rdd.map(lambda row: row + (1,))
yclean_extracol = yclean_rdd.map(lambda row: row + (1,))

from pyspark.sql.types import *

schemaString = "year month day time_cat time_num time_cos time_sin day_cat day_num day_cos day_sin weekend pickup_location pickup_latitude pickup_longitude count"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
ginit_df = spark.createDataFrame(gclean_extracol, schema)
yinit_df = spark.createDataFrame(yclean_extracol, schema)

ygdf = ginit_df.unionAll(yinit_df)
#print(ygdf.count())
ygdf_summed = ygdf.groupBy("pickup_location").agg({"count": "sum"})
#ygdf_summed = ginit_df.join(yinit_df, ["year", "month", "day", "time_cat", "time_num", "time_cos", "time_sin", "day_cat", "day_num", "day_cos", "day_sin", "weekend", "pickup_location", "pickup_latitude", "pickup_longitude", "count"], "full_outer") \
#.groupBy("pickup_location").agg({"count": "sum"})

ygdf_done = ygdf.join(ygdf_summed, "pickup_location").drop("count", "pickup_location").withColumnRenamed("sum(count)", "label")

from pyspark.sql import SQLContext

weather_pddf = weather_extractor()
sqlContext = SQLContext(spark)
weather_df = sqlContext.createDataFrame(weather_pddf)
#print(ygdf_done.rdd.count())
#print(weather_df.rdd.count())
df_merged = ygdf_done.join(weather_df, ["year", "month", "day"])
#print(df_merged.rdd.count())

df_merged.write.format("com.databricks.spark.csv").option("header", "true").save("hdfs:///Projects/ID2223nyctaxi/prepared_taxirides/yellow_and_green_data_finally2")