In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import datetime

In [0]:
spark = SparkSession.builder.appName("Uber Data Analysis").getOrCreate()

In [0]:
df = spark.read.csv("/FileStore/tables/dataset-1.csv",header=True,inferSchema=True)

In [0]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time (Local): integer (nullable = true)
 |-- Eyeballs : integer (nullable = true)
 |-- Zeroes : integer (nullable = true)
 |-- Completed Trips : integer (nullable = true)
 |-- Requests : integer (nullable = true)
 |-- Unique Drivers: integer (nullable = true)



In [0]:
df = df.withColumn("Date",to_date(unix_timestamp(df.Date, "dd-MMM-yy").cast("timestamp")))
display(df)

Date,Time (Local),Eyeballs,Zeroes,Completed Trips,Requests,Unique Drivers
2012-09-10,7,5,0,2,2,9
,8,6,0,2,2,14
,9,8,3,0,0,14
,10,9,2,0,1,14
,11,11,1,4,4,11
,12,12,0,2,2,11
,13,9,1,0,0,9
,14,12,1,0,0,9
,15,11,2,1,2,7
,16,11,2,3,4,6


In [0]:
display(df)

Date,Time (Local),Eyeballs,Zeroes,Completed Trips,Requests,Unique Drivers
2012-09-10,7,5,0,2,2,9
,8,6,0,2,2,14
,9,8,3,0,0,14
,10,9,2,0,1,14
,11,11,1,4,4,11
,12,12,0,2,2,11
,13,9,1,0,0,9
,14,12,1,0,0,9
,15,11,2,1,2,7
,16,11,2,3,4,6


In [0]:
rows = df.collect()
rows_list = [row.asDict() for row in rows]

# Iterate over each row in the list and replace null values in the "Date" column
prev_date = None
for row in rows_list:
    if row["Date"] is not None:
        prev_date = row["Date"]
    else:
        row["Date"] = prev_date

# Create a new DataFrame from the modified list
df_modified = spark.createDataFrame(rows_list)

# Show the resulting DataFrame
df_modified.show()

+----------------+----------+---------+---------+------------+--------------+-------+
|Completed Trips |      Date|Eyeballs |Requests |Time (Local)|Unique Drivers|Zeroes |
+----------------+----------+---------+---------+------------+--------------+-------+
|               2|2012-09-10|        5|        2|           7|             9|      0|
|               2|2012-09-10|        6|        2|           8|            14|      0|
|               0|2012-09-10|        8|        0|           9|            14|      3|
|               0|2012-09-10|        9|        1|          10|            14|      2|
|               4|2012-09-10|       11|        4|          11|            11|      1|
|               2|2012-09-10|       12|        2|          12|            11|      0|
|               0|2012-09-10|        9|        0|          13|             9|      1|
|               0|2012-09-10|       12|        0|          14|             9|      1|
|               1|2012-09-10|       11|        2|     

In [0]:
df_modified = df_modified.withColumnRenamed("Completed Trips ", "CompletedTrips") \
                .withColumnRenamed("Eyeballs ", "Eyeballs") \
                .withColumnRenamed("Requests ", "Requests") \
                .withColumnRenamed("Time (Local)", "TimeLocal") \
                .withColumnRenamed("Unique Drivers", "UniqueDrivers") \
                .withColumnRenamed("Zeroes ", "Zeroes")

In [0]:
null_counts = df_modified.select([sum(col(c).isNull().cast("integer")).alias(c) for c in df_modified.columns])

# Show the results
null_counts.show()

+--------------+----+--------+--------+---------+-------------+------+
|CompletedTrips|Date|Eyeballs|Requests|TimeLocal|UniqueDrivers|Zeroes|
+--------------+----+--------+--------+---------+-------------+------+
|             0|   0|       0|       0|        0|            0|     0|
+--------------+----+--------+--------+---------+-------------+------+



In [0]:
#df_modified = df_modified.withColumnRenamed("Completed Trips","Complete_trips")
print(df_modified.groupby("Date").agg(sum("CompletedTrips").alias("Trips")).orderBy("Trips",ascending=False).select("Date").first()["Date"])

