In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf

In [4]:
spark = SparkSession.builder.appName("RentalMarketPlace").getOrCreate()  

In [None]:
# Initialize SparkSession to interface with Spark
# spark = SparkSession.builder.master("local").appName("GettingStarted").getOrCreate()  # Create or retrieve a SparkSession with the specified configurations

In [5]:
spark

In [7]:
#read the data from the csv file
df_attributes = spark.read.csv("../data_source/apartment_attributes.csv", header=True, inferSchema=True)
df_apartments = spark.read.csv("../data_source/apartments.csv", header=True, inferSchema=True)
df_users = spark.read.csv("../data_source/user_viewing.csv", header=True, inferSchema=True)

In [8]:
df_attributes.show()

+---+---------+--------------------+--------------------+---------+--------+------+---------+------------+-------------+----------+-----------+--------------------+------------+------------+----------+-----------+
| id| category|                body|           amenities|bathrooms|bedrooms|   fee|has_photo|pets_allowed|price_display|price_type|square_feet|             address|    cityname|       state|  latitude|  longitude|
+---+---------+--------------------+--------------------+---------+--------+------+---------+------------+-------------+----------+-----------+--------------------+------------+------------+----------+-----------+
|  1|     2BHK|Happy product mod...|Balcony, Air Cond...|        1|       1|168.84|     true|       false|     $1588.42|   Monthly|       1463|92525 Holt Turnpi...|    New York|  California| 20.457092|    0.46622|
|  2|Penthouse|Technology past m...|Garden, Pet-frien...|        2|       2|202.99|    false|        true|     $3585.32|    Yearly|        722|8

In [12]:
df_apartments.printSchema()

root
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- source: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- currency: string (nullable = true)
 |-- listing_created_on: timestamp (nullable = true)
 |-- is_active: integer (nullable = true)
 |-- last_modified_timestamp: timestamp (nullable = true)



In [13]:
df_users.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- apartment_id: long (nullable = true)
 |-- viewed_at: timestamp (nullable = true)
 |-- is_wishlisted: string (nullable = true)
 |-- call_to_action: string (nullable = true)



In [14]:
df_attributes.printSchema()

root
 |-- id: long (nullable = true)
 |-- category: string (nullable = true)
 |-- body: string (nullable = true)
 |-- amenities: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- fee: string (nullable = true)
 |-- has_photo: string (nullable = true)
 |-- pets_allowed: string (nullable = true)
 |-- price_display: string (nullable = true)
 |-- price_type: string (nullable = true)
 |-- square_feet: string (nullable = true)
 |-- address: string (nullable = true)
 |-- cityname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [15]:
df_apartments.describe().show()

+-------+-------------------+--------------------+-----------+------------------+--------+-------------------+
|summary|                 id|               title|     source|             price|currency|          is_active|
+-------+-------------------+--------------------+-----------+------------------+--------+-------------------+
|  count|              10000|               10000|      10000|             10000|   10000|              10000|
|   mean|  5.6233956528752E9|                NULL|       NULL|         1486.2775|    NULL|             0.8304|
| stddev|7.021025204484247E7|                NULL|       NULL|1076.5079675665083|    NULL|0.37529978016580134|
|    min|         5508654087|$1,010 / Two BR -...| GoSection8|               200|     USD|                  0|
|    max|         5668662559|wood Apartments f...|tenantcloud|             52500|     USD|                  1|
+-------+-------------------+--------------------+-----------+------------------+--------+-------------------+



In [16]:
df_users.describe().show()

+-------+------------------+-------------------+-------------+--------------+
|summary|           user_id|       apartment_id|is_wishlisted|call_to_action|
+-------+------------------+-------------------+-------------+--------------+
|  count|              4999|               4999|         4999|          4999|
|   mean|2510.2942588517703|5.624324336230446E9|         NULL|          NULL|
| stddev|1420.3478927655854|6.977098255417298E7|         NULL|          NULL|
|    min|                 1|         5508654149|            n| contact_agent|
|    max|              5000|         5668662559|            y|   shortlisted|
+-------+------------------+-------------------+-------------+--------------+



In [17]:
df_attributes.describe().show()

+-------+-------------------+--------------------+--------------------+-----------+--------------------+--------------------+------------------+------------------+------------------+--------------------+------------------+-----------------+------------------+------------------+--------------------+-----------------+------------------+
|summary|                 id|            category|                body|  amenities|           bathrooms|            bedrooms|               fee|         has_photo|      pets_allowed|       price_display|        price_type|      square_feet|           address|          cityname|               state|         latitude|         longitude|
+-------+-------------------+--------------------+--------------------+-----------+--------------------+--------------------+------------------+------------------+------------------+--------------------+------------------+-----------------+------------------+------------------+--------------------+-----------------+---------

