# Pyspark Installation

In [1]:
!pip install -q findspark

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 58.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=928fd38ee91088fd9fe86a1424f8f7aeff9c0915a145fc1a3f9c102eba4cf7d2
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .getOrCreate()

In [8]:
spark

# Load Data

In [9]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet

--2022-10-03 04:30:04--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.9.84.31, 65.9.84.167, 65.9.84.37, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.9.84.31|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 21777258 (21M) [application/x-www-form-urlencoded]
Saving to: ‘yellow_tripdata_2021-02.parquet’


2022-10-03 04:30:09 (3.94 MB/s) - ‘yellow_tripdata_2021-02.parquet’ saved [21777258/21777258]



In [10]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet

--2022-10-03 04:32:43--  https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.222.137.206, 52.222.137.90, 52.222.137.46, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.222.137.206|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1145679 (1.1M) [binary/octet-stream]
Saving to: ‘green_tripdata_2021-02.parquet’


2022-10-03 04:32:44 (53.5 MB/s) - ‘green_tripdata_2021-02.parquet’ saved [1145679/1145679]



In [11]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet

--2022-10-03 04:41:56--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.9.84.37, 65.9.84.11, 65.9.84.31, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.9.84.37|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10645466 (10M) [binary/octet-stream]
Saving to: ‘fhv_tripdata_2021-02.parquet’


2022-10-03 04:41:56 (89.6 MB/s) - ‘fhv_tripdata_2021-02.parquet’ saved [10645466/10645466]



In [12]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet

--2022-10-03 05:12:23--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.9.84.31, 65.9.84.37, 65.9.84.167, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.9.84.31|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 302633211 (289M) [application/x-www-form-urlencoded]
Saving to: ‘fhvhv_tripdata_2021-02.parquet’


2022-10-03 05:12:25 (163 MB/s) - ‘fhvhv_tripdata_2021-02.parquet’ saved [302633211/302633211]



In [14]:
df_yellow = spark.read.parquet("yellow_tripdata_2021-02.parquet")
df_green = spark.read.parquet("green_tripdata_2021-02.parquet")
df_fhv = spark.read.parquet("fhv_tripdata_2021-02.parquet")
df_fhvhv = spark.read.parquet("fhvhv_tripdata_2021-02.parquet")

In [15]:
df_yellow = df_yellow.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime').withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
df_green = df_green.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime').withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [16]:
df_yellow

DataFrame[VendorID: bigint, pickup_datetime: timestamp, dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

In [17]:
df_yellow.show()

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1|2021-02-01 00:40:47|2021-02-01 00:48:28|            1.0|          2.3|       1.0|                 N|         141|         226|           2|        8.5|  3.0|    0.5|       0.0|         0

In [18]:
df_green

DataFrame[VendorID: bigint, pickup_datetime: timestamp, dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: double, PULocationID: bigint, DOLocationID: bigint, passenger_count: double, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, ehail_fee: int, improvement_surcharge: double, total_amount: double, payment_type: double, trip_type: double, congestion_surcharge: double]

In [20]:
df_green.show()

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2|2021-02-01 00:34:03|2021-02-01 00:51:58|                 N|       1.0|         130|         205|            5.0|         3.66|       14.0|  0.5|    0.5|      10.0

In [21]:
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)

In [22]:
common_colums

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [23]:
from pyspark.sql import functions as F

In [24]:
df_yellow_sel = df_yellow.select(common_colums).withColumn('service_type', F.lit('yellow'))

In [25]:
df_green_sel = df_green.select(common_colums).withColumn('service_type', F.lit('green'))

In [26]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [27]:
df_trips_data.show()

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       2|2021-02-01 00:34:03|2021-02-01 00:51:58|                 N|       1.0|         130|         205|            5.0|         3.66|       14.0|  0.5|    0.5|      10.0|         0.0|       

# 1. How many taxi trips were there on February 15?

In [32]:
df_taxi_trips = df_trips_data.withColumn('pickup_datetime',F.to_date(df_trips_data.pickup_datetime)).filter((df_trips_data['pickup_datetime']>='2021-02-15 00:00:00')&(df_trips_data['pickup_datetime']<='2021-02-16 00:00:00')).select('pickup_datetime').groupBy('pickup_datetime').count()
df_taxi_trips.show()

+---------------+-----+
|pickup_datetime|count|
+---------------+-----+
|     2021-02-15|42100|
+---------------+-----+



# 2. Find the longest trip for each day ?

In [34]:
df_longest_trip = df_trips_data.withColumn('pickup_datetime',F.to_date(df_trips_data.pickup_datetime)).select('pickup_datetime','trip_distance').filter(df_trips_data['pickup_datetime']>='2021-02-01').groupBy('pickup_datetime').max('trip_distance').sort('pickup_datetime')
df_longest_trip.show()

+---------------+------------------+
|pickup_datetime|max(trip_distance)|
+---------------+------------------+
|     2021-02-01|             38.89|
|     2021-02-02|             73.24|
|     2021-02-03|         186079.73|
|     2021-02-04|             82.19|
|     2021-02-05|          91134.16|
|     2021-02-06|              43.5|
|     2021-02-07|         186510.67|
|     2021-02-08|         186617.92|
|     2021-02-09|           60382.7|
|     2021-02-10|           60382.7|
|     2021-02-11|          43174.56|
|     2021-02-12|          66659.27|
|     2021-02-13|         115928.92|
|     2021-02-14|             58.03|
|     2021-02-15|             52.89|
|     2021-02-16|         221188.25|
|     2021-02-17|         140145.44|
|     2021-02-18|             75.81|
|     2021-02-19|              70.4|
|     2021-02-20|         188054.03|
+---------------+------------------+
only showing top 20 rows



# 3. Find Top 5 Most frequent `dispatching_base_num` 

In [35]:
most_frequent = df_fhv.groupBy("dispatching_base_num").count().sort('count',ascending=False).limit(5)

In [36]:
most_frequent.show()

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+



# 4. Find Top 5 Most common location pairs (PUlocationID and DOlocationID)

In [39]:
common_location_pair= df_trips_data.filter('PUlocationID is not NULL AND DOlocationID is not NULL').groupBy(['PUlocationID','DOlocationID']).count().sort('count',ascending=False).limit(5)

In [40]:
common_location_pair.show()

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|         237|         236|11455|
|         236|         237| 9909|
|         236|         236| 8844|
|         237|         237| 7324|
|         264|         264| 5827|
+------------+------------+-----+



# 5. Write all of the result to BigQuery table

In [None]:
df_taxi_trips.write.format('bigquery').option('table','cleaver-1.taxi.answer_1').save()

In [None]:
df_longest_trip.write.format('bigquery').option('table','cleaver-1.taxi.answer_2').save()

In [None]:
most_frequent.write.format('bigquery').option('table','cleaver-1.taxi.answer_3').save()

In [None]:
common_location_pair.write.format('bigquery').option('table','cleaver-1.taxi.answer_4').save()