In [1]:
from pyspark.sql import SparkSession

# Example: 8 cores, 128GB total memory
spark = SparkSession.builder \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "18g") \
    .config("spark.executor.instances", 7) \
    .getOrCreate()

In [2]:
data_path = "/expanse/lustre/projects/uci157/awatson4/datasets/taxi"

df = spark.read.parquet(data_path)

In [3]:
df.printSchema()
df.show(5)

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+---

### Data Exploration
*How many observations does your dataset have?*

In [4]:
df.count()

140151844

In [5]:
df.groupBy("passenger_count").count().show()

+---------------+--------+
|passenger_count|   count|
+---------------+--------+
|            8.0|     384|
|            0.0| 2719138|
|            7.0|     585|
|           NULL| 2733045|
|            1.0|98122695|
|            4.0| 2566033|
|            3.0| 5620491|
|            2.0|20555368|
|            6.0| 2966021|
|            5.0| 4867771|
|            9.0|     311|
|          112.0|       1|
|           96.0|       1|
+---------------+--------+



In [6]:
df.groupBy("VendorID", "PULocationID").count().show()

+--------+------------+-------+
|VendorID|PULocationID|  count|
+--------+------------+-------+
|       1|          94|   1041|
|       1|         185|   1691|
|       2|         179|  43171|
|       1|          51|   2477|
|       2|          52|  43261|
|       1|          96|    432|
|       4|          61|     15|
|       2|         227|   5674|
|       4|         238|   4368|
|       2|          76|  39356|
|       1|         176|     20|
|       2|         230|2844901|
|       2|         157|   9917|
|       4|         261|   1562|
|       5|         161|     14|
|       1|         138|1075811|
|       2|          41| 426463|
|       1|         158| 457328|
|       2|         133|   7995|
|       5|         163|      8|
+--------+------------+-------+
only showing top 20 rows



df.select("airport_fee").distinct().show()

Categorical Variables (Nominal Scale)
* VendorID (long): nominal, taxi provider ID, small number of discrete values
* RatecodeID (double): nominal, fare rate category, values represent pricing rules
* store_and_fwd_flag (string): binary categorical, Values: Y / N, whether trip data was stored before transmission
* PULocationID / DOLocationID (long): nominal, pickup and dropoff zones, high-cardinality categorical variables
* payment_type (long): payment method with multiple discrete classes
* airport_fee (integer): categorical / binary-like, airport surcharge presence

Interval Scale Variables
* tpep_pickup_datetime / tpep_dropoff_datetime: timestamp (interval), used to compute trip duration, distribution reflects human activity patterns

In [7]:
from pyspark.sql.functions import expr

df = df.withColumn(
    "trip_duration_minutes",
    expr("timestampdiff(MINUTE, tpep_pickup_datetime, tpep_dropoff_datetime)")
)
df.show(10)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|trip_duration_minutes|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------------+
|       1| 2019-03-01 00:24:41|  2019-03-01 00:25:31|            1.0|          0.0|       1.0|                 N|        

Continuous Variables (Ratio Scale)

* trip_distance (double):    Continuous positive variable. Right-skewed distribution (many short trips).
* fare_amount (double):    Continuous monetary value. Positively skewed.
* extra, mta_tax, improvement_surcharge, congestion_surcharge (double)
    Continuous but low-variance monetary adjustments.
* tip_amount (double):    Continuous non-negative variable. Highly right-skewed with many zeros.
* tolls_amount (double):    Continuous zero-inflated variable.
* total_amount (double):    Continuous monetary variable representing total trip cost. Strongly right-skewed.
* passenger_count (double):    Discrete count variable with small integer range.

In [8]:
from pyspark.sql.functions import min, max

df.select(min("trip_distance"), max("trip_distance")).show()
df.select(min("fare_amount"), max("fare_amount")).show()
df.select(min("total_amount"), max("total_amount")).show()
df.select(min("trip_duration_minutes"), max("trip_duration_minutes")).show()

+------------------+------------------+
|min(trip_distance)|max(trip_distance)|
+------------------+------------------+
|         -37264.53|         351613.36|
+------------------+------------------+

+----------------+----------------+
|min(fare_amount)|max(fare_amount)|
+----------------+----------------+
|         -1856.0|       998310.03|
+----------------+----------------+

+-----------------+-----------------+
|min(total_amount)|max(total_amount)|
+-----------------+-----------------+
|          -1871.8|       1084772.17|
+-----------------+-----------------+

+--------------------------+--------------------------+
|min(trip_duration_minutes)|max(trip_duration_minutes)|
+--------------------------+--------------------------+
|                   -531231|                     43648|
+--------------------------+--------------------------+



In [9]:
from pyspark.sql.functions import col

print(df.filter(col("trip_distance") < 0).count())
print(df.filter(col("fare_amount") < 0).count())
print(df.filter(col("total_amount") < 0).count())
print(df.filter(col("trip_duration_minutes") < 0).count())

1799

In [10]:
from pyspark.sql.functions import col

df_clean = df.filter(
    (col("trip_distance") >= 0) &
    (col("fare_amount") >= 0) &
    (col("total_amount") >= 0) &
    (col("trip_duration_minutes") >= 0)
)

In [11]:
from pyspark.sql.functions import min, max

df_clean.select(min("trip_distance"), max("trip_distance")).show()
df_clean.select(min("fare_amount"), max("fare_amount")).show()
df_clean.select(min("total_amount"), max("total_amount")).show()
df_clean.select(min("trip_duration_minutes"), max("trip_duration_minutes")).show()

+------------------+------------------+
|min(trip_distance)|max(trip_distance)|
+------------------+------------------+
|               0.0|         351613.36|
+------------------+------------------+

+----------------+----------------+
|min(fare_amount)|max(fare_amount)|
+----------------+----------------+
|             0.0|       998310.03|
+----------------+----------------+

+-----------------+-----------------+
|min(total_amount)|max(total_amount)|
+-----------------+-----------------+
|              0.0|       1084772.17|
+-----------------+-----------------+

+--------------------------+--------------------------+
|min(trip_duration_minutes)|max(trip_duration_minutes)|
+--------------------------+--------------------------+
|                         0|                     43648|
+--------------------------+--------------------------+



Target column: **total_amount**
- expecting right-skewed data, guessing most taxi rides are short with outliers of long trips

In [None]:
from pyspark.sql.functions import min, max, mean

df_clean.select(
    min("total_amount").alias("min_total"),
    max("total_amount").alias("max_total"),
    mean("total_amount").alias("mean_total")
).show()

*Do you have missing and duplicate values in your dataset?*

In [None]:
from pyspark.sql.functions import col, sum, when

missing_counts = df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
])

missing_counts.show()

In [None]:
total_rows = df.count()
unique_rows = df.dropDuplicates().count()
duplicates = total_rows - unique_rows

print(f"Total rows: {total_rows}")
print(f"Unique rows: {unique_rows}")
print(f"Duplicate rows: {duplicates}")