In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark=SparkSession.builder \
      .appName("MyApp") \
      .master("local[*]") \
      .getOrCreate()
print("Spark version:", spark.version)
sc=spark.sparkContext

Spark version: 3.5.5


In [2]:
spark

In [3]:
cus_df=spark.read.csv('Customer_table.csv',header=True,inferSchema=True,mode="DROPMALFORMED")
cus_df.show(10)

+---------------+---------------+-----------+-------------------+-------------+-------+---------------+---------------+-------------+---------+------------+---------------+--------------+----------------------+---------+----------------+------------+--------------+----------------+--------+-----------+
|Pickup DateTime|  Drop DateTime|Driver Name|Driver Phone Number|Trip Distance|Trip ID|Pickup Location|  Drop Location|Trip Duration|Trip Fare|Vehicle Type|   Trip Purpose|Passenger Name|Passenger Phone Number|Scheduled|Passenger Rating|Driver Rated|Payment Method|Payment Discount|Net Fare|Trip Status|
+---------------+---------------+-----------+-------------------+-------------+-------+---------------+---------------+-------------+---------+------------+---------------+--------------+----------------------+---------+----------------+------------+--------------+----------------+--------+-----------+
| 1/1/2016 21:11| 1/1/2016 21:17|     Almire|         9298608912|           21| 318886| 

In [4]:
cus_df.printSchema()

root
 |-- Pickup DateTime: string (nullable = true)
 |-- Drop DateTime: string (nullable = true)
 |-- Driver Name: string (nullable = true)
 |-- Driver Phone Number: long (nullable = true)
 |-- Trip Distance: integer (nullable = true)
 |-- Trip ID: integer (nullable = true)
 |-- Pickup Location: string (nullable = true)
 |-- Drop Location: string (nullable = true)
 |-- Trip Duration: double (nullable = true)
 |-- Trip Fare: double (nullable = true)
 |-- Vehicle Type: string (nullable = true)
 |-- Trip Purpose: string (nullable = true)
 |-- Passenger Name: string (nullable = true)
 |-- Passenger Phone Number: long (nullable = true)
 |-- Scheduled: string (nullable = true)
 |-- Passenger Rating: double (nullable = true)
 |-- Driver Rated: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Payment Discount: string (nullable = true)
 |-- Net Fare: double (nullable = true)
 |-- Trip Status: string (nullable = true)



In [5]:
admin_df=spark.read.csv('Admin_data.csv',header=True,inferSchema=True,mode="DROPMALFORMED")
admin_df.show(10)

+---------------+---------------+-------+----------+---+---------+---------------+-----------+---------------+-----+-----------+-------------+---------------+-----+------+--------+--------+-----+-------------+---------+---------+---------+----------+----------+-----------+--------------+---------------+---------+----------+------------+--------+----------+---------+
|     Start_time|       End_time|   Name|    Mobile|Age|Pin-Codes|         Source|Vaccine_cus|    Destination|Miles|Est_Costing|Ride_category|        Purpose| temp|clouds|pressure|humidity| wind|accquire_vehi|free_vehi|Lattitute|Longitude|locationID|rating_cus|Riders_Name|Riders_contact|Trusted_Contact|Rating_RI|Vaccine_Ri|Payment_mode|Discount|Final_cost|   Status|
+---------------+---------------+-------+----------+---+---------+---------------+-----------+---------------+-----+-----------+-------------+---------------+-----+------+--------+--------+-----+-------------+---------+---------+---------+----------+----------+-

In [6]:
admin_df.printSchema()

