<a href="https://colab.research.google.com/github/AminuHabib/New-York-Cabs-Travel-Behavior-Data-Pipeline/blob/main/app.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Connecting Drive to Colab** \
Mounting Google Drive

In [146]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**Reading Data from Drive** \
Unzipping the data

In [147]:
!unzip "/content/drive/MyDrive/New-York-Data/train.zip"

Archive:  /content/drive/MyDrive/New-York-Data/train.zip
replace train.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n


**Setting up PySpark in Colab** 

In [148]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Installing Apache Spark 3.0.1 with Hadoop 2.7 from the link

In [149]:
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz

To unzip that folder

In [150]:
!tar xf spark-3.1.1-bin-hadoop2.7.tgz

Install findspark library 

In [151]:
!pip install -q findspark

To set the environment path

In [152]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

To locate Spark in the system

In [153]:
import findspark
findspark.init()

To know the location where Spark is installed

In [154]:
findspark.find()

'/content/spark-3.1.1-bin-hadoop2.7'

To view the Spark UI

In [155]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

--2021-05-11 19:57:41--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 52.204.190.140, 34.193.189.47, 34.233.212.111, ...
Connecting to bin.equinox.io (bin.equinox.io)|52.204.190.140|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2021-05-11 19:57:44 (6.55 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://31c85f4c7aeb.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}},{"name":"command_line (http

Loading data into PySpark

In [156]:
data = spark.read.csv("train.csv", header=True, inferSchema=True)

Understanding the Data

In [157]:
data.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)



To display the informations

In [158]:
data.show(5)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|id3858529|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|    

To count the number of rows

In [159]:
data.count()

1458644

In [160]:
data.select("pickup_datetime","dropoff_datetime").show(5)

+-------------------+-------------------+
|    pickup_datetime|   dropoff_datetime|
+-------------------+-------------------+
|2016-03-14 17:24:55|2016-03-14 17:32:30|
|2016-06-12 00:43:35|2016-06-12 00:54:38|
|2016-01-19 11:35:24|2016-01-19 12:10:48|
|2016-04-06 19:32:31|2016-04-06 19:39:40|
|2016-03-26 13:30:55|2016-03-26 13:38:10|
+-------------------+-------------------+
only showing top 5 rows



In [161]:
data.describe().show()

+-------+---------+------------------+-------------------+-------------------+------------------+-------------------+------------------+-------------------+-------------------+------------------+-----------------+
|summary|       id|         vendor_id|    pickup_datetime|   dropoff_datetime|   passenger_count|   pickup_longitude|   pickup_latitude|  dropoff_longitude|   dropoff_latitude|store_and_fwd_flag|    trip_duration|
+-------+---------+------------------+-------------------+-------------------+------------------+-------------------+------------------+-------------------+-------------------+------------------+-----------------+
|  count|  1458644|           1458644|            1458644|            1458644|           1458644|            1458644|           1458644|            1458644|            1458644|           1458644|          1458644|
|   mean|     null|1.5349502688798637|               null|               null|1.6645295219395548| -73.97348630489282|40.750920908391734|  -73.97

In [162]:
#from pyspark.sql.column import Column as col, _to_java_column, _to_seq

In [163]:
from pyspark.sql.types import *

Converting into timestamp

In [164]:
data_conv = data.withColumn("pickup_datetime", data["pickup_datetime"].cast(TimestampType()))

In [165]:
data_conv2 = data_conv.withColumn("dropoff_datetime", data_conv["dropoff_datetime"].cast(TimestampType()))

In [166]:
data_conv2.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)



In [167]:
import pyspark.sql.functions as f

New columns to show the pickup and dropoff days

In [168]:
data_conv3 = data_conv2.withColumn("pickup_day", f.date_format("pickup_datetime", "EEEE"))

In [169]:
data_conv3.show(5)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|pickup_day|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|    Monday|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|    Sunday|
|id3858529

In [170]:
data_conv4 = data_conv3.withColumn("dropoff_day", f.date_format("dropoff_datetime", "EEEE"))

In [171]:
data_conv4.show()

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----------+-----------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|pickup_day|dropoff_day|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----------+-----------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|    Monday|     Monday|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|       

New columns to show the pickup and dropoff day numbers

In [172]:
data_conv5 = data_conv4.withColumn("pickup_day_no", f.date_format("pickup_datetime", "F").cast(IntegerType()))

In [173]:
data_conv5.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_day: string (nullable = true)
 |-- dropoff_day: string (nullable = true)
 |-- pickup_day_no: integer (nullable = true)



