In [2]:
import os
import sys

# --- FORCE JAVA 17 ---
# This explicitly tells the notebook where to find the correct Java version
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
# ---------------------

from pyspark.sql import SparkSession

# 1. Initialize Spark Session
print("Initializing Spark...")
try:
    spark = SparkSession.builder \
        .appName("HW2_Task1_RDD") \
        .master("local[*]") \
        .config("spark.driver.memory", "2g") \
        .getOrCreate()
    
    sc = spark.sparkContext
    sc.setLogLevel("ERROR") 
    
    print("✅ Spark Initialized Successfully!")
    print(f"Spark Version: {spark.version}")
    
except Exception as e:
    print("❌ Error still occurring. Please read below:")
    print(e)

# 2. Load the Dataset
# (Make sure 'yellow_tripdata_2016-03.csv' is in the folder where you started Jupyter)
try:
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv("yellow_tripdata_2016-03.csv")

    raw_rdd = df.rdd
    print(f"✅ Data Loaded. Total Rows: {raw_rdd.count()}")
    print(f"First Row: {raw_rdd.first()}")
    
except Exception as e:
    print("\n⚠️ Data loading failed. Check your file path.")
    print(e)

Initializing Spark...


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/25 22:09:38 WARN Utils: Your hostname, MSI, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/11/25 22:09:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/25 22:09:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✅ Spark Initialized Successfully!
Spark Version: 4.0.1

⚠️ Data loading failed. Check your file path.
[PATH_NOT_FOUND] Path does not exist: file:/mnt/c/Users/rasha/Downloads/yellow_tripdata_2016-03.csv. SQLSTATE: 42K03


In [3]:
# Updated path to include the folder you mentioned earlier
file_path = "archive (13)/yellow_tripdata_2016-03.csv"

print(f"Attempting to load: {file_path}")

try:
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(file_path)

    raw_rdd = df.rdd
    print(f"✅ Success! Data Loaded. Total Rows: {raw_rdd.count()}")
    print(f"First Row: {raw_rdd.first()}")

except Exception as e:
    print("❌ Still not found. Let's find the file location below.")
    import os
    print("Files in current folder:")
    print(os.listdir("."))
    if os.path.exists("archive (13)"):
        print("\nFiles in 'archive (13)':")
        print(os.listdir("archive (13)"))

Attempting to load: archive (13)/yellow_tripdata_2016-03.csv


                                                                                

✅ Success! Data Loaded. Total Rows: 12210952


[Stage 3:>                                                          (0 + 1) / 1]

First Row: Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2016, 3, 1, 0, 0), tpep_dropoff_datetime=datetime.datetime(2016, 3, 1, 0, 7, 55), passenger_count=1, trip_distance=2.5, pickup_longitude=-73.97674560546875, pickup_latitude=40.76515197753906, RatecodeID=1, store_and_fwd_flag='N', dropoff_longitude=-74.00426483154297, dropoff_latitude=40.74612808227539, payment_type=1, fare_amount=9.0, extra=0.5, mta_tax=0.5, tip_amount=2.05, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=12.35)


                                                                                

In [4]:
# 1. View the Schema (Structure)
# This shows you the column names and if they are Integers, Doubles (floats), or Strings.
print("=== SCHEMA ===")
df.printSchema()

# 2. View the First 5 Rows
# 'truncate=False' ensures you see the full content of every cell.
print("\n=== SAMPLE DATA ===")
df.show(5, truncate=False)

# 3. Summary Statistics
# This calculates count, mean, min, and max for key columns. 
# It helps you spot weird data (like negative distances).
print("\n=== STATISTICS ===")
df.select("passenger_count", "trip_distance", "fare_amount").describe().show()

=== SCHEMA ===
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)


=== SAMPLE DATA ===
+--------+--------------------+---------------------+---------------+------------