root
 |-- Start_time: string (nullable = true)
 |-- End_time: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Mobile: long (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Pin-Codes: integer (nullable = true)
 |-- Source: string (nullable = true)
 |-- Vaccine_cus: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Miles: double (nullable = true)
 |-- Est_Costing: double (nullable = true)
 |-- Ride_category: string (nullable = true)
 |-- Purpose: string (nullable = true)
 |-- temp: double (nullable = true)
 |-- clouds: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- wind: double (nullable = true)
 |-- accquire_vehi: integer (nullable = true)
 |-- free_vehi: string (nullable = true)
 |-- Lattitute: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- locationID: integer (nullable = true)
 |-- rating_cus: integer (nullable = true)
 |-- Riders_Name: string (null

# 1. Top 5 cities with most customers

In [7]:
from pyspark.sql.functions import count
cus_df.groupBy("Pickup Location").agg(count("Passenger Name").alias("customer_count")).orderBy("customer_count",ascending=False).show()

+-----------------+--------------+
|  Pickup Location|customer_count|
+-----------------+--------------+
|      Fort Pierce|           108|
|          Midtown|            78|
|  West Palm Beach|            54|
|             Cary|            52|
|  Lower Manhattan|            26|
|     Midtown East|            26|
|Flatiron District|            26|
|      East Harlem|            26|
|    Hudson Square|            26|
|          Jamaica|            26|
|         New York|            26|
|         Elmhurst|            26|
+-----------------+--------------+



# 2. The total amount of customers who opted ride category as bike.

In [8]:
from pyspark.sql.functions import count
admin_df.filter(admin_df.Ride_category=="Bike").agg(count("Riders_Name").alias("customer_count")).orderBy('customer_count',ascending=False).show()

+--------------+
|customer_count|
+--------------+
|           100|
+--------------+



# 3. Find cities with more than 50 customers

In [9]:
new_df=admin_df.groupBy("Source").agg(count("Riders_Name").alias("customer_count")).orderBy("customer_count",ascending=False)
new_df.filter(new_df.customer_count > 50).show()

+---------------+--------------+
|         Source|customer_count|
+---------------+--------------+
|    Fort Pierce|           108|
|        Midtown|            78|
|West Palm Beach|            54|
|           Cary|            52|
+---------------+--------------+



# 4. Find the top rated drivers in each category

In [10]:
from pyspark.sql.functions import first
admin_df.filter(admin_df.Rating_RI>4.5).groupBy('Ride_category').agg(first("Name").alias("top_rated_driver")).show()

+-------------+----------------+
|Ride_category|top_rated_driver|
+-------------+----------------+
|         Auto|            Clim|
|         Bike|          Editha|
|        Prime|           Glori|
|   Uber-Micro|          Stefan|
|    Uber-Mini|            Berk|
+-------------+----------------+



# 5. Find the most active driver

In [11]:
from pyspark.sql.functions import count
cus_df.groupBy("Driver Name").agg(count("*").alias("total_rides")).orderBy('total_rides',ascending=False).limit(1).show()

+-----------+-----------+
|Driver Name|total_rides|
+-----------+-----------+
|    Mathian|          2|
+-----------+-----------+



# 6. Find the name of customers whose ride got cancelled due to clouds

In [12]:
admin_df.filter((admin_df.clouds == True)&(admin_df.Status=="Cancelled")).select("Riders_Name").show()

+------------+
| Riders_Name|
+------------+
|     Johanna|
|Maximilianus|
|       Daron|
|      Sherry|
|       Marta|
|       Natty|
|       Margi|
|      Pattin|
|       Dalli|
|   Katherina|
+------------+



# 7.  Find the customers who choose the payment mode as Phonepe

In [13]:
admin_df.filter(admin_df.Payment_mode=="Phonpe").select("Riders_Name").show()

+-----------+
|Riders_Name|
+-----------+
|    Charlot|
|        Kim|
|      Margi|
|        Kip|
|      Dalli|
|  Wadsworth|
|   Virginie|
|       Roxy|
|    Carolus|
|    Johanna|
|     Ashlee|
|    Carmine|
|     Shaine|
|      Cindi|
|     Damien|
|       Edin|
|    Chelsie|
|      Hagan|
|    Romonda|
|      Amara|
+-----------+
only showing top 20 rows



# 8. Find the driver who accepted ride when the weather is cloudy

In [14]:
admin_df.filter((admin_df.clouds==True)&(admin_df.Status=="Arrived")).select("Name").show()

+---------+
|     Name|
+---------+
|  Frazier|
|    Bryan|
|    Aymer|
|    Moore|
|     Berk|
|  Donavon|
|     Karl|
| Valentia|
|   Aubert|
|Annamaria|
| Faulkner|
|    Yance|
|   Robina|
|Westbrook|
|  Othello|
|   Sydney|
|   Hadlee|
|    Abbey|
| Clarette|
|     Etti|
+---------+
only showing top 20 rows



# 9. Find the most used payment method

In [15]:
from pyspark.sql.functions import count
admin_df.groupBy("Payment_mode").agg(count("*").alias("usage_count")).orderBy('usage_count',ascending=False).limit(1).show()

+------------+-----------+
|Payment_mode|usage_count|
+------------+-----------+
| Uber wallet|        226|
+------------+-----------+



# 10. Find the customer who got the maximum discount and display their payment method


In [16]:
from pyspark.sql.functions import max,col
max_discount = admin_df.agg(max("Discount").alias("max_disc")).collect()[0]["max_disc"]
admin_df.filter(admin_df.Discount==max_discount).select("Name","Discount","Payment_mode").show()

+-------+--------+------------+
|   Name|Discount|Payment_mode|
+-------+--------+------------+
|  Deeyn|     10%| Uber wallet|
|  Bryan|     10%| Uber wallet|
| Crissy|     10%| Uber wallet|
| Kendre|     10%| Uber wallet|
|  Aymer|     10%| Uber wallet|
|Mathian|     10%| Uber wallet|
|Maurice|     10%| Uber wallet|
|   Timi|     10%| Uber wallet|
|  Janey|     10%| Uber wallet|
|Huntlee|     10%| Uber wallet|
|Pauline|     10%| Uber wallet|
|  Tobye|     10%| Uber wallet|
|Skipton|     10%| Uber wallet|
|Geordie|     10%| Uber wallet|
|Etienne|     10%| Uber wallet|
|  Lotty|     10%| Uber wallet|
|  Raviv|     10%| Uber wallet|
|    Ray|     10%| Uber wallet|
|  Sally|     10%| Uber wallet|
|  Gusta|     10%| Uber wallet|
+-------+--------+------------+
only showing top 20 rows



# 11. Find the most popular route

In [17]:
from pyspark.sql.functions import count
admin_df.groupBy("Source","Destination").agg(count("*").alias("route_count")).orderBy('route_count',ascending=False).limit(1).show()

+-------+-----------+-----------+
| Source|Destination|route_count|
+-------+-----------+-----------+
|Midtown|       Cary|         18|
+-------+-----------+-----------+



# 13. Does a driver's vaccination status impact customer ratings

In [18]:
from pyspark.sql.functions import avg,sum
admin_df.groupBy("Vaccine_Ri").agg(avg("Rating_RI").alias("Avg_Rating")).show()

+----------+-----------------+
|Vaccine_Ri|       Avg_Rating|
+----------+-----------------+
|       YES|4.062500000000001|
|        NO|4.900000000000004|
+----------+-----------------+



# 14. Do vaccinated drivers travel long distances on average than unvaccinated drivers

In [19]:
from pyspark.sql.functions import avg
admin_df.groupBy("Vaccine_Ri").agg(avg("Miles").alias("avg_distance")).show()

+----------+------------------+
|Vaccine_Ri|      avg_distance|
+----------+------------------+
|       YES|10.595000000000002|
|        NO|12.945000000000004|
+----------+------------------+



# 15. Are there any rides where final cost deviates from estimated price

In [25]:
admin_df.filter(admin_df.Est_Costing != admin_df.Final_cost).select("Name","Source","Destination","Riders_Name","Status").show()

+-------+---------------+---------------+-----------+--------+
|   Name|         Source|    Destination|Riders_Name|  Status|
+-------+---------------+---------------+-----------+--------+
|  Deeyn|    Fort Pierce|West Palm Beach|     Luelle|Assigned|
|  Bryan|West Palm Beach|     Palm Beach|     Duncan| Arrived|
| Crissy|           Cary|           Cary|     Carine|Assigned|
| Kendre|           Cary|    Morrisville|   Collette|Assigned|
|  Aymer|        Jamaica|       New York|     Killie| Arrived|
|Mathian|        Midtown|   Midtown East|     Mikkel| Arrived|
|Maurice|        Midtown|  Hudson Square|  Stanislas|Assigned|
|   Timi|  Hudson Square|Lower Manhattan|       Yvon| Arrived|
|  Janey|Lower Manhattan|  Hudson Square|      Jaine|Assigned|
|Huntlee|    Fort Pierce| Hell's Kitchen|       Abbi|Assigned|
|Pauline|           Cary|Jamestown Court|   Dorothee|Assigned|
|  Tobye|        Jamaica|         Durham|       Jens| Arrived|
|Skipton|       New York|           Cary|       Elsy| A

# 16. Average Trip Duration by Vehicle Type

In [31]:
from pyspark.sql.functions import avg
cus_df.groupBy("Vehicle Type").agg(avg("Trip Duration").alias("avg_trip_duration")).show()

+------------+------------------+
|Vehicle Type| avg_trip_duration|
+------------+------------------+
|        Bike|14.595000000000004|
|       Prime|10.278000000000002|
|        Auto|             13.68|
|   Uber-Mini|             7.602|
|  Uber-Micro| 9.169999999999995|
+------------+------------------+



# 17. Aggregate total miles covered by city or pin code

In [35]:
from pyspark.sql.functions import sum
admin_df.groupBy("Pin-Codes").agg(sum("miles").alias("total_miles_covered")).show()

+---------+-------------------+
|Pin-Codes|total_miles_covered|
+---------+-------------------+
|   646882|              288.5|
|   509117| 197.70000000000002|
|   625132|  850.7999999999998|
|   491563| 257.99999999999994|
|   125733| 267.70000000000005|
|   210314| 355.69999999999993|
|   484320|  388.8999999999999|
|   318886| 1637.2999999999993|
|   386077| 173.59999999999997|
|   265814|              534.4|
|   475241| 427.69999999999993|
|   223672|              152.2|
+---------+-------------------+



# 18. Total cost per unique customer.

In [37]:
admin_df.groupBy("Name").agg(sum("Final_cost").alias("Final_Costing")).show()

+--------+-------------+
|    Name|Final_Costing|
+--------+-------------+
|  Sianna|        60.75|
|   Tessi|        65.66|
|   Baron|        252.0|
|  Earvin|        297.0|
|    Trev|        141.0|
|Rosemary|         42.5|
|  Lorrin|         59.4|
|Melisent|        178.5|
|  Susana|       128.25|
| Nanette|        27.44|
|  Mikkel|        135.0|
|   Rocky|       68.796|
|  Aubert|        202.0|
|Madelene|       1710.0|
|  Randee|         76.5|
|Violetta|       2389.5|
|   Paulo|       68.796|
|     Ula|        113.4|
|Charissa|         83.7|
|Cathleen|        187.2|
+--------+-------------+
only showing top 20 rows



# 19. Identify locations with frequent availability of free vehicles

In [39]:
admin_df.filter(admin_df.free_vehi== True).select("Source").show()

+---------------+
|         Source|
+---------------+
|    Fort Pierce|
|        Midtown|
|   Midtown East|
|        Midtown|
|West Palm Beach|
|Lower Manhattan|
|   Midtown East|
|West Palm Beach|
|    Fort Pierce|
|    Fort Pierce|
|    Fort Pierce|
|       New York|
|    Fort Pierce|
|           Cary|
|       New York|
|    East Harlem|
|   Midtown East|
+---------------+



#