In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MySparkApp").master("local[*]").getOrCreate()
from pyspark.sql.functions import col,count,avg, month, year, to_date, current_date, when
spark

In [2]:
admin_df = spark.read.csv("Uber/Uber/Admin_data.csv", header = True, inferSchema = True)
customer_df = spark.read.csv("Uber/Uber/Customer_table.csv", header = True, inferSchema = True)
rider_df = spark.read.csv("Uber/Uber/Riders_data.csv", inferSchema = True)

In [3]:
admin_df.show(5)

+--------------+--------------+-------+----------+---+---------+---------------+-----------+---------------+-----+-----------+-------------+---------------+-----+------+--------+--------+-----+-------------+---------+---------+---------+----------+----------+-----------+--------------+---------------+---------+----------+------------+--------+----------+---------+
|    Start_time|      End_time|   Name|    Mobile|Age|Pin-Codes|         Source|Vaccine_cus|    Destination|Miles|Est_Costing|Ride_category|        Purpose| temp|clouds|pressure|humidity| wind|accquire_vehi|free_vehi|Lattitute|Longitude|locationID|rating_cus|Riders_Name|Riders_contact|Trusted_Contact|Rating_RI|Vaccine_Ri|Payment_mode|Discount|Final_cost|   Status|
+--------------+--------------+-------+----------+---+---------+---------------+-----------+---------------+-----+-----------+-------------+---------------+-----+------+--------+--------+-----+-------------+---------+---------+---------+----------+----------+-------

In [4]:
customer_df.show(5)

+---------------+--------------+-----------+-------------------+-------------+-------+---------------+---------------+-------------+---------+------------+---------------+--------------+----------------------+---------+----------------+------------+--------------+----------------+--------+-----------+
|Pickup DateTime| Drop DateTime|Driver Name|Driver Phone Number|Trip Distance|Trip ID|Pickup Location|  Drop Location|Trip Duration|Trip Fare|Vehicle Type|   Trip Purpose|Passenger Name|Passenger Phone Number|Scheduled|Passenger Rating|Driver Rated|Payment Method|Payment Discount|Net Fare|Trip Status|
+---------------+--------------+-----------+-------------------+-------------+-------+---------------+---------------+-------------+---------+------------+---------------+--------------+----------------------+---------+----------------+------------+--------------+----------------+--------+-----------+
| 1/1/2016 21:11|1/1/2016 21:17|     Almire|         9298608912|           21| 318886|    F

In [5]:
rider_df.show(5)

+--------------+--------------+-------+----------+---+------+---------------+---------------+----+------+----+-------+----------+----+----+----+----+-----------+----+-------+---------+
|           _c0|           _c1|    _c2|       _c3|_c4|   _c5|            _c6|            _c7| _c8|   _c9|_c10|   _c11|      _c12|_c13|_c14|_c15|_c16|       _c17|_c18|   _c19|     _c20|
+--------------+--------------+-------+----------+---+------+---------------+---------------+----+------+----+-------+----------+----+----+----+----+-----------+----+-------+---------+
|1/1/2016 21:11|1/1/2016 21:17| Almire|9298608912| 21|318886|    Fort Pierce|    Fort Pierce| 5.1| 49.98|Bike|Johanna|9181026109|   3| 141| YES| YES|       Gpay|  0%|  49.98|Cancelled|
|1/2/2016 20:25|1/2/2016 20:38|Frazier|8621617385| 27|318886|    Fort Pierce|    Fort Pierce| 4.8| 47.04|Bike|Charlot|9855403124|   4|  65| YES| YES|     Phonpe|  0%|  47.04|  Arrived|
|1/5/2016 17:31|1/5/2016 17:45| Editha|9954004976| 20|318886|    Fort Pierc

## 1. No. Of Customers Take trip from the same location 

In [6]:
# Count number of customers taking trip from the same pickup location
customer_df.groupBy("Pickup Location") \
    .agg(count("*").alias("Number_of_Trips")) \
    .orderBy("Number_of_Trips", ascending=False) \
    .show(truncate=False)


+-----------------+---------------+
|Pickup Location  |Number_of_Trips|
+-----------------+---------------+
|Fort Pierce      |108            |
|Midtown          |78             |
|West Palm Beach  |54             |
|Cary             |52             |
|Lower Manhattan  |26             |
|Midtown East     |26             |
|Flatiron District|26             |
|East Harlem      |26             |
|Hudson Square    |26             |
|Jamaica          |26             |
|New York         |26             |
|Elmhurst         |26             |
+-----------------+---------------+



