The goal of the project is to utilize Python’s powerful library for big data processing to gain insights from Uber’s dataset and answer questions below. 
Data sets and analysis questions are from
<a href="https://www.kaggle.com/datasets/nanasahebshinde/uber-case-study">Kaggle’s Uber Case Study</a>



<b>Dataset Columns</b>:<br>
Date<br>
Time<br>
Eyeballs = Customers who launch the app looking for riders. It is a good measure of demand<br>
Zeroes = Customers who open the app and see no cars in the area.<br>
Requests = Customers who make requests for a car.<br>
Completed Trip = The point from when a customer is picked<br>
Unique Drivers<br><br>

<b>Using the provided dataset, answer the following questions</b>:
1. Which date had the most trips completed?<br>
2. What was the highest number of completed trips within a 24 hour period?<br>
3. Which hour of the day had the most requests during?<br>
4. What percentages of all zeroes occurred on weekends (Friday at 5 pm to Sunday at 3 am)?<br>
5. In drafting a driver schedule in terms of 8 hour shifts, when are the busiest 8 consecutive hours in terms of unique requests? A new shift starts every 8 hours. Assume that a driver will work the same shift each day.<br>


In [8]:
#Import the findspark module, initialize the module, and import pyspark
import findspark
findspark.init()
import pyspark

In [15]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as sf

import warnings
warnings.filterwarnings('ignore')

# Create a SparkSession
spark = SparkSession.builder.appName("MyUberData").getOrCreate()


In [16]:
# Load the dataset into a DataFrame
df = spark.read.csv("uber_dataset.csv", header=True, inferSchema=True)

In [17]:
df.columns

['Date',
 'Time (Local)',
 'Eyeballs ',
 'Zeroes ',
 'Completed Trips ',
 'Requests ',
 'Unique Drivers']

In [18]:
df.show()

+---------+------------+---------+-------+----------------+---------+--------------+
|     Date|Time (Local)|Eyeballs |Zeroes |Completed Trips |Requests |Unique Drivers|
+---------+------------+---------+-------+----------------+---------+--------------+
|10-Sep-12|           7|        5|      0|               2|        2|             9|
|10-Sep-12|           8|        6|      0|               2|        2|            14|
|10-Sep-12|           9|        8|      3|               0|        0|            14|
|10-Sep-12|          10|        9|      2|               0|        1|            14|
|10-Sep-12|          11|       11|      1|               4|        4|            11|
|10-Sep-12|          12|       12|      0|               2|        2|            11|
|10-Sep-12|          13|        9|      1|               0|        0|             9|
|10-Sep-12|          14|       12|      1|               0|        0|             9|
|10-Sep-12|          15|       11|      2|               1|      

In [19]:
# Which date had the most trips completed?
#To answer this question, we will sum up completed trips by date, sort in descending order and select top row
from pyspark.sql.functions import sum

# Group the data by date and sum the completed trips
completed_trips_by_date = df.groupBy("Date").sum("Completed Trips ")

# Find the date with the most completed trips
date_with_most_completed_trips = completed_trips_by_date \
    .orderBy(sum("Completed Trips "), ascending=False)

print(f'{date_with_most_completed_trips.first()[0]} with {date_with_most_completed_trips.first()[1]} trips has the most number of trips.')

22-Sep-12 with 248 trips has the most number of trips.


In [63]:
#Convert Date and Hour fields in Datetime field
#Concatenate Date string and Hour string in the new column
df = df.withColumn('joined_date', sf.concat(sf.col('Date'),sf.lit(' '), sf.col('Time (Local)')))
#Use unit_timestamp converstion to get Datetime in string format
df = df.withColumn('datetime_srt', from_unixtime(unix_timestamp('joined_date', 'd-LLL-yy H'),'MM-dd-yyyy HH:mm:ss'))
#Convert string into DateTime type
df=df.withColumn('Datetime', sf.to_timestamp('datetime_srt','MM-dd-yyyy HH:mm:ss'))
df.show()

+---------+------------+---------+-------+----------------+---------+--------------+------------+-------------------+-------------------+
|     Date|Time (Local)|Eyeballs |Zeroes |Completed Trips |Requests |Unique Drivers| joined_date|       datetime_srt|           Datetime|
+---------+------------+---------+-------+----------------+---------+--------------+------------+-------------------+-------------------+
|10-Sep-12|           7|        5|      0|               2|        2|             9| 10-Sep-12 7|09-10-2012 07:00:00|2012-09-10 07:00:00|
|10-Sep-12|           8|        6|      0|               2|        2|            14| 10-Sep-12 8|09-10-2012 08:00:00|2012-09-10 08:00:00|
|10-Sep-12|           9|        8|      3|               0|        0|            14| 10-Sep-12 9|09-10-2012 09:00:00|2012-09-10 09:00:00|
|10-Sep-12|          10|        9|      2|               0|        1|            14|10-Sep-12 10|09-10-2012 10:00:00|2012-09-10 10:00:00|
|10-Sep-12|          11|       11|

