In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/10/29 12:49:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark.version


'3.0.1'

In [4]:
! wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet

--2022-10-29 12:49:08--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.84.193.91, 52.84.193.3, 52.84.193.213, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.84.193.91|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 302633211 (289M) [application/x-www-form-urlencoded]
Saving to: ‘fhvhv_tripdata_2021-02.parquet.1’


2022-10-29 12:49:10 (176 MB/s) - ‘fhvhv_tripdata_2021-02.parquet.1’ saved [302633211/302633211]



In [5]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [6]:
df = spark.read.parquet('fhvhv_tripdata_2021-02.parquet')

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [7]:
from pyspark.sql import functions as F

Q3: How many taxi trips were there on February 15?



In [8]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-02-15'") \
    .count()

                                                                                

367170

In [9]:
df.registerTempTable('fhvhv_2021_02')

In [10]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhvhv_2021_02
WHERE
    to_date(pickup_datetime) = '2021-02-15';
""").show()



+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

Q4: Longest trip for each day

In [11]:
df.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'originating_base_num',
 'request_datetime',
 'on_scene_datetime',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'trip_miles',
 'trip_time',
 'base_passenger_fare',
 'tolls',
 'bcf',
 'sales_tax',
 'congestion_surcharge',
 'airport_fee',
 'tips',
 'driver_pay',
 'shared_request_flag',
 'shared_match_flag',
 'access_a_ride_flag',
 'wav_request_flag',
 'wav_match_flag']

In [12]:
df \
    .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

22/10/29 13:10:26 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
| 2021-02-11|        75540|
| 2021-02-17|        57221|
| 2021-02-20|        44039|
| 2021-02-03|        40653|
| 2021-02-19|        37577|
+-----------+-------------+





In [14]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG))/60) AS duration
FROM 
    fhvhv_2021_02
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 10;     
    
""").show()



+-----------+-----------------+
|pickup_date|         duration|
+-----------+-----------------+
| 2021-02-11|           1259.0|
| 2021-02-17|953.6833333333333|
| 2021-02-20|733.9833333333333|
| 2021-02-03|           677.55|
| 2021-02-19|626.2833333333333|
| 2021-02-25|            583.5|
| 2021-02-18|576.8666666666667|
| 2021-02-10|569.4833333333333|
| 2021-02-21|           537.05|
| 2021-02-09|534.7833333333333|
+-----------+-----------------+





## Q5: Most frequent dispatching_base_num

How many stages this spark job has?

In [17]:
spark.sql("""
SELECT
    dispatching_base_num,
    COUNT(1)
FROM 
    fhvhv_2021_02
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 5;     
    
""").show()



+--------------------+--------+
|dispatching_base_num|count(1)|
+--------------------+--------+
|              B02510| 3233664|
|              B02764|  965568|
|              B02872|  882689|
|              B02875|  685390|
|              B02765|  559768|
+--------------------+--------+





In [18]:
df \
    .groupBy('dispatching_base_num') \
        .count() \
    .orderBy('count', ascending=False) \
    .limit(5) \
    .show()



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
+--------------------+-------+



