In [34]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("TaxiDataExploration").getOrCreate()


df = spark.read.parquet("/home/jovyan/work/data/raw")


df.printSchema()
df.limit(10).toPandas()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0
5,2,2023-01-01 00:50:34,2023-01-01 01:02:52,1.0,1.84,1.0,N,161,137,1,12.8,1.0,0.5,10.0,0.0,1.0,27.8,2.5,0.0
6,2,2023-01-01 00:09:22,2023-01-01 00:19:49,1.0,1.66,1.0,N,239,143,1,12.1,1.0,0.5,3.42,0.0,1.0,20.52,2.5,0.0
7,2,2023-01-01 00:27:12,2023-01-01 00:49:56,1.0,11.7,1.0,N,142,200,1,45.7,1.0,0.5,10.74,3.0,1.0,64.44,2.5,0.0
8,2,2023-01-01 00:21:44,2023-01-01 00:36:40,1.0,2.95,1.0,N,164,236,1,17.7,1.0,0.5,5.68,0.0,1.0,28.38,2.5,0.0
9,2,2023-01-01 00:39:42,2023-01-01 00:50:36,1.0,3.01,1.0,N,141,107,2,14.9,1.0,0.5,0.0,0.0,1.0,19.9,2.5,0.0


In [35]:
print("=== information about the data ===")
print(f"TOtal of records: {df.count():,}")
print(f"Ttoal of raws: {len(df.columns)}")

=== information about the data ===
TOtal of records: 3,066,766
Ttoal of raws: 19


In [36]:
# 2. Basic statistics for digital columns
print("\n=== Basic statistics ===")
df.describe(['fare_amount', 'trip_distance', 'passenger_count', 'total_amount']).show()


=== Basic statistics ===
+-------+------------------+------------------+------------------+------------------+
|summary|       fare_amount|     trip_distance|   passenger_count|      total_amount|
+-------+------------------+------------------+------------------+------------------+
|  count|           3066766|           3066766|           2995023|           3066766|
|   mean| 18.36706861234247|3.8473420306601414|1.3625321074328978| 27.02038310708492|
| stddev|17.807821939337924|249.58375606858166|0.8961199745510026|22.163588952492184|
|    min|            -900.0|               0.0|               0.0|            -751.0|
|    max|            1160.1|         258928.15|               9.0|            1169.4|
+-------+------------------+------------------+------------------+------------------+



In [37]:
# 3. Examine unique values for some columns
print("\n=== Types of sellers ===")
df.groupBy("VendorID").count().show()

print("\n=== payment methods ===")
df.groupBy("payment_type").count().show()


=== Types of sellers ===
+--------+-------+
|VendorID|  count|
+--------+-------+
|       1| 827367|
|       2|2239399|
+--------+-------+


=== payment methods ===
+------------+-------+
|payment_type|  count|
+------------+-------+
|           0|  71743|
|           1|2411462|
|           3|  18023|
|           2| 532241|
|           4|  33297|
+------------+-------+



In [38]:
# 4. Band checking
from pyspark.sql.functions import *
print("\n=== Value ranges ===")
df.select(
    min("fare_amount").alias("min_fare"),
    max("fare_amount").alias("max_fare"),
    min("trip_distance").alias("min_distance"),
    max("trip_distance").alias("max_distance")
).show()


=== Value ranges ===
+--------+--------+------------+------------+
|min_fare|max_fare|min_distance|max_distance|
+--------+--------+------------+------------+
|  -900.0|  1160.1|         0.0|   258928.15|
+--------+--------+------------+------------+



In [65]:
"""
Step 3: Basic Analysis & Initial Indicators

Description: Calculate basic metrics and identify simple patterns  
Level: Beginner to Intermediate  
Estimated Time: 15 minutes  
Expected Output: Basic KPIs, distributions, group comparisons

How to Run:
1. Make sure the data is loaded correctly  
2. Execute the code step by step  
3. Review the results and validate their logic
"""

