In [3]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Taxi Trip Data Analysis") \
    .getOrCreate()


In [11]:
# Load your dataset from the specified path
df = spark.read.option("header", "true").csv("ELiteTEch_Intern/taxi_tripdata.csv")
df.show(10)


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       1| 2021-07-01 00:30:52|  2021-07-01 00:35:36|                 N|         1|          74|         168|              1|         1.20|          6|  0.5|    0.

In [37]:
df.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [15]:
df.orderBy(col("trip_distance").desc()).show(5)


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|    NULL| 2021-07-19 05:15:00|  2021-07-19 05:29:00|              NULL|      NULL|         108|         150|           NULL|     99762.26|      17.69| 2.75|      

In [17]:
df.select("fare_amount").describe().show()


+-------+------------------+
|summary|       fare_amount|
+-------+------------------+
|  count|             83691|
|   mean| 20.38830459667026|
| stddev|15.583552357872819|
|    min|                -1|
|    max|              99.5|
+-------+------------------+



In [19]:
df.groupBy("payment_type").count().show()


+------------+-----+
|payment_type|count|
+------------+-----+
|           3|  307|
|           5|    1|
|           1|29990|
|           4|   44|
|           2|20831|
|        NULL|32518|
+------------+-----+



In [51]:
from pyspark.sql.functions import hour, col

# Replace "pickup_datetime" with the actual column name if it's different
df = df.withColumn("lpep_pickup_datetime", hour(col("lpep_pickup_datetime")))
df.groupBy("lpep_pickup_datetime").count().orderBy("lpep_pickup_datetime").show()


+--------------------+-----+
|lpep_pickup_datetime|count|
+--------------------+-----+
|                   0| 1030|
|                   1|  642|
|                   2|  431|
|                   3|  363|
|                   4|  501|
|                   5|  950|
|                   6| 1935|
|                   7| 3358|
|                   8| 5030|
|                   9| 5798|
|                  10| 6096|
|                  11| 6092|
|                  12| 5766|
|                  13| 5534|
|                  14| 5626|
|                  15| 5744|
|                  16| 5280|
|                  17| 5166|
|                  18| 5183|
|                  19| 4268|
+--------------------+-----+
only showing top 20 rows


In [53]:
from pyspark.sql.functions import col

df = df.withColumn("trip_distance", col("trip_distance").cast("float")) \
       .withColumn("fare_amount", col("fare_amount").cast("float"))
filtered_df = df.filter(col("trip_distance").isNotNull() & col("fare_amount").isNotNull())
correlation = filtered_df.stat.corr("trip_distance", "fare_amount")
print(f"Correlation between trip_distance and fare_amount: {correlation:.2f}")


Correlation between trip_distance and fare_amount: 0.03
