In [1]:
print ('Hello World')

Hello World


In [2]:
!pip install pyspark



In [2]:
import pyspark
pyspark.__version__

'3.5.1'

**Mounting Drive and importing files**

In [3]:

# Mount Drive
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [4]:
# Check if files exist
import os
os.path.isfile('/content/drive/MyDrive/pyspark_training/yellow_tripdata_2025-01.parquet')

True

**Initiating SparkSession**

In [5]:
# Initiate SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

1. Install Java and Spark/PySpark Dependencies
Since Spark runs on the JVM, you must ensure that Java (typically OpenJDK 8) and the necessary PySpark libraries are installed.

In [6]:
# Install Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [7]:
# Initiate SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

**Data formats and loading data**

Sources for the dataset:  
https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page  
For ths exercise was use the data from
"Taxi Zone Lookup Table (CSV)"   
and Trip Record Data Download Links 2025 January and February  
Inside Googel Drive the folder was created
MyDrive/pyspark_training/ to follow the informaation from the
https://www.linkedin.com/learning/pyspark-essential-training-introduction-to-building-data-pipelines/



In [8]:
# Read in data file
df = spark.read.parquet('/content/drive/MyDrive/pyspark_training/yellow_tripdata_2025-01.parquet')

In [9]:
# Show the top 5 rows of df
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [10]:
display(df)

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp_ntz, tpep_dropoff_datetime: timestamp_ntz, passenger_count: bigint, trip_distance: double, RatecodeID: bigint, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, Airport_fee: double, cbd_congestion_fee: double]

In [11]:
# Check rowcount in dataframe :How many Rows with have in the dataset
df.count()

3475226

In [12]:
# Show colum names : Names od the Columns and Types ot the data Types
df.schema.names

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee',
 'cbd_congestion_fee']

In [13]:
# Show column names + datatypes : print the column_name, dataType and properties of the dataframe "df"
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)



In [14]:
# Descriptive stats for a subset of columns
df.describe(['passenger_count', 'total_amount']).show()

+-------+------------------+------------------+
|summary|   passenger_count|      total_amount|
+-------+------------------+------------------+
|  count|           2935077|           3475226|
|   mean|1.2978589658806226|25.611291697295062|
| stddev| 0.750750275480471| 463.6584784502283|
|    min|                 0|            -901.0|
|    max|                 9|         863380.37|
+-------+------------------+------------------+



# **Basic querying: Selecting columns**

In [15]:
# Select a column
df.passenger_count

Column<'passenger_count'>

In [16]:
# Alternative notation to select a column
df['passenger_count']

Column<'passenger_count'>

In [17]:
# Alternative notation to select a column
df.select('passenger_count')

DataFrame[passenger_count: bigint]

In [18]:
# Select a column and display it
df.select('passenger_count').show()

+---------------+
|passenger_count|
+---------------+
|              1|
|              1|
|              1|
|              3|
|              3|
|              2|
|              0|
|              0|
|              0|
|              1|
|              1|
|              1|
|              3|
|              1|
|              1|
|              3|
|              1|
|              1|
|              1|
|              2|
+---------------+
only showing top 20 rows



In [19]:
# Alternative notation to select a column and display it
from pyspark.sql.functions import col
df.select(col('passenger_count')).show()

+---------------+
|passenger_count|
+---------------+
|              1|
|              1|
|              1|
|              3|
|              3|
|              2|
|              0|
|              0|
|              0|
|              1|
|              1|
|              1|
|              3|
|              1|
|              1|
|              3|
|              1|
|              1|
|              1|
|              2|
+---------------+
only showing top 20 rows



In [20]:
# Select multiple columns and display them
df.select('passenger_count',  'total_amount').show()

