In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.appName("kpis").getOrCreate()

In [3]:
spark

In [4]:
# Loading data
locations = spark.read.csv("/content/locations.csv", header=True, inferSchema=True)
transactions = spark.read.csv("/content/rental_transactions.csv", header=True, inferSchema=True)
users = spark.read.csv("/content/users.csv", header=True, inferSchema=True)
vehicles = spark.read.csv("/content/vehicles.csv", header=True, inferSchema=True)

In [5]:
locations.describe().show()

+-------+------------------+--------------------+--------------------+-------+-----+------------------+-------------------+------------------+
|summary|       location_id|       location_name|             address|   city|state|          zip_code|           latitude|         longitude|
+-------+------------------+--------------------+--------------------+-------+-----+------------------+-------------------+------------------+
|  count|               300|                 300|                 300|    300|  300|               300|                300|               300|
|   mean| 5531.393333333333|                NULL|                NULL|   NULL| NULL| 93076.04333333333|-3.1412114716666664|-5.494931896666663|
| stddev|2617.3563531842715|                NULL|                NULL|   NULL| NULL|1774.4153009719298| 53.347049168486706|100.88512074727106|
|    min|              1154|Acosta, Jackson a...|    00165 Laura Loaf|Anaheim|   CA|             90006|        -89.3601815|        -179.34875|

In [6]:
transactions.describe().show()

+-------+----------+----------+----------+-----------------+-----------------+-----------------+
|summary| rental_id|   user_id|vehicle_id|  pickup_location| dropoff_location|     total_amount|
+-------+----------+----------+----------+-----------------+-----------------+-----------------+
|  count|     20080|     20080|     20080|            20080|            20080|            20080|
|   mean|  Infinity|  Infinity|  Infinity|5551.445766932271|5544.133515936255|806.8037848605578|
| stddev|       NaN|       NaN|       NaN|2613.643533954626| 2602.56020105812|772.6180142164975|
|    min|00004f8d17|00020a8d92|00031378a1|             1154|             1154|             10.0|
|    max|fffbab1511|fffc6f2162|fff99606d9|             9933|             9933|           3600.0|
+-------+----------+----------+----------+-----------------+-----------------+-----------------+



In [7]:
users.describe().show()

+-------+----------+----------+---------+--------------------+-------------------+---------------------+-------------------+
|summary|   user_id|first_name|last_name|               email|       phone_number|driver_license_number|          is_active|
+-------+----------+----------+---------+--------------------+-------------------+---------------------+-------------------+
|  count|     30000|     30000|    30000|               30000|              30000|                30000|              30000|
|   mean|  Infinity|      NULL|     NULL|                NULL|5.103159923375977E9|                 NULL|             0.8004|
| stddev|       NaN|      NULL|     NULL|                NULL|2.849777308098635E9|                 NULL|0.39970634909332775|
|    min|0001795396|     Aaron|   Abbott|aaron.adams@gmail...|  (000)183-6483x011|             AA016383|                  0|
|    max|ffffb2978c|       Zoe|   Zuniga|zoe.rodriguez@yah...|         9999121042|             ZZ920200|                  1|


In [8]:
vehicles.describe().show()

+-------+-------------------+----------------------+--------------------+----------------+---------------+---------------------+------------------+---------------------+--------------------+----------+---------------------+------+------------+
|summary|             active|vehicle_license_number|   registration_name|    license_type|expiration_date|permit_license_number|      vehicle_year|base_telephone_number|        base_address|vehicle_id|last_update_timestamp| brand|vehicle_type|
+-------+-------------------+----------------------+--------------------+----------------+---------------+---------------------+------------------+---------------------+--------------------+----------+---------------------+------+------------+
|  count|             109584|                109584|              109584|          109584|         109584|               109584|            109584|               109584|              109584|    109584|               109584|109584|      109584|
|   mean|  0.89965688421

In [9]:
locations.printSchema()
transactions.printSchema()
users.printSchema()
vehicles.printSchema()

root
 |-- location_id: integer (nullable = true)
 |-- location_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

