## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [2]:
# File location and type
file_location = "/FileStore/tables/train.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
id2875421,2,2016-03-14 17:24:55,2016-03-14 17:32:30,1,-73.98215484619139,40.76793670654297,-73.96463012695312,40.765602111816406,N,455
id2377394,1,2016-06-12 00:43:35,2016-06-12 00:54:38,1,-73.98041534423827,40.738563537597656,-73.99948120117188,40.731151580810554,N,663
id3858529,2,2016-01-19 11:35:24,2016-01-19 12:10:48,1,-73.97902679443358,40.763938903808594,-74.00533294677734,40.710086822509766,N,2124
id3504673,2,2016-04-06 19:32:31,2016-04-06 19:39:40,1,-74.01004028320312,40.719970703125,-74.01226806640625,40.70671844482422,N,429
id2181028,2,2016-03-26 13:30:55,2016-03-26 13:38:10,1,-73.97305297851561,40.79320907592773,-73.9729232788086,40.782520294189446,N,435
id0801584,2,2016-01-30 22:01:40,2016-01-30 22:09:03,6,-73.98285675048828,40.74219512939453,-73.99208068847656,40.74918365478516,N,443
id1813257,1,2016-06-17 22:34:59,2016-06-17 22:40:40,4,-73.9690170288086,40.75783920288086,-73.95740509033203,40.76589584350586,N,341
id1324603,2,2016-05-21 07:54:58,2016-05-21 08:20:49,1,-73.96927642822266,40.79777908325195,-73.92247009277344,40.76055908203125,N,1551
id1301050,1,2016-05-27 23:12:23,2016-05-27 23:16:38,1,-73.99948120117188,40.738399505615234,-73.98578643798828,40.73281478881836,N,255
id0012891,2,2016-03-10 21:45:01,2016-03-10 22:05:26,1,-73.98104858398438,40.744338989257805,-73.9729995727539,40.78998947143555,N,1225


In [3]:
# Create a view or table

temp_table_name = "train_csv"

df.createOrReplaceTempView(temp_table_name)

In [4]:
%sql

/* Query the created temp table in a SQL cell */

select * from `train_csv`

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
id2875421,2,2016-03-14 17:24:55,2016-03-14 17:32:30,1,-73.98215484619139,40.76793670654297,-73.96463012695312,40.765602111816406,N,455
id2377394,1,2016-06-12 00:43:35,2016-06-12 00:54:38,1,-73.98041534423827,40.738563537597656,-73.99948120117188,40.731151580810554,N,663
id3858529,2,2016-01-19 11:35:24,2016-01-19 12:10:48,1,-73.97902679443358,40.763938903808594,-74.00533294677734,40.710086822509766,N,2124
id3504673,2,2016-04-06 19:32:31,2016-04-06 19:39:40,1,-74.01004028320312,40.719970703125,-74.01226806640625,40.70671844482422,N,429
id2181028,2,2016-03-26 13:30:55,2016-03-26 13:38:10,1,-73.97305297851561,40.79320907592773,-73.9729232788086,40.782520294189446,N,435
id0801584,2,2016-01-30 22:01:40,2016-01-30 22:09:03,6,-73.98285675048828,40.74219512939453,-73.99208068847656,40.74918365478516,N,443
id1813257,1,2016-06-17 22:34:59,2016-06-17 22:40:40,4,-73.9690170288086,40.75783920288086,-73.95740509033203,40.76589584350586,N,341
id1324603,2,2016-05-21 07:54:58,2016-05-21 08:20:49,1,-73.96927642822266,40.79777908325195,-73.92247009277344,40.76055908203125,N,1551
id1301050,1,2016-05-27 23:12:23,2016-05-27 23:16:38,1,-73.99948120117188,40.738399505615234,-73.98578643798828,40.73281478881836,N,255
id0012891,2,2016-03-10 21:45:01,2016-03-10 22:05:26,1,-73.98104858398438,40.744338989257805,-73.9729995727539,40.78998947143555,N,1225


In [5]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

#permanent_table_name = "yellow_tripdata_2017_01_8bceb_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [6]:
#df.select('id').distinct().show()

Count of number of unique entry

In [8]:
from pyspark.sql.functions import col, countDistinct

df.agg(countDistinct(col("id")).alias("count")).show()

Count number of null or nan value in each column

In [10]:
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

In [11]:
df.printSchema()

In [12]:
df = df.withColumn("pickup_datetime", df["pickup_datetime"].cast("timestamp"))
df = df.withColumn("dropoff_datetime", df["dropoff_datetime"].cast("timestamp"))
df = df.withColumn("passenger_count", df["passenger_count"].cast("int"))
df = df.withColumn("trip_duration", df["trip_duration"].cast("float"))
df = df.withColumn("pickup_longitude", df["pickup_longitude"].cast("float"))
df = df.withColumn("pickup_latitude", df["pickup_latitude"].cast("float"))
df = df.withColumn("dropoff_longitude", df["dropoff_longitude"].cast("float"))
df = df.withColumn("dropoff_latitude", df["dropoff_latitude"].cast("float"))

