**Load the data**

In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder.appName("Local_CSV_Reader").getOrCreate()

# Correct path to the CSV file in DBFS (without '/dbfs' prefix)
file_path = "/FileStore/tables/uber_data.csv"  # Corrected path

# Read the CSV file into a DataFrame
df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)

# Display the DataFrame schema
df.printSchema()

# Show a sample of the data
df.show(5)


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+------------------+---------------

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, hour, dayofmonth, dayofweek, month, year, monotonically_increasing_id

**Convert tpep_pickup_datetime and tpep_dropoff_datetime to datetime type**

In [0]:
df = df.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime"))) \
       .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))

**Remove duplicates and reset index**

In [0]:
df = df.dropDuplicates()

# Add a trip_id column
df = df.withColumn("trip_id", monotonically_increasing_id())

**Create datetime_dim table**

In [0]:
datetime_dim = df.select("tpep_pickup_datetime", "tpep_dropoff_datetime") \
                 .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \
                 .withColumn("pickup_day", dayofmonth(col("tpep_pickup_datetime"))) \
                 .withColumn("pickup_weekday", dayofweek(col("tpep_pickup_datetime"))) \
                 .withColumn("pickup_month", month(col("tpep_pickup_datetime"))) \
                 .withColumn("pickup_year", year(col("tpep_pickup_datetime"))) \
                 .withColumn("dropoff_hour", hour(col("tpep_dropoff_datetime"))) \
                 .withColumn("dropoff_day", dayofmonth(col("tpep_dropoff_datetime"))) \
                 .withColumn("dropoff_weekday", dayofweek(col("tpep_dropoff_datetime"))) \
                 .withColumn("dropoff_month", month(col("tpep_dropoff_datetime"))) \
                 .withColumn("dropoff_year", year(col("tpep_dropoff_datetime")))

# Add datetime_id column
datetime_dim = datetime_dim.withColumn("datetime_id", monotonically_increasing_id())

# Reorder columns
datetime_dim = datetime_dim.select("datetime_id", *datetime_dim.columns[:-1])

datetime_dim.show()

+-----------+--------------------+---------------------+-----------+----------+--------------+------------+-----------+------------+-----------+---------------+-------------+------------+
|datetime_id|tpep_pickup_datetime|tpep_dropoff_datetime|pickup_hour|pickup_day|pickup_weekday|pickup_month|pickup_year|dropoff_hour|dropoff_day|dropoff_weekday|dropoff_month|dropoff_year|
+-----------+--------------------+---------------------+-----------+----------+--------------+------------+-----------+------------+-----------+---------------+-------------+------------+
|          0| 2016-03-01 00:00:56|  2016-03-01 00:04:43|          0|         1|             3|           3|       2016|           0|          1|              3|            3|        2016|
|          1| 2016-03-10 07:09:06|  2016-03-10 08:00:54|          7|        10|             5|           3|       2016|           8|         10|              5|            3|        2016|
|          2| 2016-03-10 07:10:25|  2016-03-10 07:27:30|    

**Create passenger_count_dim table**

In [0]:
passenger_count_dim = df.select("passenger_count").distinct() \
                        .withColumn("passenger_count_id", monotonically_increasing_id())

# Reorder columns
passenger_count_dim = passenger_count_dim.select("passenger_count_id", "passenger_count")

passenger_count_dim.show()

+------------------+---------------+
|passenger_count_id|passenger_count|
+------------------+---------------+
|                 0|              1|
|                 1|              6|
|                 2|              3|
|                 3|              5|
|                 4|              4|
|                 5|              2|
|                 6|              0|
+------------------+---------------+



**Create trip_distance_dim table**

In [0]:
trip_distance_dim = df.select("trip_distance").distinct() \
                      .withColumn("trip_distance_id", monotonically_increasing_id())

# Reorder columns
trip_distance_dim = trip_distance_dim.select("trip_distance_id", "trip_distance")

