### Installing importing the necessary packages

In [None]:
# Install Pyspark
!pip install pyspark



In [53]:
# Importing packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType, DateType, TimestampType, BooleanType, DecimalType
from pyspark.sql.window import Window


### Initailizing a Spark Session

In [2]:
spark = SparkSession.builder.appName("Rental_Marketplace").getOrCreate()

### Loading the data

In [39]:
# Defining custom schemas for the data

# Apartments_schema
apartments_schema = StructType([
    StructField("id", LongType(), True),
    StructField("title", StringType(), True),
    StructField("source", StringType(), True),
    StructField("price", DecimalType(10, 2), True),
    StructField("currency", StringType(), True),
    StructField("listing_created_on", StringType(), True),
    StructField("is_active", BooleanType(), True),
    StructField("last_modified_timestamp", StringType(), True)
])

# Apartments_attributes_schema
apartments_attributes_schema = StructType([
    StructField("id", LongType(), True),
    StructField("category", StringType(), True),
    StructField("body", StringType(), True),
    StructField("amenities", StringType(), True),
    StructField("bathrooms", IntegerType(), True),
    StructField("bedrooms", IntegerType(), True),
    StructField("fee", DecimalType(10, 2), True),
    StructField("has_photo", BooleanType(), True),
    StructField("pets_allowed", BooleanType(), True),
    StructField("price_display", StringType(), True),
    StructField("price_type", StringType(), True),
    StructField("square_feet", DecimalType(10, 2), True),
    StructField("address", StringType(), True),
    StructField("cityname", StringType(), True),
    StructField("state", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True)
])

# user_viewing_schema
user_viewing_schema = StructType([
    StructField("user_id", LongType(), True),
    StructField("apartment_id", LongType(), True),
    StructField("viewed_at", StringType(), True),
    StructField("is_wishlisted", BooleanType(), True),
    StructField("call_to_action", StringType(), True)
])

# Bookings_schema
bookings_schema = StructType([
    StructField("booking_id", LongType(), True),
    StructField("user_id", LongType(), True),
    StructField("apartment_id", LongType(), True),
    StructField("booking_date", StringType(), True),
    StructField("checkin_date", StringType(), True),
    StructField("checkout_date", StringType(), True),
    StructField("total_price", DecimalType(10, 2), True),
    StructField("currency", StringType(), True),
    StructField("booking_status", StringType(), True)
])

In [40]:
# Load the apartments, apartment_attributes and user_viewings data
apartments = spark.read.csv("drive/MyDrive/Colab Notebooks/apartments-data/apartments.csv", schema=apartments_schema, header=True,)
apartment_attributes = spark.read.csv("drive/MyDrive/Colab Notebooks/apartments-data/apartment_attributes.csv", schema=apartments_attributes_schema, header=True,)
user_viewing = spark.read.csv("drive/MyDrive/Colab Notebooks/apartments-data/user_viewing.csv", schema=user_viewing_schema, header=True,)
bookings = spark.read.csv("drive/MyDrive/Colab Notebooks/apartments-data/bookings.csv", schema=bookings_schema, header=True,)

In [41]:
# Confirming the data schemas
print("Apartments Schema:")
apartments.printSchema()

print("\nApartment Attributes Schema:")
apartment_attributes.printSchema()

print("\nUser Viewings Schema:")
user_viewing.printSchema()

print("\nBookings Schema:")
bookings.printSchema()