In [13]:
df.printSchema()

In [14]:
display(df)

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
id2875421,2,2016-03-14T17:24:55.000+0000,2016-03-14T17:32:30.000+0000,1,-73.982155,40.767937,-73.96463,40.765602,N,455.0
id2377394,1,2016-06-12T00:43:35.000+0000,2016-06-12T00:54:38.000+0000,1,-73.980415,40.738564,-73.99948,40.73115,N,663.0
id3858529,2,2016-01-19T11:35:24.000+0000,2016-01-19T12:10:48.000+0000,1,-73.97903,40.76394,-74.00533,40.710087,N,2124.0
id3504673,2,2016-04-06T19:32:31.000+0000,2016-04-06T19:39:40.000+0000,1,-74.01004,40.71997,-74.01227,40.70672,N,429.0
id2181028,2,2016-03-26T13:30:55.000+0000,2016-03-26T13:38:10.000+0000,1,-73.97305,40.79321,-73.97292,40.78252,N,435.0
id0801584,2,2016-01-30T22:01:40.000+0000,2016-01-30T22:09:03.000+0000,6,-73.98286,40.742195,-73.99208,40.749184,N,443.0
id1813257,1,2016-06-17T22:34:59.000+0000,2016-06-17T22:40:40.000+0000,4,-73.96902,40.75784,-73.957405,40.765896,N,341.0
id1324603,2,2016-05-21T07:54:58.000+0000,2016-05-21T08:20:49.000+0000,1,-73.96928,40.79778,-73.92247,40.76056,N,1551.0
id1301050,1,2016-05-27T23:12:23.000+0000,2016-05-27T23:16:38.000+0000,1,-73.99948,40.7384,-73.98579,40.732815,N,255.0
id0012891,2,2016-03-10T21:45:01.000+0000,2016-03-10T22:05:26.000+0000,1,-73.98105,40.74434,-73.973,40.78999,N,1225.0


In [15]:
from pyspark.sql.functions import *
df = df.withColumn("weekday", date_format(df.pickup_datetime, 'EEEE'))
df = df.withColumn("weekday_num", dayofweek(df.pickup_datetime))
df = df.withColumn("month", month(df.pickup_datetime))
df = df.withColumn("pickup_hour", hour(df.pickup_datetime))

In [16]:
df.select('weekday').distinct().show()

Haversine formula

In [18]:
df = df.withColumn("a", pow(sin(toRadians(df.dropoff_latitude - df.pickup_latitude) / 2), 2) + cos(toRadians(df.pickup_latitude)) * cos(toRadians(df.dropoff_latitude)) * pow(sin(toRadians(df.dropoff_longitude - df.pickup_longitude) / 2), 2))#.withColumn("distance", atan2(sqrt(df.a), sqrt(-df.a + 1)) * 2 * 6371)

In [19]:
df = df.withColumn("distance", round(atan2(sqrt(df.a), sqrt(-df.a + 1)) * 2 * 6371,2))

In [20]:
#Calculate Speed in km/h for further insights
df = df.withColumn("speed", round(df.distance/(df.trip_duration/3600),2))

In [21]:
display(df)

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration,weekday,weekday_num,month,pickup_hour,a,distance,speed
id2875421,2,2016-03-14T17:24:55.000+0000,2016-03-14T17:32:30.000+0000,1,-73.982155,40.767937,-73.96463,40.765602,N,455.0,Monday,2,3,17,1.3830896636179652e-08,1.5,11.87
id2377394,1,2016-06-12T00:43:35.000+0000,2016-06-12T00:54:38.000+0000,1,-73.980415,40.738564,-73.99948,40.73115,N,663.0,Sunday,1,6,0,2.0078128522717423e-08,1.81,9.83
id3858529,2,2016-01-19T11:35:24.000+0000,2016-01-19T12:10:48.000+0000,1,-73.97903,40.76394,-74.00533,40.710087,N,2124.0,Tuesday,3,1,11,2.511076618143245e-07,6.39,10.83
id3504673,2,2016-04-06T19:32:31.000+0000,2016-04-06T19:39:40.000+0000,1,-74.01004,40.71997,-74.01227,40.70672,N,429.0,Wednesday,4,4,19,1.359155624250055e-08,1.49,12.5
id2181028,2,2016-03-26T13:30:55.000+0000,2016-03-26T13:38:10.000+0000,1,-73.97305,40.79321,-73.97292,40.78252,N,435.0,Saturday,7,3,13,8.701373594095704e-09,1.19,9.85
id0801584,2,2016-01-30T22:01:40.000+0000,2016-01-30T22:09:03.000+0000,6,-73.98286,40.742195,-73.99208,40.749184,N,443.0,Saturday,7,1,22,7.438317364467243e-09,1.1,8.94
id1813257,1,2016-06-17T22:34:59.000+0000,2016-06-17T22:40:40.000+0000,4,-73.96902,40.75784,-73.957405,40.765896,N,341.0,Friday,6,6,22,1.0834136581882085e-08,1.33,14.04
id1324603,2,2016-05-21T07:54:58.000+0000,2016-05-21T08:20:49.000+0000,1,-73.96928,40.79778,-73.92247,40.76056,N,1551.0,Saturday,7,5,7,2.0116586779003733e-07,5.71,13.25
id1301050,1,2016-05-27T23:12:23.000+0000,2016-05-27T23:16:38.000+0000,1,-73.99948,40.7384,-73.98579,40.732815,N,255.0,Friday,6,5,23,1.0575516792025634e-08,1.31,18.49
id0012891,2,2016-03-10T21:45:01.000+0000,2016-03-10T22:05:26.000+0000,1,-73.98105,40.74434,-73.973,40.78999,N,1225.0,Thursday,5,3,21,1.6153317501260625e-07,5.12,15.05