trip_distance_dim.show(5)

+----------------+-------------+
|trip_distance_id|trip_distance|
+----------------+-------------+
|               0|        19.98|
|               1|        17.56|
|               2|         2.86|
|               3|         0.66|
|               4|        10.65|
+----------------+-------------+
only showing top 5 rows



**Create pickup_location_dim table**

In [0]:
pickup_location_dim = df.select("pickup_longitude", "pickup_latitude").distinct() \
                        .withColumn("pickup_location_id", monotonically_increasing_id())

# Reorder columns
pickup_location_dim = pickup_location_dim.select("pickup_location_id", "pickup_longitude", "pickup_latitude")

pickup_location_dim.show()

+------------------+------------------+------------------+
|pickup_location_id|  pickup_longitude|   pickup_latitude|
+------------------+------------------+------------------+
|                 0|-73.93929290771484| 40.84185028076172|
|                 1|-73.97794342041014| 40.77862930297852|
|                 2|-73.97701263427734|40.774707794189446|
|                 3| -73.9498519897461| 40.78435897827149|
|                 4|-73.94937133789062| 40.77684783935546|
|                 5| -73.9841537475586| 40.78036117553711|
|                 6|-73.99024963378906| 40.75651550292969|
|                 7|-73.98612213134764|  40.7264404296875|
|                 8|-73.98419189453125|40.721710205078125|
|                 9| -73.9706802368164| 40.78342819213867|
|                10|-73.99044036865233|40.756053924560554|
|                11|-73.96432495117188|40.764530181884766|
|                12|-73.96515655517578| 40.79605102539063|
|                13|-74.00804901123048| 40.7149391174316

**Create dropoff_location_dim table**

In [0]:
dropoff_location_dim = df.select("dropoff_longitude", "dropoff_latitude").distinct() \
                         .withColumn("dropoff_location_id", monotonically_increasing_id())

# Reorder columns
dropoff_location_dim = dropoff_location_dim.select("dropoff_location_id", "dropoff_longitude", "dropoff_latitude")

dropoff_location_dim.show()

+-------------------+------------------+------------------+
|dropoff_location_id| dropoff_longitude|  dropoff_latitude|
+-------------------+------------------+------------------+
|                  0|-73.97039031982422| 40.75946044921875|
|                  1|-74.01337432861328| 40.70356369018555|
|                  2|-73.96913146972656| 40.76084899902344|
|                  3|-74.00003051757811| 40.74119567871094|
|                  4|-73.96012115478516|40.777191162109375|
|                  5|-73.98291778564453|40.750789642333984|
|                  6|-73.95810699462889| 40.76528930664063|
|                  7|-74.01368713378906| 40.71493148803711|
|                  8|-73.98253631591797| 40.76919555664063|
|                  9|-73.93907928466798| 40.75941848754882|
|                 10| -73.9759292602539| 40.76374816894531|
|                 11|-73.99980926513672|  40.7395896911621|
|                 12|-73.94681549072266| 40.77320861816406|
|                 13|-73.98957061767578|

**Create rate_code_dim table**

In [0]:
from pyspark.sql.functions import create_map, lit, col, monotonically_increasing_id
from itertools import chain

# Create rate_code_type dictionary
rate_code_type = {
    1: 'Standard rate',
    2: 'JFK',
    3: 'Newark',
    4: 'Nassau or Westchester',
    5: 'Negotiated fare',
    6: 'Group ride'
}

# Convert dictionary to a PySpark map using chain
rate_code_mapping = create_map([lit(x) for x in chain(*rate_code_type.items())])

# Create Rate Code Dimension Table
rate_code_dim = df.select("RatecodeID").distinct() \
                  .withColumn("rate_code_id", monotonically_increasing_id()) \
                  .withColumn("rate_code_name", rate_code_mapping[col("RatecodeID")])