## 2.what is priority for each ride category from each location

In [7]:
# Count of each ride category per pickup location
admin_df.groupBy("Source", "Ride_category") \
    .agg(count("*").alias("Trip_Count")) \
    .orderBy("Source", "Trip_Count", ascending=[True, False]) \
    .show(truncate=False)

+-----------------+-------------+----------+
|Source           |Ride_category|Trip_Count|
+-----------------+-------------+----------+
|Cary             |Prime        |12        |
|Cary             |Auto         |12        |
|Cary             |Uber-Mini    |12        |
|Cary             |Uber-Micro   |8         |
|Cary             |Bike         |8         |
|East Harlem      |Bike         |6         |
|East Harlem      |Uber-Micro   |6         |
|East Harlem      |Uber-Mini    |6         |
|East Harlem      |Auto         |4         |
|East Harlem      |Prime        |4         |
|Elmhurst         |Auto         |6         |
|Elmhurst         |Uber-Micro   |6         |
|Elmhurst         |Prime        |5         |
|Elmhurst         |Bike         |5         |
|Elmhurst         |Uber-Mini    |4         |
|Flatiron District|Uber-Mini    |6         |
|Flatiron District|Uber-Micro   |6         |
|Flatiron District|Bike         |6         |
|Flatiron District|Auto         |4         |
|Flatiron 

## 3.what are the longest locations of customer travelled.

In [8]:
# Sort by Trip Distance descending and show top 10
longest_trips = customer_df.select(
    "Passenger Name", "Pickup Location", "Drop Location", "Trip Distance"
).orderBy(col("Trip Distance").desc())

longest_trips.show(10, truncate=False)

+--------------+---------------+-------------+-------------+
|Passenger Name|Pickup Location|Drop Location|Trip Distance|
+--------------+---------------+-------------+-------------+
|Price         |East Harlem    |Whitebridge  |80           |
|Darlleen      |Fort Pierce    |Cary         |80           |
|Virginie      |Elmhurst       |Cary         |80           |
|Charlena      |Midtown East   |Durham       |80           |
|Haskel        |Cary           |Whitebridge  |80           |
|Jacky         |West Palm Beach|Houston      |80           |
|Collette      |Fort Pierce    |Tanglewood   |80           |
|Moss          |Lower Manhattan|Morrisville  |80           |
|Elsy          |New York       |Cary         |79           |
|Daron         |Fort Pierce    |Cary         |79           |
+--------------+---------------+-------------+-------------+
only showing top 10 rows



## 4. Drivers who completed ride with non-vaccinated customers.

In [9]:
# Filter non-vaccinated customer rides that were completed
completed_rides_df = admin_df.filter(
    (col("Vaccine_cus") == "NO") &
    (col("Status").isin("Arrived", "Assigned"))
)

# Select distinct driver names and phone numbers
completed_rides_df.select("Name").distinct().show(truncate=False)


+---------+
|Name     |
+---------+
|Baron    |
|Aubert   |
|Randee   |
|Violetta |
|Ula      |
|Charissa |
|Cassius  |
|Danielle |
|Bernadine|
|Mathian  |
|Gabbey   |
|Ray      |
|Laurice  |
|Fabio    |
|Sherwin  |
|Reggie   |
|Bjorn    |
|Terrye   |
|Anallise |
|Clim     |
+---------+
only showing top 20 rows



## 5.How many vaccinated customers have travelled.

In [10]:
# Count vaccinated customer rides
admin_df.filter(col("Vaccine_cus") == "YES") \
    .agg(count("*").alias("Vaccinated_Customers_Trips")) \
    .show()

+--------------------------+
|Vaccinated_Customers_Trips|
+--------------------------+
|                       400|
+--------------------------+



## 6.Customers who completed ride with non-vaccinated Drivers.

In [11]:
# Filter for non-vaccinated drivers and completed rides
non_vaccinated_driver_rides = admin_df.filter(
    (col("Vaccine_Ri") == "NO") &
    (col("Status").isin("Arrived", "Assigned"))
)

# Show unique customers who took those rides
non_vaccinated_driver_rides.select("Name", "Riders_Name") \
    .distinct() \
    .show(truncate=False)