In [22]:
df.printSchema()

In [23]:
cols = ['passenger_count','month','weekday_num','pickup_hour']
for col_name in cols:
  df = df.withColumn(col_name, df[col_name].cast('string'))

df.printSchema()

In [24]:
df.createOrReplaceTempView(temp_table_name)

In [25]:
from pyspark.sql.functions import count
df.groupBy('passenger_count').count().show()

In [26]:
%sql
select distinct passenger_count, count(passenger_count) as Count from `train_csv` group by passenger_count order by passenger_count

passenger_count,Count
0,60
1,1033540
2,210318
3,59896
4,28404
5,78088
6,48333
7,3
8,1
9,1


In [27]:
df.describe('passenger_count').show()

Replacing 0 by 2 passengers as mean is 1.66

In [29]:
from pyspark.sql import functions as F
df = df.withColumn("passenger_count", F.when(F.col("passenger_count")== 0, 2).otherwise(F.col("passenger_count")))

In [30]:
from pyspark.sql.functions import count
df.groupBy('passenger_count').count().show()

Delete passanger 7,8 and 9

In [32]:
df = df.filter(col("passenger_count") != "7")

In [33]:
df = df.filter(col("passenger_count") != "8").filter(col("passenger_count") != "9")

In [34]:
df.groupBy('passenger_count').count().show()

In [35]:
df.createOrReplaceTempView(temp_table_name)

In [36]:
%sql
select distinct passenger_count, count(passenger_count) as Count from `train_csv` group by passenger_count order by passenger_count

passenger_count,Count
1,1033540
2,210378
3,59896
4,28404
5,78088
6,48333


In [37]:
%sql
select distinct vendor_id, count(vendor_id) as Count from `train_csv` group by vendor_id order by vendor_id

vendor_id,Count
1,678342
2,780297


In [38]:
df.describe('distance').show()

In [39]:
%sql
select distinct distance, count(distance) as Count from `train_csv` group by distance order by distance

distance,Count
0.0,7931
0.01,1256
0.02,729
0.03,632
0.04,530
0.05,495
0.06,385
0.07,413
0.08,421
0.09,353


In [40]:
df.describe('trip_duration').show()

In [41]:
df = df.withColumn("trip_duration", round(df.trip_duration/60,2))

In [42]:
df.describe('trip_duration').show()

In [43]:
df.createOrReplaceTempView(temp_table_name)

In [44]:
%sql
select distinct trip_duration, count(trip_duration) as Count from `train_csv` group by trip_duration order by trip_duration

trip_duration,Count
0.02,33
0.03,177
0.05,318
0.07,317
0.08,284
0.1,192
0.12,218
0.13,206
0.15,239
0.17,182