+---------------+------------+
|passenger_count|total_amount|
+---------------+------------+
|              1|        18.0|
|              1|       12.12|
|              1|        12.1|
|              3|         9.7|
|              3|         8.3|
|              2|        24.1|
|              0|       11.75|
|              0|        19.1|
|              0|        27.1|
|              1|        16.4|
|              1|        16.4|
|              1|       12.96|
|              3|        19.2|
|              1|        12.9|
|              1|        38.9|
|              3|        22.7|
|              1|       25.55|
|              1|       -8.54|
|              1|        12.2|
|              2|        20.6|
+---------------+------------+
only showing top 20 rows



# **Sorting dataframes**

In [21]:
# Sort df descending by the total_amount column
df.sort('total_amount', ascending=False).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-20 12:07:18|  2025-01-20 12:12:42|              1|          1.6|         1|                 N|         138|    

In [22]:
# Sort df by two columns in descending order
df.sort(['total_amount', 'passenger_count'], ascending=[False, False]).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-20 12:07:18|  2025-01-20 12:12:42|              1|          1.6|         1|                 N|         138|    

**Filtering data**

In [23]:
# Filter dataframe
df.filter('Airport_fee > 0').show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:51:41|  2025-01-01 01:06:26|              1|          7.2|         1|                 N|         132|    

In [24]:
# Filter dataframe for two criteria
df.filter('Airport_fee > 0 and passenger_count > 2').show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       2| 2025-01-01 00:36:08|  2025-01-01 00:54:52|              4|        11.86|         1|                 N|         138|    

In [None]:
# Filter the dataframe and retrieve only three columns
df.filter('Airport_fee > 0 and passenger_count > 2').select('VendorID', 'passenger_count', 'total_amount').show()

**Challenge solution: Querying a dataframe**

In [25]:
# Chaining filter, select, and sort and then displaying the result
df.filter('Airport_fee = 0 and passenger_count > 2')\
.select('VendorID', 'passenger_count', 'total_amount')\
.sort('passenger_count', ascending=False)\
.show()

+--------+---------------+------------+
|VendorID|passenger_count|total_amount|
+--------+---------------+------------+
|       2|              9|      111.32|
|       2|              9|       101.0|
|       2|              9|       107.5|
|       2|              8|       102.0|
|       2|              8|        86.0|
|       2|              8|        81.0|
|       2|              8|        81.0|
|       2|              7|       74.25|
|       2|              7|       100.5|
|       2|              6|       19.35|
|       2|              6|        22.2|
|       2|              6|       21.54|
|       2|              6|        42.0|
|       2|              6|       34.69|
|       1|              6|        21.3|
|       1|              6|        20.0|
|       2|              6|       23.69|
|       2|              6|       24.06|
|       2|              6|       23.88|
|       2|              6|       14.82|
+--------+---------------+------------+
only showing top 20 rows



**Overview: Essential PySpark Data Manipulation**  
In this chapter, you'll learn:  

How to handle missing data in PySpark DataFrames  
How to create new columns in a DataFrame and drop columns from a DataFrame  
How to union and join two DataFrames  
How to calculate aggregates such as counts and averages  
How to write data back to file  

# Handling missing data

In [26]:
# Import functions for finding missing data
from pyspark.sql.functions import col, isnull

In [27]:
# Count the number of NULL fare_amount values
df.filter(isnull(col('fare_amount'))).count()

0

In [28]:
# Count the number of NULL passenger_count values
df.filter(isnull(col('passenger_count'))).count()

540149

In [29]:
# Backfill all NULL fields with 1
df1 = df.fillna({'passenger_count': 1})

In [30]:
# Show df1
df1.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [31]:
#Check number of NULL passenger_count values again
df1.filter(isnull(col('passenger_count'))).count()

0

**Creating new dataframes and columns**

In [32]:
# Import timestamp function and rounding function
from pyspark.sql.functions import unix_timestamp, round

