In [0]:
from pyspark.sql.types import *
from pyspark.sql import DataFrame
from functools import reduce
from pyspark.sql.functions import *
from pyspark.sql import functions as F
import os
from pyspark.sql import *
from pyspark.sql.functions import col, sum

In [0]:
new_folder_name = "Uber_Gold_tables"

new_folder_path = "dbfs:/FileStore/" + new_folder_name + "/"

dbutils.fs.mkdirs(new_folder_path)

try:
    dbutils.fs.mkdirs(new_folder_path)
    print(f"Folder '{new_folder_name}' created successfully at path '{new_folder_path}'.")
except Exception as e:
    print(f"Failed to create folder '{new_folder_name}' at path '{new_folder_path}'.")
    print(f"Error: {e}")

Folder 'Uber_Gold_tables' created successfully at path 'dbfs:/FileStore/Uber_Gold_tables/'.


In [0]:
dbutils.fs.ls("dbfs:/FileStore/Uber_Gold_tables/")

Out[3]: []

In [0]:
spark.conf.set("spark.databricks.delta.formatCheck.enabled", "false")

In [0]:
spark.sql(f'DROP table if exists Uber_GOLD')
create_db_qry=f"""CREATE DATABASE IF NOT EXISTS Uber_Gold"""
spark.sql(create_db_qry)

Out[2]: DataFrame[]

In [0]:
file_path = "dbfs:/FileStore/UberPickUps/Silver_Tables/Uber_AprSep14/"
dfs_Uber_AprSep14 = spark.read.format("delta").option("header", "true").load(file_path)

In [0]:
file_path = "dbfs:/FileStore/UberPickUps/Silver_Tables/Uber_JanJun15/"
dfs_Uber_JanJun15 = spark.read.format("delta").option("header", "true").load(file_path)

In [0]:
file_path = "dbfs:/FileStore/UberPickUps/Silver_Tables/FHV_Data/"
dfs_FHV_Data = spark.read.format("delta").option("header", "true").load(file_path)

In [0]:
file_path = "dbfs:/FileStore/UberPickUps/Silver_Tables/UBERandFHV/"
dfs_UBERandFHV = spark.read.format("delta").option("header", "true").load(file_path)


In [0]:
dfs_Uber_AprSep14.show(2)
dfs_Uber_JanJun15.show(2)
dfs_UBERandFHV.show(2)
dfs_FHV_Data.show(2)

+----------+-------+--------+--------------------+-------------------+--------+--------+---------+
|      Date|    Lat|     Lon|Dispatching_Base_Num|           Category|    Time| trip_ID|Base_Name|
+----------+-------+--------+--------------------+-------------------+--------+--------+---------+
|2014-04-01| 40.769|-73.9549|              B02512|uber_raw_data_apr14|00:11:00|11947673|    Unter|
|2014-04-01|40.7267|-74.0345|              B02512|uber_raw_data_apr14|00:17:00|11947673|    Unter|
+----------+-------+--------+--------------------+-------------------+--------+--------+---------+
only showing top 2 rows

+--------------------+--------------------+----------+--------+--------+---------+--------+
|Dispatching_Base_Num|            Category|      Date|    Time| trip_ID|Base_Name|Price[$]|
+--------------------+--------------------+----------+--------+--------+---------+--------+
|              B02617|uber_raw_data_jan...|2015-05-17|09:47:00|40000000|   Weiter|   93.66|
|            

**Location-Based Marketing: Identify high-demand areas and peak times to target promotions and incentives to both riders and drivers.**

In [0]:
columns = ["Date", "Lat", "Lon", "Dispatching_Base_Num", "Category", "Time", "trip_ID", "Base_Name"]
dfs_Uber_AprSep14 = dfs_Uber_AprSep14.withColumn("Hour", hour(col("Time")))

dfs_Uber_AprSep14 = dfs_Uber_AprSep14.withColumn(
    "TimeInterval",
    when((col("Hour") >= 8) & (col("Hour") < 12), "8am to 12pm")
    .when((col("Hour") >= 12) & (col("Hour") < 16), "12pm to 4pm")
    .when((col("Hour") >= 16) & (col("Hour") < 24), "4pm to 12am")
    .otherwise("12am to 8am")
)
dfs_Uber_AprSep14 = dfs_Uber_AprSep14.withColumn("Truncated_Lat", floor(col("Lat"))) \
                                     .withColumn("Truncated_Lon", floor(col("Lon")))

dfs_Uber_AprSep14_grouped = dfs_Uber_AprSep14.groupBy("Truncated_Lat", "Truncated_Lon", "Date", "TimeInterval") \
               .agg(count("trip_ID").alias("Total_Trips"))