In [45]:
%sql
select * from `train_csv` where trip_duration >= 900

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration,weekday,weekday_num,month,pickup_hour,a,distance,speed
id3307903,2,2016-02-20T04:03:06.000+0000,2016-02-21T03:33:00.000+0000,3,-74.0081,40.74149,-74.00996,40.71461,N,1409.9,Saturday,7,2,4,5.5167732515428266e-08,2.99,0.13
id1091477,2,2016-05-07T18:36:22.000+0000,2016-05-08T18:32:11.000+0000,1,-73.99024,40.75092,-73.97628,40.75089,N,1435.82,Saturday,7,5,18,8.519393142812997e-09,1.18,0.05
id3431345,2,2016-06-07T12:58:48.000+0000,2016-06-08T12:58:00.000+0000,6,-73.954956,40.77765,-73.98103,40.743713,N,1439.2,Tuesday,3,6,12,1.174121079743924e-07,4.37,0.18
id1487069,2,2016-02-13T00:21:49.000+0000,2016-02-14T00:19:05.000+0000,1,-73.96859,40.799217,-73.97958,40.784714,N,1437.27,Saturday,7,2,0,2.1295006781177292e-08,1.86,0.08
id3674870,2,2016-03-18T11:54:20.000+0000,2016-03-19T11:34:17.000+0000,1,-73.98909,40.73699,-73.972336,40.75151,N,1419.95,Friday,6,3,11,2.8323140205250334e-08,2.14,0.09
id3632390,2,2016-06-08T08:54:33.000+0000,2016-06-09T07:58:09.000+0000,5,-73.98056,40.742466,-73.98472,40.74861,N,1383.6,Wednesday,4,6,8,3.631841993406624e-09,0.77,0.03
id3354426,2,2016-05-05T15:18:41.000+0000,2016-05-06T15:11:07.000+0000,1,-73.98904,40.773514,-73.98068,40.78152,N,1432.43,Thursday,5,5,15,7.93024990478118e-09,1.13,0.05
id0773526,2,2016-04-02T14:58:45.000+0000,2016-04-03T14:19:55.000+0000,6,-73.98799,40.76134,-74.00292,40.75628,N,1401.17,Saturday,7,4,14,1.1691946367015729e-08,1.38,0.06
id3617210,2,2016-03-15T17:51:32.000+0000,2016-03-16T17:18:04.000+0000,1,-73.96561,40.76578,-73.97265,40.753418,N,1406.53,Tuesday,3,3,17,1.380720493346012e-08,1.5,0.06
id0067152,2,2016-02-27T21:04:05.000+0000,2016-02-28T21:03:22.000+0000,5,-73.993744,40.727444,-74.001335,40.729244,N,1439.28,Saturday,7,2,21,2.767137309897393e-09,0.67,0.03


In [46]:
%sql
select COUNT(*) from `train_csv` where trip_duration >= 900

count(1)
1973


1973 trips are there where trip duration is more than equal to 15hrs.

In [48]:
%sql
select COUNT(*) from `train_csv` where trip_duration >= 60

count(1)
12334


In [49]:
df.describe('speed').show()

In [50]:
%sql
select speed from `train_csv`

speed
11.87
9.83
10.83
12.5
9.85
8.94
14.04
13.25
18.49
15.05


Most of time car speed was 9 to 18, as because it is in city area.
There are speed more than 60, may be it is because outside city and long tour, next investigate that

In [52]:
%sql
select * from `train_csv` where trip_duration >= 900 and speed >= 50

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration,weekday,weekday_num,month,pickup_hour,a,distance,speed


In [53]:
%sql
CREATE TABLE train_csv_delta
USING delta FROM `train_csv`

In [54]:
%sql
DROP table train_csv_delta

In [55]:
%sql
select MAX(speed) from `train_csv` where trip_duration >= 800 

max(speed)
2.55


We can easily verify the trips which takes more than 800 mins is false trip because the average speed is 2.55 km  So we can delete these trip

In [57]:
df = df.filter(col("trip_duration") < 800)

In [58]:
df.createOrReplaceTempView(temp_table_name)

In [59]:
%sql
select COUNT(speed) from `train_csv` where trip_duration >= 800 

count(speed)
0


In [60]:
%sql
select COUNT(speed) from `train_csv` where speed >= 120 

count(speed)
135


We also can delete superfast taxi, speed more than 100

In [62]:
df = df.filter(col("speed") <= 100)

In [63]:
df.createOrReplaceTempView(temp_table_name)

In [64]:
%sql
select COUNT(speed) from `train_csv` where speed > 100 

count(speed)
0


In [65]:
df.printSchema()

In [66]:
%sql
select distinct store_and_fwd_flag, count(store_and_fwd_flag) as Count from `train_csv` group by store_and_fwd_flag order by store_and_fwd_flag

store_and_fwd_flag,Count
N,1448441
Y,8044


In [67]:
%sql
select distinct pickup_hour, count(pickup_hour) as Count from `train_csv` group by pickup_hour order by Count

pickup_hour,Count
5,14977
4,15752
3,20842
2,27906
6,33213
1,38491
0,53160
7,55546
16,64214
10,65366


In [68]:
%sql
select distinct weekday, count(weekday) as Count from `train_csv` group by weekday order by Count

weekday,Count
Monday,187198
Sunday,195014
Tuesday,202479
Wednesday,209872
Thursday,218267
Saturday,220467
Friday,223188


In [69]:
%sql
select distinct month, count(month) as TripCounts from `train_csv` group by month order by TripCounts

month,TripCounts
1,229360
6,233942
2,237977
5,248119
4,251277
3,255810


