In [None]:
# Mount to Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Installing PySpark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 41 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 55.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=a53b4f8a059a64841b249fdac65e7db797a27c291c8fb6b11d81f0bdadf2f0e3
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
# Initialize Spark Session
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

credentials_location = '/content/drive/MyDrive/practical-case-session-6/data-fellowship-batch-7-9a47d25ff887.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('Colab') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [None]:
sc = SparkContext(conf=conf)
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

# Print Spark Session
spark

In [None]:
%%writefile parsing.py
import argparse
from pyspark.sql import functions as F

parser = argparse.ArgumentParser()

parser.add_argument('--output', required=True)

args = parser.parse_args()

output = args.output

Writing parsing.py


In [None]:
# Read Yellow Trip Data
df1 = spark.read.parquet("/content/drive/MyDrive/practical-case-session-6/yellow_tripdata_2021-02.parquet")
df1.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-02-01 00:40:47|  2021-02-01 00:48:28|            1.0|          2.3|       1.0|                 N|         141|         226|           2|        8.5|  3.0|    0.5|       0.

In [None]:
# Read Green Trip Data
df2 = spark.read.parquet("/content/drive/MyDrive/practical-case-session-6/green_tripdata_2021-02.parquet")
df2.show(5)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-02-01 00:34:03|  2021-02-01 00:51:58|                 N|       1.0|         130|         205|            5.0|         3.66|       14.0|  0.5|    0.

In [None]:
# Read FHV Trip Data
df3 = spark.read.parquet("/content/drive/MyDrive/practical-case-session-6/fhv_tripdata_2021-02.parquet")
df3.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00013|2021-02-01 00:01:00|2021-02-01 01:33:00|        null|        null|   null|                B00014|
|     B00021         |2021-02-01 00:55:40|2021-02-01 01:06:20|       173.0|        82.0|   null|       B00021         |
|     B00021         |2021-02-01 00:14:03|2021-02-01 00:28:37|       173.0|        56.0|   null|       B00021         |
|     B00021         |2021-02-01 00:27:48|2021-02-01 00:35:45|        82.0|       129.0|   null|       B00021         |
|              B00037|2021-02-01 00:12:50|2021-02-01 00:26:38|        null|       225.0|   null|                B00037|
+--------------------+------------------

In [None]:
# Read FHVHV Trip Data
df4 = spark.read.parquet("/content/drive/MyDrive/practical-case-session-6/fhvhv_tripdata_2021-02.parquet")
df4.show(5)

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [None]:
# 1. How many taxi trips were there on February 15?
df1.createOrReplaceTempView("yellow_tripdata")

count_yellow_taxi_trip_15_feb = spark.sql("SELECT COUNT(*) AS count_yellow_taxi_trip_15_feb FROM yellow_tripdata WHERE tpep_pickup_datetime BETWEEN '2021-02-15 00:00:00' AND '2021-02-15 23:59:59'")
count_yellow_taxi_trip_15_feb.show()
count_yellow_taxi_trip_15_feb.printSchema()

+-----------------------------+
|count_yellow_taxi_trip_15_feb|
+-----------------------------+
|                        40322|
+-----------------------------+

root
 |-- count_yellow_taxi_trip_15_feb: long (nullable = false)



In [None]:
# 2. Find the longest trip for each day?
find_longest_trip_monday = spark.sql(" SELECT MAX(trip_distance) AS find_longest_trip_monday FROM yellow_tripdata \
WHERE ( tpep_pickup_datetime BETWEEN '2021-02-01 00:00:00' AND '2021-02-01 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-08 00:00:00' AND '2021-02-08 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-15 00:00:00' AND '2021-02-15 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-22 00:00:00' AND '2021-02-22 23:59:59' )")
find_longest_trip_monday.show()
find_longest_trip_monday.printSchema()

+------------------------+
|find_longest_trip_monday|
+------------------------+
|               186617.92|
+------------------------+

root
 |-- find_longest_trip_monday: double (nullable = true)