dfs_Uber_AprSep14_grouped.show(5)

+-------------+-------------+----------+------------+-----------+
|Truncated_Lat|Truncated_Lon|      Date|TimeInterval|Total_Trips|
+-------------+-------------+----------+------------+-----------+
|           40|          -74|2014-05-01| 12am to 8am|         74|
|           40|          -75|2014-05-01| 12am to 8am|         26|
|           41|          -75|2014-09-01| 12am to 8am|          1|
|           40|          -74|2014-09-01| 12am to 8am|         45|
|           40|          -75|2014-09-01| 12am to 8am|         11|
+-------------+-------------+----------+------------+-----------+
only showing top 5 rows



**Location-Based Marketing: Identify variations in trip counts by location and date**

In [0]:
windowSpec = Window.partitionBy("Truncated_Lat", "Truncated_Lon").orderBy("Date")

Uber_AprSep14_lag_lead = dfs_Uber_AprSep14_grouped.withColumn("Previous_Total_Trips", lag("Total_Trips").over(windowSpec))
Uber_AprSep14_lag_lead = Uber_AprSep14_lag_lead.withColumn("Diff_in_Trips", col("Total_Trips") - col("Previous_Total_Trips"))
Uber_AprSep14_lag_lead = Uber_AprSep14_lag_lead.drop("TimeInterval", "Previous_Total_Trips")
Uber_AprSep14_lag_lead.show(5)

+-------------+-------------+----------+-----------+-------------+
|Truncated_Lat|Truncated_Lon|      Date|Total_Trips|Diff_in_Trips|
+-------------+-------------+----------+-----------+-------------+
|           40|          -75|2014-04-01|         28|         null|
|           40|          -75|2014-05-01|         26|           -2|
|           40|          -75|2014-06-01|         33|            7|
|           40|          -75|2014-07-01|         23|          -10|
|           40|          -75|2014-08-01|         33|           10|
+-------------+-------------+----------+-----------+-------------+
only showing top 5 rows



**Performance Benchmarking: Compare performance metrics across different dispatching bases on particular days to identify best days of week.**

In [0]:
dfs_Uber_JanJun15 = dfs_Uber_JanJun15.withColumn("DayOfWeek", date_format(col("Date"), "EEEE"))
df_total_price_per_day  = dfs_Uber_JanJun15.groupBy("Dispatching_Base_Num", "Date", "DayOfWeek", "Base_Name").agg(sum("Price[$]").alias("Total_Price_per_day"))
df_total_price_per_day = df_total_price_per_day.withColumn("Total_Price_per_day", col("Total_Price_per_day").cast("float"))
df_total_price_per_day.show(5)

+--------------------+----------+---------+---------+-------------------+
|Dispatching_Base_Num|      Date|DayOfWeek|Base_Name|Total_Price_per_day|
+--------------------+----------+---------+---------+-------------------+
|              B02617|2015-05-17|   Sunday|   Weiter|            5528.02|
+--------------------+----------+---------+---------+-------------------+



**Ride Pricing Optimization: Use data to optimize pricing strategies during different times and for different ride categories.​**

In [0]:
dfs_Uber_JanJun15 = dfs_Uber_JanJun15.withColumn("Hour", hour(col("Time")))

dfs_Uber_JanJun15 = dfs_Uber_JanJun15.withColumn(
    "TimeInterval",
    when((col("Hour") >= 8) & (col("Hour") < 12), "8am to 12pm")
    .when((col("Hour") >= 12) & (col("Hour") < 16), "12pm to 4pm")
    .when((col("Hour") >= 16) & (col("Hour") < 24), "4pm to 12am")
    .otherwise("12am to 8am")
)

P_T_time = dfs_Uber_JanJun15.groupBy("TimeInterval") \
               .agg(count("trip_ID").alias("Total_Rides"), sum("Price[$]").alias("Total_Price"))
P_T_time = P_T_time.withColumn("Total_Price", col("Total_Price").cast("float"))

P_T_time.show()

+------------+-----------+-----------+
|TimeInterval|Total_Rides|Total_Price|
+------------+-----------+-----------+
| 8am to 12pm|        100|    5528.02|
+------------+-----------+-----------+



**Cumulative Trips and Active Vehicles: Use the data to get the cumulative number of trips and active vehicles based on the Base names and Category**