In [33]:
# Create a new column with the calculated trip duration in minutes
df1 = df.withColumn(
'trip_duration_minutes',
round(
(unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime') ) / 60,
1)
)

In [34]:
# Renaming columns
df2 = df1.select('tpep_pickup_datetime', 'tpep_dropoff_datetime', \
                 'trip_duration_minutes', 'PULocationID', 'DOLocationID', \
                 'passenger_count', 'fare_amount', 'Airport_fee', 'total_amount',).\
withColumnsRenamed({'tpep_pickup_datetime': 'pu_datetime',
                    'tpep_dropoff_datetime': 'do_datetime', \
                    'PULocationID': 'pu_location_id', \
                    'DOLocationID': 'do_location_id', \
                    'Airport_fee' : 'airport_fee'}
                   )

In [35]:
# Show df2
df2.show()

+-------------------+-------------------+---------------------+--------------+--------------+---------------+-----------+-----------+------------+
|        pu_datetime|        do_datetime|trip_duration_minutes|pu_location_id|do_location_id|passenger_count|fare_amount|airport_fee|total_amount|
+-------------------+-------------------+---------------------+--------------+--------------+---------------+-----------+-----------+------------+
|2025-01-01 00:18:38|2025-01-01 00:26:59|                  8.4|           229|           237|              1|       10.0|        0.0|        18.0|
|2025-01-01 00:32:40|2025-01-01 00:35:13|                  2.6|           236|           237|              1|        5.1|        0.0|       12.12|
|2025-01-01 00:44:04|2025-01-01 00:46:01|                  2.0|           141|           141|              1|        5.1|        0.0|        12.1|
|2025-01-01 00:14:27|2025-01-01 00:20:01|                  5.6|           244|           244|              3|        7

In [36]:
# Drop two columns - remember, df1 is immutable, so this only drops from the visible output
df1.drop('VendorID', 'RatecodeID').show()

+--------------------+---------------------+---------------+-------------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+---------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|trip_duration_minutes|
+--------------------+---------------------+---------------+-------------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+---------------------+
| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|                 N|         229|         237|     

**Unions and joins**

In [37]:
# Create new dataframe with February data
df_feb = spark.read.parquet('/content/drive/MyDrive/pyspark_training/yellow_tripdata_2025-02.parquet')

In [38]:
# Assign unioned dataframe to new dataframe
df_2025 = df.union(df_feb)

In [39]:
# Record count
df_2025.count()

7052769

In [40]:
# Load new dataset
taxi_zone_lookup = spark.read.option('header', 'true').csv('/content/drive/MyDrive/pyspark_training/taxi_zone_lookup.csv')

In [41]:
# Show new dataset
taxi_zone_lookup.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [42]:
# Join dataframes on the LocationID columns
df_joined = df_2025.join(taxi_zone_lookup, df_2025.PULocationID == taxi_zone_lookup.LocationID, 'left')
df_joined.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+----------+---------+--------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|LocationID|  Borough|                Zone|service_zone|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+----------+---------+

# **Aggregating**

In [43]:
# Calculate how often each payment type is used
df.groupBy('payment_type').count().sort('payment_type').show()

+------------+-------+
|payment_type|  count|
+------------+-------+
|           0| 540149|
|           1|2444393|
|           2| 390429|
|           3|  23773|
|           4|  76481|
|           5|      1|
+------------+-------+



# Data dictionary for taxi lookup
* 0 = Flex Fare trip
* 1 = Credit card payment
* 2 = Cash payment
* 3 = No charge
* 4 = Dispute
* 5 = Unknown
* 6 = Voided trip

In [44]:
# Get the average amount for each payment type
df.groupBy('payment_type').avg('total_amount').show()

+------------+------------------+
|payment_type| avg(total_amount)|
+------------+------------------+
|           5|               0.0|
|           1|28.079206040112336|
|           3| 6.145837294409612|
|           2| 21.74035097290552|
|           4|11.573446869157074|
|           0|20.085374294870405|
+------------+------------------+



In [45]:
# Alternative syntax for aggregates
from pyspark.sql.functions import avg
df.groupBy('payment_type').agg(avg('total_amount')).show()

+------------+------------------+
|payment_type| avg(total_amount)|
+------------+------------------+
|           5|               0.0|
|           1|28.079206040112336|
|           3| 6.145837294409612|
|           2| 21.74035097290552|
|           4|11.573446869157074|
|           0|20.085374294870405|
+------------+------------------+



In [46]:
# Rename the aggregate column
df.groupBy('payment_type').agg(avg('total_amount').alias('avg_amount')).show()

+------------+------------------+
|payment_type|        avg_amount|
+------------+------------------+
|           5|               0.0|
|           1|28.079206040112336|
|           3| 6.145837294409612|
|           2| 21.74035097290552|
|           4|11.573446869157074|
|           0|20.085374294870405|
+------------+------------------+



# **Writing dataframes to files**

In [47]:
# Create new dataframe from aggregate
avg_fare = df.groupBy('payment_type').agg(avg('total_amount')).sort('payment_type')

In [48]:
# Write dataframe to file
avg_fare.write.csv('/content/drive/MyDrive/pyspark_training/avg_fare', header=True, mode='overwrite')

# Challenge solution: Essential PySpark data manipulation

In [49]:
# Step 1: Read in data files
df_jan_2025 = spark.read.parquet('/content/drive/MyDrive/pyspark_training/yellow_tripdata_2025-01.parquet')
df_feb_2025 = spark.read.parquet('/content/drive/MyDrive/pyspark_training/yellow_tripdata_2025-02.parquet')

In [50]:
# Step 2: Union dataframes
df_2025_combined = df_jan_2025.union(df_feb_2025)

In [51]:
# Step 3: Select and rename columns
df_2025_combined = df_2025_combined.\
select('tpep_pickup_datetime', 'tpep_dropoff_datetime',\
       'PULocationID', 'DOLocationID', 'passenger_count',\
       'fare_amount', 'Airport_fee', 'total_amount', 'payment_type').\
withColumnsRenamed({
    'tpep_pickup_datetime': 'pu_datetime',\
    'tpep_dropoff_datetime': 'do_datetime',\
    'PULocationID': 'pu_location_id',\
    'DOLocationID': 'do_location_id',\
    'Airport_fee' : 'airport_fee'})


In [52]:
# Step 4: Create a new dataframe with the zone lookup table:
taxi_zones = spark.read.option('header', 'true')\
  .csv('/content/drive/MyDrive/pyspark_training/taxi_zone_lookup.csv')

In [53]:
#Step 5: Join the dataframes on the location IDs to get the dropoff boro:
df_2025_combined = df_2025_combined.join(taxi_zones, df_2025_combined.do_location_id == taxi_zones.LocationID, 'left')

In [54]:
# Step 6: Drop and rename columns:
df_2025_combined = df_2025_combined.drop('LocationID', 'Zone', 'service_zone')\
.withColumnsRenamed({'Borough': 'pu_boro'})

In [55]:
# Step 7: Show the resulting dataframe:
df_2025_combined.show()

+-------------------+-------------------+--------------+--------------+---------------+-----------+-----------+------------+------------+---------+
|        pu_datetime|        do_datetime|pu_location_id|do_location_id|passenger_count|fare_amount|airport_fee|total_amount|payment_type|  pu_boro|
+-------------------+-------------------+--------------+--------------+---------------+-----------+-----------+------------+------------+---------+
|2025-01-01 00:18:38|2025-01-01 00:26:59|           229|           237|              1|       10.0|        0.0|        18.0|           1|Manhattan|
|2025-01-01 00:32:40|2025-01-01 00:35:13|           236|           237|              1|        5.1|        0.0|       12.12|           1|Manhattan|
|2025-01-01 00:44:04|2025-01-01 00:46:01|           141|           141|              1|        5.1|        0.0|        12.1|           1|Manhattan|
|2025-01-01 00:14:27|2025-01-01 00:20:01|           244|           244|              3|        7.2|        0.0| 

# **PySpark SQL**

In [56]:
# Read in the taxi file
taxi = spark.read.parquet('/content/drive/MyDrive/pyspark_training/yellow_tripdata_2025-01.parquet')

In [57]:
# Create a temporary view
taxi.createOrReplaceTempView('taxi')

In [58]:
# Select rows where the total_amount is greater than 50
spark.sql('SELECT * FROM taxi WHERE total_amount > 50').show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       2| 2025-01-01 00:15:41|  2025-01-01 01:03:03|              4|         3.05|         1|                 N|         114|    

In [59]:
# Chain a SQL query with a filter and select and display the result
spark.sql('SELECT * FROM taxi WHERE total_amount > 50')\
  .filter('passenger_count > 2')\
  .select('payment_type', 'passenger_count',  'total_amount')\
  .show()

+------------+---------------+------------+
|payment_type|passenger_count|total_amount|
+------------+---------------+------------+
|           1|              4|       50.76|
|           2|              9|      111.32|
|           1|              3|       56.38|
|           1|              4|        58.3|
|           1|              3|       51.55|
|           1|              4|       91.19|
|           2|              3|        59.1|
|           1|              4|       61.27|
|           3|              4|      123.44|
|           1|              3|       59.09|
|           2|              3|       192.4|
|           1|              4|       62.52|
|           1|              3|      151.35|
|           1|              3|       88.92|
|           1|              4|       100.0|
|           1|              4|      115.05|
|           2|              4|       80.94|
|           1|              3|        58.1|
|           1|              3|       62.49|
|           1|              3|  

In [61]:
# Write a query as triple-quoted text string
query = '''
SELECT
	payment_type,
	passenger_count,
	total_amount
FROM taxi
WHERE
	total_amount > 50
	AND
	passenger_count > 2
'''

In [62]:
# Execute the query and display results
spark.sql(query).show()

+------------+---------------+------------+
|payment_type|passenger_count|total_amount|
+------------+---------------+------------+
|           1|              4|       50.76|
|           2|              9|      111.32|
|           1|              3|       56.38|
|           1|              4|        58.3|
|           1|              3|       51.55|
|           1|              4|       91.19|
|           2|              3|        59.1|
|           1|              4|       61.27|
|           3|              4|      123.44|
|           1|              3|       59.09|
|           2|              3|       192.4|
|           1|              4|       62.52|
|           1|              3|      151.35|
|           1|              3|       88.92|
|           1|              4|       100.0|
|           1|              4|      115.05|
|           2|              4|       80.94|
|           1|              3|        58.1|
|           1|              3|       62.49|
|           1|              3|  

**Challenge solution: PySpark SQL**

In [63]:
# Step 1: Load the taxi ride data and create a temporary view:

taxi_jan2025 = spark.read.parquet('/content/drive/MyDrive/pyspark_training/yellow_tripdata_2025-01.parquet')
taxi_jan2025.createOrReplaceTempView('taxi_jan2025')

In [64]:
# Step 2: Load the lookup data and create a temporary view:

taxi_lookup = spark.read.option('header', 'true').csv('/content/drive/MyDrive/pyspark_training/taxi_zone_lookup.csv')
taxi_lookup.createOrReplaceTempView('taxi_lookup')

In [65]:
# Step 3: Join the two tables using SQL and assign the result to a dataframe called joined_df:

query = '''
SELECT
  DOLocationID,
  Borough,
  total_amount
FROM taxi_jan2025
LEFT JOIN taxi_lookup
  ON DOLocationID = LocationID
'''

joined_df = spark.sql(query)

In [66]:
# Step 4: Group the dataframe by Borough, calculate the average total fare amount,
# alias the average amount column, and display the result:

joined_df.groupBy('Borough').agg(avg('total_amount').alias('avg_amount')).show()

+-------------+------------------+
|      Borough|        avg_amount|
+-------------+------------------+
|       Queens| 51.61163036538719|
|          EWR|123.16705659828357|
|      Unknown|25.926336005344027|
|     Brooklyn| 42.40119152829651|
|Staten Island| 88.12902995720401|
|          N/A|107.62149760052951|
|    Manhattan|22.818652849443495|
|        Bronx| 42.89454769447338|
+-------------+------------------+



**End of the notebook!  **  
**by Carlos Araque : 10/11/2025**