## Import Library

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import collections
from pyspark.sql import functions as f

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
spark

In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [5]:
# Mendownload dataset (Running di terminal wsl)
# !wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet  # green_tripdata
# !wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet    #fhv_tripdata

## Load and View Dataset

In [6]:
# Meload file yang telah terdownload
df = spark.read \
        .option("header","true") \
        .parquet(r"D:\13. Iykra Data Felowship\Project\Project 6\green_tripdata_2021-02.parquet")      

df2 = spark.read \
        .option("header","true") \
        .parquet(r"D:\13. Iykra Data Felowship\Project\Project 6\fhv_tripdata_2021-02.parquet")

In [7]:
# Melakukan Quey SQL di Spark

df.createOrReplaceTempView('green')            # Cara2 : df.registerTempTable('taxi')
df2.createOrReplaceTempView('fhv')

In [8]:
# Show database green taxi trip records

spark.sql('select * from green').show(5)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-02-01 07:34:03|  2021-02-01 07:51:58|                 N|       1.0|         130|         205|            5.0|         3.66|       14.0|  0.5|    0.

In [9]:
# Show data for hire vehicle trip records

spark.sql("select * from fhv").show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00013|2021-02-01 07:01:00|2021-02-01 08:33:00|        null|        null|   null|                B00014|
|     B00021         |2021-02-01 07:55:40|2021-02-01 08:06:20|       173.0|        82.0|   null|       B00021         |
|     B00021         |2021-02-01 07:14:03|2021-02-01 07:28:37|       173.0|        56.0|   null|       B00021         |
|     B00021         |2021-02-01 07:27:48|2021-02-01 07:35:45|        82.0|       129.0|   null|       B00021         |
|              B00037|2021-02-01 07:12:50|2021-02-01 07:26:38|        null|       225.0|   null|                B00037|
+--------------------+------------------

## Problem

**1. How many taxi trips were there on February 15?**

In [10]:
df \
    .withColumn('lpep_pickup_datetime', f.to_date(df.lpep_pickup_datetime)) \
    .select("lpep_pickup_datetime").where("lpep_pickup_datetime=='2021-02-15'").count()     

1811

**2. Find the longest trip for each day ?**

In [11]:
trip = df \
    .withColumn('lpep_pickup_datetime', f.to_date(df.lpep_pickup_datetime)) \
    .select("lpep_pickup_datetime","trip_distance") \
    .groupBy("lpep_pickup_datetime") \
    .agg({'trip_distance':'max'}) \
    .orderBy('lpep_pickup_datetime').show()

+--------------------+------------------+
|lpep_pickup_datetime|max(trip_distance)|
+--------------------+------------------+
|          2009-01-01|               0.0|
|          2021-02-01|             27.52|
|          2021-02-02|              48.1|
|          2021-02-03|             36.33|
|          2021-02-04|         102620.98|
|          2021-02-05|             36.37|
|          2021-02-06|             38.75|
|          2021-02-07|              90.0|
|          2021-02-08|            5634.0|
|          2021-02-09|             34.64|
|          2021-02-10|           60382.7|
|          2021-02-11|          43174.56|
|          2021-02-12|          66659.27|
|          2021-02-13|             47.79|
|          2021-02-14|             58.03|
|          2021-02-15|             44.04|
|          2021-02-16|          16191.56|
|          2021-02-17|          16240.75|
|          2021-02-18|          29501.25|
|          2021-02-19|             34.95|
+--------------------+------------

**3. Find Top 5 Most frequent `dispatching_base_num` ?**

In [12]:
spark.sql("""
SELECT 
    `dispatching_base_num`,
    COUNT(`dispatching_base_num`) AS `Count`
FROM `fhv`
GROUP BY `dispatching_base_num`
ORDER BY `Count` DESC
LIMIT 5
""").show()

+--------------------+-----+
|dispatching_base_num|Count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+



**4. Find Top 5 Most common location pairs (PUlocationID and DOlocationID) ?**

In [13]:
spark.sql("""
SELECT 
    PULocationID,DOLocationID, COUNT(*)
FROM fhv
GROUP BY PULocationID, DOLocationID
LIMIT 5;
""").show()

+------------+------------+--------+
|PULocationID|DOLocationID|count(1)|
+------------+------------+--------+
|        62.0|        85.0|       2|
|       169.0|       185.0|      29|
|       119.0|       229.0|       3|
|       246.0|       142.0|      11|
|         7.0|       193.0|      69|
+------------+------------+--------+