+--------+-----------+
|Name    |Riders_Name|
+--------+-----------+
|Horton  |Dora       |
|Madison |Kip        |
|Charissa|Kimberly   |
|Viviana |Kain       |
|Ernaline|Yasmeen    |
|Joelle  |Dora       |
|Kristo  |Ellette    |
|Arliene |Yasmeen    |
|Gracia  |Carolus    |
|Anton   |Marta      |
|Roderigo|Pattin     |
|Renard  |Haskel     |
|Riley   |Abbi       |
|Kristel |Carolus    |
|Mathian |Mikkel     |
|Anallise|Pattin     |
|Geordie |Pattin     |
|Gwennie |Romonda    |
|Lindy   |Haskel     |
|Marv    |Abbi       |
+--------+-----------+
only showing top 20 rows



# WRONG
## 7.who is the customer completed highest no of rides

In [12]:
# Filter completed rides
completed_rides = customer_df.filter(col("Trip Status").isin("Arrived", "Assigned"))

# Group by Passenger Name and count rides
top_customer = completed_rides.groupBy("Passenger Name", "Passenger Phone Number") \
    .agg(count("*").alias("Completed_Rides")) \
    .orderBy(col("Completed_Rides").desc()) \
    .limit(1)

top_customer.show(truncate=False)

+--------------+----------------------+---------------+
|Passenger Name|Passenger Phone Number|Completed_Rides|
+--------------+----------------------+---------------+
|Siegfried     |9419083292            |5              |
+--------------+----------------------+---------------+



## 8.who is the driver completed highest no of rides.

In [17]:
# Filter for completed rides
completed_rides = admin_df.filter(col("Status").isin("Arrived", "Assigned"))

# Group by driver and count number of completed rides
top_driver = completed_rides.groupBy("Riders_Name", "Riders_contact") \
    .agg(count("*").alias("Completed_Rides")) \
    .orderBy(col("Completed_Rides").desc()) \
    .limit(5)

top_driver.show(truncate=False)

+-----------+--------------+---------------+
|Riders_Name|Riders_contact|Completed_Rides|
+-----------+--------------+---------------+
|Rhonda     |9626523261    |5              |
|Marve      |9503219756    |5              |
|Aurlie     |9450752773    |5              |
|Siegfried  |9419083292    |5              |
|Ede        |9659266471    |5              |
+-----------+--------------+---------------+



## 9.what are first 10 age groups which uses uber services mostly.

In [14]:
top_age_groups = admin_df.groupBy("Age") \
    .agg(count("*").alias("Usage_Count")) \
    .orderBy(col("Usage_Count").desc()) \
    .limit(10)

top_age_groups.show()

+---+-----------+
|Age|Usage_Count|
+---+-----------+
| 69|         16|
| 15|         15|
| 32|         15|
| 28|         12|
| 78|         11|
| 43|         11|
| 49|         11|
| 21|         11|
| 23|         10|
| 44|         10|
+---+-----------+



## 10.what is the count of different destination locations from same start location and also completed ride

In [15]:
completed_rides = customer_df.filter(col("Trip Status").isin("Arrived", "Assigned"))

destination_counts = completed_rides.groupBy("Pickup Location", "Drop Location") \
    .agg(count("*").alias("Trip_Count")) \
    .orderBy("Pickup Location", "Trip_Count", ascending=[True, False])

destination_counts.show(20, truncate=False)


+---------------+---------------------+----------+
|Pickup Location|Drop Location        |Trip_Count|
+---------------+---------------------+----------+
|Cary           |Cary                 |12        |
|Cary           |Morrisville          |7         |
|Cary           |Whitebridge          |3         |
|Cary           |Kissimmee            |2         |
|Cary           |Lake Reams           |2         |
|Cary           |Houston              |2         |
|Cary           |Raleigh              |1         |
|Cary           |South Congress       |1         |
|Cary           |Cedar Hill           |1         |
|Cary           |Kenner               |1         |
|Cary           |Tanglewood           |1         |
|Cary           |Lakeview             |1         |
|Cary           |R?walpindi           |1         |
|Cary           |Hazelwood            |1         |
|Cary           |Chessington          |1         |
|Cary           |Westpark Place       |1         |
|Cary           |Lower Garden D

## 11.what is the most expensive Drive

In [23]:
most_expensive_trip = customer_df.select(
    "Passenger Name", "Driver Name", "Pickup Location", "Drop Location", "Trip Fare"
).orderBy(col("Trip Fare").desc()).limit(1)

most_expensive_trip.show()

+--------------+-----------+---------------+-------------+---------+
|Passenger Name|Driver Name|Pickup Location|Drop Location|Trip Fare|
+--------------+-----------+---------------+-------------+---------+
|        Aurlie|        Ely|           Cary|         Cary|   5406.0|
+--------------+-----------+---------------+-------------+---------+