2012-09-22


In [0]:
#c_t = df_modified.select("CompletedTrips","TimeLocal").groupby("TimeLocal").agg(max(df_modified["CompletedTrips"]).alias("com_trips"))
print(c_t.agg(sum(c_t["com_trips"])).first()[0])

329


In [0]:
df_modified.select("CompletedTrips","TimeLocal").groupby("TimeLocal").agg(max(df_modified["CompletedTrips"])).show()

+---------+-------------------+
|TimeLocal|max(CompletedTrips)|
+---------+-------------------+
|       19|                 21|
|        0|                 23|
|       22|                 25|
|        7|                  3|
|        6|                  4|
|        9|                  5|
|       17|                 26|
|        5|                  1|
|        1|                 17|
|       10|                  3|
|        3|                  9|
|       12|                 12|
|        8|                  8|
|       11|                  7|
|        2|                 22|
|        4|                  2|
|       13|                 12|
|       18|                 15|
|       14|                 16|
|       21|                 15|
+---------+-------------------+
only showing top 20 rows



In [0]:
display(df_modified)

CompletedTrips,Date,Eyeballs,Requests,TimeLocal,UniqueDrivers,Zeroes
2,2012-09-10,5,2,7,9,0
2,2012-09-10,6,2,8,14,0
0,2012-09-10,8,0,9,14,3
0,2012-09-10,9,1,10,14,2
4,2012-09-10,11,4,11,11,1
2,2012-09-10,12,2,12,11,0
0,2012-09-10,9,0,13,9,1
0,2012-09-10,12,0,14,9,1
1,2012-09-10,11,2,15,7,2
3,2012-09-10,11,4,16,6,2


In [0]:
df_modified.groupby("Date","TimeLocal").agg(sum("Requests").alias("Request")).orderBy("Request",ascending=False).limit(1).show(10)

+----------+---------+-------+
|      Date|TimeLocal|Request|
+----------+---------+-------+
|2012-09-21|       23|     46|
+----------+---------+-------+



In [0]:
df_modified.show()

+--------------+----------+--------+--------+---------+-------------+------+
|CompletedTrips|      Date|Eyeballs|Requests|TimeLocal|UniqueDrivers|Zeroes|
+--------------+----------+--------+--------+---------+-------------+------+
|             2|2012-09-10|       5|       2|        7|            9|     0|
|             2|2012-09-10|       6|       2|        8|           14|     0|
|             0|2012-09-10|       8|       0|        9|           14|     3|
|             0|2012-09-10|       9|       1|       10|           14|     2|
|             4|2012-09-10|      11|       4|       11|           11|     1|
|             2|2012-09-10|      12|       2|       12|           11|     0|
|             0|2012-09-10|       9|       0|       13|            9|     1|
|             0|2012-09-10|      12|       0|       14|            9|     1|
|             1|2012-09-10|      11|       2|       15|            7|     2|
|             3|2012-09-10|      11|       4|       16|            6|     2|

In [0]:

weekend_df = df_modified.filter(
    (col("TimeLocal") >= 17) & (dayofweek(col("Date")) == 6) |
    (dayofweek(col("Date")) == 7) |  
    (dayofweek(col("Date")) == 1) & (col("TimeLocal") < 3)
)

weekend_df.show(10)




+--------------+----------+--------+--------+---------+-------------+------+
|CompletedTrips|      Date|Eyeballs|Requests|TimeLocal|UniqueDrivers|Zeroes|
+--------------+----------+--------+--------+---------+-------------+------+
|             3|2012-09-14|      34|       5|       17|           13|     4|
|             8|2012-09-14|      40|       9|       18|           14|     2|
|             9|2012-09-14|      46|      10|       19|           15|     6|
|             8|2012-09-14|      38|       9|       20|           14|     4|
|             8|2012-09-14|      49|       9|       21|           17|     6|
|            18|2012-09-14|      60|      20|       22|           19|     3|
|            24|2012-09-14|      68|      29|       23|           18|    18|
|            23|2012-09-15|      45|      24|        0|           19|     2|
|            12|2012-09-15|      37|      14|        1|           18|     1|
|            22|2012-09-15|      38|      27|        2|           12|    17|