+-------+------------------+------------------+------------------+
|summary|   passenger_count|     trip_distance|       fare_amount|
+-------+------------------+------------------+------------------+
|  count|          12210952|          12210952|          12210952|
|   mean| 1.659580022917132| 6.131769762914482|12.795078448428898|
| stddev|1.3121892188373814|6156.4826446949155|134.09792335003755|
|    min|                 0|               0.0|            -376.0|
|    max|                 9|      1.90726288E7|         429496.72|
+-------+------------------+------------------+------------------+



                                                                                

In [5]:
# 1. Filter() Transformation [cite: 26]
# Objective: Remove noisy records.
# We keep only trips where trip_distance > 0.
# Note: logical operators like 'and' work in Python lambdas, unlike SQL.
filtered_rdd = raw_rdd.filter(lambda row: row['trip_distance'] is not None and row['trip_distance'] > 0)

print(f"1. Filter complete.")
print(f"   (We are processing {filtered_rdd.count()} valid trips...)")

1. Filter complete.




   (We are processing 12139826 valid trips...)


                                                                                

In [6]:
# 2. Map() Transformation [cite: 25]
# Objective: Derive a new feature or structured tuple.
# We extract a tuple of (VendorID, TripDistance) to analyze who drives further.
vendor_distance_pair = filtered_rdd.map(lambda row: (row['VendorID'], float(row['trip_distance'])))

print(f"2. Map complete. Sample: {vendor_distance_pair.take(5)}")

[Stage 9:>                                                          (0 + 1) / 1]

2. Map complete. Sample: [(1, 2.5), (1, 2.9), (2, 19.98), (2, 10.78), (2, 30.43)]


                                                                                

In [11]:
# 3. FlatMap() Transformation [cite: 27]
# Objective: Break complex fields into smaller elements.
# We turn the single datetime field (e.g., "2016-03-01 00:00:00") into a list of [Date, Time].
# Since Spark loaded it as a real datetime object, we convert to str() first to split it.
def split_date_time(row):
    dt_string = str(row['tpep_pickup_datetime']) # Converts to '2016-03-01 00:00:00'
    return dt_string.split(' ') # Returns list ['2016-03-01', '00:00:00']

flat_mapped_rdd = filtered_rdd.flatMap(split_date_time)

print(f"3. FlatMap complete. Sample elements: {flat_mapped_rdd.take(10)}")

[Stage 11:>                                                         (0 + 1) / 1]

3. FlatMap complete. Sample elements: ['2016-03-01', '00:00:00', '2016-03-01', '00:00:00', '2016-03-01', '00:00:00', '2016-03-01', '00:00:00', '2016-03-01', '00:00:00']


                                                                                

In [12]:
# 4. ReduceByKey() Transformation [cite: 28]
# Objective: Compute summary statistics (Aggregations).
# We sum the total miles driven for each VendorID.
# The input is the (VendorID, Distance) pairs from step 2.
total_distance_by_vendor = vendor_distance_pair.reduceByKey(lambda a, b: a + b)

In [13]:
# 5. Collect() Action [cite: 29]
# Objective: Extract outputs to the driver for printing.
# Note: meaningful for analysis as it brings the aggregated results back to you.
results = total_distance_by_vendor.collect()

print("\n=== FINAL RESULTS: Total Miles per Vendor ===")
for vendor, miles in results:
    print(f"Vendor {vendor}: {miles:,.2f} miles")




=== FINAL RESULTS: Total Miles per Vendor ===
Vendor 1: 55,384,892.00 miles
Vendor 2: 19,489,854.25 miles


                                                                                

In [14]:
# --- TASK 2: SPARK SQL (30 Points) ---

# 1. Create a Temporary View
# This allows us to use standard SQL queries on the DataFrame
df.createOrReplaceTempView("taxi_trips")
print("✅ Table 'taxi_trips' created successfully!")

✅ Table 'taxi_trips' created successfully!


In [15]:
# 2. Aggregation Query (10 pts)
# Requirement: Average value by category, count by class, etc.
# We will calculate the Average Fare and Total Trips for each Passenger Count group.
print("\n--- Query 1: Aggregation (Avg Fare by Passenger Count) ---")
q1 = spark.sql("""
    SELECT 
        passenger_count,
        ROUND(AVG(fare_amount), 2) as avg_fare,
        COUNT(*) as total_trips
    FROM taxi_trips
    WHERE passenger_count IS NOT NULL
    GROUP BY passenger_count
    ORDER BY passenger_count ASC
""")
q1.show()


