In [1]:
##Creating SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Uber Data Analysis").getOrCreate()

In [2]:
##reading Uber Trip Dataset
df_trip = spark.read.csv("dataset.csv", header = True, inferSchema = True)

In [3]:
df_trip.show()

+---------+------------+---------+-------+----------------+---------+--------------+
|     Date|Time (Local)|Eyeballs |Zeroes |Completed Trips |Requests |Unique Drivers|
+---------+------------+---------+-------+----------------+---------+--------------+
|10-Sep-12|           7|        5|      0|               2|        2|             9|
|     NULL|           8|        6|      0|               2|        2|            14|
|     NULL|           9|        8|      3|               0|        0|            14|
|     NULL|          10|        9|      2|               0|        1|            14|
|     NULL|          11|       11|      1|               4|        4|            11|
|     NULL|          12|       12|      0|               2|        2|            11|
|     NULL|          13|        9|      1|               0|        0|             9|
|     NULL|          14|       12|      1|               0|        0|             9|
|     NULL|          15|       11|      2|               1|      

In [4]:
##Cleaning the dataset
from pyspark.sql.window import Window
from pyspark.sql.functions import lit, col, last, to_date, date_format, window, lpad, concat

w = Window.orderBy(lit(1)).rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_cleaned = df_trip.withColumn("dates", last(col("Date"), ignorenulls = True).over(w))

df_final = df_cleaned.withColumn("Trip_Date", date_format(to_date(col("dates"), 'dd-MMM-yy'),'dd-MM-yyyy')) \
           .select('Trip_Date',col('Time (Local)').alias('Time'), col('Eyeballs ').alias('Eyeballs'),col('Zeroes ').alias('Zeroes'),col('Completed Trips ').alias('Completed_Trips'),col('Requests ').alias('Requests'),col('Unique Drivers').alias('Unique_Drivers'))
df_final = df_final.withColumn('Time_padded', concat(col('Trip_Date'), lit(" "), lpad(col('Time').cast('string'),2,"0"), lit(":00:00")))
df_final = df_final.withColumn('Trip_Date',to_date(col('Trip_Date'),'dd-MM-yyyy'))
df_final.printSchema()