root
 |-- rental_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- vehicle_id: string (nullable = true)
 |-- rental_start_time: timestamp (nullable = true)
 |-- rental_end_time: timestamp (nullable = true)
 |-- pickup_location: integer (nullable = true)
 |-- dropoff_location: integer (nullable = true)
 |-- total_amount: double (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- driver_license_number: string (nullable = true)
 |-- driver_license_expiry: date (nulla

In [10]:
locations.show(5)

+-----------+--------------------+--------------------+---------+-----+--------+-----------+----------+
|location_id|       location_name|             address|     city|state|zip_code|   latitude| longitude|
+-----------+--------------------+--------------------+---------+-----+--------+-----------+----------+
|       2702|Jackson, Velazque...|3140 Heath Radial...|  Modesto|   CA|   94540|   86.25802| -169.2448|
|       4380|            Bean LLC|51144 Patrick Isl...|  Fontana|   CA|   92188|-74.4558925|-42.279882|
|       7709|     Gilbert-Simmons|    4738 Lewis Locks|Roseville|   CA|   91032|-65.4309305|-64.763489|
|       8607|    Coleman-Robinson|  324 Robin Causeway|  Modesto|   CA|   93714| -64.281076|-77.669631|
|       5499|        Deleon Group|    51725 Evans View|Roseville|   CA|   91849| 18.4951575|-154.76578|
+-----------+--------------------+--------------------+---------+-----+--------+-----------+----------+
only showing top 5 rows



In [11]:
transactions.show(5)

+----------+----------+----------+-------------------+-------------------+---------------+----------------+------------+
| rental_id|   user_id|vehicle_id|  rental_start_time|    rental_end_time|pickup_location|dropoff_location|total_amount|
+----------+----------+----------+-------------------+-------------------+---------------+----------------+------------+
|b139d8e1b2|320be8068b|0d52304987|2024-02-28 08:05:00|2024-03-01 05:05:00|           1497|            6785|       450.0|
|7afd60f6d3|320be8068b|975d72985c|2024-01-07 20:16:00|2024-01-09 21:16:00|           5345|            2608|      2450.0|
|733a9361bc|8f31b734a6|0d9f0f0fb9|2024-01-07 09:36:00|2024-01-07 17:36:00|           2546|            5442|        80.0|
|6e546b69dd|8f31b734a6|967fdab45e|2024-01-05 11:30:00|2024-01-07 04:30:00|           8147|            4380|      2050.0|
|acc192b64a|8f31b734a6|32d58ea4b7|2024-03-06 18:19:00|2024-03-09 14:19:00|           6290|            8932|      1360.0|
+----------+----------+---------

In [12]:
users.show(5)

+----------+----------+---------+--------------------+------------------+---------------------+---------------------+-------------+---------+
|   user_id|first_name|last_name|               email|      phone_number|driver_license_number|driver_license_expiry|creation_date|is_active|
+----------+----------+---------+--------------------+------------------+---------------------+---------------------+-------------+---------+
|26d08ab733|      Lisa|   Parker|lisa.parker@gmail...|334.271.2972x60554|             MO028963|           2033-06-21|   2024-05-26|        1|
|0a0430e6f9|  Courtney|   Martin|courtney.martin@y...|  826-262-0518x252|             VW966518|           2028-09-28|   2024-05-22|        0|
|eb5d10cccd|    Andrew|  Mcclain|andrew.mcclain@ho...|   +1-467-858-1702|             WL839491|           2028-09-01|   2024-01-29|        1|
|2a59127ee0|   Michael|   Hoover|michael.hoover@ya...|  001-220-342-6250|             UI603163|           2028-11-29|   2024-03-22|        1|
|e3a46

In [13]:
vehicles.show(5)

+------+----------------------+--------------------+----------------+---------------+---------------------+------------------+------------+---------------------+--------------------+----------+---------------------+---------+------------+
|active|vehicle_license_number|   registration_name|    license_type|expiration_date|permit_license_number|certification_date|vehicle_year|base_telephone_number|        base_address|vehicle_id|last_update_timestamp|    brand|vehicle_type|
+------+----------------------+--------------------+----------------+---------------+---------------------+------------------+------------+---------------------+--------------------+----------+---------------------+---------+------------+
|     1|               5818886|CITY,LIVERY,LEASI...|FOR HIRE VEHICLE|     27-09-2025|             6EPABCVK|        2018-01-09|        2018|        (646)780-0129|1515 THIRD STREET...|67789f742d|  04-06-2024 13:25:00|  Ferrari|    high_end|
|     1|               5520432|    FERNANDEZ

## Spark Job 1


In [18]:
transactions_location_vehicle = transactions.join(locations, transactions.pickup_location == locations.location_id, "left") \
    .join(vehicles, transactions.vehicle_id == vehicles.vehicle_id, "left")

transactions_location_vehicle.show(5)

+----------+----------+----------+-------------------+-------------------+---------------+----------------+------------+-----------+--------------------+--------------------+--------------+-----+--------+-----------+-----------+------+----------------------+--------------------+----------------+---------------+---------------------+------------------+------------+---------------------+--------------------+----------+---------------------+-------+------------+
| rental_id|   user_id|vehicle_id|  rental_start_time|    rental_end_time|pickup_location|dropoff_location|total_amount|location_id|       location_name|             address|          city|state|zip_code|   latitude|  longitude|active|vehicle_license_number|   registration_name|    license_type|expiration_date|permit_license_number|certification_date|vehicle_year|base_telephone_number|        base_address|vehicle_id|last_update_timestamp|  brand|vehicle_type|
+----------+----------+----------+-------------------+------------------

In [32]:
#  Revenue per Location
revenue_per_location_df = transactions.groupBy("pickup_location").agg(F.sum("total_amount").alias("total_revenue"))\
.join(locations, transactions.pickup_location == \
            locations.location_id, "left").select("pickup_location",
    "location_name",
    "total_revenue")
revenue_per_location_df.show(5)

+---------------+--------------------+-------------+
|pickup_location|       location_name|total_revenue|
+---------------+--------------------+-------------+
|           1959|           Lopez Inc|      58090.0|
|           8928|Phelps, Robinson ...|      43470.0|
|           1507|Smith, Alexander ...|      57120.0|
|           8932|Costa, Whitaker a...|      53890.0|
|           9182|         Montoya Inc|      63900.0|
+---------------+--------------------+-------------+
only showing top 5 rows



In [33]:
# Total Transactions per Location
transactions_per_location_df = transactions.groupBy("pickup_location")\
.agg(F.count("rental_id").alias("total_transactions")).join \
(locations, transactions.pickup_location == \
            locations.location_id, "left").select("pickup_location",
    "location_name",
    "total_transactions")
transactions_per_location_df.show(5)

+---------------+--------------------+------------------+
|pickup_location|       location_name|total_transactions|
+---------------+--------------------+------------------+
|           1959|           Lopez Inc|                69|
|           8928|Phelps, Robinson ...|                66|
|           1507|Smith, Alexander ...|                77|
|           8932|Costa, Whitaker a...|                65|
|           9182|         Montoya Inc|                75|
+---------------+--------------------+------------------+
only showing top 5 rows



In [34]:
# Average, Max, and Min Transaction Amounts
transaction_amounts_df = transactions.groupBy("pickup_location").agg(
    F.avg("total_amount").alias("avg_transaction"),
    F.max("total_amount").alias("max_transaction"),
    F.min("total_amount").alias("min_transaction")
).join(locations, transactions.pickup_location == \
            locations.location_id, "left").select(
    "pickup_location",
    "location_name",
    "avg_transaction",
    "max_transaction",
    "min_transaction")
transaction_amounts_df.show(5)

+---------------+--------------------+-----------------+---------------+---------------+
|pickup_location|       location_name|  avg_transaction|max_transaction|min_transaction|
+---------------+--------------------+-----------------+---------------+---------------+
|           1959|           Lopez Inc|841.8840579710145|         3600.0|           60.0|
|           8928|Phelps, Robinson ...|658.6363636363636|         3200.0|           30.0|
|           1507|Smith, Alexander ...|741.8181818181819|         3250.0|           40.0|
|           8932|Costa, Whitaker a...|829.0769230769231|         3500.0|           40.0|
|           9182|         Montoya Inc|            852.0|         3450.0|           20.0|
+---------------+--------------------+-----------------+---------------+---------------+
only showing top 5 rows



In [20]:
# unique_vehicles_df =transactions.groupBy("pickup_location").agg(F.countDistinct("vehicle_id").alias("unique_vehicles"))
# unique_vehicles_df.show(5)

In [35]:
# Unique Vehicles Used at Each Location
unique_vehicles_per_location_df = transactions.groupBy("pickup_location")\
.agg(F.countDistinct("vehicle_id").alias("unique_vehicles"))\
.join(locations, transactions.pickup_location == \
            locations.location_id, "left").select("pickup_location",
    "location_name",
    "unique_vehicles")
unique_vehicles_per_location_df.show(5)

+---------------+--------------------+---------------+
|pickup_location|       location_name|unique_vehicles|
+---------------+--------------------+---------------+
|           1959|           Lopez Inc|             69|
|           8928|Phelps, Robinson ...|             66|
|           9182|         Montoya Inc|             75|
|           8932|Costa, Whitaker a...|             65|
|           1507|Smith, Alexander ...|             77|
+---------------+--------------------+---------------+
only showing top 5 rows



In [22]:
# Join transactions with vehicles to get vehicle_type
transactions_with_vehicle_type = transactions.join(
    vehicles, transactions.vehicle_id == vehicles.vehicle_id, "left"
)

# Calculate rental duration metrics by vehicle type
rental_duration_by_vehicle_type_df = transactions_with_vehicle_type.withColumn(
    "rental_duration_hours",
    (F.col("rental_end_time").cast("long") - F.col("rental_start_time").cast("long")) / 3600
).groupBy("vehicle_type").agg(
    F.count("*").alias("total_rentals_by_vehicle"),
    F.avg("rental_duration_hours").alias("avg_rental_duration_by_vehicle"),
    F.sum("rental_duration_hours").alias("total_rental_duration_by_vehicle")
)

# Display results
rental_duration_by_vehicle_type_df.show(5)

+------------+------------------------+------------------------------+--------------------------------+
|vehicle_type|total_rentals_by_vehicle|avg_rental_duration_by_vehicle|total_rental_duration_by_vehicle|
+------------+------------------------+------------------------------+--------------------------------+
|    high_end|                    4010|            36.900498753117205|                        147971.0|
|       basic|                    7931|             36.59878955995461|                        290265.0|
|     premium|                    8139|             36.24167588155793|                        294971.0|
+------------+------------------------+------------------------------+--------------------------------+



In [31]:
# Rental Duration and Revenue by Location
# Calculate rental duration in hours and aggregate revenue and duration by location
rental_duration_revenue_by_location_df = transactions.withColumn(
    "rental_duration_hours",
    (F.col("rental_end_time").cast("long") - F.col("rental_start_time").cast("long")) / 3600
).groupBy("pickup_location").agg(
    F.sum("total_amount").alias("total_revenue_by_location"),
    F.sum("rental_duration_hours").alias("total_rental_duration_by_location")
).join(locations, transactions.pickup_location == locations.location_id, "left").select(
    "pickup_location",
    "location_name",
    "total_revenue_by_location",
    "total_rental_duration_by_location")

# Display results
rental_duration_revenue_by_location_df.show(5)

+---------------+--------------------+-------------------------+---------------------------------+
|pickup_location|       location_name|total_revenue_by_location|total_rental_duration_by_location|
+---------------+--------------------+-------------------------+---------------------------------+
|           1959|           Lopez Inc|                  58090.0|                           2770.0|
|           8928|Phelps, Robinson ...|                  43470.0|                           2230.0|
|           1507|Smith, Alexander ...|                  57120.0|                           2897.0|
|           8932|Costa, Whitaker a...|                  53890.0|                           2318.0|
|           9182|         Montoya Inc|                  63900.0|                           2626.0|
+---------------+--------------------+-------------------------+---------------------------------+
only showing top 5 rows



In [39]:
# Join all KPIs on location (and vehicle_type where applicable)

location_kpi_df = revenue_per_location_df \
        .join(transactions_per_location_df, ["pickup_location", "location_name"], "left") \
        .join(transaction_amounts_df, ["pickup_location", "location_name"], "left") \
        .join(unique_vehicles_per_location_df, ["pickup_location", "location_name"], "left") \
        .join(rental_duration_revenue_by_location_df, ["pickup_location", "location_name"], "left")

In [40]:
location_kpi_df.show(10, truncate=False)

+---------------+-----------------------------+-------------+------------------+-----------------+---------------+---------------+---------------+-------------------------+---------------------------------+
|pickup_location|location_name                |total_revenue|total_transactions|avg_transaction  |max_transaction|min_transaction|unique_vehicles|total_revenue_by_location|total_rental_duration_by_location|
+---------------+-----------------------------+-------------+------------------+-----------------+---------------+---------------+---------------+-------------------------+---------------------------------+
|1959           |Lopez Inc                    |58090.0      |69                |841.8840579710145|3600.0         |60.0           |69             |58090.0                  |2770.0                           |
|8928           |Phelps, Robinson and Myers   |43470.0      |66                |658.6363636363636|3200.0         |30.0           |66             |43470.0                  |

## Job 2

In [27]:
# Convert rental_start_time to date for daily aggregations
transactions = transactions.withColumn("rental_date", F.date_format(F.col("rental_start_time"), "yyyy-MM-dd"))

In [28]:
# Compute Total Transactions per Day
transactions_per_day = transactions.groupBy("rental_date").agg(F.count("rental_id").alias("total_transactions"))
transactions_per_day.show(5)

+-----------+------------------+
|rental_date|total_transactions|
+-----------+------------------+
| 2024-01-19|               115|
| 2024-04-03|               124|
| 2024-02-28|               144|
| 2024-02-08|               135|
| 2024-01-13|               124|
+-----------+------------------+
only showing top 5 rows



In [29]:
# Compute Revenue per Day
revenue_per_day = transactions.groupBy("rental_date").agg(F.sum("total_amount").alias("total_revenue"))
revenue_per_day.show(5)

+-----------+-------------+
|rental_date|total_revenue|
+-----------+-------------+
| 2024-01-19|      93910.0|
| 2024-04-03|     101350.0|
| 2024-02-28|     104200.0|
| 2024-02-08|     106030.0|
| 2024-01-13|      96870.0|
+-----------+-------------+
only showing top 5 rows



In [32]:
transactions_and_revenue = transactions.groupBy("rental_date").agg(
    F.count("rental_id").alias("total_transactions"),
    F.sum("total_amount").alias("total_revenue")
)
transactions_and_revenue.count()

152

In [39]:
# Compute User-specific Spending and Rental Duration Metrics
user_metrics = transactions.groupBy("user_id").agg(
    F.sum("total_amount").alias("total_amoun_spent"),
    F.avg(F.col("rental_end_time").cast("long") - F.col("rental_start_time").cast("long")).alias("avg_rental_duration")
)
user_metrics.show(5)
user_metrics.count()

+----------+-----------------+-------------------+
|   user_id|total_amoun_spent|avg_rental_duration|
+----------+-----------------+-------------------+
|5144945047|           1640.0|           172800.0|
|0e672a709e|           1280.0|           144000.0|
|a39cfd7ab5|            510.0|           183600.0|
|010ad799a1|           1440.0|           259200.0|
|b31a334b34|           1470.0|           172800.0|
+----------+-----------------+-------------------+
only showing top 5 rows



10000

In [40]:
max_min_spending_per_user = transactions.groupBy("user_id").agg(
    F.max("total_amount").alias("max_spending"),
    F.min("total_amount").alias("min_spending")
)
max_min_spending_per_user.show(5)
max_min_spending_per_user.count()

+----------+------------+------------+
|   user_id|max_spending|min_spending|
+----------+------------+------------+
|5144945047|       690.0|       400.0|
|0e672a709e|       680.0|       600.0|
|a39cfd7ab5|       510.0|       510.0|
|010ad799a1|      1440.0|      1440.0|
|b31a334b34|      1020.0|       450.0|
+----------+------------+------------+
only showing top 5 rows



10000

In [42]:
# Compute Maximum and Minimum Transaction Amounts
transaction_amounts = transactions.agg(
    F.max("total_amount").alias("max_transaction_amount"),
    F.min("total_amount").alias("min_transaction_amount")
)
transaction_amounts.show(5)

+----------------------+----------------------+
|max_transaction_amount|min_transaction_amount|
+----------------------+----------------------+
|                3600.0|                  10.0|
+----------------------+----------------------+



In [43]:
daily_kpi_df = transactions_per_day \
    .join(revenue_per_day, "rental_date", "inner") \
    .join(transaction_amounts)

In [44]:
daily_kpi_df.show(5)
daily_kpi_df.count()

+-----------+------------------+-------------+----------------------+----------------------+
|rental_date|total_transactions|total_revenue|max_transaction_amount|min_transaction_amount|
+-----------+------------------+-------------+----------------------+----------------------+
| 2024-01-19|               115|      93910.0|                3600.0|                  10.0|
| 2024-04-03|               124|     101350.0|                3600.0|                  10.0|
| 2024-02-28|               144|     104200.0|                3600.0|                  10.0|
| 2024-02-08|               135|     106030.0|                3600.0|                  10.0|
| 2024-01-13|               124|      96870.0|                3600.0|                  10.0|
+-----------+------------------+-------------+----------------------+----------------------+
only showing top 5 rows



152