--- Query 1: Aggregation (Avg Fare by Passenger Count) ---




+---------------+--------+-----------+
|passenger_count|avg_fare|total_trips|
+---------------+--------+-----------+
|              0|   25.92|        608|
|              1|   12.61|    8690240|
|              2|   13.44|    1730864|
|              3|   13.43|     499376|
|              4|   13.44|     239827|
|              5|   12.85|     644712|
|              6|   12.64|     405255|
|              7|   42.96|         24|
|              8|   54.18|         24|
|              9|   55.29|         22|
+---------------+--------+-----------+



                                                                                

In [16]:
# 3. Filtering Query (10 pts)
# Requirement: Use WHERE with logical operators (AND/OR).
# We find 'High Value' trips: Fare > $50 AND Distance > 10 miles.
print("\n--- Query 2: Filtering (High Value Trips) ---")
q2 = spark.sql("""
    SELECT 
        tpep_pickup_datetime,
        trip_distance,
        fare_amount,
        payment_type
    FROM taxi_trips
    WHERE fare_amount > 50 AND trip_distance > 10
""")
q2.show(5)


--- Query 2: Filtering (High Value Trips) ---
+--------------------+-------------+-----------+------------+
|tpep_pickup_datetime|trip_distance|fare_amount|payment_type|
+--------------------+-------------+-----------+------------+
| 2016-03-01 00:00:00|        19.98|       54.5|           1|
| 2016-03-01 00:00:00|        30.43|       98.0|           1|
| 2016-03-01 00:00:03|        16.81|       52.0|           1|
| 2016-03-01 00:00:07|        18.03|       52.0|           1|
| 2016-03-01 00:00:21|        19.35|       52.0|           1|
+--------------------+-------------+-----------+------------+
only showing top 5 rows


In [18]:
# 4. Join Query (10 pts)
# Requirement: Join with a helper table.
# We Create a small 'lookup table' for Payment Types (1=Credit Card, 2=Cash, etc.)
# Then we JOIN it to our main data to get the text description.

# A. Create the Lookup Data
payment_data = [
    (1, "Credit Card"),
    (2, "Cash"),
    (3, "No Charge"),
    (4, "Dispute"),
    (5, "Unknown"),
    (6, "Voided")
]
payment_df = spark.createDataFrame(payment_data, ["payment_type_id", "payment_desc"])
payment_df.createOrReplaceTempView("payment_lookup")

In [20]:
# B. Perform the JOIN
print("\n--- Query 3: Join (Trips + Payment Description) ---")
q3 = spark.sql("""
    SELECT 
        t.VendorID,
        t.fare_amount,
        t.payment_type,
        p.payment_desc
    FROM taxi_trips t
    INNER JOIN payment_lookup p 
    ON t.payment_type = p.payment_type_id
""")
q3.show(10)


--- Query 3: Join (Trips + Payment Description) ---


[Stage 27:>                                                         (0 + 1) / 1]

+--------+-----------+------------+------------+
|VendorID|fare_amount|payment_type|payment_desc|
+--------+-----------+------------+------------+
|       1|        9.0|           1| Credit Card|
|       1|       11.0|           1| Credit Card|
|       2|       54.5|           1| Credit Card|
|       2|       31.5|           1| Credit Card|
|       2|       98.0|           1| Credit Card|
|       2|       23.5|           1| Credit Card|
|       1|        5.5|           1| Credit Card|
|       2|       23.5|           1| Credit Card|
|       1|        5.5|           1| Credit Card|
|       2|        9.0|           1| Credit Card|
+--------+-----------+------------+------------+
only showing top 10 rows


                                                                                

In [21]:
from pyspark.sql.functions import col, avg, count, max, desc

print("=== TASK 3: DATAFRAMES ===")