In [None]:
find_longest_trip_tuesday = spark.sql("SELECT MAX(trip_distance) AS find_longest_trip_tuesday FROM yellow_tripdata \
WHERE ( tpep_pickup_datetime BETWEEN '2021-02-02 00:00:00' AND '2021-02-02 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-09 00:00:00' AND '2021-02-09 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-16 00:00:00' AND '2021-02-16 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-23 00:00:00' AND '2021-02-23 23:59:59' )")
find_longest_trip_tuesday.show()
find_longest_trip_tuesday.printSchema()

+-------------------------+
|find_longest_trip_tuesday|
+-------------------------+
|                221188.25|
+-------------------------+

root
 |-- find_longest_trip_tuesday: double (nullable = true)



In [None]:
find_longest_trip_wednesday = spark.sql("SELECT MAX(trip_distance) AS find_longest_trip_wednesday FROM yellow_tripdata \
WHERE ( tpep_pickup_datetime BETWEEN '2021-02-03 00:00:00' AND '2021-02-03 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-10 00:00:00' AND '2021-02-10 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-17 00:00:00' AND '2021-02-17 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-24 00:00:00' AND '2021-02-24 23:59:59' )")
find_longest_trip_wednesday.show()
find_longest_trip_wednesday.printSchema()

+---------------------------+
|find_longest_trip_wednesday|
+---------------------------+
|                  186079.73|
+---------------------------+

root
 |-- find_longest_trip_wednesday: double (nullable = true)



In [None]:
find_longest_trip_thursday = spark.sql("SELECT MAX(trip_distance) AS find_longest_trip_thursday FROM yellow_tripdata \
WHERE ( tpep_pickup_datetime BETWEEN '2021-02-04 00:00:00' AND '2021-02-04 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-11 00:00:00' AND '2021-02-11 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-18 00:00:00' AND '2021-02-18 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-25 00:00:00' AND '2021-02-25 23:59:59' )")
find_longest_trip_thursday.show()
find_longest_trip_thursday.printSchema()

+--------------------------+
|find_longest_trip_thursday|
+--------------------------+
|                     82.19|
+--------------------------+

root
 |-- find_longest_trip_thursday: double (nullable = true)



In [None]:
find_longest_trip_friday = spark.sql("SELECT MAX(trip_distance) AS find_longest_trip_friday FROM yellow_tripdata \
WHERE ( tpep_pickup_datetime BETWEEN '2021-02-05 00:00:00' AND '2021-02-05 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-12 00:00:00' AND '2021-02-12 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-19 00:00:00' AND '2021-02-19 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-26 00:00:00' AND '2021-02-26 23:59:59' )")
find_longest_trip_friday.show()
find_longest_trip_friday.printSchema()

+------------------------+
|find_longest_trip_friday|
+------------------------+
|                91134.16|
+------------------------+

root
 |-- find_longest_trip_friday: double (nullable = true)



In [None]:
find_longest_trip_saturday = spark.sql("SELECT MAX(trip_distance) AS find_longest_trip_saturday FROM yellow_tripdata \
WHERE ( tpep_pickup_datetime BETWEEN '2021-02-06 00:00:00' AND '2021-02-06 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-13 00:00:00' AND '2021-02-13 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-20 00:00:00' AND '2021-02-20 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-27 00:00:00' AND '2021-02-27 23:59:59' )")
find_longest_trip_saturday.show()
find_longest_trip_saturday.printSchema()

+--------------------------+
|find_longest_trip_saturday|
+--------------------------+
|                 188054.03|
+--------------------------+

root
 |-- find_longest_trip_saturday: double (nullable = true)



In [None]:
find_longest_trip_sunday = spark.sql("SELECT MAX(trip_distance) AS find_longest_trip_sunday FROM yellow_tripdata \
WHERE ( tpep_pickup_datetime BETWEEN '2021-02-07 00:00:00' AND '2021-02-07 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-14 00:00:00' AND '2021-02-14 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-21 00:00:00' AND '2021-02-21 23:59:59' ) OR \
( tpep_pickup_datetime BETWEEN '2021-02-28 00:00:00' AND '2021-02-28 23:59:59' )")
find_longest_trip_sunday.show()
find_longest_trip_sunday.printSchema()