# 1. Basic Metrics
print("=== Basic Metrics ===")
total_revenue = df.select(sum("total_amount")).collect()[0][0]
avg_fare = df.select(avg("fare_amount")).collect()[0][0]
avg_distance = df.select(avg("trip_distance")).collect()[0][0]
avg_passengers = df.select(avg("passenger_count")).collect()[0][0]

print(f"Total Revenue: ${total_revenue:,.2f}")
print(f"Average Fare: ${avg_fare:.2f}")
print(f"Average Trip Distance: {avg_distance:.2f} miles")
print(f"Average Passenger Count: {avg_passengers:.1f}")

# 2. Distribution of Passenger Count
print("\n=== Distribution of Passenger Count ===")
passenger_dist = df.groupBy("passenger_count") \
                   .count() \
                   .orderBy("passenger_count")
passenger_dist.show()

# 3. Vendor Comparison
print("\n=== Vendor Comparison ===")
vendor_comparison = df.groupBy("VendorID") \
                      .agg(count("*").alias("total_trips"),
                           avg("fare_amount").alias("avg_fare"),
                           avg("trip_distance").alias("avg_distance")) \
                      .orderBy("VendorID")
vendor_comparison.show()

# 4. Highest and Lowest Fares
print("\n=== Highest and Lowest Fares ===")
df.select("fare_amount", "trip_distance", "passenger_count") \
  .orderBy(col("fare_amount").desc()) \
  .show(5)


=== Basic Metrics ===
Total Revenue: $82,865,192.22
Average Fare: $18.37
Average Trip Distance: 3.85 miles
Average Passenger Count: 1.4

=== Distribution of Passenger Count ===
+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|           NULL|  71743|
|            0.0|  51164|
|            1.0|2261400|
|            2.0| 451536|
|            3.0| 106353|
|            4.0|  53745|
|            5.0|  42681|
|            6.0|  28124|
|            7.0|      6|
|            8.0|     13|
|            9.0|      1|
+---------------+-------+


=== Vendor Comparison ===
+--------+-----------+------------------+-----------------+
|VendorID|total_trips|          avg_fare|     avg_distance|
+--------+-----------+------------------+-----------------+
|       1|     827367|17.429674993078745|3.116383176994046|
|       2|    2239399|  18.7133974919165|4.117401646602275|
+--------+-----------+------------------+-----------------+


=== Highest and Lowest Fares ===
+---------

In [68]:
"""
Step 3: Basic Analysis & Initial Indicators

Description: Calculate basic metrics and identify simple patterns  
Level: Beginner to Intermediate  
Estimated Time: 15 minutes  
Expected Output: Basic KPIs, distributions, group comparisons

How to Run:
1. Make sure the data is loaded correctly  
2. Execute the code step by step  
3. Review the results and validate their logic
"""

# 1. Basic Metrics
print("=== Basic Metrics ===")
total_revenue = df.select(sum("total_amount")).collect()[0][0]
avg_fare = df.select(avg("fare_amount")).collect()[0][0]
avg_distance = df.select(avg("trip_distance")).collect()[0][0]
avg_passengers = df.select(avg("passenger_count")).collect()[0][0]

print(f"Total Revenue: ${total_revenue:,.2f}")
print(f"Average Fare: ${avg_fare:.2f}")
print(f"Average Trip Distance: {avg_distance:.2f} miles")
print(f"Average Passenger Count: {avg_passengers:.1f}")

# 2. Distribution of Passenger Count
print("\n=== Distribution of Passenger Count ===")
passenger_dist = df.groupBy("passenger_count") \
                   .count() \
                   .orderBy("passenger_count")
passenger_dist.show()

# 3. Vendor Comparison
print("\n=== Vendor Comparison ===")
vendor_comparison = df.groupBy("VendorID") \
                      .agg(count("*").alias("total_trips"),
                           avg("fare_amount").alias("avg_fare"),
                           avg("trip_distance").alias("avg_distance")) \
                      .orderBy("VendorID")
vendor_comparison.show()