# Reorder columns
rate_code_dim = rate_code_dim.select("rate_code_id", "RatecodeID", "rate_code_name")

rate_code_dim.show()


+------------+----------+--------------------+
|rate_code_id|RatecodeID|      rate_code_name|
+------------+----------+--------------------+
|           0|         1|       Standard rate|
|           1|         3|              Newark|
|           2|         5|     Negotiated fare|
|           3|         4|Nassau or Westche...|
|           4|         2|                 JFK|
|           5|         6|          Group ride|
+------------+----------+--------------------+



**Create payment_type_dim table**

In [0]:
from pyspark.sql.functions import create_map, lit, col, monotonically_increasing_id
from itertools import chain

# Create payment_type_name dictionary
payment_type_name = {
    1: 'Credit card',
    2: 'Cash',
    3: 'No charge',
    4: 'Dispute',
    5: 'Unknown',
    6: 'Voided trip'
}

# Convert dictionary to PySpark map with lit()
payment_type_mapping = create_map([lit(x) for x in chain(*payment_type_name.items())])

# Create Payment Type Dimension Table
payment_type_dim = df.select("payment_type").distinct() \
                     .withColumn("payment_type_id", monotonically_increasing_id()) \
                     .withColumn("payment_type_name", payment_type_mapping[col("payment_type")])

# Reorder columns
payment_type_dim = payment_type_dim.select("payment_type_id", "payment_type", "payment_type_name")

payment_type_dim.show()


+---------------+------------+-----------------+
|payment_type_id|payment_type|payment_type_name|
+---------------+------------+-----------------+
|              0|           1|      Credit card|
|              1|           3|        No charge|
|              2|           4|          Dispute|
|              3|           2|             Cash|
+---------------+------------+-----------------+



**Create fact_table**

In [0]:
fact_table = df.join(passenger_count_dim, df.trip_id == passenger_count_dim.passenger_count_id) \
               .join(trip_distance_dim, df.trip_id == trip_distance_dim.trip_distance_id) \
               .join(rate_code_dim, df.trip_id == rate_code_dim.rate_code_id) \
               .join(payment_type_dim, df.trip_id == payment_type_dim.payment_type_id) \
               .join(datetime_dim, df.trip_id == datetime_dim.datetime_id) \
               .join(pickup_location_dim, df.trip_id == pickup_location_dim.pickup_location_id) \
               .join(dropoff_location_dim, df.trip_id == dropoff_location_dim.dropoff_location_id) \
               .select(
                   "trip_id", "VendorID", "datetime_id", "passenger_count_id", "trip_distance_id",
                   "pickup_location_id", "dropoff_location_id", "rate_code_id", "payment_type_id",
                   "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount"
               )

fact_table.show()

+-------+--------+-----------+------------------+----------------+------------------+-------------------+------------+---------------+-----------+-----+-------+----------+------------+---------------------+------------+
|trip_id|VendorID|datetime_id|passenger_count_id|trip_distance_id|pickup_location_id|dropoff_location_id|rate_code_id|payment_type_id|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+-------+--------+-----------+------------------+----------------+------------------+-------------------+------------+---------------+-----------+-----+-------+----------+------------+---------------------+------------+
|      0|       2|          0|                 0|               0|                 0|                  0|           0|              0|        4.5|  0.5|    0.5|      1.16|         0.0|                  0.3|        6.96|
|      1|       2|          1|                 1|               1|                 1|                  1|           1|  

**Calculate Trip Duration**

In [0]:
from pyspark.sql.functions import unix_timestamp

