In [2]:
from sklearn import datasets
from sklearn.model_selection import cross_val_predict
from sklearn import linear_model
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
import pandasql as pdsql
import pandas as pd
pysql = lambda q: pdsql.sqldf(q, globals())
import numpy as np

from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()

from pyspark.sql import SQLContext

In [3]:
# load data
df=spark.read.csv('yellow_tripdata_2017-06.csv',header='true', inferSchema='true')
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [4]:
# create new features: weekdays (0=Monday, 1=Tuesday,.., 6=Sunday), pickup_time (0-24) and trip_hours
df.createOrReplaceTempView("df")
df2=spark.sql("select *, case when diff_days=0 then trip_hrs else 24+trip_hrs end as trip_hr \
from (select VendorID,tpep_pickup_datetime, tpep_dropoff_datetime, \
datediff(tpep_dropoff_datetime,tpep_pickup_datetime) as diff_days, weekday(tpep_pickup_datetime) as weekday, \
Hour(tpep_pickup_datetime) as pickup_time,\
((Hour(tpep_dropoff_datetime)*3600+Minute(tpep_dropoff_datetime)*60+Second(tpep_dropoff_datetime)) - \
(Hour(tpep_pickup_datetime)*3600+Minute(tpep_pickup_datetime)*60+Second(tpep_pickup_datetime)))/3600 as trip_hrs, \
passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,\
mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount \
from df) as a where diff_days in (0,1)  ")
df2.show(4) 

+--------+--------------------+---------------------+---------+-------+-----------+------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|diff_days|weekday|pickup_time|          trip_hrs|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|           trip_hr|
+--------+--------------------+---------------------+---------+-------+-----------+------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+------------------+
|       2| 2017-06-08 07:52:31|  2017-06-08 08:01:32|        0|      3|          7

In [9]:
# create trip_hours as bins by every 0.5 hours
df2.createOrReplaceTempView("df2")
df3=spark.sql("select VendorID, weekday,pickup_time,  \
case when trip_hr>=0 and trip_hr<=0.5 then 0.5 \
     when trip_hr>0.5 and trip_hr<=1.0 then 1.0 \
     when trip_hr>1.0 and trip_hr<=1.5 then 1.5 \
     when trip_hr>1.5 and trip_hr<=2.0 then 2.0  \
     when trip_hr>2.0 and trip_hr<=2.5 then 2.5 \
     when trip_hr>2.5 and trip_hr<=3.0 then 3.0 \
     when trip_hr>3.0 and trip_hr<=3.5 then 3.5  \
     when trip_hr>3.5 and trip_hr<=4.0 then 4.0 \
     when trip_hr>4.0 and trip_hr<=4.5 then 4.5 \
     when trip_hr>4.5 and trip_hr<=5.0 then 5.0  \
     when trip_hr>5.0 and trip_hr<=5.5 then 5.5 \
     when trip_hr>5.5 and trip_hr<=6.0 then 6.0 \
     when trip_hr>6.0 and trip_hr<=6.5 then 6.5  \
     when trip_hr>6.5 and trip_hr<=7.0 then 7.0 \
     when trip_hr>7.0 and trip_hr<=7.5 then 7.5 \
     when trip_hr>7.5 and trip_hr<=8.0 then 8.0  \
     when trip_hr>8.0 and trip_hr<=8.5 then 8.5 \
     when trip_hr>8.5 and trip_hr<=9.0 then 9.0 \
     when trip_hr>9.0 and trip_hr<=9.5 then 9.5  \
     when trip_hr>9.5 and trip_hr<=10.0 then 10.0 \
     when trip_hr>10.0 and trip_hr<=10.5 then 10.5 \
     when trip_hr>10.5 and trip_hr<=11.0 then 11.0 \
     when trip_hr>11.0 and trip_hr<=11.5 then 11.5 \
     when trip_hr>11.5 and trip_hr<=12.0 then 12.0\
     when trip_hr>12.0 and trip_hr<=12.5 then 12.5 \
     when trip_hr>12.5 and trip_hr<=13.0 then 13.0 \
     when trip_hr>13.0 and trip_hr<=13.5 then 13.5 \
     when trip_hr>13.5 and trip_hr<=14.0 then 14.0  \
     when trip_hr>14.0 and trip_hr<=14.5 then 14.5 \
     when trip_hr>14.5 and trip_hr<=15.0 then 15.0 \
     when trip_hr>15.0 and trip_hr<=15.5 then 15.5  \
     when trip_hr>15.5 and trip_hr<=16.0 then 16.0 \
     when trip_hr>16.0 and trip_hr<=16.5 then 16.5 \
     when trip_hr>16.5 and trip_hr<=17.0 then 17.0  \
     when trip_hr>17.0 and trip_hr<=17.5 then 17.5 \
     when trip_hr>17.5 and trip_hr<=18.0 then 18.0 \
     when trip_hr>18.0 and trip_hr<=18.5 then 18.5  \
     when trip_hr>18.5 and trip_hr<=19.0 then 19.0 \
     when trip_hr>19.0 and trip_hr<=19.5 then 19.5 \
     when trip_hr>19.5 and trip_hr<=20.0 then 20.0  \
     when trip_hr>20.0 and trip_hr<=20.5 then 20.5 \
     when trip_hr>20.5 and trip_hr<=21.0 then 21.0 \
     when trip_hr>21.0 and trip_hr<=21.5 then 21.5  \
     when trip_hr>21.5 and trip_hr<=22.0 then 22.0 \
     when trip_hr>22.0 and trip_hr<=22.5 then 22.5 \
     when trip_hr>22.5 and trip_hr<=23.0 then 23.0 \
     when trip_hr>23.0 and trip_hr<=23.5 then 23.5 \
     when trip_hr>23.5 and trip_hr<=24.0 then 24.0\
      end as trip_hours, \
trip_distance,RatecodeID,store_and_fwd_flag,payment_type,total_amount, \
passenger_count, PULocationID,DOLocationID \
from df2")
df3.show(4)   
#passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,\
#mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount

+--------+-------+-----------+----------+-------------+----------+------------------+------------+------------+---------------+------------+------------+
|VendorID|weekday|pickup_time|trip_hours|trip_distance|RatecodeID|store_and_fwd_flag|payment_type|total_amount|passenger_count|PULocationID|DOLocationID|
+--------+-------+-----------+----------+-------------+----------+------------------+------------+------------+---------------+------------+------------+
|       2|      3|          7|       0.5|         1.03|         1|                 N|           1|       11.16|              6|         161|         140|
|       2|      3|          8|       0.5|         1.03|         1|                 N|           1|       10.14|              6|         162|         233|
|       2|      3|          8|       7.5|         5.63|         1|                 N|           2|        23.3|              6|         137|          41|
|       2|      3|         15|       0.5|         1.43|         1|          

In [11]:
# Random to pick 100000 samples
df3.createOrReplaceTempView("df3")
df_pd=spark.sql("select * from df3 ORDER BY RAND() limit 100000").toPandas() 
df_pd.info()
df_pd.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 12 columns):
VendorID              100000 non-null int32
weekday               100000 non-null int32
pickup_time           100000 non-null int32
trip_hours            100000 non-null object
trip_distance         100000 non-null float64
RatecodeID            100000 non-null int32
store_and_fwd_flag    100000 non-null object
payment_type          100000 non-null int32
total_amount          100000 non-null float64
passenger_count       100000 non-null int32
PULocationID          100000 non-null int32
DOLocationID          100000 non-null int32
dtypes: float64(2), int32(8), object(2)
memory usage: 6.1+ MB


Unnamed: 0,VendorID,weekday,pickup_time,trip_hours,trip_distance,RatecodeID,store_and_fwd_flag,payment_type,total_amount,passenger_count,PULocationID,DOLocationID
0,2,4,10,0.5,1.28,1,N,2,7.8,1,244,116
1,1,1,16,0.5,1.2,1,N,2,10.8,2,236,140
2,1,6,2,0.5,2.1,1,N,1,12.95,1,148,137
3,1,1,17,0.5,1.7,1,N,1,11.75,1,48,239
4,1,0,5,0.5,1.6,1,N,1,9.55,1,236,237