In [18]:
# join df_apartments and df_attributes
df = df_apartments.join(df_attributes, on='id', how='inner')

In [19]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- source: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- currency: string (nullable = true)
 |-- listing_created_on: timestamp (nullable = true)
 |-- is_active: integer (nullable = true)
 |-- last_modified_timestamp: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- body: string (nullable = true)
 |-- amenities: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- fee: string (nullable = true)
 |-- has_photo: string (nullable = true)
 |-- pets_allowed: string (nullable = true)
 |-- price_display: string (nullable = true)
 |-- price_type: string (nullable = true)
 |-- square_feet: string (nullable = true)
 |-- address: string (nullable = true)
 |-- cityname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [20]:
df.show(5)

+----------+--------------------+---------+-----+--------+-------------------+---------+-----------------------+--------------------+--------------------+--------------------+---------+--------+----+---------+------------+-------------+----------+-----------+----------------+----------+-----+--------+---------+
|        id|               title|   source|price|currency| listing_created_on|is_active|last_modified_timestamp|            category|                body|           amenities|bathrooms|bedrooms| fee|has_photo|pets_allowed|price_display|price_type|square_feet|         address|  cityname|state|latitude|longitude|
+----------+--------------------+---------+-----+--------+-------------------+---------+-----------------------+--------------------+--------------------+--------------------+---------+--------+----+---------+------------+-------------+----------+-----------+----------------+----------+-----+--------+---------+
|5668626895|Studio apartment ...|RentLingo|  790|     USD|202

In [23]:
df.describe("fee").show()

+-------+------------------+
|summary|               fee|
+-------+------------------+
|  count|                81|
|   mean|               1.7|
| stddev|0.8232726023485646|
|    min|            325-$1|
|    max|  Pool,Wood Floors|
+-------+------------------+



In [24]:
df.describe("pets_allowed").show()

+-------+------------------+
|summary|      pets_allowed|
+-------+------------------+
|  count|              8283|
|   mean|               1.3|
| stddev|0.4830458915396479|
|    min|          348 - $1|
|    max|               Yes|
+-------+------------------+



In [32]:
from pyspark.sql.functions import to_date, avg

avg_listing_price = df.withColumn("date", to_date("listing_created_on")) \
    .groupBy("date") \
    .agg(avg("price").alias("average_listing_price"))

avg_listing_price.show()

+----------+---------------------+
|      date|average_listing_price|
+----------+---------------------+
|2024-05-16|   1520.2005952380953|
|2024-05-17|   1473.5705049261085|
|2024-05-18|   1464.8419811320755|
+----------+---------------------+



In [33]:
from pyspark.sql.functions import countDistinct

# daily occupied apartments
occupied = df_users.withColumn("date", to_date("viewed_at")) \
    .groupBy("date") \
    .agg(countDistinct("apartment_id").alias("occupied_listings"))

# daily active listings
active_listings = df.withColumn("date", to_date("listing_created_on")) \
    .filter("is_active = 1") \
    .groupBy("date") \
    .agg(countDistinct("id").alias("total_listings"))

occupancy_rate = occupied.join(active_listings, "date", "inner") \
    .withColumn("occupancy_rate", (occupied["occupied_listings"] / active_listings["total_listings"]) * 100)
occupancy_rate.show()

+----------+-----------------+--------------+------------------+
|      date|occupied_listings|total_listings|    occupancy_rate|
+----------+-----------------+--------------+------------------+
|2024-05-16|               75|          2789|2.6891358910003587|
|2024-05-17|              246|          2672| 9.206586826347305|
|2024-05-18|              361|          2843|12.697854379176926|
+----------+-----------------+--------------+------------------+



In [34]:
from pyspark.sql.functions import count

popular_locations = df_users.join(df, df_users.apartment_id == df.id) \
    .withColumn("date", to_date("viewed_at")) \
    .groupBy("date", "cityname") \
    .agg(count("*").alias("view_count")) \
    .orderBy("date", "view_count", ascending=[True, False])
popular_locations.show()

+----------+----------------+----------+
|      date|        cityname|view_count|
+----------+----------------+----------+
|2024-05-16|          Austin|         8|
|2024-05-16|     Sioux Falls|         3|
|2024-05-16|      Cincinnati|         2|
|2024-05-16|          Dallas|         2|
|2024-05-16|         Trenton|         2|
|2024-05-16|         Madison|         2|
|2024-05-16|         Houston|         2|
|2024-05-16|        Portland|         2|
|2024-05-16|   Lawrenceville|         2|
|2024-05-16|           Omaha|         2|
|2024-05-16|     Kansas City|         2|
|2024-05-16|     Los Angeles|         2|
|2024-05-16|          Tucson|         1|
|2024-05-16|     East Orange|         1|
|2024-05-16|Colorado Springs|         1|
|2024-05-16|    Santa Monica|         1|
|2024-05-16|     San Antonio|         1|
|2024-05-16|          Spring|         1|
|2024-05-16|        Stoneham|         1|
|2024-05-16|     Winter Park|         1|
+----------+----------------+----------+
only showing top