# Calculate trip duration in minutes
df = df.withColumn("trip_duration_minutes", 
                   (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60)

In [0]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-------+---------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|trip_id|trip_duration_minutes|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-------+---------------------+
|       2| 2016-03-01 00:00:56|  2016-03-01 00

**Revenue Analysis by Time of Day, Day of Week, and Month**

In [0]:
from pyspark.sql.functions import sum, avg

# Revenue by hour of the day
revenue_by_hour = df.groupBy(hour(col("tpep_pickup_datetime")).alias("pickup_hour")) \
                    .agg(sum("total_amount").alias("total_revenue")) \
                    .orderBy("pickup_hour")

revenue_by_hour.show()

# Revenue by day of the week
revenue_by_weekday = df.groupBy(dayofweek(col("tpep_pickup_datetime")).alias("pickup_weekday")) \
                       .agg(sum("total_amount").alias("total_revenue")) \
                       .orderBy("pickup_weekday")

revenue_by_weekday.show()

# Revenue by month
revenue_by_month = df.groupBy(month(col("tpep_pickup_datetime")).alias("pickup_month")) \
                     .agg(sum("total_amount").alias("total_revenue")) \
                     .orderBy("pickup_month")

revenue_by_month.show()

+-----------+------------------+
|pickup_hour|     total_revenue|
+-----------+------------------+
|          0|123377.42999999851|
|          1| 70962.12999999963|
|          2|41108.150000000125|
|          3| 31724.88000000013|
|          4| 38209.59000000011|
|          5|  68055.1499999998|
|          6|32469.970000000118|
|          7|142396.04999999778|
|          8|177146.46999999828|
|          9|167358.04999999798|
|         10|154726.84999999817|
|         11|154918.10999999798|
|         12|171311.51999999824|
|         13|175717.62999999823|
|         14| 89590.10999999937|
+-----------+------------------+

+--------------+------------------+
|pickup_weekday|     total_revenue|
+--------------+------------------+
|             3| 405907.3000000164|
|             5|1233164.7899998862|
+--------------+------------------+

+------------+------------------+
|pickup_month|     total_revenue|
+------------+------------------+
|           3|1639072.0899997323|
+------------+-----

**Geospatial Analysis (Hotspots)**

In [0]:
from pyspark.sql.functions import count

# Top 10 pickup locations (hotspots)
pickup_hotspots = df.groupBy("pickup_latitude", "pickup_longitude") \
                    .agg(count("*").alias("trip_count")) \
                    .orderBy(col("trip_count").desc()) \
                    .limit(10)

pickup_hotspots.show()

# Top 10 dropoff locations (hotspots)
dropoff_hotspots = df.groupBy("dropoff_latitude", "dropoff_longitude") \
                     .agg(count("*").alias("trip_count")) \
                     .orderBy(col("trip_count").desc()) \
                     .limit(10)

dropoff_hotspots.show()

+------------------+------------------+----------+
|   pickup_latitude|  pickup_longitude|trip_count|
+------------------+------------------+----------+
|               0.0|               0.0|       925|
|40.747291564941406|-73.93831634521484|        21|
| 40.68149185180664|-74.10514068603516|        20|
| 40.71524810791016|-73.75502014160156|        20|
| 40.76255035400391|-73.98658752441406|        20|
|40.812801361083984|-73.96054077148438|        14|
| 40.74533081054688| -73.9708480834961|        13|
| 40.75154113769531|-73.94527435302734|        11|
| 40.83308410644531|-73.94574737548828|         9|
| 40.65737915039063|-73.79387664794923|         8|
+------------------+------------------+----------+

+------------------+------------------+----------+
|  dropoff_latitude| dropoff_longitude|trip_count|
+------------------+------------------+----------+
|               0.0|               0.0|       893|
|40.747291564941406|-73.93831634521484|        21|
| 40.76255035400391|-73.986587

In [0]:
# Average trip distance and revenue by passenger count
passenger_behavior = df.groupBy("passenger_count") \
                       .agg(
                           avg("trip_distance").alias("avg_trip_distance"),
                           avg("total_amount").alias("avg_revenue")
                       ) \
                       .orderBy("passenger_count")

In [0]:
passenger_behavior.show()

+---------------+-------------------+------------------+
|passenger_count|  avg_trip_distance|       avg_revenue|
+---------------+-------------------+------------------+
|              0|0.02666666666666667| 37.96666666666666|
|              1| 3.0628218282869932| 16.43996091185226|
|              2|  3.104453278867896| 16.62547450579908|
|              3| 2.9159298331697743|16.155846418056885|
|              4|  2.829545934530096|15.626758183738172|
|              5| 2.9596764974851393|   16.386579789666|
|              6| 2.8202863254895507|15.721423399703653|
+---------------+-------------------+------------------+



In [0]:
# Average trip distance and duration by payment type
payment_analysis = df.groupBy("payment_type") \
                     .agg(
                         avg("trip_distance").alias("avg_trip_distance"),
                         avg("trip_duration_minutes").alias("avg_trip_duration")
                     ) \
                     .orderBy("payment_type")

payment_analysis.show()

+------------+------------------+------------------+
|payment_type| avg_trip_distance| avg_trip_duration|
+------------+------------------+------------------+
|           1|3.2281984432289526|17.290641842479616|
|           2|2.6444965816341894|16.189107409972998|
|           3|3.3947976878612716| 9.989980732177266|
|           4|2.6890789473684213| 8.739035087719298|
+------------+------------------+------------------+



In [0]:
# Distribution of trips by rate code
rate_code_distribution = df.groupBy("RatecodeID") \
                           .agg(count("*").alias("trip_count")) \
                           .orderBy(col("trip_count").desc())

rate_code_distribution.show()

+----------+----------+
|RatecodeID|trip_count|
+----------+----------+
|         1|     97199|
|         2|      2207|
|         5|       283|
|         3|       262|
|         4|        48|
|         6|         1|
+----------+----------+



In [0]:
from pyspark.sql.functions import when

# Categorize trip distance into short, medium, and long trips
df = df.withColumn("trip_distance_category", 
                   when(col("trip_distance") < 5, "Short")
                   .when((col("trip_distance") >= 5) & (col("trip_distance") < 15), "Medium")
                   .otherwise("Long"))

df.show()

# Analyze revenue by trip distance category
revenue_by_distance_category = df.groupBy("trip_distance_category") \
                                 .agg(sum("total_amount").alias("total_revenue")) \
                                 .orderBy("trip_distance_category")
revenue_by_distance_category.show()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-------+---------------------+----------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|trip_id|trip_duration_minutes|trip_distance_category|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-------+---------------------+-

In [0]:
# Identify peak hours based on trip count
peak_hours = df.groupBy(hour(col("tpep_pickup_datetime")).alias("pickup_hour")) \
               .agg(count("*").alias("trip_count")) \
               .orderBy(col("trip_count").desc()) \
               .limit(5)

peak_hours.show()

+-----------+----------+
|pickup_hour|trip_count|
+-----------+----------+
|          8|     11708|
|          9|     10710|
|         12|     10512|
|         13|     10204|
|         10|      9625|
+-----------+----------+



In [0]:
# Calculate revenue per mile
df = df.withColumn("revenue_per_mile", col("total_amount") / col("trip_distance"))

# Average revenue per mile by payment type
revenue_per_mile_by_payment = df.groupBy("payment_type") \
                                .agg(avg("revenue_per_mile").alias("avg_revenue_per_mile")) \
                                .orderBy("payment_type")

revenue_per_mile_by_payment.show()

+------------+--------------------+
|payment_type|avg_revenue_per_mile|
+------------+--------------------+
|           1|   8.938963916700892|
|           2|   9.395030655832459|
|           3|   -4.58687095929926|
|           4| -30.273418361518324|
+------------+--------------------+



In [0]:
# Save revenue_by_hour to Parquet
revenue_by_hour.write.mode("overwrite").parquet("/mnt/uber-insights/revenue_by_hour")

# Save pickup_hotspots to CSV
pickup_hotspots.write.mode("overwrite").csv("/mnt/uber-insights/pickup_hotspots")