In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions

In [2]:
# when programmatically accessing spark, you need to create a session first
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

In [3]:
# read the parquet data
df = spark.read.parquet("./data/tripdata.parquet")

In [4]:
# .parquet is a self describing file format - it already contains a schema. Print the schema of the read file.
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [5]:
# show the first record of the file
df.show(1, vertical=True)

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2024-05-01 00:59:15 
 tpep_dropoff_datetime | 2024-05-01 01:23:50 
 passenger_count       | 1                   
 trip_distance         | 6.1                 
 RatecodeID            | 1                   
 store_and_fwd_flag    | N                   
 PULocationID          | 138                 
 DOLocationID          | 145                 
 payment_type          | 1                   
 fare_amount           | 28.2                
 extra                 | 7.75                
 mta_tax               | 0.5                 
 tip_amount            | 5.0                 
 tolls_amount          | 0.0                 
 improvement_surcharge | 1.0                 
 total_amount          | 42.45               
 congestion_surcharge  | 0.0                 
 Airport_fee           | 1.75                
only showing top 1 row



In [6]:
# show the total number of rows
df.count()

3723833

In [7]:
# you can use SQL statements directly as well instead of pyspark functions
# register the dataframe as a table first
df.createOrReplaceTempView("dataTable")

In [8]:
# now you can run the SQL statements
query = "SELECT count(*) FROM dataTable"
spark.sql(query).show()

+--------+
|count(1)|
+--------+
| 3723833|
+--------+



### Average Revenue Per Vendor

In [9]:
# using pyspark functions
df.select(df.VendorID, df.total_amount).groupby('VendorID').avg('total_amount').show()

+--------+-----------------+
|VendorID|avg(total_amount)|
+--------+-----------------+
|       1|27.77309818844534|
|       2|28.56437778683938|
|       6|44.27804444444447|
+--------+-----------------+



In [10]:
# using SQL query
query = "SELECT VendorID, avg(total_amount) FROM dataTable GROUP BY VendorID"
spark.sql(query).show()

+--------+-----------------+
|VendorID|avg(total_amount)|
+--------+-----------------+
|       1|27.77309818844534|
|       2|28.56437778683938|
|       6|44.27804444444447|
+--------+-----------------+



### Trip distance

In [11]:
# data needs to be grouped before running any aggregate functions - running df.avg('column') will give an error
df.groupBy().avg("trip_distance").show()

+------------------+
|avg(trip_distance)|
+------------------+
| 5.367049832793204|
+------------------+



In [12]:
query = "SELECT avg(trip_distance) FROM dataTable"
spark.sql(query).show()

+------------------+
|avg(trip_distance)|
+------------------+
| 5.367049832793204|
+------------------+



In [13]:
df.select(df.VendorID, df.trip_distance).groupBy('VendorID').avg('trip_distance').show()

+--------+------------------+
|VendorID|avg(trip_distance)|
+--------+------------------+
|       1|3.2256287155426717|
|       2| 6.046133441822597|
|       6|10.220799999999999|
+--------+------------------+



In [14]:
query = 'SELECT VendorID, avg(trip_distance) FROM dataTable GROUP BY VendorID'
spark.sql(query).show()

+--------+------------------+
|VendorID|avg(trip_distance)|
+--------+------------------+
|       1|3.2256287155426717|
|       2| 6.046133441822597|
|       6|10.220799999999999|
+--------+------------------+



### Pickup Location

#### Top 10 pickup locations

In [15]:
df.select(df.PULocationID).groupBy('PULocationID').count().sort(functions.desc('count')).show(10)

+------------+------+
|PULocationID| count|
+------------+------+
|         237|183490|
|         161|174675|
|         236|168379|
|         132|167546|
|         138|127137|
|         162|125417|
|         142|124892|
|         230|121996|
|         186|114356|
|         163|107303|
+------------+------+
only showing top 10 rows



In [16]:
query = "SELECT PULocationID, count(PULocationID) as count FROM dataTable GROUP BY PULocationID ORDER BY count(PULocationID) desc LIMIT 10"
spark.sql(query).show()

+------------+------+
|PULocationID| count|
+------------+------+
|         237|183490|
|         161|174675|
|         236|168379|
|         132|167546|
|         138|127137|
|         162|125417|
|         142|124892|
|         230|121996|
|         186|114356|
|         163|107303|
+------------+------+



### Dropoff Location

In [17]:
df.select(df.DOLocationID).groupBy('DOLocationID').count().sort(functions.desc('count')).show(10)

+------------+------+
|DOLocationID| count|
+------------+------+
|         236|172072|
|         237|165143|
|         161|141823|
|         230|119124|
|         170|108171|
|         162|106543|
|         142|106372|
|         239|102809|
|          68| 95180|
|         141| 94888|
+------------+------+
only showing top 10 rows



In [18]:
query = "SELECT DOLocationID, count(DOLocationID) as count FROM dataTable GROUP BY DOLocationID ORDER BY count(DOLocationID) desc LIMIT 10"
spark.sql(query).show()

+------------+------+
|DOLocationID| count|
+------------+------+
|         236|172072|
|         237|165143|
|         161|141823|
|         230|119124|
|         170|108171|
|         162|106543|
|         142|106372|
|         239|102809|
|          68| 95180|
|         141| 94888|
+------------+------+



In [19]:
# spark.sql.catalogImplementation('Hive')
query1 = "CREATE TABLE Top10PU AS SELECT PULocationID, count(PULocationID) as count FROM dataTable GROUP BY PULocationID ORDER BY count(PULocationID) desc LIMIT 10"
query2 = "CREATE TABLE Top10DO AS SELECT DOLocationID, count(DOLocationID) as count FROM dataTable GROUP BY DOLocationID ORDER BY count(DOLocationID) desc LIMIT 10"
query3 = "SELECT PULocationID as BusyLocationsID FROM Top10PU INNER JOIN Top10DO on Top10PU.PULocationID = Top10DO.DOLocationID"
spark.sql(query1)
spark.sql(query2)
spark.sql(query3).show()

++
||
++
++

++
||
++
++

+---------------+
|BusyLocationsID|
+---------------+
|            237|
|            161|
|            236|
|            162|
|            142|
|            230|
+---------------+