In [0]:
weekend_count = weekend_df.filter(weekend_df["Zeroes"] == 0).count()
total_count = df_modified.filter(df_modified["Zeroes"] == 0).count()

print(f"the percentage of zeroes are {(weekend_count/total_count) * 100}")

the percentage of zeroes are 3.8461538461538463


In [0]:
weighted_avg = df_modified.withColumn("completed_per_driver", df_modified["CompletedTrips"] / df_modified["UniqueDrivers"]) \
                 .groupBy("Date", "TimeLocal") \
                 .agg(avg("completed_per_driver").alias("avg_completed_per_driver"), sum("CompletedTrips").alias("total_completed_trips")) \
                 .withColumn("weighted_ratio", col("avg_completed_per_driver") * col("total_completed_trips")) \
                 .agg(sum("weighted_ratio") / sum("total_completed_trips")).collect()[0][0]

print("The weighted average ratio of completed trips per driver is:", weighted_avg)

+--------------------------------------------------+
|(sum(weighted_ratio) / sum(total_completed_trips))|
+--------------------------------------------------+
|                                0.8276707747535552|
+--------------------------------------------------+



In [0]:
# Calculate the number of unique requests for each hour of the day
hourly_unique_requests = (df_modified
  .groupby("TimeLocal")
  .agg(countDistinct("Requests").alias("unique_requests"))
)

# Slide a window of 8 hours to find the busiest 8 consecutive hours
window = Window.orderBy(col("unique_requests").desc()).rowsBetween(0, 7)
busiest_8_consecutive_hours = (hourly_unique_requests
  .select("*", sum("unique_requests").over(window).alias("consecutive_sum"))
  .orderBy(col("consecutive_sum").desc())
  .limit(1)
)

# Print the result
busiest_8_consecutive_hours.show()

+---------+---------------+---------------+
|TimeLocal|unique_requests|consecutive_sum|
+---------+---------------+---------------+
|       20|             12|             80|
+---------+---------------+---------------+



In [0]:
period_ratios = (df_modified
  .groupBy(((col("Date").cast("timestamp").cast("long") / (72*3600)).cast("int")).alias("period"))
  .agg(sum("Zeroes").alias("zeroes"), sum("Eyeballs").alias("eyeballs"))
  .withColumn("ratio", col("zeroes") / col("eyeballs"))
)

# Find the period with the highest ratio
highest_ratio_period = period_ratios.orderBy(col("ratio").desc()).limit(1)

# Print the result
highest_ratio_period.show()

+------+------+--------+-------------------+
|period|zeroes|eyeballs|              ratio|
+------+------+--------+-------------------+
|  5199|   443|    1763|0.25127623369256946|
+------+------+--------+-------------------+



In [0]:
# Calculate requests per unique driver for each hour
requests_per_driver = (df_modified.groupBy('TimeLocal').agg(
    (sum('Requests') / countDistinct('UniqueDrivers')).alias('requests_per_driver'))
)

# Show the hour with the highest ratio
requests_per_driver.orderBy(desc('requests_per_driver')).show(1)

+---------+-------------------+
|TimeLocal|requests_per_driver|
+---------+-------------------+
|        2|               20.0|
+---------+-------------------+
only showing top 1 row



In [0]:

avg_trips_and_drivers = (df_modified.groupBy('TimeLocal').agg(
    mean('CompletedTrips').alias('avg_completed_trips'),
    mean('UniqueDrivers').alias('avg_unique_drivers')
))

# Show the hour with the lowest average completed trips and unique drivers
avg_trips_and_drivers.orderBy('avg_completed_trips', 'avg_unique_drivers').show(1)

+---------+-------------------+------------------+
|TimeLocal|avg_completed_trips|avg_unique_drivers|
+---------+-------------------+------------------+
|        4|0.14285714285714285|0.6428571428571429|
+---------+-------------------+------------------+
only showing top 1 row

