# Setting up PySpark enviornment

In [1]:
from pyspark import SparkContext
sc = SparkContext()

In [2]:
from pyspark import SQLContext
sqlContext = SQLContext(sc)

In [3]:
from pyspark.sql.functions import *
from pyspark.sql.types import *    

# Loading and Cleaning the Data

Now that the PySpark enviornment has been setup, the next step is to load in the data from Hadoop. Dataframes can be loaded using PySpark in a fashion that is similar to pandas. The variable `inferSchema` tells PySpark  to guess what the datatype of each column in the dataframe is. If the option is set to ```False``` then all columns will be loaded in as strings. We'll start with 2014 green taxi trips.

In [4]:
%%time
green_2014 = sqlContext.read.csv("hdfs://namenode:8020/am2786/***/Taxi_2014_green.csv", header=True,inferSchema=True)
green_2014.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Unnamed: 0: integer (nullable = true)
 |-- vendorid: integer (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- Store_and_fwd_flag: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- Pickup_longitude: double (nullable = true)
 |-- Pickup_latitude: double (nullable = true)
 |-- Dropoff_longitude: double (nullable = true)
 |-- Dropoff_latitude: double (nullable = true)
 |-- Passenger_count: integer (nullable = true)
 |-- Trip_distance: double (nullable = true)
 |-- Fare_amount: double (nullable = true)
 |-- Extra: double (nullable = true)
 |-- MTA_tax: double (nullable = true)
 |-- Tip_amount: double (nullable = true)
 |-- Tolls_amount: double (nullable = true)
 |-- Ehail_fee: string (nullable = true)
 |-- Total_amount: double (nullable = true)
 |-- Payment_type: integer (nullable = true)
 |-- Trip_type: double (nullable = true)
 |-- pickup_neighborhood: s

We can see that most of the columns (including the datetimes of the pickups and dropoffs) have been set to either strings or doubles. The name of the game is reducing computation time, so before we do anything else let's select only the columns that we need from the dataframe, then have a look at the first couple of rows. The `green_2014.show(5, False)` portion of the cell tells PySpark to show the forst 5 rows of the database, and to not truncate any long columns. 


In [5]:
green_2014 = green_2014.select('pickup_datetime', 'dropoff_datetime', 
                      'pickup_neighborhood', 'Passenger_count', 'Tip_amount',
                     'Total_amount', 'Trip_distance', 'Payment_type', 'Fare_amount')

green_2014.show(5, False)

+----------------------+----------------------+-------------------+---------------+----------+------------+-------------+------------+-----------+
|pickup_datetime       |dropoff_datetime      |pickup_neighborhood|Passenger_count|Tip_amount|Total_amount|Trip_distance|Payment_type|Fare_amount|
+----------------------+----------------------+-------------------+---------------+----------+------------+-------------+------------+-----------+
|08/05/2014 11:46:48 AM|08/05/2014 12:05:10 PM|Harlem             |1              |0.0       |16.0        |3.46         |2           |15.5       |
|11/22/2014 02:55:25 AM|11/22/2014 03:04:43 AM|Williamsburg       |2              |0.0       |10.0        |1.89         |2           |9.0        |
|09/16/2014 10:44:40 PM|09/16/2014 10:50:26 PM|Harlem             |1              |1.4       |8.9         |1.23         |1           |6.5        |
|11/18/2014 11:03:59 PM|11/18/2014 11:15:15 PM|Williamsburg       |1              |2.4       |14.9        |2.77       

We can do the same process for the 2014 yellow taxi trips.

In [6]:
%%time
yellow_2014 = sqlContext.read.csv("hdfs://namenode:8020/am2786/***/Taxi_2014_yellow.csv", header=True,inferSchema=True)
yellow_2014.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Unnamed: 0: integer (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_neighborhood: string (nullable = true)
 |-- dropoff_neighborhood: string (nullable = tru

In [7]:
yellow_2014 = yellow_2014.select('pickup_datetime', 'dropoff_datetime', 
                      'pickup_neighborhood', 'Passenger_count', 'Tip_amount',
                     'Total_amount', 'Trip_distance', 'Payment_type',  'Fare_amount')

yellow_2014.show(5, False)

+-------------------+-------------------+-------------------+---------------+----------+------------+-------------+------------+-----------+
|pickup_datetime    |dropoff_datetime   |pickup_neighborhood|Passenger_count|Tip_amount|Total_amount|Trip_distance|Payment_type|Fare_amount|
+-------------------+-------------------+-------------------+---------------+----------+------------+-------------+------------+-----------+
|2014-01-12 17:48:53|2014-01-12 17:58:51|Upper East Side    |2              |1.0       |10.5        |1.7          |CRD         |9.0        |
|2014-01-11 18:56:48|2014-01-11 19:03:22|Carroll Gardens    |1              |1.3       |7.8         |0.8          |CRD         |6.0        |
|2014-01-11 10:00:47|2014-01-11 10:10:35|Financial District |1              |2.0       |10.5        |1.5          |CRD         |8.0        |
|2014-01-10 12:55:42|2014-01-10 13:03:21|Upper East Side    |1              |1.0       |8.5         |1.1          |CRD         |7.0        |
|2014-01-11 1

It's that easy! There are a couple striking differences between the 2014 green and yellow data sets that we should take note of right off the bat:

1. There are *many* more yellow taxi trips than green taxi trips
2. The way that pickup and dropoff datetimes are setup in the green taxi dataset is different from the yellow taxi dataset. PySpark was able to infer that the datetimes from the yellow taxi dataset were indeed datetimes however, it left the green taxi datetimes as strings. We will fix this in the next section.
3. The column `Payment_type` is different for yellow and green taxis. For green taxis it is a number (1 = credit, 2 = cash, etc.) while for yellow taxis it's an abbreviated string ('CRD' = credit card, etc.)

### Assigning columns to proper data types

Since the data has been loaded, the next step is to convert the data type of columns in teh database to the right data types. When we ask PySpark to infer the schema of the columns within a dataframe, we're essentially asking it "What datatype does the format of this column resemble?" If does not resemble anythiing then it will be left as a string. This is what is occuring with some of the columns within the dataframes. The most troublesome one is the `pickup_datetime` feature in the green taxis dataframe. The format of these times is a such: `2014-01-23 06:39:00 PM`. PySpark only recognizes what we in the states call "military time" (24-hour clock). We'll have to fix this at a later time, but for now what we'll do is create a duplicate column of `pickup_datetime` called `pickup_datetime_str`. 

In [8]:
green_2014 = green_2014.withColumn("pickup_datetime_str", col("pickup_datetime"))

A fucntion can be written to convert the columns to the correct data types.

In [9]:
# Write a custom function to convert the data type of DataFrame columns


def convertColumn(df, names, newType):
    """
    df     : DataFrame with the columns of data
    names  : List containing the names of all the columns that will be converted
    newType: New data type
    """
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

In [10]:
%%time
# List with the names of all the features to be converted
columns = ['Tip_amount', 'Total_amount', 'Trip_distance', 'Fare_amount']

# Convert all features in `columns` to `FloatType()`
green_2014 = convertColumn(green_2014, columns, FloatType())
yellow_2014 = convertColumn(yellow_2014, columns, FloatType())

CPU times: user 8.33 ms, sys: 2.45 ms, total: 10.8 ms
Wall time: 183 ms


If we go back to the 3rd main difference between the yellow and green taxi datasets, we'l' see that the feature `Payment_type` *should* be a string for yellow taxis, but an integer for green taxis. Thus the lists of column names to be converted to integer data types will not be the same for both green and yellow taxis in the cell below. 

In [11]:

# Convert all features listed to `IntegerType()`
green_2014 = convertColumn(green_2014, ['Passenger_count', 'Payment_type'], IntegerType())
yellow_2014 = convertColumn(yellow_2014, ['Passenger_count'], IntegerType())

To convert our dates, PySpark does have a `DateType()` data type similar to IntegerType() and FloatType() however, using this method to convert datetime objects in PySpark can be unreliable and often results in a columns full null values. The most dependable method is re-create the columns with a specified data type and format, as in the cell below. The built in function `to_timestamp` converts strings to DateType() with the given specific format.

In [12]:
# Convert the df columns to datetime()
from pyspark.sql.functions import to_timestamp
green_2014 = green_2014.withColumn("pickup_datetime", to_timestamp(green_2014.pickup_datetime, 'MM/dd/yyyy HH:mm:ss'))
green_2014 = green_2014.withColumn("dropoff_datetime", to_timestamp(green_2014.dropoff_datetime, 'MM/dd/yyyy HH:mm:ss'))
yellow_2014 = yellow_2014.withColumn("pickup_datetime", to_timestamp(yellow_2014.pickup_datetime, 'MM/dd/yyyy HH:mm:ss'))
yellow_2014 = yellow_2014.withColumn("dropoff_datetime", to_timestamp(yellow_2014.dropoff_datetime, 'MM/dd/yyyy HH:mm:ss'))


Awesome we now have all of the taxi trips in New York City for the year 2014, with the data in its proper format! The next step is to clean the datasets. 

#### Outlier and Incorrect Data Entry Detection

With any *real* dataset you can bet your last dollar that there will be some data that has been input incorrectly. In the case of our taxi data, this can be:

- Recording the incorrect number of passengers
- Error with the taxi meter that puts an impossibly *low* or *high* fare

Our safest course of action is to get rid of these incorrect taxi trips. In the case of the first issue pertaining to the number of passengers, we can get rid of any trips that have 0 passengers recorded, as this is impossible. For the fare amount we can set some minimum threshold for the fare amount where any trip with a fare less than that is thrown away. Doing a quick Google search reveals that taxis in New York have a $2.50 minimum, meaning that as soon as you step in that is what you are charged. We can use this as our minimum fare threshold. 

In [13]:
def taxi_data_cleaner(data):
    
    # Getting rid of rides with 0 passengers
    data = data.filter(data.Passenger_count > 0)
    
    # Getting rid of rides with impossibly low fares
    data = data.filter(data.Fare_amount > 1.)
    
    #Making a feature which measures the % a driver was tipped 
    data = data.withColumn("Tip_percentage", 100.*(col("Tip_amount")/(col("Total_amount") - col("Tip_amount"))))
    
    # Adding columns that give the month , day of the week and hour of pickups and dropoffs
    data = data.withColumn("pickup_dayofweek", date_format('pickup_datetime', 'u'))
    data = data.withColumn("dropoff_dayofweek", date_format('dropoff_datetime', 'u'))
    data = data.withColumn("pickup_month", month('pickup_datetime'))
    data = data.withColumn("dropoff_month", month('dropoff_datetime'))
    data = data.withColumn("pickup_hour", hour('pickup_datetime'))
    data = data.withColumn("dropoff_hour", hour('dropoff_datetime'))
    return(data)

In [14]:
%%time
green_2014_cleaned   = taxi_data_cleaner(green_2014)
yellow_2014_cleaned  = taxi_data_cleaner(yellow_2014)

CPU times: user 20.6 ms, sys: 4.27 ms, total: 24.9 ms
Wall time: 382 ms


# Answering questions

Now that we've loaded our data and cleaned we can finally get to the fun part of exploring the dataset. EDA can take a while so a good   to follow is to make a list of questions you want to ivestigate *before* exploring the data. The questions should be directly related to your overachring goals. In our case the over-arching goals are:
- Calculate future growth of company 
- 
-

The preliminary questions to answer these goals are:

1. Which neighborhoods are green/yellow taxis used the most often in?
2. Do yellow or green taxi tend to have longer distance trips?

3. Which neighborhoods in NYC tip the highest percentage on average?
4. Do riders tip more on holidays?

5. How does the number of trips change over the weekdays/weekends for green and yellow taxis?

7. How has the number of taxi trips per day changed over time? Will this number increase or decrease in the future?


We will answer these questions one by one, with the assumption that a story should hopefully unfold from exploring the data. The first step before answering any questions though is to import matplotlib and seaborn for visuals.

In [21]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
% matplotlib inline
import matplotlib.dates as dates

Great! Now we can start counting the number of pickups for each neighborhood. 

In [15]:
green_2014_neighborhoods = green_2014_cleaned.groupby('pickup_neighborhood').count().toPandas()

In [16]:
yellow_2014_neighborhoods = yellow_2014_cleaned.groupby('pickup_neighborhood').count().toPandas()

Taking a look at the top 11 neighborhoods with the most green taxi pickups:

In [17]:
green_2014_neighborhoods.sort_values(by = 'count', ascending = False).head(11)

Unnamed: 0,pickup_neighborhood,count
165,Harlem,348987
79,East Harlem,298041
96,Williamsburg,288515
78,Astoria,169066
240,Elmhurst,125150
237,Morningside Heights,122197
27,Washington Heights,112068
111,Long Island City,103500
14,Jackson Heights,89549
133,Bedford-Stuyvesant,76149


Doing the same for yellow taxis:

In [18]:
yellow_2014_neighborhoods.sort_values(by = 'count', ascending = False).head(11)

Unnamed: 0,pickup_neighborhood,count
16,Midtown,5572662
52,Upper East Side,4319755
215,Chelsea,2918252
126,Upper West Side,2735587
19,Hell's Kitchen,1701725
91,East Village,1297380
128,West Village,1191889
168,Theater District,1158069
204,Murray Hill,841875
193,SoHo,713881


the Python package Folium must be imported to produce the chlorpeth plot. In addition the file containging the shapefiles for all of the neighborhoods must be loaded.

In [19]:
import folium
geo_path = r'NY_neighborhoods.geojson'    # Name of the json file with the neighborhood inofrmation 

The first chloropeth plot will be for green taxi pickups

In [22]:

osm = folium.Map([40.7, -73.8139], zoom_start=10) # producing initial map




#Mapping pickup count values from `green_2014_neighborhoods` to shapefiles from json file

osm.choropleth(geo_data=geo_path, data=green_2014_neighborhoods, 
               fill_color='OrRd', fill_opacity=0.9, line_opacity=0.4, 
               key_on='feature.properties.neighborhood',  
               columns=['pickup_neighborhood','count'], 
              threshold_scale = [10,1000, 10000, 100000, 500000], 
              legend_name='Number of pickups', reset=True)






osm.save('2014_pickup_neighborhood_chloropeth_green.html') #saving chloropeth to html
osm

We can now do the same for yellow taxis:

In [23]:

osm = folium.Map([40.7, -73.8139], zoom_start=10) # producing initial map




#Mapping pickup count values from `yellow_2014_neighborhoods` to shapefiles from json file
osm.choropleth(geo_data=geo_path, data=yellow_2014_neighborhoods, 
               fill_color='OrRd', fill_opacity=0.9, line_opacity=0.4, 
               key_on='feature.properties.neighborhood',  
               columns=['pickup_neighborhood','count'], 
              threshold_scale = [10,1000, 10000, 100000, 500000], 
              legend_name='Number of pickups', reset=True)





osm.save('2014_pickup_neighborhood_chloropeth_yellow.html')  #saving chloropeth to html
osm