+------------------------+
|find_longest_trip_sunday|
+------------------------+
|               186510.67|
+------------------------+

root
 |-- find_longest_trip_sunday: double (nullable = true)



In [None]:
# Find Top 5 Most frequent `dispatching_base_num` ?
df3.createOrReplaceTempView("fhv_tripdata")

find_most_frequent_dispatching_base_num = spark.sql("SELECT dispatching_base_num, COUNT(dispatching_base_num) AS find_most_frequent_dispatching_base_num \
FROM fhv_tripdata \
GROUP BY dispatching_base_num \
ORDER BY COUNT(dispatching_base_num) DESC \
LIMIT 5")
find_most_frequent_dispatching_base_num.show()
find_most_frequent_dispatching_base_num.printSchema()

+--------------------+---------------------------------------+
|dispatching_base_num|find_most_frequent_dispatching_base_num|
+--------------------+---------------------------------------+
|              B00856|                                  35077|
|              B01312|                                  33089|
|              B01145|                                  31114|
|              B02794|                                  30397|
|              B03016|                                  29794|
+--------------------+---------------------------------------+

root
 |-- dispatching_base_num: string (nullable = true)
 |-- find_most_frequent_dispatching_base_num: long (nullable = false)



In [102]:
# Find Top 5 Most common location pairs (PUlocationID and DOlocationID) ?
find_most_common_location_pair = spark.sql("SELECT PUlocationID, DOlocationID, COUNT(PUlocationID) AS find_most_common_location_pair \
FROM fhv_tripdata \
WHERE PUlocationID == DOlocationID \
GROUP BY PUlocationID, DOlocationID \
ORDER BY COUNT(PUlocationID) DESC \
LIMIT 5")
find_most_common_location_pair.show()
find_most_common_location_pair.printSchema()

+------------+------------+------------------------------+
|PUlocationID|DOlocationID|find_most_common_location_pair|
+------------+------------+------------------------------+
|       206.0|       206.0|                          2374|
|       129.0|       129.0|                          1902|
|         7.0|         7.0|                          1829|
|       179.0|       179.0|                          1736|
|       221.0|       221.0|                          1562|
+------------+------------+------------------------------+

root
 |-- PUlocationID: double (nullable = true)
 |-- DOlocationID: double (nullable = true)
 |-- find_most_common_location_pair: long (nullable = false)



In [None]:
# Write all of the result to BigQuery table (additional - point plus)
count_yellow_taxi_trip_15_feb.coalesce(1).write.option("inferSchema","true").csv('/content/drive/MyDrive/practical-case-session-6/no1', header='true')

In [None]:
find_longest_trip_monday.coalesce(1).write.option("inferSchema","true").csv('/content/drive/MyDrive/practical-case-session-6/no2-mon', header='true')
find_longest_trip_tuesday.coalesce(1).write.option("inferSchema","true").csv('/content/drive/MyDrive/practical-case-session-6/no2-tue', header='true')
find_longest_trip_wednesday.coalesce(1).write.option("inferSchema","true").csv('/content/drive/MyDrive/practical-case-session-6/no2-wed', header='true')
find_longest_trip_thursday.coalesce(1).write.option("inferSchema","true").csv('/content/drive/MyDrive/practical-case-session-6/no2-thu', header='true')
find_longest_trip_friday.coalesce(1).write.option("inferSchema","true").csv('/content/drive/MyDrive/practical-case-session-6/no2-fri', header='true')
find_longest_trip_saturday.coalesce(1).write.option("inferSchema","true").csv('/content/drive/MyDrive/practical-case-session-6/no2-sat', header='true')
find_longest_trip_sunday.coalesce(1).write.option("inferSchema","true").csv('/content/drive/MyDrive/practical-case-session-6/no2-sun', header='true')

In [None]:
find_most_frequent_dispatching_base_num.coalesce(1).write.option("inferSchema","true").csv('/content/drive/MyDrive/practical-case-session-6/no3', header='true')

In [103]:
find_most_common_location_pair.coalesce(1).write.option("inferSchema","true").csv('/content/drive/MyDrive/practical-case-session-6/nomor4', header='true')