In [1]:
import os
import requests
import pyspark
pyspark.__file__

import warnings
warnings.filterwarnings("ignore")

### Download data using requests

In [3]:
data_yellow = requests.get('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet')
data_green = requests.get('https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet')
data_fhv = requests.get('https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet')
data_hvfhv = requests.get('https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet')

In [4]:
# download yellow trip data
with open(os.path.join(os.getcwd(),'yellow_tripdata_2021-02.parquet'), 'wb') as f:
    f.write(data_yellow.content)
    f.close()

In [5]:
# download green trip data
with open(os.path.join(os.getcwd(),'green_tripdata_2021-02.parquet'), 'wb') as f:
    f.write(data_green.content)
    f.close()

In [6]:
# download fhv trip data
with open(os.path.join(os.getcwd(),'fhv_tripdata_2021-02.parquet'), 'wb') as f:
    f.write(data_fhv.content)
    f.close()

In [7]:
# download hvfhv trip data
with open(os.path.join(os.getcwd(),'fhvhv_tripdata_2021-02.parquet'), 'wb') as f:
    f.write(data_hvfhv.content)
    f.close()

### Starting SparkSession

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [4]:
df_green = spark.read.parquet(os.path.join(os.getcwd(),'green_tripdata_2021-02.parquet'))

In [5]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [6]:
df_yellow = spark.read.parquet(os.path.join(os.getcwd(), 'yellow_tripdata_2021-02.parquet'))

In [7]:
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [8]:
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)

In [9]:
from pyspark.sql import functions as F

In [10]:
df_green_sel = df_green \
    .select(common_colums) \
    .withColumn('service_type', F.lit('green'))

In [11]:
df_yellow_sel = df_yellow \
    .select(common_colums) \
    .withColumn('service_type', F.lit('yellow'))

In [12]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [13]:
df_trips_data.groupBy('service_type').count().show()

+------------+-------+
|service_type|  count|
+------------+-------+
|       green|  64572|
|      yellow|1371709|
+------------+-------+



In [14]:
df_trips_data.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'service_type']

In [15]:
df_trips_data.registerTempTable('trips_data')

In [16]:
spark.sql("""
SELECT
    service_type,
    count(1)
FROM
    trips_data
GROUP BY 
    service_type
""").show()

+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green|   64572|
|      yellow| 1371709|
+------------+--------+



In [17]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

In [18]:
# df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')

#### 1. How many taxi trips were there on February 15?

In [19]:
df_green.registerTempTable('trips_data_green')
df_yellow.registerTempTable('trips_data_yellow')

In [20]:
# Green taxi data

spark.sql("""
SELECT
    VendorID,
    count(1) as total_trip
FROM
    trips_data_green
WHERE
    pickup_datetime>='2021-02-15 00:00:00' and pickup_datetime<='2021-02-16 23:59:59'
GROUP BY
    VendorID
""").show()

+--------+----------+
|VendorID|total_trip|
+--------+----------+
|       1|       450|
|       2|      4005|
+--------+----------+



In [21]:
# Yellow taxi data

spark.sql("""
SELECT
    VendorID,
    count(1) as total_trip
FROM
    trips_data_yellow
WHERE
    pickup_datetime>='2021-02-15 00:00:00' and pickup_datetime<='2021-02-15 23:59:59'
GROUP BY
    VendorID
""").show()

+--------+----------+
|VendorID|total_trip|
+--------+----------+
|       6|       314|
|       1|     13599|
|       2|     29773|
+--------+----------+



In [22]:
# yellow and green taxi data

spark.sql("""
SELECT
    service_type,
    count(1) as total_trip
FROM
    trips_data
WHERE
    pickup_datetime>='2021-02-15 00:00:00' and pickup_datetime<='2021-02-15 23:59:59'
GROUP BY
    service_type
""").show()

+------------+----------+
|service_type|total_trip|
+------------+----------+
|       green|      1811|
|      yellow|     43686|
+------------+----------+



#### 2. Find the longest trip for each day ?

In [23]:
from pyspark.sql.functions import *

In [25]:
fixed_date_green = df_green.select("*").withColumn("fixed_date", date_format("pickup_datetime", "yyyy-MM-dd"))
fixed_date_yellow = df_yellow.select("*").withColumn("fixed_date", date_format("pickup_datetime", "yyyy-MM-dd"))
fixed_date_agg = df_trips_data.select("*").withColumn("fixed_date", date_format("pickup_datetime", "yyyy-MM-dd"))

In [26]:
fixed_date_green.registerTempTable('trips_data_green_fixed')
fixed_date_yellow.registerTempTable('trips_data_yellow_fixed')
fixed_date_agg.registerTempTable('trips_data_agg_fixed')

In [27]:
# green taxi data

spark.sql("""
SELECT
    fixed_date,
    max(trip_distance) as longest_trip
FROM
    trips_data_green_fixed
GROUP BY
    fixed_date
ORDER BY
    fixed_date ASC
""").show()

+----------+------------+
|fixed_date|longest_trip|
+----------+------------+
|2009-01-01|         0.0|
|2021-02-01|       27.52|
|2021-02-02|        48.1|
|2021-02-03|       36.33|
|2021-02-04|   102620.98|
|2021-02-05|       36.37|
|2021-02-06|       38.75|
|2021-02-07|        90.0|
|2021-02-08|      5634.0|
|2021-02-09|       34.64|
|2021-02-10|     60382.7|
|2021-02-11|    43174.56|
|2021-02-12|    66659.27|
|2021-02-13|       47.79|
|2021-02-14|       58.03|
|2021-02-15|       44.04|
|2021-02-16|    16191.56|
|2021-02-17|    16240.75|
|2021-02-18|    29501.25|
|2021-02-19|       34.95|
+----------+------------+
only showing top 20 rows