# 4. Highest and Lowest Fares
print("\n=== Highest and Lowest Fares ===")
df.select("fare_amount", "trip_distance", "passenger_count") \
  .orderBy(col("fare_amount").desc()) \
  .show(5)


=== Basic Metrics ===
Total Revenue: $82,865,192.22
Average Fare: $18.37
Average Trip Distance: 3.85 miles
Average Passenger Count: 1.4

=== Distribution of Passenger Count ===
+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|           NULL|  71743|
|            0.0|  51164|
|            1.0|2261400|
|            2.0| 451536|
|            3.0| 106353|
|            4.0|  53745|
|            5.0|  42681|
|            6.0|  28124|
|            7.0|      6|
|            8.0|     13|
|            9.0|      1|
+---------------+-------+


=== Vendor Comparison ===
+--------+-----------+------------------+-----------------+
|VendorID|total_trips|          avg_fare|     avg_distance|
+--------+-----------+------------------+-----------------+
|       1|     827367|17.429674993078745|3.116383176994046|
|       2|    2239399|  18.7133974919165|4.117401646602275|
+--------+-----------+------------------+-----------------+


=== Highest and Lowest Fares ===
+---------

In [67]:
"""
Step 4: Temporal Pattern Analysis

Description: Identify peak hours, daily and weekly ride patterns  
Level: Intermediate  
Estimated Time: 20 minutes  
Expected Output: Peak hours, daily and weekly patterns, busiest days

How to Run:
1. Make sure the data is loaded correctly  
2. Execute the code step by step  
3. Review the results and validate their logic
"""

from pyspark.sql.functions import hour, dayofweek, date_format

# 1. Add time-based columns
df_time = df.withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \
            .withColumn("pickup_day", dayofweek(col("tpep_pickup_datetime"))) \
            .withColumn("pickup_date", date_format(col("tpep_pickup_datetime"), "yyyy-MM-dd"))

# 2. Trip distribution by hour (Peak times)
print("=== Trip Distribution by Hour (Peak Hours) ===")
hourly_trips = df_time.groupBy("pickup_hour") \
                      .count() \
                      .orderBy("pickup_hour")
hourly_trips.show(24)

# 3. Average fare by hour
print("\n=== Average Fare by Hour ===")
hourly_fare = df_time.groupBy("pickup_hour") \
                     .agg(avg("fare_amount").alias("avg_fare"),
                          count("*").alias("trip_count")) \
                     .orderBy("pickup_hour")
hourly_fare.show(24)

# 4. Trip distribution by day of the week
print("\n=== Trip Distribution by Day of the Week ===")
# 1=Sunday, 2=Monday, ..., 7=Saturday
daily_trips = df_time.groupBy("pickup_day") \
                     .agg(count("*").alias("trip_count"),
                          avg("fare_amount").alias("avg_fare")) \
                     .orderBy("pickup_day")
daily_trips.show()

# 5. Busiest days by number of trips
print("\n=== Busiest Days ===")
daily_volume = df_time.groupBy("pickup_date") \
                      .count() \
                      .orderBy(col("count").desc())
daily_volume.show(10)


=== Trip Distribution by Hour (Peak Hours) ===
+-----------+------+
|pickup_hour| count|
+-----------+------+
|          0| 84969|
|          1| 59799|
|          2| 42040|
|          3| 27438|
|          4| 17835|
|          5| 18011|
|          6| 43860|
|          7| 86877|
|          8|116865|
|          9|131111|
|         10|143666|
|         11|154157|
|         12|169858|
|         13|178739|
|         14|191604|
|         15|196424|
|         16|195977|
|         17|209493|
|         18|215889|
|         19|192801|
|         20|165862|
|         21|161548|
|         22|147415|
|         23|114528|
+-----------+------+


=== Average Fare by Hour ===
+-----------+------------------+----------+
|pickup_hour|          avg_fare|trip_count|
+-----------+------------------+----------+
|          0|19.586434346644225|     84969|
|          1| 17.74733005568661|     59799|
|          2|16.734721693625268|     42040|
|          3| 17.67398461987038|     27438|
|          4| 22.000816932