# Initial setup

In [1]:
!pip install pyspark



In [2]:
import pyspark
pyspark.__version__

'3.5.5'

# Mounting Drive and importing files

In [5]:
# # Mount Drive
# from google.colab import drive
# drive.mount('/content/drive/')

Mounted at /content/drive/


In [6]:
# # Check if files exist
# import os
# os.path.isfile('/content/drive/MyDrive/pyspark_training/yellow_tripdata_2025-01.parquet')

True

In [4]:
%env SPARK_LOCAL_IP=127.0.0.1

env: SPARK_LOCAL_IP=127.0.0.1


# Initiating SparkSession

In [5]:
# Initiate SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

# Loading & inspecting data

In [6]:
# Read in data file
df = spark.read.parquet('./yellow_tripdata_2025-01.parquet', header=True)

                                                                                

In [7]:
df.show(5)

                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [8]:
# Check rowcount in dataframe
df.count()

3475226

In [9]:
# Show colum names
df.schema.names

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee',
 'cbd_congestion_fee']

In [13]:
# Show column names + datatypes
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)



In [14]:
# Descriptive stats for a subset of columns
df.describe(['passenger_count', 'total_amount']).show()

+-------+------------------+------------------+
|summary|   passenger_count|      total_amount|
+-------+------------------+------------------+
|  count|           2935077|           3475226|
|   mean|1.2978589658806226|25.611291697295062|
| stddev| 0.750750275480471| 463.6584784502283|
|    min|                 0|            -901.0|
|    max|                 9|         863380.37|
+-------+------------------+------------------+



# Selecting columns

In [15]:
df.passenger_count

Column<'passenger_count'>

In [16]:
df['passenger_count']

Column<'passenger_count'>

In [17]:
df.select('passenger_count')

DataFrame[passenger_count: bigint]

In [18]:
df.select('passenger_count').show()

+---------------+
|passenger_count|
+---------------+
|              1|
|              1|
|              1|
|              3|
|              3|
|              2|
|              0|
|              0|
|              0|
|              1|
|              1|
|              1|
|              3|
|              1|
|              1|
|              3|
|              1|
|              1|
|              1|
|              2|
+---------------+
only showing top 20 rows



In [19]:
from pyspark.sql.functions import col
df.select(col('passenger_count')).show()

+---------------+
|passenger_count|
+---------------+
|              1|
|              1|
|              1|
|              3|
|              3|
|              2|
|              0|
|              0|
|              0|
|              1|
|              1|
|              1|
|              3|
|              1|
|              1|
|              3|
|              1|
|              1|
|              1|
|              2|
+---------------+
only showing top 20 rows



In [20]:
df.select('passenger_count',  'total_amount').show()

+---------------+------------+
|passenger_count|total_amount|
+---------------+------------+
|              1|        18.0|
|              1|       12.12|
|              1|        12.1|
|              3|         9.7|
|              3|         8.3|
|              2|        24.1|
|              0|       11.75|
|              0|        19.1|
|              0|        27.1|
|              1|        16.4|
|              1|        16.4|
|              1|       12.96|
|              3|        19.2|
|              1|        12.9|
|              1|        38.9|
|              3|        22.7|
|              1|       25.55|
|              1|       -8.54|
|              1|        12.2|
|              2|        20.6|
+---------------+------------+
only showing top 20 rows



# Sorting dataframes

In [21]:
df.sort('total_amount', ascending=False).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-20 12:07:18|  2025-01-20 12:12:42|              1|          1.6|         1|                 N|         138|    

In [22]:
df.sort(['total_amount', 'passenger_count'], ascending=[False, False]).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-20 12:07:18|  2025-01-20 12:12:42|              1|          1.6|         1|                 N|         138|    

# Filtering data

In [23]:
df.filter('Airport_fee > 0').show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:51:41|  2025-01-01 01:06:26|              1|          7.2|         1|                 N|         132|    

In [24]:
df.filter('Airport_fee > 0 and passenger_count > 2').show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       2| 2025-01-01 00:36:08|  2025-01-01 00:54:52|              4|        11.86|         1|                 N|         138|    

In [25]:
df.filter('Airport_fee > 0 and passenger_count > 2').select('VendorID', 'passenger_count', 'total_amount').show()

+--------+---------------+------------+
|VendorID|passenger_count|total_amount|
+--------+---------------+------------+
|       2|              4|       91.19|
|       2|              3|       40.39|
|       2|              4|       61.27|
|       2|              3|      151.35|
|       2|              3|       29.35|
|       2|              3|       32.81|
|       2|              3|       88.92|
|       1|              3|       62.49|
|       1|              4|       25.75|
|       2|              4|       25.37|
|       2|              4|       92.69|
|       2|              4|       83.39|
|       2|              4|      102.25|
|       1|              3|       96.19|
|       2|              3|       62.16|
|       2|              3|       31.75|
|       2|              3|       69.05|
|       1|              4|       69.63|
|       2|              3|       47.16|
|       2|              4|       82.69|
+--------+---------------+------------+
only showing top 20 rows



# Challenge solution: Querying a dataframe

In [10]:
df.filter('Airport_fee = 0 and passenger_count > 2')\
.select('VendorID', 'passenger_count', 'total_amount')\
.sort('passenger_count', ascending=False)\
.show()

+--------+---------------+------------+
|VendorID|passenger_count|total_amount|
+--------+---------------+------------+
|       2|              9|       107.5|
|       2|              9|      111.32|
|       2|              9|       101.0|
|       2|              8|       102.0|
|       2|              8|        86.0|
|       2|              8|        81.0|
|       2|              8|        81.0|
|       2|              7|       100.5|
|       2|              7|       74.25|
|       2|              6|        22.2|
|       2|              6|       11.25|
|       2|              6|        42.0|
|       2|              6|       19.35|
|       1|              6|        21.3|
|       2|              6|       18.85|
|       2|              6|       23.69|
|       2|              6|       21.54|
|       2|              6|       23.88|
|       2|              6|       11.76|
|       2|              6|       15.48|
+--------+---------------+------------+
only showing top 20 rows