In [28]:
# yellow taxi data

spark.sql("""
SELECT
    fixed_date,
    max(trip_distance) as longest_trip
FROM
    trips_data_yellow_fixed
GROUP BY
    fixed_date
ORDER BY
    fixed_date ASC
""").show()

+----------+------------+
|fixed_date|longest_trip|
+----------+------------+
|2009-01-01|        2.89|
|2009-01-02|        0.84|
|2021-02-01|       38.89|
|2021-02-02|       73.24|
|2021-02-03|   186079.73|
|2021-02-04|       82.19|
|2021-02-05|    91134.16|
|2021-02-06|       48.35|
|2021-02-07|   186510.67|
|2021-02-08|   186617.92|
|2021-02-09|    89416.24|
|2021-02-10|       99.96|
|2021-02-11|        54.4|
|2021-02-12|    34346.07|
|2021-02-13|    54381.65|
|2021-02-14|   115928.92|
|2021-02-15|       52.89|
|2021-02-16|   221188.25|
|2021-02-17|   140145.44|
|2021-02-18|       900.0|
+----------+------------+
only showing top 20 rows



In [29]:
# Green and Yellow taxi data

spark.sql("""
SELECT
    fixed_date,
    max(trip_distance) as longest_trip
FROM
    trips_data_agg_fixed
GROUP BY
    fixed_date
ORDER BY
    fixed_date ASC
""").show()

+----------+------------+
|fixed_date|longest_trip|
+----------+------------+
|2009-01-01|        2.89|
|2009-01-02|        0.84|
|2021-02-01|       38.89|
|2021-02-02|       73.24|
|2021-02-03|   186079.73|
|2021-02-04|   102620.98|
|2021-02-05|    91134.16|
|2021-02-06|       48.35|
|2021-02-07|   186510.67|
|2021-02-08|   186617.92|
|2021-02-09|    89416.24|
|2021-02-10|     60382.7|
|2021-02-11|    43174.56|
|2021-02-12|    66659.27|
|2021-02-13|    54381.65|
|2021-02-14|   115928.92|
|2021-02-15|       52.89|
|2021-02-16|   221188.25|
|2021-02-17|   140145.44|
|2021-02-18|    29501.25|
+----------+------------+
only showing top 20 rows



#### 3. Find Top 5 Most frequent `dispatching_base_num` ?

In [30]:
df_fhv = spark.read.parquet(os.path.join(os.getcwd(),'fhv_tripdata_2021-02.parquet'))

In [31]:
df_fhv.registerTempTable('data_fhv')

In [32]:
spark.sql("""
SELECT 
    distinct(dispatching_base_num),
    count(1) as total
FROM
    data_fhv
GROUP BY
    dispatching_base_num
ORDER BY
    total DESC
LIMIT 5
""").show()

+--------------------+-----+
|dispatching_base_num|total|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+



#### 4. Find Top 5 Most common location pairs (PUlocationID and DOlocationID) ?

In [33]:
# Green taxi data

spark.sql("""
SELECT
    PUlocationID,
    DOlocationID,
    count(1) as total_pair
from
    trips_data_green
GROUP BY
    PUlocationID,
    DOlocationID
ORDER BY
    total_pair DESC
LIMIT 5
""").show()

+------------+------------+----------+
|PUlocationID|DOlocationID|total_pair|
+------------+------------+----------+
|          74|          75|       994|
|          75|          74|       949|
|          74|          74|       651|
|          41|          42|       535|
|          74|          42|       497|
+------------+------------+----------+



In [34]:
# Yellow taxi data

spark.sql("""
SELECT
    PUlocationID,
    DOlocationID,
    count(1) as total_pair
from
    trips_data_yellow
GROUP BY
    PUlocationID,
    DOlocationID
ORDER BY
    total_pair DESC
LIMIT 5
""").show()

+------------+------------+----------+
|PUlocationID|DOlocationID|total_pair|
+------------+------------+----------+
|         237|         236|     11455|
|         236|         237|      9901|
|         236|         236|      8819|
|         237|         237|      7324|
|         264|         264|      5732|
+------------+------------+----------+



In [35]:
# Green and Yellow taxi data

spark.sql("""
SELECT
    PUlocationID,
    DOlocationID,
    count(1) as total_pair
from
    trips_data
GROUP BY
    PUlocationID,
    DOlocationID
ORDER BY
    total_pair DESC
LIMIT 5
""").show()

+------------+------------+----------+
|PUlocationID|DOlocationID|total_pair|
+------------+------------+----------+
|         237|         236|     11455|
|         236|         237|      9909|
|         236|         236|      8844|
|         237|         237|      7324|
|         264|         264|      5827|
+------------+------------+----------+



#### 5. Write all of the result to BigQuery table (additional - point plus)
This is not done yet (I'll add it when I already figured it out about Connection to BigQuery

In [36]:
spark.sql('''
select
    count(pickup_datetime) as count
from trips_data_green
where
    pickup_datetime >= '2021-02-15' and pickup_datetime < '2021-02-16'
''').show()

+-----+
|count|
+-----+
| 1811|
+-----+



In [37]:
from pyspark.sql import types

In [38]:
green_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("ehail_fee", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("trip_type", types.IntegerType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [39]:
spark.stop()