In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [4]:
spark = SparkSession.builder.appName('EMR').getOrCreate()

## Schema Definintion

In [34]:
# Schema definition for the datasets

# users schema
users_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone_number", StringType(), True),
    StructField("driver_license_number", StringType(), True),
    StructField("driver_license_expiry", DateType(),True),
    StructField("creation_date", DateType(),True),
    StructField("is_active", IntegerType(),True),
])

# vehicles schema
vehicles_schema = StructType([
    StructField("active", IntegerType(), True),
    StructField("vehicle_license_number", IntegerType(), True),
    StructField("registration_name", StringType(), True),
    StructField("license_type", StringType(), True),
    StructField("expiration_date", StringType(), True),
    StructField("permit_license_number", StringType(), True),
    StructField("certification_date", StringType(), True),
    StructField("vehicle_year", IntegerType(), True),
    StructField("base_telephone_number", StringType(), True),
    StructField("base_address", StringType(), True),
    StructField("vehicle_id", StringType(), True),
    StructField("last_update_timestamp", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("vehicle_type", StringType(), True),
])

# locations schema
locations_schema = StructType([
    StructField("location_id", StringType(), True),
    StructField("location_name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
])

# rental locations schema
rental_transactions_schema = StructType([
    StructField("rental_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("vehicle_id", StringType(), True),
    StructField("rental_start_time", StringType(), True),
    StructField("rental_end_time", StringType(), True),
    StructField("pickup_location", IntegerType(), True),
    StructField("dropoff_location", IntegerType(), True),
    StructField("total_amount", DoubleType(), True)
])

## Loading Data

In [35]:
# Loading the data
users_df = spark.read.csv("/content/users.csv", header=True, schema=users_schema)
vehicles_df = spark.read.csv("/content/vehicles.csv", header=True, schema=vehicles_schema)
locations_df = spark.read.csv("/content/locations.csv", header=True, schema=locations_schema)
rental_txn_df = spark.read.csv("/content/rental_transactions.csv", header=True, schema=rental_transactions_schema)

In [36]:
def inspect_data(df):
  # print schema with head
  df.printSchema()
  df.show(5)

In [37]:
inspect_data(users_df)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- driver_license_number: string (nullable = true)
 |-- driver_license_expiry: date (nullable = true)
 |-- creation_date: date (nullable = true)
 |-- is_active: integer (nullable = true)

+----------+----------+---------+--------------------+------------------+---------------------+---------------------+-------------+---------+
|   user_id|first_name|last_name|               email|      phone_number|driver_license_number|driver_license_expiry|creation_date|is_active|
+----------+----------+---------+--------------------+------------------+---------------------+---------------------+-------------+---------+
|26d08ab733|      Lisa|   Parker|lisa.parker@gmail...|334.271.2972x60554|             MO028963|           2033-06-21|   2024-05-26|        1|
|0a0430e6f9|  Courtney|   Martin|c

In [38]:
inspect_data(vehicles_df)

root
 |-- active: integer (nullable = true)
 |-- vehicle_license_number: integer (nullable = true)
 |-- registration_name: string (nullable = true)
 |-- license_type: string (nullable = true)
 |-- expiration_date: string (nullable = true)
 |-- permit_license_number: string (nullable = true)
 |-- certification_date: string (nullable = true)
 |-- vehicle_year: integer (nullable = true)
 |-- base_telephone_number: string (nullable = true)
 |-- base_address: string (nullable = true)
 |-- vehicle_id: string (nullable = true)
 |-- last_update_timestamp: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- vehicle_type: string (nullable = true)

+------+----------------------+--------------------+----------------+---------------+---------------------+------------------+------------+---------------------+--------------------+----------+---------------------+---------+------------+
|active|vehicle_license_number|   registration_name|    license_type|expiration_date|permit_license_

In [28]:
inspect_data(locations_df)

root
 |-- location_id: string (nullable = true)
 |-- location_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)

+-----------+--------------------+--------------------+---------+-----+--------+----------+----------+
|location_id|       location_name|             address|     city|state|zip_code|  latitude| longitude|
+-----------+--------------------+--------------------+---------+-----+--------+----------+----------+
|       2702|Jackson, Velazque...|3140 Heath Radial...|  Modesto|   CA|   94540|  86.25802| -169.2448|
|       4380|            Bean LLC|51144 Patrick Isl...|  Fontana|   CA|   92188|-74.455894| -42.27988|
|       7709|     Gilbert-Simmons|    4738 Lewis Locks|Roseville|   CA|   91032| -65.43093| -64.76349|
|       8607|    Coleman-Robinson|  324 Robin Causeway|  

In [29]:
inspect_data(rental_txn_df)

root
 |-- rental_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- vehicle_id: string (nullable = true)
 |-- rental_start_time: string (nullable = true)
 |-- rental_end_time: string (nullable = true)
 |-- pickup_location: integer (nullable = true)
 |-- dropoff_location: integer (nullable = true)
 |-- total_amount: double (nullable = true)

+----------+----------+----------+-------------------+-------------------+---------------+----------------+------------+
| rental_id|   user_id|vehicle_id|  rental_start_time|    rental_end_time|pickup_location|dropoff_location|total_amount|
+----------+----------+----------+-------------------+-------------------+---------------+----------------+------------+
|b139d8e1b2|320be8068b|0d52304987|2024-02-28 08:05:00|2024-03-01 05:05:00|           1497|            6785|       450.0|
|7afd60f6d3|320be8068b|975d72985c|2024-01-07 20:16:00|2024-01-09 21:16:00|           5345|            2608|      2450.0|
|733a9361bc|8f31b734a6|0d9f0f0

In [31]:
# Checking for null values
print("Null values in users data:")
users_df.select([count(when(col(c).isNull(), c)).alias(c) for c in users_df.columns]).show()
print("Null values in vehicles data:")
vehicles_df.select([count(when(col(c).isNull(), c)).alias(c) for c in vehicles_df.columns]).show()
print("Null values in locations data:")
locations_df.select([count(when(col(c).isNull(), c)).alias(c) for c in locations_df.columns]).show()
print("Null values in rental transactions data:")
rental_txn_df.select([count(when(col(c).isNull(), c)).alias(c) for c in rental_txn_df.columns]).show()

Null values in users data:
+-------+----------+---------+-----+------------+---------------------+---------------------+-------------+---------+
|user_id|first_name|last_name|email|phone_number|driver_license_number|driver_license_expiry|creation_date|is_active|
+-------+----------+---------+-----+------------+---------------------+---------------------+-------------+---------+
|      0|         0|        0|    0|           0|                    0|                    0|            0|        0|
+-------+----------+---------+-----+------------+---------------------+---------------------+-------------+---------+

Null values in vehicles data:
+------+----------------------+-----------------+------------+---------------+---------------------+------------------+------------+---------------------+------------+----------+---------------------+-----+------------+
|active|vehicle_license_number|registration_name|license_type|expiration_date|permit_license_number|certification_date|vehicle_year|

In [33]:
# print rows that contain null values in vehicles_df in any of them columns
vehicles_df.where(col("vehicle_license_number").isNull()).show()

+------+----------------------+--------------------+----------------+---------------+---------------------+------------------+------------+---------------------+--------------------+----------+---------------------+-------------+------------+
|active|vehicle_license_number|   registration_name|    license_type|expiration_date|permit_license_number|certification_date|vehicle_year|base_telephone_number|        base_address|vehicle_id|last_update_timestamp|        brand|vehicle_type|
+------+----------------------+--------------------+----------------+---------------+---------------------+------------------+------------+---------------------+--------------------+----------+---------------------+-------------+------------+
|     1|                  NULL|ROYAL,MONGER,TRAN...|FOR HIRE VEHICLE|     01-05-2025|             AQG7ONJG|        2018-05-20|        2018|        (646)780-0129|1515 THIRD STREET...|b88a97d40f|  04-06-2024 13:25:00|    Chevrolet|       basic|
|     1|                  NU

In [39]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg, max, min, countDistinct, to_timestamp, datediff, hour, date_format, when, round
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Data Processing
# Convert string timestamps to timestamp type and calculate rental duration in hours
rental_txn_df = rental_txn_df.withColumn("rental_start_time", to_timestamp(col("rental_start_time"), "yyyy-MM-dd HH:mm:ss")) \
                            .withColumn("rental_end_time", to_timestamp(col("rental_end_time"), "yyyy-MM-dd HH:mm:ss"))

rental_txn_df = rental_txn_df.withColumn(
    "rental_duration_hours",
    (col("rental_end_time").cast("long") - col("rental_start_time").cast("long")) / 3600
)

# Convert location IDs to string for joining
rental_txn_df = rental_txn_df.withColumn("pickup_location", col("pickup_location").cast("string")) \
                            .withColumn("dropoff_location", col("dropoff_location").cast("string"))

# Extract date from rental_start_time for daily metrics
rental_txn_df = rental_txn_df.withColumn("rental_date", date_format(col("rental_start_time"), "yyyy-MM-dd"))

# Join with vehicles to get vehicle type information
rental_vehicle_df = rental_txn_df.join(vehicles_df, on="vehicle_id", how="left")

# 1. LOCATION AND VEHICLE PERFORMANCE KPIs

# Total revenue per location (using pickup location)
total_revenue_per_location = rental_txn_df.groupBy("pickup_location") \
    .agg(sum("total_amount").alias("total_revenue"),
         count("rental_id").alias("total_transactions"),
         avg("total_amount").alias("avg_transaction_amount"),
         max("total_amount").alias("max_transaction_amount"),
         min("total_amount").alias("min_transaction_amount"),
         countDistinct("vehicle_id").alias("unique_vehicles"))

# Join with location names
total_revenue_per_location = total_revenue_per_location.join(
    locations_df,
    total_revenue_per_location.pickup_location == locations_df.location_id,
    "left"
).select(
    col("pickup_location"),
    col("location_name"),
    col("total_revenue"),
    col("total_transactions"),
    col("avg_transaction_amount"),
    col("max_transaction_amount"),
    col("min_transaction_amount"),
    col("unique_vehicles")
)

# Rental duration metrics by vehicle type
rental_duration_by_vehicle_type = rental_vehicle_df.groupBy("vehicle_type") \
    .agg(avg("rental_duration_hours").alias("avg_rental_duration"),
         max("rental_duration_hours").alias("max_rental_duration"),
         min("rental_duration_hours").alias("min_rental_duration"),
         sum("total_amount").alias("total_revenue"),
         count("rental_id").alias("total_rentals"))

# 2. USER AND TRANSACTION METRICS KPIs

# Total daily transactions and revenue
daily_transactions = rental_txn_df.groupBy("rental_date") \
    .agg(count("rental_id").alias("total_transactions"),
         sum("total_amount").alias("total_revenue"))

# Overall average transaction value
avg_transaction_value = rental_txn_df.agg(avg("total_amount").alias("avg_transaction_value"))

# User engagement metrics
user_metrics = rental_txn_df.groupBy("user_id") \
    .agg(count("rental_id").alias("total_transactions"),
         sum("total_amount").alias("total_revenue"),
         avg("total_amount").alias("avg_spending"),
         max("total_amount").alias("max_spending"),
         min("total_amount").alias("min_spending"),
         sum("rental_duration_hours").alias("total_rental_hours"))

# Join with user information
user_metrics = user_metrics.join(users_df, on="user_id", how="left") \
    .select("user_id", "first_name", "last_name", "email",
            "total_transactions", "total_revenue", "avg_spending",
            "max_spending", "min_spending", "total_rental_hours")

# Show the results
print("Location and Vehicle Performance:")
total_revenue_per_location.show(truncate=False)
rental_duration_by_vehicle_type.show(truncate=False)

print("\nUser and Transaction Metrics:")
daily_transactions.show(truncate=False)
avg_transaction_value.show(truncate=False)
user_metrics.show(truncate=False)

# Write results to Parquet format (if required)
# total_revenue_per_location.write.mode("overwrite").parquet("s3://your-bucket/output/total_revenue_per_location")
# rental_duration_by_vehicle_type.write.mode("overwrite").parquet("s3://your-bucket/output/rental_duration_by_vehicle_type")
# daily_transactions.write.mode("overwrite").parquet("s3://your-bucket/output/daily_transactions")
# avg_transaction_value.write.mode("overwrite").parquet("s3://your-bucket/output/avg_transaction_value")
# user_metrics.write.mode("overwrite").parquet("s3://your-bucket/output/user_metrics")

# Additional Analysis: Top revenue-generating locations
top_revenue_locations = total_revenue_per_location.orderBy(col("total_revenue").desc())

# Most rented vehicle types
most_rented_vehicle_types = rental_vehicle_df.groupBy("vehicle_type") \
    .agg(count("rental_id").alias("total_rentals")) \
    .orderBy(col("total_rentals").desc())

# Top spending users
top_spending_users = user_metrics.orderBy(col("total_revenue").desc())

print("\nTop Revenue Generating Locations:")
top_revenue_locations.show(5, truncate=False)

print("\nMost Rented Vehicle Types:")
most_rented_vehicle_types.show(truncate=False)

print("\nTop Spending Users:")
top_spending_users.show(5, truncate=False)

Location and Vehicle Performance:
+---------------+------------------------------+-------------+------------------+----------------------+----------------------+----------------------+---------------+
|pickup_location|location_name                 |total_revenue|total_transactions|avg_transaction_amount|max_transaction_amount|min_transaction_amount|unique_vehicles|
+---------------+------------------------------+-------------+------------------+----------------------+----------------------+----------------------+---------------+
|2136           |Grant-Miller                  |57330.0      |61                |939.8360655737705     |3450.0                |40.0                  |61             |
|2294           |Kelly, Gonzales and Stevens   |32180.0      |52                |618.8461538461538     |2600.0                |10.0                  |52             |
|5149           |Salazar, Powell and Wilson    |46990.0      |70                |671.2857142857143     |3450.0                |50.0