## Installing importing the necessary packages

In [1]:
# Install Pyspark
!pip install pyspark



In [25]:
# Importing packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType, DateType, TimestampType, BooleanType, DecimalType, DoubleType
from pyspark.sql.window import Window

### Initailizing a spark session

In [26]:
spark = SparkSession.builder.appName("car_rental_marketplace").getOrCreate()

### Loading dataset

In [27]:
# Schema definition for the datasets

# users schema
users_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone_number", StringType(), True),
    StructField("driver_license_number", StringType(), True),
    StructField("driver_license_expiry", StringType(),True),
    StructField("creation_date", StringType(),True),
    StructField("is_active", IntegerType(),True),
])

# vehicles schema
vehicles_schema = StructType([
    StructField("active", IntegerType(), True),
    StructField("vehicle_license_number", IntegerType(), True),
    StructField("registration_name", StringType(), True),
    StructField("license_type", StringType(), True),
    StructField("expiration_date", StringType(), True),
    StructField("permit_license_number", StringType(), True),
    StructField("certification_date", StringType(), True),
    StructField("vehicle_year", IntegerType(), True),
    StructField("base_telephone_number", StringType(), True),
    StructField("base_address", StringType(), True),
    StructField("vehicle_id", StringType(), True),
    StructField("last_update_timestamp", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("vehicle_type", StringType(), True),
])

# locations schema
locations_schema = StructType([
    StructField("location_id", StringType(), True),
    StructField("location_name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
])

# rental locations schema
rental_transactions_schema = StructType([
    StructField("rental_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("vehicle_id", StringType(), True),
    StructField("rental_start_time", StringType(), True),
    StructField("rental_end_time", StringType(), True),
    StructField("pickup_location", IntegerType(), True),
    StructField("dropoff_location", IntegerType(), True),
    StructField("total_amount", DoubleType(), True)
])

In [28]:
# Loading the data
users_df = spark.read.csv("drive/MyDrive/Colab Notebooks/car_rental_data/users.csv", header=True, schema=users_schema)
vehicles_df = spark.read.csv("drive/MyDrive/Colab Notebooks/car_rental_data/vehicles.csv", header=True, schema=vehicles_schema)
locations_df = spark.read.csv("drive/MyDrive/Colab Notebooks/car_rental_data/locations.csv", header=True, schema=locations_schema)
rental_transactions_df = spark.read.csv("drive/MyDrive/Colab Notebooks/car_rental_data/rental_transactions.csv", header=True, schema=rental_transactions_schema)


In [29]:
# Confirming the schemas

users_df.printSchema()
vehicles_df.printSchema()
locations_df.printSchema()
rental_transactions_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- driver_license_number: string (nullable = true)
 |-- driver_license_expiry: string (nullable = true)
 |-- creation_date: string (nullable = true)
 |-- is_active: integer (nullable = true)

root
 |-- active: integer (nullable = true)
 |-- vehicle_license_number: integer (nullable = true)
 |-- registration_name: string (nullable = true)
 |-- license_type: string (nullable = true)
 |-- expiration_date: string (nullable = true)
 |-- permit_license_number: string (nullable = true)
 |-- certification_date: string (nullable = true)
 |-- vehicle_year: integer (nullable = true)
 |-- base_telephone_number: string (nullable = true)
 |-- base_address: string (nullable = true)
 |-- vehicle_id: string (nullable = true)
 |-- last_update_timestamp: string (nullable = true)
 |-- brand: string 

In [30]:
# Inspecting the data

print("Users data:")
users_df.describe().show()
users_df.show(5)

print("Vehicles data:")
vehicles_df.describe().show()
vehicles_df.show(5)

print("Locations data:")
locations_df.describe().show()
locations_df.show(5)

print("Rental transactions data:")
rental_transactions_df.describe().show()
rental_transactions_df.show(5)

Users data:
+-------+----------+----------+---------+--------------------+-------------------+---------------------+---------------------+-------------+-------------------+
|summary|   user_id|first_name|last_name|               email|       phone_number|driver_license_number|driver_license_expiry|creation_date|          is_active|
+-------+----------+----------+---------+--------------------+-------------------+---------------------+---------------------+-------------+-------------------+
|  count|     30000|     30000|    30000|               30000|              30000|                30000|                30000|        30000|              30000|
|   mean|  Infinity|      NULL|     NULL|                NULL|5.103159923375977E9|                 NULL|                 NULL|         NULL|             0.8004|
| stddev|       NaN|      NULL|     NULL|                NULL|2.849777308098635E9|                 NULL|                 NULL|         NULL|0.39970634909332775|
|    min|0001795396|  

### Cleaning the data

In [31]:
# Checking for null values
print("Null values in users data:")
users_df.select([count(when(col(c).isNull(), c)).alias(c) for c in users_df.columns]).show()
print("Null values in vehicles data:")
vehicles_df.select([count(when(col(c).isNull(), c)).alias(c) for c in vehicles_df.columns]).show()
print("Null values in locations data:")
locations_df.select([count(when(col(c).isNull(), c)).alias(c) for c in locations_df.columns]).show()
print("Null values in rental transactions data:")
rental_transactions_df.select([count(when(col(c).isNull(), c)).alias(c) for c in rental_transactions_df.columns]).show()

Null values in users data:
+-------+----------+---------+-----+------------+---------------------+---------------------+-------------+---------+
|user_id|first_name|last_name|email|phone_number|driver_license_number|driver_license_expiry|creation_date|is_active|
+-------+----------+---------+-----+------------+---------------------+---------------------+-------------+---------+
|      0|         0|        0|    0|           0|                    0|                    0|            0|        0|
+-------+----------+---------+-----+------------+---------------------+---------------------+-------------+---------+

Null values in vehicles data:
+------+----------------------+-----------------+------------+---------------+---------------------+------------------+------------+---------------------+------------+----------+---------------------+-----+------------+
|active|vehicle_license_number|registration_name|license_type|expiration_date|permit_license_number|certification_date|vehicle_year|

In [34]:
# Converting date strings to proper date formats and timestamp strings to timestamps
users_df = users_df.withColumn("driver_license_expiry", to_date(col("driver_license_expiry"), "yyyy-MM-dd"))
users_df = users_df.withColumn("creation_date", to_date(col("creation_date"), "yyyy-MM-dd"))

vehicles_df = vehicles_df.withColumn("expiration_date", to_date(col("expiration_date"), "dd-MM-yyyy"))
vehicles_df = vehicles_df.withColumn("certification_date", to_date(col("certification_date"), "yyyy-MM-dd"))
vehicles_df = vehicles_df.withColumn("last_update_timestamp", to_timestamp(col("last_update_timestamp"), "dd-MM-yyyy HH:mm:ss"))

rental_transactions_df = rental_transactions_df.withColumn("rental_start_time", to_timestamp(col("rental_start_time"), "yyyy-MM-dd HH:mm:ss"))
rental_transactions_df = rental_transactions_df.withColumn("rental_end_time", to_timestamp(col("rental_end_time"), "yyyy-MM-dd HH:mm:ss"))

In [35]:
users_df.printSchema()
users_df.show(5)
vehicles_df.printSchema()
vehicles_df.show(5)
locations_df.printSchema()
locations_df.show(5)
rental_transactions_df.printSchema()
rental_transactions_df.show(5)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- driver_license_number: string (nullable = true)
 |-- driver_license_expiry: date (nullable = true)
 |-- creation_date: date (nullable = true)
 |-- is_active: integer (nullable = true)

+----------+----------+---------+--------------------+------------------+---------------------+---------------------+-------------+---------+
|   user_id|first_name|last_name|               email|      phone_number|driver_license_number|driver_license_expiry|creation_date|is_active|
+----------+----------+---------+--------------------+------------------+---------------------+---------------------+-------------+---------+
|26d08ab733|      Lisa|   Parker|lisa.parker@gmail...|334.271.2972x60554|             MO028963|           2033-06-21|   2024-05-26|        1|
|0a0430e6f9|  Courtney|   Martin|c

In [36]:
# Joining rental_transactions, vehicles and locations
joined_df = rental_transactions_df.join(vehicles_df, rental_transactions_df.vehicle_id == vehicles_df.vehicle_id, "left").drop(vehicles_df.vehicle_id)
joined_df = joined_df.join(locations_df, joined_df.pickup_location == locations_df.location_id, "left")

In [37]:
joined_df.show(5)

+----------+----------+----------+-------------------+-------------------+---------------+----------------+------------+------+----------------------+--------------------+----------------+---------------+---------------------+------------------+------------+---------------------+--------------------+---------------------+-------+------------+-----------+--------------------+--------------------+--------------+-----+--------+----------+----------+
| rental_id|   user_id|vehicle_id|  rental_start_time|    rental_end_time|pickup_location|dropoff_location|total_amount|active|vehicle_license_number|   registration_name|    license_type|expiration_date|permit_license_number|certification_date|vehicle_year|base_telephone_number|        base_address|last_update_timestamp|  brand|vehicle_type|location_id|       location_name|             address|          city|state|zip_code|  latitude| longitude|
+----------+----------+----------+-------------------+-------------------+---------------+--------

In [38]:
# Calculating the rental duration in hours
joined_df = joined_df.withColumn(
    "rental_duration_hours",
    (
        unix_timestamp(to_timestamp("rental_end_time", "yyyy-MM-dd HH:mm:ss")) -
        unix_timestamp(to_timestamp("rental_start_time", "yyyy-MM-dd HH:mm:ss"))
    ) / 3600
)

In [39]:
joined_df.select(["rental_duration_hours", "pickup_location"]).show(5)

+---------------------+---------------+
|rental_duration_hours|pickup_location|
+---------------------+---------------+
|                 45.0|           1497|
|                 49.0|           5345|
|                  8.0|           2546|
|                 41.0|           8147|
|                 68.0|           6290|
+---------------------+---------------+
only showing top 5 rows



### Location and Vehicle Performance:

In [53]:
# Location Performance Metrics
location_metrics = joined_df.groupBy("pickup_location", "location_name", "city", "state") \
    .agg(
        sum("total_amount").alias("total_revenue"),
        count("rental_id").alias("total_transactions"),
        round(avg("total_amount"),2).alias("avg_transaction_amount"),
        max("total_amount").alias("max_transaction_amount"),
        min("total_amount").alias("min_transaction_amount"),
        countDistinct("vehicle_id").alias("unique_vehicles_used")
    )

In [64]:
# Vehicle Type Performance Metrics
vehicle_type_metrics = joined_df.groupBy("vehicle_type") \
    .agg(
        count("rental_id").alias("rental_count"),
        sum("total_amount").alias("total_revenue"),
        round(avg("total_amount"),2).alias("avg_revenue"),
        sum("rental_duration_hours").alias("total_rental_hours"),
        round(avg("rental_duration_hours"),2).alias("avg_rental_duration_hours")
    )

### User and Transaction Metrics

In [65]:
# Join transactions with users
user_transactions = rental_transactions_df.join(
    users_df,
    rental_transactions_df.user_id == users_df.user_id,
    "left"
).drop(users_df.user_id)

# Extract date from rental_start_time
user_transactions_with_date = user_transactions.withColumn(
    "rental_date",
    to_date(to_timestamp("rental_start_time", "yyyy-MM-dd HH:mm:ss"))
)

In [66]:
# Daily Transaction Metrics
daily_metrics = user_transactions_with_date.groupBy("rental_date") \
    .agg(
        count("rental_id").alias("total_daily_transactions"),
        sum("total_amount").alias("total_daily_revenue"),
        round(avg("total_amount"),2).alias("avg_daily_transaction_value")
    )

# Calculate rental duration in hours for user metrics
user_transactions_with_duration = user_transactions_with_date.withColumn(
    "rental_duration_hours",
    (
        unix_timestamp(to_timestamp("rental_end_time", "yyyy-MM-dd HH:mm:ss")) -
        unix_timestamp(to_timestamp("rental_start_time", "yyyy-MM-dd HH:mm:ss"))
    ) / 3600
)

# User Engagement Metrics
user_metrics = user_transactions_with_duration.groupBy("user_id", "first_name", "last_name") \
    .agg(
        count("rental_id").alias("total_user_transactions"),
        sum("total_amount").alias("total_user_spending"),
        round(avg("total_amount"),2).alias("avg_user_transaction_value"),
        max("total_amount").alias("max_user_transaction"),
        min("total_amount").alias("min_user_transaction"),
        sum("rental_duration_hours").alias("total_user_rental_hours")
    )

In [67]:
# Displaying metrics
print("Location Performance Metrics:")
location_metrics.show()

print("\nVehicle Type Performance Metrics:")
vehicle_type_metrics.show()

print("\nDaily Transaction Metrics:")
daily_metrics.show()

print("\nUser Engagement Metrics:")
user_metrics.show()

Location Performance Metrics:
+---------------+--------------------+-------------+-----+-------------+------------------+----------------------+----------------------+----------------------+--------------------+
|pickup_location|       location_name|         city|state|total_revenue|total_transactions|avg_transaction_amount|max_transaction_amount|min_transaction_amount|unique_vehicles_used|
+---------------+--------------------+-------------+-----+-------------+------------------+----------------------+----------------------+----------------------+--------------------+
|           1463|Pham, Nicholson a...|      Oakland|   CA|      52280.0|                68|                768.82|                3400.0|                  10.0|                  68|
|           6920|            Lane Inc|    Elk Grove|   CA|      46940.0|                66|                711.21|                3600.0|                  20.0|                  66|
|           9297|Holland, Adams an...|     Glendale|   CA|  