In [21]:
#What was the highest number of completed trips within a 24 hour period?


# Group the data by 24-hour windows and sum the completed trips
completed_trips_by_window = df \
    .groupBy(sf.window("Datetime", "24 hours")) \
    .agg(sum("Completed Trips ").alias("Total Completed Trips")) \
    .orderBy("Total Completed Trips", ascending=False)


completed_trips_by_window.show()

highest_completed_trips_in_24_hours = completed_trips_by_window \
    .select("Total Completed Trips") \
    .first()["Total Completed Trips"]

print(f'The highest number of completed trips in 24 hour period is {highest_completed_trips_in_24_hours}')

+--------------------+---------------------+
|              window|Total Completed Trips|
+--------------------+---------------------+
|{2012-09-21 19:00...|                  256|
|{2012-09-22 19:00...|                  189|
|{2012-09-14 19:00...|                  181|
|{2012-09-15 19:00...|                  161|
|{2012-09-20 19:00...|                  130|
|{2012-09-11 19:00...|                   96|
|{2012-09-13 19:00...|                   63|
|{2012-09-16 19:00...|                   51|
|{2012-09-17 19:00...|                   47|
|{2012-09-19 19:00...|                   46|
|{2012-09-18 19:00...|                   42|
|{2012-09-12 19:00...|                   40|
|{2012-09-10 19:00...|                   24|
|{2012-09-09 19:00...|                   20|
|{2012-09-23 19:00...|                   19|
+--------------------+---------------------+

The highest number of completed trips in 24 hour period is 256


In [22]:
#Which hour of the day had the most requests?
hourly_requests = df \
    .groupBy(sf.hour("Datetime").alias("hour")) \
    .agg(sum("Requests ").alias("total_requests")) \
    .orderBy("total_requests", ascending=False)

most_requested_hour = hourly_requests.select("hour").first()[0]
print("The hour with the most requests is:", most_requested_hour)

The hour with the most requests is: 23


In [23]:
#What percentages of all zeroes occurred on weekends (Friday at 5 pm to Sunday at 3 am)?

#First, let's calculate number of "Zeros" occurred on the weekend
weekend_zeros = df.filter((hour("Datetime") >= 17) | (hour("Datetime") < 3)) \
    .filter((dayofweek("Datetime") == 6) | (dayofweek("Datetime") == 7)) \
    .agg(sum("Zeroes ").alias("weekend_zeros")).collect()[0]["weekend_zeros"]

#Then, let's calculate total number of "Zero" incidents in the dataset
total_zeros = df.agg(sum("Zeroes ").alias("total_zeros")).collect()[0]["total_zeros"]

pcnt_weekend_zeros = weekend_zeros / total_zeros * 100

print("The percentage of \"Zero\" events that occurred on weekends is: ", pcnt_weekend_zeros, "%")


The percentage of "Zero" events that occurred on weekends is:  29.111266620014 %


In [62]:
# Question: In drafting a driver schedule in terms of 8 hours shifts, when are the busiest 8 consecutive hours in terms of unique requests? 
# A new shift starts every 8 hours. Assume that a driver will work the same shift each day.

#First, we will find the number of unique requests per hour of each day. Then create a window of 8 hours to find 8 consecutive hours
#with the highest number of unique requests. 

import re

busiest_8_consecutive_hours = df \
    .groupBy(sf.window("Datetime", "8 hours")) \
    .agg(sum("Requests ").alias("total_requests")) \
    .orderBy(sum("Requests "), ascending=False)

start_date = busiest_8_consecutive_hours.select(sf.date_format(sf.col('window.start'), 'yyyy-MM-dd HH:mm'))

start_date.show(1)

end_date = busiest_8_consecutive_hours.select(sf.date_format(sf.col('window.end'), 'yyyy-MM-dd HH:mm'))
end_date.show(1)

#.str.extract(r'"start":"([^"]*)","end":"([^"]+)')

#print("the busiest 8 consecutive hours are from", start_date )
#, " to ",
#      busiest_8_consecutive_hours.collect()[0]("end"), " with ", busiest_8_consecutive_hours.collect()[0]("total_requests"), 
#      " total requests")
#print(busiest_8_consecutive_hours.collect()[0])



+-------------------------------------------+
|date_format(window.start, yyyy-MM-dd HH:mm)|
+-------------------------------------------+
|                           2012-09-22 19:00|
+-------------------------------------------+
only showing top 1 row

+-----------------------------------------+
|date_format(window.end, yyyy-MM-dd HH:mm)|
+-----------------------------------------+
|                         2012-09-23 03:00|
+-----------------------------------------+
only showing top 1 row