In [70]:
%sql
select distinct pickup_hour, avg(trip_duration) as AvgDuration from `train_csv` group by pickup_hour

pickup_hour,AvgDuration
7,12.648269722392254
15,16.212865354297758
11,14.669792824141798
3,11.739814317244024
8,13.912657220496898
22,13.415201941626735
16,16.1709882580123
0,13.021907072987208
5,11.872748213928023
18,14.378360048640271


In [71]:
%sql
select distinct weekday_num, avg(trip_duration) as AvgDuration from `train_csv` group by weekday_num order by weekday_num

weekday_num,AvgDuration
1,12.75195201370157
2,13.54589418690373
3,14.300533833138282
4,14.672807330182222
5,14.978421978585851
6,14.487531632525036
7,13.006548644468344


In [72]:
%sql
select distinct month, avg(trip_duration) as AvgDuration from `train_csv` group by month order by month

month,AvgDuration
1,13.210178365887684
2,13.19360438193608
3,13.670138188499283
4,14.16830772414506
5,14.7588331405495
6,14.87146450829692


In [73]:
%sql
select distinct trip_duration, distance  from `train_csv`

trip_duration,distance
8.78,2.76
11.72,1.97
4.37,0.84
9.42,1.91
4.97,1.2
8.32,0.69
4.93,1.0
45.37,8.72
9.32,2.95
12.87,1.67


In [74]:
%sql
select distinct pickup_longitude, pickup_latitude  from `train_csv`

pickup_longitude,pickup_latitude
-73.989815,40.72965
-73.9913,40.71707
-73.98217,40.75572
-74.01551,40.71584
-73.974686,40.783226
-74.00416,40.71297
-74.0091,40.70707
-73.99668,40.72586
-73.97695,40.764442
-73.99577,40.725147


In [75]:
%sql
select distinct dropoff_longitude, dropoff_latitude  from `train_csv`

dropoff_longitude,dropoff_latitude
-73.99624,40.72537
-73.963104,40.768974
-73.98538,40.69084
-74.00432,40.73326
-73.98217,40.769073
-73.94968,40.713264
-74.0048,40.70965
-73.99486,40.726948
-73.99143,40.754784
-74.003624,40.717438


In [76]:
df.printSchema()

In [77]:
df = df.withColumn("avgSpeed", lit(16))

In [78]:
df.printSchema()

In [79]:
display(df)

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration,weekday,weekday_num,month,pickup_hour,a,distance,speed,avgSpeed
id2875421,2,2016-03-14T17:24:55.000+0000,2016-03-14T17:32:30.000+0000,1,-73.982155,40.767937,-73.96463,40.765602,N,7.58,Monday,2,3,17,1.3830896636179652e-08,1.5,11.87,16
id2377394,1,2016-06-12T00:43:35.000+0000,2016-06-12T00:54:38.000+0000,1,-73.980415,40.738564,-73.99948,40.73115,N,11.05,Sunday,1,6,0,2.0078128522717423e-08,1.81,9.83,16
id3858529,2,2016-01-19T11:35:24.000+0000,2016-01-19T12:10:48.000+0000,1,-73.97903,40.76394,-74.00533,40.710087,N,35.4,Tuesday,3,1,11,2.511076618143245e-07,6.39,10.83,16
id3504673,2,2016-04-06T19:32:31.000+0000,2016-04-06T19:39:40.000+0000,1,-74.01004,40.71997,-74.01227,40.70672,N,7.15,Wednesday,4,4,19,1.359155624250055e-08,1.49,12.5,16
id2181028,2,2016-03-26T13:30:55.000+0000,2016-03-26T13:38:10.000+0000,1,-73.97305,40.79321,-73.97292,40.78252,N,7.25,Saturday,7,3,13,8.701373594095704e-09,1.19,9.85,16
id0801584,2,2016-01-30T22:01:40.000+0000,2016-01-30T22:09:03.000+0000,6,-73.98286,40.742195,-73.99208,40.749184,N,7.38,Saturday,7,1,22,7.438317364467243e-09,1.1,8.94,16
id1813257,1,2016-06-17T22:34:59.000+0000,2016-06-17T22:40:40.000+0000,4,-73.96902,40.75784,-73.957405,40.765896,N,5.68,Friday,6,6,22,1.0834136581882085e-08,1.33,14.04,16
id1324603,2,2016-05-21T07:54:58.000+0000,2016-05-21T08:20:49.000+0000,1,-73.96928,40.79778,-73.92247,40.76056,N,25.85,Saturday,7,5,7,2.0116586779003733e-07,5.71,13.25,16
id1301050,1,2016-05-27T23:12:23.000+0000,2016-05-27T23:16:38.000+0000,1,-73.99948,40.7384,-73.98579,40.732815,N,4.25,Friday,6,5,23,1.0575516792025634e-08,1.31,18.49,16
id0012891,2,2016-03-10T21:45:01.000+0000,2016-03-10T22:05:26.000+0000,1,-73.98105,40.74434,-73.973,40.78999,N,20.42,Thursday,5,3,21,1.6153317501260625e-07,5.12,15.05,16