In [35]:
top_performing_listings = df_users.join(df, df_users.apartment_id == df.id) \
    .withColumn("date", to_date("viewed_at")) \
    .groupBy("date", "apartment_id", "price") \
    .agg(count("*").alias("booking_count")) \
    .withColumn("revenue", (col("price") * col("booking_count"))) \
    .orderBy("date", "revenue", ascending=[True, False])
top_performing_listings.show()

+----------+------------+-----+-------------+-------+
|      date|apartment_id|price|booking_count|revenue|
+----------+------------+-----+-------------+-------+
|2024-05-16|  5509110411| 3565|            1|   3565|
|2024-05-16|  5509047257| 2695|            1|   2695|
|2024-05-16|  5508837746| 2315|            1|   2315|
|2024-05-16|  5664576894| 2295|            1|   2295|
|2024-05-16|  5508978709| 2285|            1|   2285|
|2024-05-16|  5509167461| 2200|            1|   2200|
|2024-05-16|  5668640444| 2195|            1|   2195|
|2024-05-16|  5668611158| 2135|            1|   2135|
|2024-05-16|  5668630433| 2035|            1|   2035|
|2024-05-16|  5508951401| 1905|            1|   1905|
|2024-05-16|  5668613019| 1878|            1|   1878|
|2024-05-16|  5642367760| 1800|            1|   1800|
|2024-05-16|  5664583681| 1800|            1|   1800|
|2024-05-16|  5508955455| 1795|            1|   1795|
|2024-05-16|  5668616274| 1650|            1|   1650|
|2024-05-16|  5668630385| 15

In [36]:
user_bookings = df_users.withColumn("date", to_date("viewed_at")) \
    .groupBy("date", "user_id") \
    .agg(count("*").alias("total_bookings"))
user_bookings.show()

+----------+-------+--------------+
|      date|user_id|total_bookings|
+----------+-------+--------------+
|2024-05-16|    258|             1|
|2024-05-20|   1018|             1|
|2024-05-17|   1463|             1|
|2024-05-21|   3995|             1|
|2024-05-24|   3666|             1|
|2024-05-20|   2663|             1|
|2024-05-17|   1337|             1|
|2024-05-28|   3044|             1|
|2024-05-24|    879|             1|
|2024-05-20|   1645|             1|
|2024-05-19|   4366|             1|
|2024-05-21|   4295|             1|
|2024-05-26|   1054|             1|
|2024-05-22|    879|             1|
|2024-05-23|   1662|             1|
|2024-05-24|   1150|             1|
|2024-05-19|   3682|             1|
|2024-05-25|   1236|             1|
|2024-05-22|   3524|             1|
|2024-05-20|   3376|             1|
+----------+-------+--------------+
only showing top 20 rows



In [37]:
from pyspark.sql.functions import count as count_, countDistinct as count_distinct

user_repeat_counts = df_users.withColumn("date", to_date("viewed_at")) \
    .groupBy("date", "user_id") \
    .agg(count_("apartment_id").alias("num_bookings"))

repeat_customers = user_repeat_counts.filter("num_bookings > 1") \
    .groupBy("date") \
    .agg(count_distinct("user_id").alias("repeat_users"))

total_users = df_users.withColumn("date", to_date("viewed_at")) \
    .groupBy("date") \
    .agg(count_distinct("user_id").alias("total_users"))

repeat_customer_rate = repeat_customers.join(total_users, "date") \
    .withColumn("repeat_customer_rate", (col("repeat_users") / col("total_users")) * 100)
repeat_customer_rate.show()

+----------+------------+-----------+--------------------+
|      date|repeat_users|total_users|repeat_customer_rate|
+----------+------------+-----------+--------------------+
|2024-05-25|          21|        456|   4.605263157894736|
|2024-05-19|          23|        441|   5.215419501133787|
|2024-05-29|           1|         71|  1.4084507042253522|
|2024-05-23|          14|        395|  3.5443037974683547|
|2024-05-21|          18|        409|   4.400977995110025|
|2024-05-16|           1|         74|  1.3513513513513513|
|2024-05-20|          20|        442|   4.524886877828054|
|2024-05-26|          19|        447|   4.250559284116331|
|2024-05-22|          20|        455|   4.395604395604396|
|2024-05-17|           8|        241|   3.319502074688797|
|2024-05-24|          13|        408|  3.1862745098039214|
|2024-05-27|          17|        374|   4.545454545454546|
|2024-05-28|           7|        228|   3.070175438596491|
|2024-05-18|          18|        352|   5.11363636363636