In [0]:
cumulative_vehicles_trips = dfs_UBERandFHV
cumulative_vehicles_trips = cumulative_vehicles_trips.groupby(['Dispatching_Base_Num', 'Category']).agg({'Active_Vehicles': 'sum','Trips': 'sum'})
cumulative_vehicles_trips = cumulative_vehicles_trips.orderBy("Dispatching_Base_Num", "Category")
cumulative_vehicles_trips = cumulative_vehicles_trips.withColumnRenamed('sum(Active_Vehicles)', 'Total_Active_Vehicles')\
                                            .withColumnRenamed('sum(Trips)', 'Total_Trips')
cumulative_vehicles_trips.show(5)

+--------------------+--------------------+---------------------+-----------+
|Dispatching_Base_Num|            Category|Total_Active_Vehicles|Total_Trips|
+--------------------+--------------------+---------------------+-----------+
|              B00013|other_FHV_service...|                 59.0|      105.0|
|              B00014|other_FHV_service...|                 86.0|      155.0|
|              B00029|other_FHV_service...|                 82.0|      731.0|
|              B00053|other_FHV_service...|                 31.0|       36.0|
|              B00095|other_FHV_service...|                144.0|     1644.0|
+--------------------+--------------------+---------------------+-----------+
only showing top 5 rows



**Time and Category Analysis: Determine the total number of trips conducted by non-Uber FHV vehicles within a particular category and time range.**

In [0]:
dfs_FHV_Data = dfs_FHV_Data.withColumn(
    "TimeInterval",
    when((col("Time") >= "08:00") & (col("Time") < "12:00"), "8am to 12pm")
    .when((col("Time") >= "12:00") & (col("Time") < "16:00"), "12pm to 4pm")
    .when((col("Time") >= "16:00") & (col("Time") < "24:00"), "4pm to 12am")
    .otherwise("12am to 4am")
)

df_category_time_analysis = dfs_FHV_Data.groupBy("Category", "TimeInterval") \
    .agg(count("trip_ID").alias("Total_Trips"))

df_category_time_analysis.show()

+--------------------+------------+-----------+
|            Category|TimeInterval|Total_Trips|
+--------------------+------------+-----------+
| other_Federal_02216| 12am to 4am|        100|
|other_Prestige_B0...| 12am to 4am|        100|
|other_Skyline_B00111| 8am to 12pm|         12|
|other_Skyline_B00111| 12pm to 4pm|         18|
|other_Skyline_B00111| 12am to 4am|         23|
|other_Skyline_B00111| 4pm to 12am|         47|
|other_Highclass_B...| 12am to 4am|        100|
|other_Firstclass_...| 12am to 4am|        100|
| other_Carmel_B00256| 12am to 4am|        100|
|   other_Lyft_B02510| 8am to 12pm|          1|
|   other_Lyft_B02510| 12am to 4am|         58|
|   other_Lyft_B02510| 4pm to 12am|         36|
|   other_Lyft_B02510| 12pm to 4pm|          5|
|other_American_B0...| 12am to 4am|        100|
+--------------------+------------+-----------+



In [0]:
cumulative_vehicles_trips.write.format("delta").mode("overwrite").option("path", "dbfs:/FileStore/Uber_Gold_tables/Cumulative_Vehicles_Trips/").save()
cumulative_vehicles_trips.write.mode("overwrite").saveAsTable('Uber_Gold.cumulative_vehicles_trips')

In [0]:
dfs_Uber_AprSep14_grouped.write.format("delta").mode("overwrite").option("path", "dbfs:/FileStore/Uber_Gold_tables/Uber_AprSep14_grouped/").save()
dfs_Uber_AprSep14_grouped.write.mode("overwrite").saveAsTable('Uber_Gold.Uber_AprSep14_grouped')

In [0]:
df_total_price_per_day.write.format("delta").mode("overwrite").option("path", "dbfs:/FileStore/Uber_Gold_tables/total_price_per_day/").save()
df_total_price_per_day.write.mode("overwrite").saveAsTable('Uber_Gold.total_price_per_day')

In [0]:
P_T_time.write.format("delta").mode("overwrite").option("path", "dbfs:/FileStore/Uber_Gold_tables/P_T_time/").save()
P_T_time.write.mode("overwrite").saveAsTable('Uber_Gold.P_T_time')

In [0]:
df_category_time_analysis.write.format("delta").mode("overwrite").option("path", "dbfs:/FileStore/Uber_Gold_tables/category_time_analysis/").save()
df_category_time_analysis.write.mode("overwrite").saveAsTable('Uber_Gold.category_time_analysis')

In [0]:
Uber_AprSep14_lag_lead.write.format("delta").mode("overwrite").option("path", "dbfs:/FileStore/Uber_Gold_tables/Uber_AprSep14_lag_lead/").save()
Uber_AprSep14_lag_lead.write.mode("overwrite").saveAsTable('Uber_Gold.Uber_AprSep14_lag_lead')