In [12]:
# Build model and then summary
from statsmodels.formula.api import ols
#df_pd2=df_pd.drop(['VendorID'],axis=1)
ls=ols("total_amount ~ VendorID+weekday+pickup_time+trip_hours+trip_distance+RatecodeID+store_and_fwd_flag+payment_type \
+passenger_count+PULocationID+DOLocationID",df_pd).fit()
ls.summary()

0,1,2,3
Dep. Variable:,total_amount,R-squared:,0.834
Model:,OLS,Adj. R-squared:,0.834
Method:,Least Squares,F-statistic:,15650.0
Date:,"Sun, 14 Jul 2019",Prob (F-statistic):,0.0
Time:,04:04:23,Log-Likelihood:,-319630.0
No. Observations:,100000,AIC:,639300.0
Df Residuals:,99967,BIC:,639600.0
Df Model:,32,,
Covariance Type:,nonrobust,,

0,1,2,3,4,5,6
,coef,std err,t,P>|t|,[0.025,0.975]
Intercept,8.9006,0.118,75.282,0.000,8.669,9.132
trip_hours[T.Decimal('1.0')],7.0828,0.088,80.256,0.000,6.910,7.256
trip_hours[T.Decimal('1.5')],8.6276,0.200,43.245,0.000,8.237,9.019
trip_hours[T.Decimal('2.0')],13.9238,0.539,25.822,0.000,12.867,14.981
trip_hours[T.Decimal('2.5')],33.3599,2.100,15.887,0.000,29.244,37.476
trip_hours[T.Decimal('3.0')],-47.3873,5.920,-8.005,0.000,-58.990,-35.785
trip_hours[T.Decimal('3.5')],56.6956,5.916,9.584,0.000,45.101,68.290
trip_hours[T.Decimal('4.5')],5.4389,5.916,0.919,0.358,-6.156,17.034
trip_hours[T.Decimal('5.0')],-1.9385,5.916,-0.328,0.743,-13.533,9.656

0,1,2,3
Omnibus:,190515.261,Durbin-Watson:,1.992
Prob(Omnibus):,0.0,Jarque-Bera (JB):,1691479422.851
Skew:,14.146,Prob(JB):,0.0
Kurtosis:,639.518,Cond. No.,76100.0


In [None]:
'''
A. Exercise
1. Imagine that you decide to drive a taxi for 10 hours each week to earn a little extra money.  Show how you would approach 
maximizing your income as a taxi driver.  

Answer: From the above summary, the Adj. R-squared reaches 0.834 and a very high F-statistic indicates the model is good.
coef positive means feature value higher then total income higher and p value <=0.05 means 
statistical significance. Features store_and_fwd_flag and VendorID are not significant (p >0.05). The feature weekday coef 
negative means more money you can make in Monday,Tuesday,and Wed would be better than Thur, Friday and weekends.The 
pickup_time coef positive means evening better than afternoon and morning, but not obvious since the coef quite samll. As a 
taxi driver, you are not able to pick trip_distance, RatecodeID, PULocationID, DOLocationID, payment_type, and trip_hours 
although they are significant. Howver you can drive a taxi in Monday, Tuesday and Wed, and also a later time such as 
evening and afternoon.

2. If you could enrich the dataset, what would you add?  Is there anything in the dataset that you don’t find especially 
useful? 

Answer: I would like to add pickup location because the guests from different location may give different tips. From the 
summary, we find passenger_count seems not especially useful because it is not statistical significant and also total 
income may already include that informtion.

B. Response
Some of the things we like to see in responses are...
An explanation of why you selected or constructed the variable you used to represent income.
Thoughtful exploration of model variables prior to modeling.
Conclusions summarized so that a wide variety of people can understand them.

Answer: Generated three important features weekday (from Monday to Sunday), pickup_time (from early morning to the midnight),
and also trip_hours which greatly improve the model performance, and the total_income as a target is very correlated with
these three features and also other features listed above summary output. In conclusion, we build a a quite good linear 
regression model with randomly extracted samples from NY public data (yellow_tripdata_2017-06.csv).

'''