root
 |-- Trip_Date: date (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Eyeballs: integer (nullable = true)
 |-- Zeroes: integer (nullable = true)
 |-- Completed_Trips: integer (nullable = true)
 |-- Requests: integer (nullable = true)
 |-- Unique_Drivers: integer (nullable = true)
 |-- Time_padded: string (nullable = true)



In [5]:
df_final.show()

+----------+----+--------+------+---------------+--------+--------------+-------------------+
| Trip_Date|Time|Eyeballs|Zeroes|Completed_Trips|Requests|Unique_Drivers|        Time_padded|
+----------+----+--------+------+---------------+--------+--------------+-------------------+
|2012-09-10|   7|       5|     0|              2|       2|             9|10-09-2012 07:00:00|
|2012-09-10|   8|       6|     0|              2|       2|            14|10-09-2012 08:00:00|
|2012-09-10|   9|       8|     3|              0|       0|            14|10-09-2012 09:00:00|
|2012-09-10|  10|       9|     2|              0|       1|            14|10-09-2012 10:00:00|
|2012-09-10|  11|      11|     1|              4|       4|            11|10-09-2012 11:00:00|
|2012-09-10|  12|      12|     0|              2|       2|            11|10-09-2012 12:00:00|
|2012-09-10|  13|       9|     1|              0|       0|             9|10-09-2012 13:00:00|
|2012-09-10|  14|      12|     1|              0|       0|  

In [6]:
##Which date had the most completed trips during the two-week period?
from pyspark.sql.functions import sum
df_complete = df_final.groupBy('Trip_Date').agg(sum('Completed_Trips').alias("Total_Completed_Trips"))
df_complete.orderBy("Total_Completed_Trips", ascending=False).limit(1).show()

+----------+---------------------+
| Trip_Date|Total_Completed_Trips|
+----------+---------------------+
|2012-09-22|                  248|
+----------+---------------------+



In [7]:
##What was the highest number of completed trips within a 24-hour period?
from pyspark.sql.functions import to_timestamp
df_final = df_final.withColumn('Time_padded',to_timestamp(col('Time_padded'),'dd-MM-yyyy HH:mm:ss'))

df_final.show()
df_window_24hr = df_final.groupBy(window('Time_padded', '24 hours')).agg(sum('Completed_Trips').alias("Total_Completed_Trips"))
df_window_24hr.orderBy("Total_Completed_Trips",ascending=False).limit(1).show()

+----------+----+--------+------+---------------+--------+--------------+-------------------+
| Trip_Date|Time|Eyeballs|Zeroes|Completed_Trips|Requests|Unique_Drivers|        Time_padded|
+----------+----+--------+------+---------------+--------+--------------+-------------------+
|2012-09-10|   7|       5|     0|              2|       2|             9|2012-09-10 07:00:00|
|2012-09-10|   8|       6|     0|              2|       2|            14|2012-09-10 08:00:00|
|2012-09-10|   9|       8|     3|              0|       0|            14|2012-09-10 09:00:00|
|2012-09-10|  10|       9|     2|              0|       1|            14|2012-09-10 10:00:00|
|2012-09-10|  11|      11|     1|              4|       4|            11|2012-09-10 11:00:00|
|2012-09-10|  12|      12|     0|              2|       2|            11|2012-09-10 12:00:00|
|2012-09-10|  13|       9|     1|              0|       0|             9|2012-09-10 13:00:00|
|2012-09-10|  14|      12|     1|              0|       0|  

In [8]:
##Which hour of the day had the most requests during the two-week period?

df_most_request = df_final.groupBy(col('Time').alias('Hour')).agg(sum('Requests').alias('Total_Requests')).orderBy('Total_Requests', ascending=False)
df_most_request.limit(1).show()

+----+--------------+
|Hour|Total_Requests|
+----+--------------+
|  23|           184|
+----+--------------+



In [9]:
##What percentages of all zeroes during the two-week period occurred on weekends (Friday at 5 pm to Sunday at 3 am)?
from pyspark.sql.functions import dayofweek
df_final = df_final.withColumn('Day', dayofweek('Trip_Date'))
#df_final.printSchema()
#df_final.show()

total_zeroes = df_final.agg(sum('Zeroes')).collect()[0][0]
weekend_data = df_final.filter((col('Day') == 7) | ((col('Day') == 6) & (col('Time') >= 17)) | ((col('Day') == 1) & (col('Time') <= 3)))
weekend_zeroes = weekend_data.agg(sum('Zeroes')).collect()[0][0]

#weekend_data.show()

print("Total zeroes:", total_zeroes)
print("Total zeroes during weekend:",weekend_zeroes)
print("Percentage:", round((weekend_zeroes/total_zeroes)*100,2))

Total zeroes: 1429
Total zeroes during weekend: 644
Percentage: 45.07


In [10]:
#What is the weighted average ratio of completed trips per driver during the two-week period?

from pyspark.sql.functions import avg
ratio_data = df_final.withColumn('completed_to_driver', (col('Completed_Trips')/col('Unique_Drivers')))
ratio_data = ratio_data.groupBy('Time').agg(avg('completed_to_driver').alias('avg_completed_to_driver'), sum('Completed_Trips').alias('total_completed_trips'))
ratio_data = ratio_data.withColumn('weighted_avg', col('avg_completed_to_driver')*col('total_completed_trips'))

total_completed_trips = ratio_data.agg(sum('total_completed_trips')).collect()[0][0]
total_weighted_ratio = ratio_data.agg(sum('weighted_avg')).collect()[0][0]
#ratio_data.show()
print('Total Completed Trips:',total_completed_trips)
print('Total Weighted Ratio:', round(total_weighted_ratio,3))
print('Average Weighted Ratio:',round(total_weighted_ratio/total_completed_trips,2))

Total Completed Trips: 1365
Total Weighted Ratio: 779.1
Average Weighted Ratio: 0.57


In [11]:
#In drafting a driver schedule in terms of 8 hours shifts, when are the busiest 8 consecutive hours over the two-week period 
#in terms of unique requests? A new shift starts every 8 hours. Assume that a driver will work the same shift each day.
from pyspark.sql.functions import countDistinct
request_data = df_final.groupBy('Time').agg(countDistinct('Requests').alias('unique_request'))

eight_hr_window = Window.orderBy(col('Time')).rowsBetween(0,7)
request_data = request_data.withColumn('total_request', sum(col('unique_Request')).over(eight_hr_window))
request_data.show()

+----+--------------+-------------+
|Time|unique_request|total_request|
+----+--------------+-------------+
|   0|            11|           51|
|   1|             8|           46|
|   2|             8|           44|
|   3|             7|           40|
|   4|             3|           41|
|   5|             4|           46|
|   6|             5|           50|
|   7|             5|           53|
|   8|             6|           56|
|   9|             6|           58|
|  10|             4|           62|
|  11|             8|           67|
|  12|             8|           69|
|  13|             8|           73|
|  14|             8|           75|
|  15|             8|           77|
|  16|             8|           77|
|  17|            10|           69|
|  18|             9|           59|
|  19|            10|           50|
+----+--------------+-------------+
only showing top 20 rows



In [12]:
#In which 72-hour period is the ratio of Zeroes to Eyeballs the highest?
from pyspark.sql.functions import round
df_zero_to_eyeball = df_final.groupBy(window('Time_padded', '72 hours')).agg(sum('Eyeballs').alias('total_eyeballs'), sum('Zeroes').alias('total_zeroes'))
df_zero_to_eyeball = df_zero_to_eyeball.withColumn('zeroes_to_eyeballs', round((col('total_zeroes')/col('total_eyeballs')),2))
df_zero_to_eyeball.show()

+--------------------+--------------+------------+------------------+
|              window|total_eyeballs|total_zeroes|zeroes_to_eyeballs|
+--------------------+--------------+------------+------------------+
|{2012-09-08 05:30...|           205|          35|              0.17|
|{2012-09-11 05:30...|          1080|         211|               0.2|
|{2012-09-14 05:30...|          1756|         439|              0.25|
|{2012-09-17 05:30...|          1029|         210|               0.2|
|{2012-09-20 05:30...|          2220|         440|               0.2|
|{2012-09-23 05:30...|           397|          94|              0.24|
+--------------------+--------------+------------+------------------+



In [13]:
#If you could add 5 drivers to any single hour of every day during the two-week period, 
#which hour should you add them to? Hint: Consider both rider eyeballs and driver supply when choosing.

df_req_to_driver = df_final.groupBy('Time').agg(avg('Requests').alias('Avg_Request'), avg('Unique_Drivers').alias('Avg_Driver'))
df_req_to_driver = df_req_to_driver.withColumn('request_to_driver', round(col('Avg_Request')/col('Avg_Driver'), 2))

df_req_to_driver.orderBy('request_to_driver', ascending=False).show()


+----+------------------+------------------+-----------------+
|Time|       Avg_Request|        Avg_Driver|request_to_driver|
+----+------------------+------------------+-----------------+
|   2| 7.142857142857143| 4.428571428571429|             1.61|
|  23|13.142857142857142|               8.5|             1.55|
|   0|10.142857142857142| 7.928571428571429|             1.28|
|   5|               1.0|0.7857142857142857|             1.27|
|  22|12.428571428571429|10.285714285714286|             1.21|
|   1| 6.857142857142857| 6.714285714285714|             1.02|
|   4|0.6428571428571429|0.6428571428571429|              1.0|
|   3|               2.5| 2.857142857142857|             0.88|
|  19|11.142857142857142|12.857142857142858|             0.87|
|   6|               2.0| 2.642857142857143|             0.76|
|  21|               8.0|11.071428571428571|             0.72|
|  18|               8.5|12.428571428571429|             0.68|
|  20| 7.642857142857143|11.642857142857142|           

In [14]:
#Looking at the data from all two weeks, which time might make the most sense to consider a true “end day” instead of midnight? 
#(i.e when are supply and demand at both their natural minimums)

df_trip_driver = df_final.groupBy('time').agg(avg('Completed_Trips').alias('Avg_Completed_Trips'), avg('Unique_Drivers').alias('Avg_Unique_Drivers'))

df_trip_driver.orderBy('Avg_Completed_Trips','Avg_Unique_Drivers').show()

+----+-------------------+------------------+
|time|Avg_Completed_Trips|Avg_Unique_Drivers|
+----+-------------------+------------------+
|   4|0.14285714285714285|0.6428571428571429|
|   5| 0.2857142857142857|0.7857142857142857|
|  10| 1.2857142857142858| 9.214285714285714|
|   6| 1.3571428571428572| 2.642857142857143|
|   7| 1.3571428571428572| 4.285714285714286|
|   9| 1.4285714285714286| 7.857142857142857|
|   3|                1.5| 2.857142857142857|
|   8| 1.7142857142857142| 6.785714285714286|
|  11| 2.5714285714285716|               9.5|
|  13|  3.142857142857143| 8.714285714285714|
|  12| 3.2857142857142856| 9.428571428571429|
|  15|  3.357142857142857| 9.928571428571429|
|  14|                3.5| 8.928571428571429|
|   2|  4.357142857142857| 4.428571428571429|
|  16|  4.857142857142857|10.285714285714286|
|   1|  5.071428571428571| 6.714285714285714|
|  20|  5.428571428571429|11.642857142857142|
|  17|  5.571428571428571|11.785714285714286|
|  21|  6.285714285714286|11.07142

In [15]:
df_final.printSchema()

root
 |-- Trip_Date: date (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Eyeballs: integer (nullable = true)
 |-- Zeroes: integer (nullable = true)
 |-- Completed_Trips: integer (nullable = true)
 |-- Requests: integer (nullable = true)
 |-- Unique_Drivers: integer (nullable = true)
 |-- Time_padded: timestamp (nullable = true)
 |-- Day: integer (nullable = true)

