In [1]:
# Mount Google Drive 
from google.colab import drive 
drive.mount('/content/gdrive')

In [2]:
!pwd

/content


In [3]:
# import os 
# COLAB_PATH ="gdrive/MyDrive/spark"
# os.chdir(COLAB_PATH)

In [4]:
# # innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz



In [5]:
!pwd

/content


In [6]:
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [7]:
!pwd

/content


In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [9]:
# !wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet

In [10]:
import pandas as pd
from pyspark.sql import types

In [11]:
!ls

gdrive	sample_data  spark-3.0.0-bin-hadoop3.2	spark-3.0.0-bin-hadoop3.2.tgz


In [12]:
df = spark.read.parquet('/content/gdrive/MyDrive/spark/fhvhv_tripdata_2021-02.parquet')

In [13]:
df.show(2)

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [14]:
df.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'originating_base_num',
 'request_datetime',
 'on_scene_datetime',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'trip_miles',
 'trip_time',
 'base_passenger_fare',
 'tolls',
 'bcf',
 'sales_tax',
 'congestion_surcharge',
 'airport_fee',
 'tips',
 'driver_pay',
 'shared_request_flag',
 'shared_match_flag',
 'access_a_ride_flag',
 'wav_request_flag',
 'wav_match_flag']

In [15]:
df.dtypes

[('hvfhs_license_num', 'string'),
 ('dispatching_base_num', 'string'),
 ('originating_base_num', 'string'),
 ('request_datetime', 'timestamp'),
 ('on_scene_datetime', 'timestamp'),
 ('pickup_datetime', 'timestamp'),
 ('dropoff_datetime', 'timestamp'),
 ('PULocationID', 'bigint'),
 ('DOLocationID', 'bigint'),
 ('trip_miles', 'double'),
 ('trip_time', 'bigint'),
 ('base_passenger_fare', 'double'),
 ('tolls', 'double'),
 ('bcf', 'double'),
 ('sales_tax', 'double'),
 ('congestion_surcharge', 'double'),
 ('airport_fee', 'double'),
 ('tips', 'double'),
 ('driver_pay', 'double'),
 ('shared_request_flag', 'string'),
 ('shared_match_flag', 'string'),
 ('access_a_ride_flag', 'string'),
 ('wav_request_flag', 'string'),
 ('wav_match_flag', 'string')]

In [45]:
df.show(2)

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [17]:
df.createOrReplaceTempView('df')

### 1. How many taxi trips were there on February 15?

In [18]:
spark.sql('''
select
    count(pickup_datetime) as count
from df
where
    pickup_datetime >= '2021-02-15' and pickup_datetime < '2021-02-16'
''').show()

+------+
| count|
+------+
|367170|
+------+



### 2. Find the longest trip for each day ?

In [34]:
spark.sql('''
select
    distinct(date(pickup_datetime)),
    max(trip_miles) as longest_trip
from df
group by
  1
order by
    longest_trip desc
''').show(31)

+---------------+------------+
|pickup_datetime|longest_trip|
+---------------+------------+
|     2021-02-18|      527.11|
|     2021-02-10|       512.5|
|     2021-02-09|      480.73|
|     2021-02-27|      454.49|
|     2021-02-22|      347.41|
|     2021-02-20|      340.64|
|     2021-02-19|      329.16|
|     2021-02-17|      324.19|
|     2021-02-16|     307.661|
|     2021-02-24|      301.73|
|     2021-02-02|      282.78|
|     2021-02-05|      275.32|
|     2021-02-06|      266.36|
|     2021-02-08|       253.5|
|     2021-02-12|      250.11|
|     2021-02-11|      240.66|
|     2021-02-07|      216.36|
|     2021-02-15|      215.21|
|     2021-02-01|      212.43|
|     2021-02-21|      207.61|
|     2021-02-13|      207.44|
|     2021-02-04|      203.97|
|     2021-02-28|     200.921|
|     2021-02-26|     192.465|
|     2021-02-03|     191.177|
|     2021-02-14|      184.77|
|     2021-02-25|      173.82|
|     2021-02-23|      163.96|
+---------------+------------+



### 3. Find Top 5 Most frequent `dispatching_base_num` ?

In [42]:
spark.sql('''
select
    dispatching_base_num, COUNT(dispatching_base_num) as 5_Most_frequent 
from df
GROUP BY 1
ORDER BY 2 DESC
LIMIT 5
''').show()

+--------------------+---------------+
|dispatching_base_num|5_Most_frequent|
+--------------------+---------------+
|              B02510|        3233664|
|              B02764|         965568|
|              B02872|         882689|
|              B02875|         685390|
|              B02765|         559768|
+--------------------+---------------+



### 4. Find Top 5 Most common location pairs (PUlocationID and DOlocationID) ?

In [56]:
spark.sql('''
select
    LEAST(PULocationID, DOLocationID) AS PULocationID,
    GREATEST(PULocationID, DOLocationID) AS DOLocationID,
    COUNT(*) AS journey_count
from df
GROUP BY 1, 2
ORDER BY 3 desc
LIMIT 5
''').show()

+------------+------------+-------------+
|PULocationID|DOLocationID|journey_count|
+------------+------------+-------------+
|          76|          76|        45041|
|          26|          26|        37329|
|          39|          39|        28026|
|          61|          61|        25976|
|          61|         188|        23276|
+------------+------------+-------------+