In [174]:
data_conv6 = data_conv5.withColumn("dropoff_day_no", f.date_format("dropoff_datetime", "F").cast(IntegerType()))

In [175]:
data_conv6.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_day: string (nullable = true)
 |-- dropoff_day: string (nullable = true)
 |-- pickup_day_no: integer (nullable = true)
 |-- dropoff_day_no: integer (nullable = true)



New columns to show the pickup and dropoff hours

In [176]:
data_conv7 = data_conv6.withColumn("pickup_hour", f.date_format("pickup_datetime", "H").cast(IntegerType()))

In [177]:
data_conv7.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_day: string (nullable = true)
 |-- dropoff_day: string (nullable = true)
 |-- pickup_day_no: integer (nullable = true)
 |-- dropoff_day_no: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)



In [178]:
data_conv8 = data_conv7.withColumn("dropoff_hour", f.date_format("dropoff_datetime", "H").cast(IntegerType()))

In [179]:
data_conv8.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_day: string (nullable = true)
 |-- dropoff_day: string (nullable = true)
 |-- pickup_day_no: integer (nullable = true)
 |-- dropoff_day_no: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)



New columns to show the pickup and dropoff months

In [180]:
data_conv9 = data_conv8.withColumn("pickup_month", f.date_format("pickup_datetime", "M").cast(IntegerType()))

In [181]:
data_conv9.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_day: string (nullable = true)
 |-- dropoff_day: string (nullable = true)
 |-- pickup_day_no: integer (nullable = true)
 |-- dropoff_day_no: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)



In [182]:
data_conv10 = data_conv9.withColumn("dropoff_month", f.date_format("dropoff_datetime", "M").cast(IntegerType()))

In [183]:
data_conv10.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_day: string (nullable = true)
 |-- dropoff_day: string (nullable = true)
 |-- pickup_day_no: integer (nullable = true)
 |-- dropoff_day_no: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- dropoff_month: integer (nullable = true)



In [184]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType
from pyspark.sql.functions import udf
from pyspark.sql import Row

A function to calculate time of day per 4-hour period

In [185]:
def time_of_day(x):
    if x in range(6,10):
        return "Morning"
    elif x in range(10,14):
        return "Afternoon"
    elif x in range(14,18):
        return "Evening"
    else:
        return "Night"

col_time_of_day = udf(lambda z: time_of_day(z))
spark.udf.register("col_time_of_day", time_of_day, StringType())

<function __main__.time_of_day>

New columns to show the pickup and dropoff time of the day

In [186]:
data_conv11 = data_conv10.withColumn("pickup_timeofday", col_time_of_day("pickup_hour"))

In [187]:
data_conv11.show()

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----------+-----------+-------------+--------------+-----------+------------+------------+-------------+----------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|pickup_day|dropoff_day|pickup_day_no|dropoff_day_no|pickup_hour|dropoff_hour|pickup_month|dropoff_month|pickup_timeofday|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----------+-----------+-------------+--------------+-----------+------------+------------+-------------+----------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1

In [188]:
data_conv12 = data_conv11.withColumn("dropoff_timeofday", col_time_of_day("dropoff_hour"))

In [189]:
data_conv12.show()

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----------+-----------+-------------+--------------+-----------+------------+------------+-------------+----------------+-----------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|pickup_day|dropoff_day|pickup_day_no|dropoff_day_no|pickup_hour|dropoff_hour|pickup_month|dropoff_month|pickup_timeofday|dropoff_timeofday|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----------+-----------+-------------+--------------+-----------+------------+------------+-------------+----------------+-----------------+
|id2875421|        2|2

In [190]:
data_conv12.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_day: string (nullable = true)
 |-- dropoff_day: string (nullable = true)
 |-- pickup_day_no: integer (nullable = true)
 |-- dropoff_day_no: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- dropoff_month: integer (nullable = true)
 |-- pickup_timeofday: string (nullable = true)
 |-- dropoff_timeofday: string (nullable = true)



In [191]:
from geopy.distance import great_circle

A function to calculate the distance between two coordinates

In [192]:
def cal_distance(pickup_lat , pickup_long , dropoff_lat, dropoff_long):
    start_coordinates = pickup_lat, pickup_long
    stop_coordinates = dropoff_lat, dropoff_long
    
    return great_circle(start_coordinates, stop_coordinates).km

cal_distance_udf = udf(lambda x1,x2,y1,y2: cal_distance(x1,x2,y1,y2))
spark.udf.register("cal_distance_udf", cal_distance, DoubleType())

<function __main__.cal_distance>

