# Final Project
### Team: Data Miner(Weirong He, Michael Gainey, Tri Pham)
### Topic: Taxi Data Analysis

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import numpy as np
import pyarrow.parquet as pq

from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

In [2]:
sc = SparkContext()
session = SparkSession(sc)

21/11/17 10:52:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Data Process
Data set could be download at https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
In this project, we are focusing on year 2020, so we have 12 month taxi data in 2020. However, this dataset is too large for this project, so we decide to only take data at the first week of each month which will lead to result of 5M records.

In [3]:
taxi_record_1 = session.read.csv("data/records/yellow_tripdata_2020-01.csv", header=True)
taxi_record_2 = session.read.csv("data/records/yellow_tripdata_2020-02.csv", header=True)
taxi_record_3 = session.read.csv("data/records/yellow_tripdata_2020-03.csv", header=True)
taxi_record_4 = session.read.csv("data/records/yellow_tripdata_2020-04.csv", header=True)
taxi_record_5 = session.read.csv("data/records/yellow_tripdata_2020-05.csv", header=True)
taxi_record_6 = session.read.csv("data/records/yellow_tripdata_2020-06.csv", header=True)
taxi_record_7 = session.read.csv("data/records/yellow_tripdata_2020-07.csv", header=True)
taxi_record_8 = session.read.csv("data/records/yellow_tripdata_2020-08.csv", header=True)
taxi_record_9 = session.read.csv("data/records/yellow_tripdata_2020-09.csv", header=True)
taxi_record_10 = session.read.csv("data/records/yellow_tripdata_2020-10.csv", header=True)
taxi_record_11 = session.read.csv("data/records/yellow_tripdata_2020-11.csv", header=True)
taxi_record_12 = session.read.csv("data/records/yellow_tripdata_2020-12.csv", header=True)
location_df = session.read.csv("data/taxi+_zone_lookup.csv", header=True)

In [4]:
dates = ("2020-01-01",  "2020-01-07")
taxi_record_1 = taxi_record_1.where(F.col('tpep_pickup_datetime').between(*dates))
dates = ("2020-02-01",  "2020-02-07")
taxi_record_2 = taxi_record_2.where(F.col('tpep_pickup_datetime').between(*dates))
dates = ("2020-03-01",  "2020-03-07")
taxi_record_3 = taxi_record_3.where(F.col('tpep_pickup_datetime').between(*dates))
dates = ("2020-04-01",  "2020-04-07")
taxi_record_4 = taxi_record_4.where(F.col('tpep_pickup_datetime').between(*dates))
dates = ("2020-05-01",  "2020-05-07")
taxi_record_5 = taxi_record_5.where(F.col('tpep_pickup_datetime').between(*dates))
dates = ("2020-06-01",  "2020-06-07")
taxi_record_6 = taxi_record_6.where(F.col('tpep_pickup_datetime').between(*dates))
dates = ("2020-07-01",  "2020-07-07")
taxi_record_7 = taxi_record_7.where(F.col('tpep_pickup_datetime').between(*dates))
dates = ("2020-08-01",  "2020-08-07")
taxi_record_8 = taxi_record_8.where(F.col('tpep_pickup_datetime').between(*dates))
dates = ("2020-09-01",  "2020-09-07")
taxi_record_9 = taxi_record_9.where(F.col('tpep_pickup_datetime').between(*dates))
dates = ("2020-10-01",  "2020-10-07")
taxi_record_10 = taxi_record_10.where(F.col('tpep_pickup_datetime').between(*dates))
dates = ("2020-11-01",  "2020-11-07")
taxi_record_11 = taxi_record_11.where(F.col('tpep_pickup_datetime').between(*dates))
dates = ("2020-12-01",  "2020-12-07")
taxi_record_12 = taxi_record_12.where(F.col('tpep_pickup_datetime').between(*dates))

In [5]:
taxi_record = (taxi_record_1.union(taxi_record_2).union(taxi_record_3)
               .union(taxi_record_4).union(taxi_record_5).union(taxi_record_6)
               .union(taxi_record_7).union(taxi_record_8).union(taxi_record_9)
               .union(taxi_record_10).union(taxi_record_11).union(taxi_record_12)
               .drop("VendorID", "RatecodeID", "payment_type", "store_and_fwd_flag",
                     "improvement_surcharge", "mta_tax", "tolls_amount", "congestion_surcharge")
              )

In [6]:
taxi_record.head()

Row(tpep_pickup_datetime='2020-01-01 00:28:15', tpep_dropoff_datetime='2020-01-01 00:33:03', passenger_count='1', trip_distance='1.20', PULocationID='238', DOLocationID='239', fare_amount='6', extra='3', tip_amount='1.47', total_amount='11.27')

### Machine Learning
We can use Machine Learning to help taxi driver to predict the fare amout of next trip. In real life, taxi passengers alway like to ask the price right after they stating the destination location. Many drivers with more experience can quickly response a rough price. And this model can look through 5M trips to become a experienced taxi driver and make a prediction of the fare amount based on the pickup location and drop off location.

In [7]:
# data process in spark
loc_price = taxi_record.select(F.col("PULocationID").alias("PU"),
                   F.col("DOLocationID").alias("DO"),
                   F.col("fare_amount").alias("Price"))
# filter null value record and unknown record(locationID: 264 and 265 are unknown)
loc_price = loc_price.filter(loc_price.PU.isNotNull() & loc_price.PU.between(1, 263)
                             & loc_price.DO.isNotNull() & loc_price.DO.between(1, 263)
                             & loc_price.Price.isNotNull())

In [8]:
%%time
# convert spark df to pandas df
loc_price.write.mode("overwrite").parquet("loc_price.parquet")
pd_df = pq.read_pandas('loc_price.parquet').to_pandas()

                                                                                

CPU times: user 1.18 s, sys: 345 ms, total: 1.52 s
Wall time: 43.5 s


In [13]:
# data process in pandas df
pickup = np.array(pd_df.PU).astype(float)
dropoff = np.array(pd_df.DO).astype(float)

location = np.vstack((pickup, dropoff)).T
price = np.array(pd_df.Price).astype(float)
# data split
X_train, X_test, y_train, y_test = train_test_split(location, price, test_size=0.2)

In [10]:
%%time
# fit model
regr = RandomForestRegressor(max_depth=3, random_state=0)
regr.fit(X_train, y_train)
y_pred = regr.predict(X_test)

CPU times: user 2min 42s, sys: 3.25 s, total: 2min 45s
Wall time: 2min 45s


In [11]:
def cal_accuracy(pred, actual):
    count = 0;
    for i in range(len(actual)):
        if (abs(pred[i] - actual[i]) <= 5):
            count = count + 1
    accuracy = count/len(actual)
    return accuracy
print("price accuracy(accept +- 5 error) : %.2f" %cal_accuracy(y_pred, y_test))
print("Coefficient of determination: %.2f" % r2_score(y_test, y_pred))
print("Mean squared error: %.2f" % mean_squared_error(y_test, y_pred))

price accuracy(accept +- 5 error) : 0.56
Coefficient of determination: 0.23
Mean squared error: 93.08