Convert categorical to one hot encoder

In [81]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ["passenger_count", "weekday_num", "month", "pickup_hour"]
stages = []
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    print(stringIndexer,type(stringIndexer))
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

Use a VectorAssembler to combine all the feature columns into a single vector column

In [83]:
print(stages)

In [84]:
numericCols = ["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "distance","avgSpeed"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [85]:
print(stages)

In [86]:
#df_stages =  assembler.transform(df)

In [87]:
#from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(df)
preppedDataDF = pipelineModel.transform(df)

In [88]:
type(preppedDataDF)

In [89]:
trip_duration = df.select("trip_duration").collect()

In [90]:
type(trip_duration)

In [91]:
preppedDataDF = preppedDataDF.withColumn("label", df.trip_duration)

In [92]:
display(preppedDataDF)

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration,weekday,weekday_num,month,pickup_hour,a,distance,speed,avgSpeed,passenger_countIndex,passenger_countclassVec,weekday_numIndex,weekday_numclassVec,monthIndex,monthclassVec,pickup_hourIndex,pickup_hourclassVec,features,label
id2875421,2,2016-03-14T17:24:55.000+0000,2016-03-14T17:32:30.000+0000,1,-73.982155,40.767937,-73.96463,40.765602,N,7.58,Monday,2,3,17,1.3830896636179652e-08,1.5,11.87,16,0.0,"List(0, 5, List(0), List(1.0))",6.0,"List(0, 6, List(), List())",0.0,"List(0, 5, List(0), List(1.0))",5.0,"List(0, 23, List(5), List(1.0))","List(0, 45, List(0, 11, 21, 39, 40, 41, 42, 43, 44), List(1.0, 1.0, 1.0, -73.9821548461914, 40.76793670654297, -73.96463012695312, 40.765602111816406, 1.5, 16.0))",7.58
id2377394,1,2016-06-12T00:43:35.000+0000,2016-06-12T00:54:38.000+0000,1,-73.980415,40.738564,-73.99948,40.73115,N,11.05,Sunday,1,6,0,2.0078128522717423e-08,1.81,9.83,16,0.0,"List(0, 5, List(0), List(1.0))",5.0,"List(0, 6, List(5), List(1.0))",4.0,"List(0, 5, List(4), List(1.0))",17.0,"List(0, 23, List(17), List(1.0))","List(0, 45, List(0, 10, 15, 33, 39, 40, 41, 42, 43, 44), List(1.0, 1.0, 1.0, 1.0, -73.98041534423828, 40.738563537597656, -73.99948120117188, 40.73115158081055, 1.81, 16.0))",11.05
id3858529,2,2016-01-19T11:35:24.000+0000,2016-01-19T12:10:48.000+0000,1,-73.97903,40.76394,-74.00533,40.710087,N,35.4,Tuesday,3,1,11,2.511076618143245e-07,6.39,10.83,16,0.0,"List(0, 5, List(0), List(1.0))",4.0,"List(0, 6, List(4), List(1.0))",5.0,"List(0, 5, List(), List())",11.0,"List(0, 23, List(11), List(1.0))","List(0, 45, List(0, 9, 27, 39, 40, 41, 42, 43, 44), List(1.0, 1.0, 1.0, -73.9790267944336, 40.763938903808594, -74.00533294677734, 40.710086822509766, 6.39, 16.0))",35.4
id3504673,2,2016-04-06T19:32:31.000+0000,2016-04-06T19:39:40.000+0000,1,-74.01004,40.71997,-74.01227,40.70672,N,7.15,Wednesday,4,4,19,1.359155624250055e-08,1.49,12.5,16,0.0,"List(0, 5, List(0), List(1.0))",3.0,"List(0, 6, List(3), List(1.0))",1.0,"List(0, 5, List(1), List(1.0))",1.0,"List(0, 23, List(1), List(1.0))","List(0, 45, List(0, 8, 12, 17, 39, 40, 41, 42, 43, 44), List(1.0, 1.0, 1.0, 1.0, -74.01004028320312, 40.719970703125, -74.01226806640625, 40.70671844482422, 1.49, 16.0))",7.15
id2181028,2,2016-03-26T13:30:55.000+0000,2016-03-26T13:38:10.000+0000,1,-73.97305,40.79321,-73.97292,40.78252,N,7.25,Saturday,7,3,13,8.701373594095704e-09,1.19,9.85,16,0.0,"List(0, 5, List(0), List(1.0))",1.0,"List(0, 6, List(1), List(1.0))",0.0,"List(0, 5, List(0), List(1.0))",9.0,"List(0, 23, List(9), List(1.0))","List(0, 45, List(0, 6, 11, 25, 39, 40, 41, 42, 43, 44), List(1.0, 1.0, 1.0, 1.0, -73.97305297851562, 40.793209075927734, -73.9729232788086, 40.78252029418945, 1.19, 16.0))",7.25
id0801584,2,2016-01-30T22:01:40.000+0000,2016-01-30T22:09:03.000+0000,6,-73.98286,40.742195,-73.99208,40.749184,N,7.38,Saturday,7,1,22,7.438317364467243e-09,1.1,8.94,16,4.0,"List(0, 5, List(4), List(1.0))",1.0,"List(0, 6, List(1), List(1.0))",5.0,"List(0, 5, List(), List())",4.0,"List(0, 23, List(4), List(1.0))","List(0, 45, List(4, 6, 20, 39, 40, 41, 42, 43, 44), List(1.0, 1.0, 1.0, -73.98285675048828, 40.74219512939453, -73.99208068847656, 40.749183654785156, 1.1, 16.0))",7.38
id1813257,1,2016-06-17T22:34:59.000+0000,2016-06-17T22:40:40.000+0000,4,-73.96902,40.75784,-73.957405,40.765896,N,5.68,Friday,6,6,22,1.0834136581882085e-08,1.33,14.04,16,5.0,"List(0, 5, List(), List())",0.0,"List(0, 6, List(0), List(1.0))",4.0,"List(0, 5, List(4), List(1.0))",4.0,"List(0, 23, List(4), List(1.0))","List(0, 45, List(5, 15, 20, 39, 40, 41, 42, 43, 44), List(1.0, 1.0, 1.0, -73.9690170288086, 40.75783920288086, -73.95740509033203, 40.76589584350586, 1.33, 16.0))",5.68
id1324603,2,2016-05-21T07:54:58.000+0000,2016-05-21T08:20:49.000+0000,1,-73.96928,40.79778,-73.92247,40.76056,N,25.85,Saturday,7,5,7,2.0116586779003733e-07,5.71,13.25,16,0.0,"List(0, 5, List(0), List(1.0))",1.0,"List(0, 6, List(1), List(1.0))",2.0,"List(0, 5, List(2), List(1.0))",16.0,"List(0, 23, List(16), List(1.0))","List(0, 45, List(0, 6, 13, 32, 39, 40, 41, 42, 43, 44), List(1.0, 1.0, 1.0, 1.0, -73.96927642822266, 40.79777908325195, -73.92247009277344, 40.76055908203125, 5.71, 16.0))",25.85
id1301050,1,2016-05-27T23:12:23.000+0000,2016-05-27T23:16:38.000+0000,1,-73.99948,40.7384,-73.98579,40.732815,N,4.25,Friday,6,5,23,1.0575516792025634e-08,1.31,18.49,16,0.0,"List(0, 5, List(0), List(1.0))",0.0,"List(0, 6, List(0), List(1.0))",2.0,"List(0, 5, List(2), List(1.0))",10.0,"List(0, 23, List(10), List(1.0))","List(0, 45, List(0, 5, 13, 26, 39, 40, 41, 42, 43, 44), List(1.0, 1.0, 1.0, 1.0, -73.99948120117188, 40.738399505615234, -73.98578643798828, 40.73281478881836, 1.31, 16.0))",4.25
id0012891,2,2016-03-10T21:45:01.000+0000,2016-03-10T22:05:26.000+0000,1,-73.98105,40.74434,-73.973,40.78999,N,20.42,Thursday,5,3,21,1.6153317501260625e-07,5.12,15.05,16,0.0,"List(0, 5, List(0), List(1.0))",2.0,"List(0, 6, List(2), List(1.0))",0.0,"List(0, 5, List(0), List(1.0))",2.0,"List(0, 23, List(2), List(1.0))","List(0, 45, List(0, 7, 11, 18, 39, 40, 41, 42, 43, 44), List(1.0, 1.0, 1.0, 1.0, -73.98104858398438, 40.74433898925781, -73.9729995727539, 40.78998947143555, 5.12, 16.0))",20.42


In [93]:
splits = preppedDataDF.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [94]:
from pyspark.ml.regression import LinearRegression
niter = 500         # set by trial and error
reg = 0.0          # no regularization for now
elastic_reg = 0.0  # secondary regularization parameter (ratio of L1 to L2 regularization penalties)
tolerance = 1e-5   # set by trial and error
intercept = True   # why not indeed
lr = LinearRegression(featuresCol = 'features', labelCol='label',maxIter=niter,regParam=reg,elasticNetParam=elastic_reg,tol=tolerance)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

In [95]:
from math import sqrt
def calcRMSE(df, labelCol='label'):
  """Given a DataFrame with 'label' and 'prediction' columns, calculate the root-mean-squared error
    Args:
        df: a DataFrame with labels and predictions        
    Returns:
        Float
  """
  return sqrt(float(df.rdd.map(lambda row: (row[labelCol]-row['prediction'])**2).reduce(lambda a,b: a+b)) / df.count() )

def calcRsq(df, labelCol='label'):
  """Given a DataFrame with 'label' and 'prediction' columns, calculate the R-squared diagnostic  
    Args:
        df: a DataFrame with labels and predictions        
    Returns:
        Float
  """
  npts = df.count()
  mean_label = float(df.rdd.map(lambda row: row[labelCol]).reduce(lambda a,b: a+b))/npts
  total_squared_error = (df.rdd.map(lambda row: (row[labelCol]-mean_label)**2).reduce(lambda a,b: a+b))
  squared_error = (df.rdd.map(lambda row: (row[labelCol]-row['prediction'])**2).reduce(lambda a,b: a+b))  
  return (1.0 - float(squared_error) / total_squared_error)

In [96]:
# baseline model
from pyspark.sql.functions import lit

mean_label = float(train_df.select('label').groupBy().sum().first()[0]) / train_df.count()

train_withPreds_base_df = train_df.withColumn('prediction', lit(mean_label))
test_withPreds_base_df = test_df.withColumn('prediction', lit(mean_label))

In [97]:
train_withPreds_df = lr_model.transform(train_df)
test_withPreds_df = lr_model.transform(test_df)
print('Training regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(train_withPreds_df),calcRsq(train_withPreds_df)))
print ('Training baseline \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(train_withPreds_base_df), calcRsq(train_withPreds_base_df)))
print ('')
print ('Test regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(test_withPreds_df), calcRsq(test_withPreds_df)))
print ('Testing baseline \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(test_withPreds_base_df), calcRsq(test_withPreds_base_df)))

In [98]:
from pyspark.ml.regression import LinearRegression
regs = [1e-4, 1e-2, 1.0, 1.e1]
niter = 500         # set by trial and error
#reg = 1.0         # no regularization for now
elastic_reg = 0.0  # secondary regularization parameter (ratio of L1 to L2 regularization penalties)
tolerance = 1e-5   # set by trial and error
intercept = True   # why not indeed
for reg in regs:
  lr_1e_4 = LinearRegression(featuresCol = 'features', labelCol='label',maxIter=niter,regParam=reg,elasticNetParam=elastic_reg,tol=tolerance)
  lr_1e_4_model = lr_1e_4.fit(train_df)
  train_withPreds_df = lr_1e_4_model.transform(train_df)
  test_withPreds_df = lr_1e_4_model.transform(test_df)
  print(reg)
  print('Training regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(train_withPreds_df),calcRsq(train_withPreds_df)))
  print ('Test regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(test_withPreds_df), calcRsq(test_withPreds_df)))
  print('')
  

In [99]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'label', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions_train = gbt_model.transform(train_df)
gbt_predictions_test = gbt_model.transform(test_df)

print('Training Gradient-boosted tree regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(gbt_predictions_train),calcRsq(gbt_predictions_train)))
print ('Test Gradient-boosted tree regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(gbt_predictions_test), calcRsq(gbt_predictions_test)))

In [100]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'label', maxIter=20)
gbt_model = gbt.fit(train_df)
gbt_predictions_train = gbt_model.transform(train_df)
gbt_predictions_test = gbt_model.transform(test_df)

print('Training Gradient-boosted tree regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(gbt_predictions_train),calcRsq(gbt_predictions_train)))
print ('Test Gradient-boosted tree regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(gbt_predictions_test), calcRsq(gbt_predictions_test)))

In [101]:
from pyspark.ml.regression import GBTRegressor
maxIterList =[30,40,50]
for maxIter in maxIterList:
  gbt = GBTRegressor(featuresCol = 'features', labelCol = 'label', maxIter=maxIter)
  gbt_model = gbt.fit(train_df)
  gbt_predictions_train = gbt_model.transform(train_df)
  gbt_predictions_test = gbt_model.transform(test_df)
  
  print(maxIter)
  print('Training Gradient-boosted tree regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(gbt_predictions_train),calcRsq(gbt_predictions_train)))
  print ('Test Gradient-boosted tree regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(gbt_predictions_test), calcRsq(gbt_predictions_test)))
  print('')
  

In [102]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'label')
dt_model = dt.fit(train_df)
dt_predictions_train = dt_model.transform(train_df)
dt_predictions_test = dt_model.transform(test_df)

print('Training Decision tree regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(dt_predictions_train),calcRsq(dt_predictions_train)))
print ('Test Decision tree regression \t...\t RMSE: %.3f,  Rsq: %.3f' % (calcRMSE(dt_predictions_test), calcRsq(dt_predictions_test)))