New column to show the distances in km of trips

In [193]:
data_conv13 = data_conv12.withColumn("distance", cal_distance_udf("pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude"))

In [194]:
data_conv13.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_day: string (nullable = true)
 |-- dropoff_day: string (nullable = true)
 |-- pickup_day_no: integer (nullable = true)
 |-- dropoff_day_no: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- dropoff_month: integer (nullable = true)
 |-- pickup_timeofday: string (nullable = true)
 |-- dropoff_timeofday: string (nullable = true)
 |-- distance: string (null

In [195]:
data_conv13.show()

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----------+-----------+-------------+--------------+-----------+------------+------------+-------------+----------------+-----------------+------------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|pickup_day|dropoff_day|pickup_day_no|dropoff_day_no|pickup_hour|dropoff_hour|pickup_month|dropoff_month|pickup_timeofday|dropoff_timeofday|          distance|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----------+-----------+-------------+--------------+-----------+------------+------------+-------------+----------------+---

In [196]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName("New York Cab Info")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
print("A Technical Case Study of New York Cab Trips")

A Technical Case Study of New York Cab Trips


In [197]:
# os.environ['PYSPARK_SUBMIT_ARGS']= spark.sparkContext.addPyFile('../')

In [198]:
spark

Sql query to display thw total trips

In [209]:
data_conv13.createOrReplaceTempView("data_conv13")
spark.sql("SELECT COUNT(id) AS total_trip from data_conv13").show()


+----------+
|total_trip|
+----------+
|   1458644|
+----------+



To show the number of trips made according to the day of the week

In [208]:
spark.sql("SELECT pickup_day, COUNT(id) AS total_trips FROM data_conv13 GROUP BY pickup_day ORDER BY total_trips DESC").show()


+----------+-----------+
|pickup_day|total_trips|
+----------+-----------+
|    Friday|     223533|
|  Saturday|     220868|
|  Thursday|     218574|
| Wednesday|     210136|
|   Tuesday|     202749|
|    Sunday|     195366|
|    Monday|     187418|
+----------+-----------+



number of trips made according to the day of the week

In [210]:
spark.sql("SELECT pickup_day, COUNT(vendor_id) AS total_trips FROM data_conv13 GROUP BY pickup_day ORDER BY total_trips DESC").show()

+----------+-----------+
|pickup_day|total_trips|
+----------+-----------+
|    Friday|     223533|
|  Saturday|     220868|
|  Thursday|     218574|
| Wednesday|     210136|
|   Tuesday|     202749|
|    Sunday|     195366|
|    Monday|     187418|
+----------+-----------+



To show the number of trips made according to the time of day

In [211]:
spark.sql("SELECT pickup_timeofday, COUNT(vendor_id) AS total_trips_time_of_day FROM data_conv13 GROUP BY pickup_timeofday ORDER BY total_trips_time_of_day DESC").show()

+----------------+-----------------------+
|pickup_timeofday|total_trips_time_of_day|
+----------------+-----------------------+
|           Night|                 670922|
|         Evening|                 286899|
|       Afternoon|                 277259|
|         Morning|                 223564|
+----------------+-----------------------+



To show the number of trips made according to the time of day

In [212]:
spark.sql("SELECT pickup_timeofday, COUNT(id) AS total_trips_time_of_day FROM data_conv13 GROUP BY pickup_timeofday ORDER BY total_trips_time_of_day DESC").show()

+----------------+-----------------------+
|pickup_timeofday|total_trips_time_of_day|
+----------------+-----------------------+
|           Night|                 670922|
|         Evening|                 286899|
|       Afternoon|                 277259|
|         Morning|                 223564|
+----------------+-----------------------+



To show the number of km traveled per day of the week

In [213]:
spark.sql("SELECT pickup_day, SUM(distance) AS km_traveled_per_day FROM data_conv13 GROUP BY pickup_day ORDER BY km_traveled_per_day DESC").show()

+----------+-------------------+
|pickup_day|km_traveled_per_day|
+----------+-------------------+
|    Friday|  758725.5431796226|
|  Thursday|   747678.685340323|
|  Saturday|  736412.1719889197|
|    Sunday|  726454.2760555025|
| Wednesday|  702919.7880676301|
|   Tuesday|  678329.0532968312|
|    Monday|  668483.0576452918|
+----------+-------------------+



Save to file

In [205]:
data_conv13.write.csv("/content/drive/MyDrive/New-York-Data/processed_final_data.csv", header=True)

To display the number of partitions

In [206]:
data_conv13.rdd.getNumPartitions()

2