# --- 1. Data Cleaning (10 pts) ---
# Requirement: Handle missing values using .dropna() or .fillna().
# We will drop rows where vital information (like passenger_count) is missing.
cleaned_df = df.dropna(subset=["passenger_count", "trip_distance", "fare_amount"])

# Alternatively, we could fill nulls (uncomment to see):
# cleaned_df = df.fillna(0, subset=["tolls_amount"])

print(f"Original Count: {df.count()}")
print(f"Cleaned Count:  {cleaned_df.count()}")
print("✅ Data cleaning complete (Rows with nulls removed).")

=== TASK 3: DATAFRAMES ===


                                                                                

Original Count: 12210952




Cleaned Count:  12210952
✅ Data cleaning complete (Rows with nulls removed).


                                                                                

In [22]:
# --- 2. GroupBy + Aggregation (10 pts) ---
# Requirement: Compute mean, max, count, etc. on a categorical field.
# We group by 'VendorID' and calculate statistics for the Fare Amount.
agg_df = cleaned_df.groupBy("VendorID").agg(
    count("*").alias("Total_Trips"),
    avg("fare_amount").alias("Avg_Fare"),
    max("trip_distance").alias("Max_Distance")
)

print("\n--- Aggregation Results (Stats by Vendor) ---")
agg_df.show()


--- Aggregation Results (Stats by Vendor) ---




+--------+-----------+------------------+------------+
|VendorID|Total_Trips|          Avg_Fare|Max_Distance|
+--------+-----------+------------------+------------+
|       1|    5731242|12.727348032067002|1.90726288E7|
|       2|    6479710| 12.85498535891264|      390.07|
+--------+-----------+------------------+------------+



                                                                                

In [23]:
# --- 3. Sorting / OrderBy (10 pts) ---
# Requirement: Sort by multiple columns.
# We sort by 'passenger_count' (Ascending) and 'fare_amount' (Descending).
# This shows us the most expensive trips for each passenger group size.
sorted_df = cleaned_df.orderBy(col("passenger_count").asc(), col("fare_amount").desc())

print("\n--- Sorting Results (High Fares by Passenger Count) ---")
sorted_df.select("passenger_count", "fare_amount", "trip_distance").show(10)


--- Sorting Results (High Fares by Passenger Count) ---




+---------------+-----------+-------------+
|passenger_count|fare_amount|trip_distance|
+---------------+-----------+-------------+
|              0|      252.3|          0.0|
|              0|      230.0|          0.3|
|              0|      210.0|          0.0|
|              0|      190.0|          0.0|
|              0|      150.0|          0.0|
|              0|      148.0|          0.0|
|              0|     144.71|          0.0|
|              0|      141.0|        28.95|
|              0|      141.0|          0.0|
|              0|      140.5|         0.01|
+---------------+-----------+-------------+
only showing top 10 rows


                                                                                

In [25]:
# Filter out 0 passengers first
df_no_zero = cleaned_df.filter(cleaned_df["passenger_count"] > 0)

# Then sort
sorted_df = df_no_zero.orderBy(col("passenger_count").asc(), col("fare_amount").desc())

print("\n--- Sorting Results (0 Passengers Removed) ---")
sorted_df.select("passenger_count", "fare_amount", "trip_distance").show(20)


--- Sorting Results (0 Passengers Removed) ---




+---------------+-----------+-------------+
|passenger_count|fare_amount|trip_distance|
+---------------+-----------+-------------+
|              1|  429496.72|          0.0|
|              1|     2002.5|          0.0|
|              1|     1120.5|          0.0|
|              1|     1000.0|          0.0|
|              1|      900.8|          0.0|
|              1|      819.5|        160.8|
|              1|      700.0|          0.0|
|              1|      700.0|          0.4|
|              1|     655.34|          0.8|
|              1|      600.0|        138.1|
|              1|      600.0|          0.2|
|              1|      590.0|          0.0|
|              1|      553.5|         2.75|
|              1|      550.0|        196.0|
|              1|      500.0|         19.2|
|              1|      500.0|       110.74|
|              1|      500.0|          2.8|
|              1|      495.7|          0.0|
|              1|      495.0|       126.54|
|              1|      495.0|   

                                                                                