Apartments Schema:
root
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- source: string (nullable = true)
 |-- price: decimal(10,2) (nullable = true)
 |-- currency: string (nullable = true)
 |-- listing_created_on: string (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- last_modified_timestamp: string (nullable = true)


Apartment Attributes Schema:
root
 |-- id: long (nullable = true)
 |-- category: string (nullable = true)
 |-- body: string (nullable = true)
 |-- amenities: string (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- fee: decimal(10,2) (nullable = true)
 |-- has_photo: boolean (nullable = true)
 |-- pets_allowed: boolean (nullable = true)
 |-- price_display: string (nullable = true)
 |-- price_type: string (nullable = true)
 |-- square_feet: decimal(10,2) (nullable = true)
 |-- address: string (nullable = true)
 |-- cityname: string (nullable = true)
 |-- state: string (nulla

In [42]:
# displaying sample data from the various dataset
print("Apartments Data:")
apartments.describe().show()
apartments.show(10)

print("\nApartment Attributes Data:")
apartment_attributes.describe().show()
apartment_attributes.show(10)

print("\nUser Viewings Data:")
user_viewing.describe().show()
user_viewing.show(10)

print("\nBookings Data:")
bookings.describe().show()
bookings.show(10)

Apartments Data:
+-------+-----------------+------------+------+------------------+--------+------------------+-----------------------+
|summary|               id|       title|source|             price|currency|listing_created_on|last_modified_timestamp|
+-------+-----------------+------------+------+------------------+--------+------------------+-----------------------+
|  count|           200000|      200000|200000|            200000|  200000|            200000|                 200000|
|   mean|         100000.5|        NULL|  NULL|       2995.724602|    NULL|              NULL|                   NULL|
| stddev|57735.17125634945|        NULL|  NULL|1157.8568826871642|    NULL|              NULL|                   NULL|
|    min|                1|Abbott Group|Airbnb|           1000.01|     EUR|        01/01/2020|             01/01/2020|
|    max|           200000| Zuniga-Wood|Zillow|           4999.94|     USD|        31/12/2024|             31/12/2024|
+-------+-----------------+----

### Raw Data

In [43]:
# Raw data to be populated into the redshift raw layer
raw_apartments_data = apartments
raw_apartment_attributes_data = apartment_attributes
raw_user_viewing_data = user_viewing
raw_bookings_data = bookings

### Cleaning the data

In [44]:
# Update currency to 'USD'
apartments = apartments.withColumn("currency", lit("USD"))
bookings = bookings.withColumn("currency", lit("USD"))

apartments.select("currency").show()
bookings.select("currency").show()

+--------+
|currency|
+--------+
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
+--------+
only showing top 20 rows

+--------+
|currency|
+--------+
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
|     USD|
+--------+
only showing top 20 rows



In [45]:
# Standardize Dates
apartments_date_columns = ['listing_created_on', 'last_modified_timestamp']
for col_name in apartments_date_columns:
    apartments = apartments.withColumn(col_name, to_date(col(col_name), 'dd/MM/yyyy'))

bookings_date_columns = ['booking_date', 'checkin_date', 'checkout_date']
for col_name in bookings_date_columns:
    bookings = bookings.withColumn(col_name, to_date(col(col_name), 'dd/MM/yyyy'))

user_viewing = user_viewing.withColumn("viewed_at", to_date(col("viewed_at"), 'dd/MM/yyyy'))

apartments.select(apartments_date_columns).show(5)
bookings.select(bookings_date_columns).show(5)
user_viewing.select("viewed_at").show(5)

+------------------+-----------------------+
|listing_created_on|last_modified_timestamp|
+------------------+-----------------------+
|        2024-03-27|             2021-12-27|
|        2023-07-23|             2023-02-10|
|        2024-04-15|             2020-01-02|
|        2020-10-27|             2023-06-23|
|        2021-02-01|             2022-07-25|
+------------------+-----------------------+
only showing top 5 rows

+------------+------------+-------------+
|booking_date|checkin_date|checkout_date|
+------------+------------+-------------+
|  2022-11-11|  2024-09-07|   2023-06-30|
|  2024-05-17|  2024-07-07|   2021-09-06|
|  2022-11-26|  2025-02-12|   2023-04-07|
|  2020-07-09|  2024-10-25|   2021-02-08|
|  2024-05-08|  2022-01-31|   2024-10-29|
+------------+------------+-------------+
only showing top 5 rows

+----------+
| viewed_at|
+----------+
|2023-01-23|
|2021-05-14|
|2023-06-27|
|2022-10-15|
|2020-02-06|
+----------+
only showing top 5 rows



In [46]:
# checking for duplicates
print("Apartments Duplicates:")
print(apartments.count()-apartments.distinct().count())

print("\nApartment Attributes Duplicates:")
print(apartment_attributes.count()-apartment_attributes.distinct().count())

print("\nUser Viewings Duplicates:")
print(user_viewing.count()-user_viewing.distinct().count())

print("\nBookings Duplicates:")
print(bookings.count()-bookings.distinct().count())

# droping duplicates
apartments = apartments.dropDuplicates()
apartment_attributes = apartment_attributes.dropDuplicates()
user_viewing = user_viewing.dropDuplicates()
bookings = bookings.dropDuplicates()

Apartments Duplicates:
0

Apartment Attributes Duplicates:
0

User Viewings Duplicates:
0

Bookings Duplicates:
0


In [47]:
# Checking for null values
print("Apartments Null Values:")
apartments.select([count(when(col(c).isNull(), c)).alias(c) for c in apartments.columns]).show()
print("\nApartment Attributes Null Values:")
apartment_attributes.select([count(when(col(c).isNull(), c)).alias(c) for c in apartment_attributes.columns]).show()
print("\nUser Viewings Null Values:")
user_viewing.select([count(when(col(c).isNull(), c)).alias(c) for c in user_viewing.columns]).show()
print("\nBookings Null Values:")
bookings.select([count(when(col(c).isNull(), c)).alias(c) for c in bookings.columns]).show()

Apartments Null Values:
+---+-----+------+-----+--------+------------------+---------+-----------------------+
| id|title|source|price|currency|listing_created_on|is_active|last_modified_timestamp|
+---+-----+------+-----+--------+------------------+---------+-----------------------+
|  0|    0|     0|    0|       0|                 0|        0|                      0|
+---+-----+------+-----+--------+------------------+---------+-----------------------+


Apartment Attributes Null Values:
+---+--------+----+---------+---------+--------+---+---------+------------+-------------+----------+-----------+-------+--------+-----+--------+---------+
| id|category|body|amenities|bathrooms|bedrooms|fee|has_photo|pets_allowed|price_display|price_type|square_feet|address|cityname|state|latitude|longitude|
+---+--------+----+---------+---------+--------+---+---------+------------+-------------+----------+-----------+-------+--------+-----+--------+---------+
|  0|       0|   0|        0|        0| 

In [50]:
# Combine apartments and apartments attributes data
curated_apartments = apartments.join(apartment_attributes, apartments.id == apartment_attributes.id).drop(apartment_attributes.id)

In [55]:
# Calculate stay duration
curated_bookings = bookings.withColumn(
    "stay_days",
    abs(datediff(
        col("checkout_date"),
        col("checkin_date")))
).withColumn(
    "booking_week",
    weekofyear(col("booking_date"))
)

In [52]:
# Displaying curated apartments and bookings
print("Curated Apartments Data:")
curated_apartments.describe().show()
curated_apartments.show(5)

print("\nCurated Bookings Data:")
curated_bookings.describe().show()
curated_bookings.show(5)

Curated Apartments Data:
+-------+-----------------+------------+------+----------------+--------+--------+--------------------+--------------------+------------------+------------------+----------------+-------------+----------+-----------------+--------------------+---------+-------+--------------------+-------------------+
|summary|               id|       title|source|           price|currency|category|                body|           amenities|         bathrooms|          bedrooms|             fee|price_display|price_type|      square_feet|             address| cityname|  state|            latitude|          longitude|
+-------+-----------------+------------+------+----------------+--------+--------+--------------------+--------------------+------------------+------------------+----------------+-------------+----------+-----------------+--------------------+---------+-------+--------------------+-------------------+
|  count|           200000|      200000|200000|          200000|  

In [68]:
# Average Listing price
# Filter active listings and calculate weekly averages
weekly_avg_price = curated_apartments.filter(col("is_active") == True) \
    .groupBy(
        year("listing_created_on").alias("year"),
        weekofyear("listing_created_on").alias("week")
    ) \
    .agg(
        avg("price").alias("average_price"),
        date_format(
            min("listing_created_on"),
            "yyyy-MM-dd"
        ).alias("week_start_date"),
        date_format(
            max("listing_created_on"),
            "yyyy-MM-dd"
        ).alias("week_end_date")
    )

weekly_avg_price = weekly_avg_price.select(
    "year",
    "week",
    "week_start_date",
    "week_end_date",
    "average_price",
).orderBy("year", "week")

# Show results
weekly_avg_price.show(truncate=False)

+----+----+---------------+-------------+-------------+
|year|week|week_start_date|week_end_date|average_price|
+----+----+---------------+-------------+-------------+
|2020|1   |2020-01-01     |2020-01-05   |3069.052797  |
|2020|2   |2020-01-06     |2020-01-12   |2978.171657  |
|2020|3   |2020-01-13     |2020-01-19   |2988.534457  |
|2020|4   |2020-01-20     |2020-01-26   |3162.005817  |
|2020|5   |2020-01-27     |2020-02-02   |2881.686546  |
|2020|6   |2020-02-03     |2020-02-09   |3138.746108  |
|2020|7   |2020-02-10     |2020-02-16   |3067.635195  |
|2020|8   |2020-02-17     |2020-02-23   |3007.358688  |
|2020|9   |2020-02-24     |2020-03-01   |2952.400923  |
|2020|10  |2020-03-02     |2020-03-08   |2970.938320  |
|2020|11  |2020-03-09     |2020-03-15   |3009.121108  |
|2020|12  |2020-03-16     |2020-03-22   |3008.349499  |
|2020|13  |2020-03-23     |2020-03-29   |3064.337552  |
|2020|14  |2020-03-30     |2020-04-05   |3027.744523  |
|2020|15  |2020-04-06     |2020-04-12   |3058.73

In [69]:
# Occupancy Rate
# Step 1: Get active listings count per month
# (Assuming listings stay active until marked inactive)
active_listings_monthly = curated_apartments.filter(col("is_active") == True) \
    .select("id") \
    .crossJoin(
        spark.sql("SELECT explode(sequence(to_date('2020-01-01'), to_date('2025-12-31'), interval 1 month)) as month")
    ) \
    .groupBy(year("month").alias("year"), month("month").alias("month")) \
    .agg(count("id").alias("active_listings"))

# Step 2: Calculate booked nights per month
confirmed_bookings = curated_bookings.filter(col("booking_status") == "confirmed")

bookings_with_nights = confirmed_bookings.withColumn(
    "nights_booked",
    datediff(col("checkout_date"), col("checkin_date"))
)

# Create a DataFrame with all months needed
all_months = spark.sql("""
    SELECT
        year(date) as year,
        month(date) as month,
        day(last_day(date)) as days_in_month
    FROM (
        SELECT explode(sequence(
            to_date('2020-01-01'),
            to_date('2025-12-31'),
            interval 1 month
        )) as date
    )
""")

# Step 3: Join and calculate metrics
monthly_occupancy = all_months.join(
    active_listings_monthly,
    ["year", "month"],
    "left"
).fillna(0, subset=["active_listings"]).join(
    bookings_with_nights.groupBy(
        year("checkin_date").alias("year"),
        month("checkin_date").alias("month")
    ).agg(
        sum("nights_booked").alias("total_booked_nights"),
        count("booking_id").alias("total_bookings")
    ),
    ["year", "month"],
    "left"
).fillna(0, subset=["total_booked_nights", "total_bookings"])

# Calculate final metrics
monthly_occupancy = monthly_occupancy.withColumn(
    "available_nights",
    col("active_listings") * col("days_in_month")
).withColumn(
    "occupancy_rate",
    round((col("total_booked_nights") / col("available_nights")) * 100, 2)
).select(
    "year",
    "month",
    "active_listings",
    "days_in_month",
    "available_nights",
    "total_bookings",
    "total_booked_nights",
    "occupancy_rate"
).orderBy("year", "month")

# Show results
monthly_occupancy.show(truncate=False)

+----+-----+---------------+-------------+----------------+--------------+-------------------+--------------+
|year|month|active_listings|days_in_month|available_nights|total_bookings|total_booked_nights|occupancy_rate|
+----+-----+---------------+-------------+----------------+--------------+-------------------+--------------+
|2020|1    |100203         |31           |3106293         |2679          |2497828            |80.41         |
|2020|2    |100203         |29           |2905887         |2548          |2337624            |80.44         |
|2020|3    |100203         |31           |3106293         |2721          |2429867            |78.22         |
|2020|4    |100203         |30           |3006090         |2673          |2261830            |75.24         |
|2020|5    |100203         |31           |3106293         |2775          |2280921            |73.43         |
|2020|6    |100203         |30           |3006090         |2687          |2082654            |69.28         |
|2020|7   

In [71]:
# Most Popular Locations
window_spec = Window.partitionBy("year", "week").orderBy(col("booking_count").desc())
popular_locations = curated_bookings.filter(col('booking_status') == 'confirmed') \
    .join(curated_apartments, curated_bookings.apartment_id == curated_apartments.id) \
    .groupBy(
        year(col('booking_date')).alias('year'),
        weekofyear(col('booking_date')).alias('week'),
        col('cityname')
    ).agg(count('*').alias('booking_count')) \
    .orderBy("year","week",col('booking_count').desc())

popular_locations = popular_locations.withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") <= 5)

popular_locations.show()

+----+----+------------+-------------+----+
|year|week|    cityname|booking_count|rank|
+----+----+------------+-------------+----+
|2020|   1|     Phoenix|           52|   1|
|2020|   1|   San Diego|           50|   2|
|2020|   1|      Dallas|           44|   3|
|2020|   1|    New York|           41|   4|
|2020|   1| Los Angeles|           40|   5|
|2020|   2|     Houston|           73|   1|
|2020|   2|      Austin|           70|   2|
|2020|   2|Philadelphia|           62|   3|
|2020|   2|    New York|           62|   4|
|2020|   2|     Chicago|           60|   5|
|2020|   3|      Dallas|           80|   1|
|2020|   3| San Antonio|           78|   2|
|2020|   3|     Houston|           67|   3|
|2020|   3|     Chicago|           67|   4|
|2020|   3|Philadelphia|           65|   5|
|2020|   4|   San Diego|           71|   1|
|2020|   4|      Dallas|           69|   2|
|2020|   4|    New York|           66|   3|
|2020|   4|     Houston|           66|   4|
|2020|   4|     Chicago|        

In [74]:
# Top Performing Listings: Track properties with the highest confirmed revenue per week.

# Step 1: Filter confirmed bookings and join with apartment data
confirmed_bookings_with_details = curated_bookings.filter(col('booking_status') == 'confirmed') \
    .join(curated_apartments,
          curated_bookings.apartment_id == curated_apartments.id,
          'inner')

# Step 2: Calculate weekly revenue per property
weekly_revenue = confirmed_bookings_with_details.groupBy(
    year(col('booking_date')).alias('year'),
    weekofyear(col('booking_date')).alias('week'),
    col('id'),  # apartment_id
    col('title'),
    col('cityname')
).agg(
    sum('total_price').alias('week_revenue')
)

# Step 3: Rank properties by revenue within each week
window_spec = Window.partitionBy("year", "week").orderBy(desc("week_revenue"))

top_performing_listings = weekly_revenue \
    .withColumn("rank", rank().over(window_spec)) \
    .filter(col("rank") <= 5) \
    .orderBy("year", "week", "rank")

# Show results
top_performing_listings.show(truncate=False)


+----+----+------+-----------------------------+------------+------------+----+
|year|week|id    |title                        |cityname    |week_revenue|rank|
+----+----+------+-----------------------------+------------+------------+----+
|2020|1   |77201 |Payne LLC                    |Houston     |4975.47     |1   |
|2020|1   |88382 |Brown Ltd                    |San Antonio |4946.55     |2   |
|2020|1   |148834|Yang-Williams                |Austin      |4933.35     |3   |
|2020|1   |26783 |Coffey, Harris and Spears    |Phoenix     |4922.19     |4   |
|2020|1   |122554|Fields PLC                   |Phoenix     |4912.87     |5   |
|2020|2   |117340|Robinson-Cole                |Chicago     |4989.89     |1   |
|2020|2   |105057|Delacruz PLC                 |Los Angeles |4986.52     |2   |
|2020|2   |117630|Johnson, Monroe and Cervantes|Los Angeles |4970.92     |3   |
|2020|2   |194248|Peterson, Richards and Miller|San Diego   |4967.31     |4   |
|2020|2   |182042|Beck, Hobbs and Boyd  

In [75]:
user_weekly_bookings = curated_bookings.filter(col('booking_status') == 'confirmed') \
    .groupBy(
        year('booking_date').alias('year'),
        weekofyear('booking_date').alias('week'),
        'user_id'
    ) \
    .agg(count('booking_id').alias('total_bookings')) \
    .orderBy('year', 'week', col('total_bookings').desc())

# Show top users each week
user_weekly_bookings.show()

+----+----+-------+--------------+
|year|week|user_id|total_bookings|
+----+----+-------+--------------+
|2020|   1|   7578|             3|
|2020|   1|   6417|             2|
|2020|   1|   4386|             2|
|2020|   1|   3102|             2|
|2020|   1|   4954|             2|
|2020|   1|   3299|             2|
|2020|   1|   9447|             2|
|2020|   1|   5609|             2|
|2020|   1|   9848|             2|
|2020|   1|   8641|             1|
|2020|   1|   6211|             1|
|2020|   1|   4474|             1|
|2020|   1|   6314|             1|
|2020|   1|   1524|             1|
|2020|   1|   7684|             1|
|2020|   1|   1257|             1|
|2020|   1|   2019|             1|
|2020|   1|   1591|             1|
|2020|   1|   1954|             1|
|2020|   1|   9259|             1|
+----+----+-------+--------------+
only showing top 20 rows



In [76]:
booking_duration = curated_bookings.filter(col('booking_status') == 'confirmed') \
    .withColumn('duration_nights', datediff('checkout_date', 'checkin_date')) \
    .groupBy(
        year('checkin_date').alias('year'),
        weekofyear('checkin_date').alias('week')  # Change to month() for monthly
    ) \
    .agg(avg('duration_nights').alias('avg_booking_duration')) \
    .orderBy('year', 'week')

booking_duration.show()

+----+----+--------------------+
|year|week|avg_booking_duration|
+----+----+--------------------+
|2020|   1|   912.7377777777778|
|2020|   2|              973.05|
|2020|   3|   950.0348432055749|
|2020|   4|   912.2760330578512|
|2020|   5|   919.2632398753894|
|2020|   6|   949.5647840531561|
|2020|   7|   919.5730337078652|
|2020|   8|   893.3169934640523|
|2020|   9|   899.7744610281924|
|2020|  10|   885.7065573770492|
|2020|  11|   885.6378737541528|
|2020|  12|   916.5257048092869|
|2020|  13|   865.2751572327044|
|2020|  14|   894.4098101265823|
|2020|  15|   865.4759535655058|
|2020|  16|    827.879282218597|
|2020|  17|    826.092879256966|
|2020|  18|   840.1052631578947|
|2020|  19|   852.9112627986348|
|2020|  20|   802.4167962674961|
+----+----+--------------------+
only showing top 20 rows



In [77]:
# Step 1: Identify users with >1 booking in 30 days
repeat_users = curated_bookings.filter(col('booking_status') == 'confirmed') \
    .groupBy(
        'user_id',
        window('booking_date', '30 days').alias('30_day_window')
    ) \
    .agg(count('booking_id').alias('booking_count')) \
    .filter(col('booking_count') > 1)

# Step 2: Calculate rate
total_users = curated_bookings.filter(col('booking_status') == 'confirmed') \
    .agg(countDistinct('user_id').alias('total_users')) \
    .first()['total_users']

repeat_rate = (repeat_users.select(countDistinct('user_id')).first()[0] / total_users) * 100
print(f"Repeat Customer Rate: {repeat_rate:.2f}%")

Repeat Customer Rate: 84.00%


In [83]:
monthly_repeats = (
    curated_bookings
    .filter(col('booking_status') == 'confirmed')
    .groupBy(
        year('booking_date').alias('year'),
        month('booking_date').alias('month'),
        'user_id'
    )
    .agg(count('*').alias('bookings'))
    .groupBy('year', 'month')
    .agg(
        count('user_id').alias('total_users'),
        sum(when(col('bookings') > 1, 1).otherwise(0)).alias('repeat_users'),
        round(sum(when(col('bookings') > 1, 1).otherwise(0)) / count('user_id') * 100, 2).alias('repeat_rate')
    )
    .orderBy('year', 'month')
)

monthly_repeats.show()

+----+-----+-----------+------------+-----------+
|year|month|total_users|repeat_users|repeat_rate|
+----+-----+-----------+------------+-----------+
|2020|    1|       2414|         316|      13.09|
|2020|    2|       2240|         285|      12.72|
|2020|    3|       2359|         322|      13.65|
|2020|    4|       2306|         293|      12.71|
|2020|    5|       2334|         291|      12.47|
|2020|    6|       2318|         279|      12.04|
|2020|    7|       2358|         307|      13.02|
|2020|    8|       2290|         290|      12.66|
|2020|    9|       2270|         300|      13.22|
|2020|   10|       2336|         302|      12.93|
|2020|   11|       2232|         292|      13.08|
|2020|   12|       2302|         278|      12.08|
|2021|    1|       2392|         324|      13.55|
|2021|    2|       2114|         243|      11.49|
|2021|    3|       2356|         319|      13.54|
|2021|    4|       2203|         288|      13.07|
|2021|    5|       2378|         287|      12.07|
