In [1]:
!wget https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar -O gcs-connector-hadoop3-latest.jar

--2025-06-26 12:22:58--  https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar
Resolving storage.googleapis.com (storage.googleapis.com)... 108.177.121.207, 209.85.145.207, 142.250.152.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|108.177.121.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 40713341 (39M) [application/java-archive]
Saving to: ‘gcs-connector-hadoop3-latest.jar’


2025-06-26 12:22:58 (116 MB/s) - ‘gcs-connector-hadoop3-latest.jar’ saved [40713341/40713341]



In [2]:
# cloud import
from pyspark.sql import SparkSession
database="rental_apartment_app"

In [3]:
spark = (SparkSession.builder
         .master('local[*]')
         .appName('PySpark GCS Colab')
         .config('spark.jars', '/content/gcs-connector-hadoop3-latest.jar')
         .config('spark.driver.extraClassPath', '/content/gcs-connector-hadoop3-latest.jar')
         .config('spark.executor.extraClassPath', '/content/gcs-connector-hadoop3-latest.jar')
         .config('spark.hadoop.fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
         .config('spark.hadoop.google.cloud.auth.service.account.enable', 'true')
         .config('spark.hadoop.google.cloud.auth.service.account.json.keyfile', '/content/service_account_gcp.json')
         .getOrCreate())

In [4]:
from google.colab import userdata
bucket_name = userdata.get('bucket_name')

In [5]:
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, TimestampType, DateType, NumericType, BooleanType
from pyspark.sql.window import Window


In [6]:
user_view = pd.read_csv("/content/user_viewings.csv")

In [7]:
# define schema
schema_userview = 'user_id STRING, apartment_id STRING, viewed_at STRING, is_wishlisted STRING, call_to_action STRING'
schema_apart = 'id STRING, title STRING, source STRING, price INT, currency STRING, listing_created_on TIMESTAMP, is_active TINYINT, last_modified_timestamp TIMESTAMP'
schema_att = 'id STRING, category STRING, body STRING, amenities STRING, bathrooms FLOAT, bedrooms FLOAT, fee FLOAT, has_photo STRING, pets_allowed STRING, price_display STRING, price_type STRING, square_feet INT, address STRING, cityname STRING, state STRING, latitude DOUBLE, longitude DOUBLE'

In [8]:
df_userview = spark.createDataFrame(user_view, schema=schema_userview)
df_userview.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- apartment_id: string (nullable = true)
 |-- viewed_at: string (nullable = true)
 |-- is_wishlisted: string (nullable = true)
 |-- call_to_action: string (nullable = true)



In [9]:
df_apart = spark.read.json(f'gs://{bucket_name}/{database}/apartments/apartments_batch*.json', schema=schema_apart)
df_att = spark.read.json(f'gs://{bucket_name}/{database}/apartment_attributes/apartment_attributes_batch*.json', schema=schema_att)

In [10]:
df_apart = df_apart.cache()
df_att = df_att.cache()
df_userview = df_userview.cache()

In [11]:
df_apart.printSchema(), df_att.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- source: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- currency: string (nullable = true)
 |-- listing_created_on: timestamp (nullable = true)
 |-- is_active: byte (nullable = true)
 |-- last_modified_timestamp: timestamp (nullable = true)

root
 |-- id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- body: string (nullable = true)
 |-- amenities: string (nullable = true)
 |-- bathrooms: float (nullable = true)
 |-- bedrooms: float (nullable = true)
 |-- fee: float (nullable = true)
 |-- has_photo: string (nullable = true)
 |-- pets_allowed: string (nullable = true)
 |-- price_display: string (nullable = true)
 |-- price_type: string (nullable = true)
 |-- square_feet: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- cityname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: 

(None, None)

### Functions

In [12]:
def check_missing(df):
    df.select([count(when(col(c).isNull(), 1)).alias(c) for c in df.columns]).show()

In [13]:
from pyspark.sql.functions import col, to_timestamp, when

def check_handle_timestamp_pyspark(df, timestamp_cols):
    # List of supported formats: regex + parsing format
    formats = [
        {
            "regex": r'^\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01]) (0\d|1\d|2[0-3]):([0-5]\d):([0-5]\d)$',
            "format": "yyyy-MM-dd HH:mm:ss"
        },
        {
            "regex": r'^(0[1-9]|[12]\d|3[01])-(0[1-9]|1[0-2])-\d{4} (0\d|1\d|2[0-3]):([0-5]\d):([0-5]\d)$',
            "format": "dd-MM-yyyy HH:mm:ss"
        },
        {
            "regex": r'^\d{4}/(0[1-9]|1[0-2])/(0[1-9]|[12]\d|3[01]) (0\d|1\d|2[0-3]):([0-5]\d):([0-5]\d)$',
            "format": "yyyy/MM/dd HH:mm:ss"
        },
        {
            "regex": r'^(0[1-9]|[12]\d|3[01])/(0[1-9]|1[0-2])/\d{4} (0\d|1\d|2[0-3]):([0-5]\d):([0-5]\d)$',
            "format": "dd/MM/yyyy HH:mm:ss"
        },
        {
            "regex": r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$',
            "format": "yyyy-MM-dd'T'HH:mm:ss"
        }
    ]

    for col_name in timestamp_cols:
        print(f"Checking timestamp formats in column {col_name}...")

        expr = None
        for f in formats:
            regex = f["regex"]
            fmt = f["format"]

            if expr is None:
                expr = when(col(col_name).rlike(regex), to_timestamp(col(col_name), fmt))
            else:
                expr = expr.when(col(col_name).rlike(regex), to_timestamp(col(col_name), fmt))

        # Fallback jika tidak cocok format apapun
        expr = expr.otherwise(None)

        # Buat kolom baru yang sudah diparse
        df = df.withColumn(col_name, expr)

        # Cek apakah masih ada data yang tidak bisa diparse (jadi null)
        invalid_count = df.filter(col(col_name).isNull()).count()

        if invalid_count > 0:
            print(f"[WARNING] Found {invalid_count} invalid timestamp(s) in column {col_name}.")
        else:
            print(f"[INFO] All values in column {col_name} are in valid timestamp format.")

    return df

In [14]:
def check_handling_nan_pyspark(df, name):
    # Check if there are any nulls in the DataFrame
    any_nan = df.select([col(c).isNull().alias(c) for c in df.columns]).agg(
        *[sum(col(c).cast("int")).alias(c) for c in df.columns]
    ).toPandas().sum(axis=1)[0] > 0

    if any_nan:
        for field in df.schema.fields:
            col_name = field.name
            col_type = field.dataType

            total_count = df.count()
            null_count = df.filter(col(col_name).isNull()).count()

            if null_count == total_count:
                print(f"Column {col_name} in df {name} is fully null -> Dropping column...")
                df = df.drop(col_name)
                continue  # Langsung lanjut ke kolom berikutnya

            if null_count > 0:
                if isinstance(col_type, BooleanType):
                    print(f"Missing values on df {name}, column {col_name} (BooleanType) -> No action taken.")
                    continue  # Skip handling for boolean columns

                print(f"Handle missing values on df {name}, column {col_name}")

                if isinstance(col_type, StringType):
                    df = df.withColumn(col_name, when(col(col_name).isNull(), lit('Unknown')).otherwise(col(col_name)))
                elif isinstance(col_type, TimestampType):
                    df = df.withColumn(col_name, when(col(col_name).isNull(), lit('9999-12-31 23:59:59')).otherwise(col(col_name)))
                elif isinstance(col_type, DateType):
                    df = df.withColumn(col_name, when(col(col_name).isNull(), lit('9999-12-31')).otherwise(col(col_name)))
                elif isinstance(col_type, NumericType):
                    df = df.withColumn(col_name, when(col(col_name).isNull(), lit(0)).otherwise(col(col_name)))
                else:
                    # Fallback untuk tipe yang tidak terdefinisi
                    df = df.withColumn(col_name, when(col(col_name).isNull(), lit('Unknown')).otherwise(col(col_name)))
            else:
                print(f"No missing values on df {name}, column {col_name}")
    else:
        print(f"No missing values found in any columns of df {name}")

    return df

In [15]:
def check_handling_duplicate_pyspark(df, id_column, name):
    # Hitung jumlah total rows dan jumlah unique rows berdasarkan id_column
    total_count = df.count()
    distinct_count = df.select(id_column).distinct().count()

    if total_count > distinct_count:
        print(f"Found duplicate on {name}, drop duplicate...")
        df = df.dropDuplicates([id_column])
    else:
        print(f"No duplicate on {name}")

    return df

### Explore & Cleaning

#### df_userview

In [16]:
# validate format data type
timestamp_cols = [
    "viewed_at"
]

df_userview = check_handle_timestamp_pyspark(df_userview, timestamp_cols)

Checking timestamp formats in column viewed_at...
[INFO] All values in column viewed_at are in valid timestamp format.


In [17]:
df_userview = df_userview.withColumn("is_wishlisted", col('is_wishlisted').cast('boolean'))

In [18]:
df_userview = check_handling_nan_pyspark(df_userview, "User Viewings")

No missing values found in any columns of df User Viewings


In [19]:
df_userview = check_handling_duplicate_pyspark(df_userview, ["user_id","apartment_id"], "User Viewings")

No duplicate on User Viewings


#### df_apart

In [20]:
# validate format data type
timestamp_cols = [
    "listing_created_on",
    "last_modified_timestamp"
]

df_apart = check_handle_timestamp_pyspark(df_apart, timestamp_cols)

Checking timestamp formats in column listing_created_on...
[INFO] All values in column listing_created_on are in valid timestamp format.
Checking timestamp formats in column last_modified_timestamp...
[INFO] All values in column last_modified_timestamp are in valid timestamp format.


In [21]:
df_apart = df_apart.withColumn("is_active", col('is_active').cast('boolean'))

In [22]:
df_apart = check_handling_nan_pyspark(df_apart, "Apartment")

No missing values found in any columns of df Apartment


In [23]:
df_apart = check_handling_duplicate_pyspark(df_apart, ["id"], "Apartment")

No duplicate on Apartment


#### df_att

In [24]:
df_att = check_handling_nan_pyspark(df_att, "Apartment Attributes")

No missing values on df Apartment Attributes, column id
No missing values on df Apartment Attributes, column category
No missing values on df Apartment Attributes, column body
No missing values on df Apartment Attributes, column amenities
No missing values on df Apartment Attributes, column bathrooms
No missing values on df Apartment Attributes, column bedrooms
Column fee in df Apartment Attributes is fully null -> Dropping column...
No missing values on df Apartment Attributes, column has_photo
Handle missing values on df Apartment Attributes, column pets_allowed
No missing values on df Apartment Attributes, column price_display
No missing values on df Apartment Attributes, column price_type
No missing values on df Apartment Attributes, column square_feet
No missing values on df Apartment Attributes, column address
Handle missing values on df Apartment Attributes, column cityname
Handle missing values on df Apartment Attributes, column state
Handle missing values on df Apartment Attri

In [25]:
df_att = check_handling_duplicate_pyspark(df_att, ["id"], "Apartment Attributes")

No duplicate on Apartment Attributes


### Fact & Dim

In [26]:
df_apart.unpersist()
df_att.unpersist()
df_userview.unpersist()

DataFrame[user_id: string, apartment_id: string, viewed_at: timestamp, is_wishlisted: boolean, call_to_action: string]

In [27]:
fact_userview = df_userview.select("*")
dim_apart = df_apart.select("*")
dim_att = df_att.select("*")

### Join

In [28]:
df_apart_att = dim_apart.join(dim_att, "id", 'left')
df_full = fact_userview.join(df_apart_att, fact_userview.apartment_id == df_apart_att.id, 'left')
df_full = df_full.cache()

### Apartment Performance KPI

Informasi apartemen + user engagement (views, wishlists, contact agent)

In [29]:
user_engagement_per_apart = fact_userview.groupBy("apartment_id").agg(
    count("*").alias("total_views"),
    count(when(col("is_wishlisted") == True, 1)).alias("total_wishlists"),
    count(when(col("call_to_action") == "contact_agent", 1)).alias("total_contact_agent")
)
df_apartment_perf = df_apart_att.join(user_engagement_per_apart, df_apart_att.id == user_engagement_per_apart.apartment_id, 'left').drop("apartment_id")

In [30]:
df_apartment_perf = df_apartment_perf.select(
    col("id").alias("apartment_id"),
    col("title"),
    col("category"),
    col("cityname"),
    col("state"),
    col("source"),
    col("price"),
    col("currency"),
    col("is_active"),
    col("listing_created_on"),
    col("last_modified_timestamp"),
    col("total_views"),
    col("total_wishlists"),
    col("total_contact_agent")
)

In [31]:
df_apartment_perf.show(5)

+------------+--------------------+--------------------+-----------+-----+------------+-----+--------+---------+-------------------+-----------------------+-----------+---------------+-------------------+
|apartment_id|               title|            category|   cityname|state|      source|price|currency|is_active| listing_created_on|last_modified_timestamp|total_views|total_wishlists|total_contact_agent|
+------------+--------------------+--------------------+-----------+-----+------------+-----+--------+---------+-------------------+-----------------------+-----------+---------------+-------------------+
|  5508868255|In addition to un...|housing/rent/apar...|     Denver|   CO|RentDigs.com| 1599|     USD|     true|2024-05-16 23:23:15|    2024-05-19 16:07:53|       NULL|           NULL|               NULL|
|  5509239849|Grapevine Luxurio...|housing/rent/apar...|  Grapevine|   TX|RentDigs.com| 1450|     USD|     true|2024-05-18 02:28:56|    2024-05-18 02:28:56|          1|            

### Hour KPI Summary

In [32]:
df_full = df_full.withColumn("view_hour", hour(col("viewed_at")))

In [33]:
df_full = df_full.withColumn("view_day", dayofmonth(col("viewed_at")))
df_full = df_full.withColumn("view_month", month(col("viewed_at")))
df_full = df_full.withColumn("view_year", year(col("viewed_at")))

In [34]:
df_hour_kpi = df_full.groupBy("view_hour").agg(
    count("*").alias("total_views"),
    count(when(col("is_wishlisted") == True, 1)).alias("total_wishlists"),
    count(when(col("call_to_action") == "contact_agent", 1)).alias("total_contact_agent")
)

In [35]:
agg_views = df_full.groupBy("view_hour", "source").agg(
    count("*").alias("total_views")
).orderBy("view_hour", "total_views", ascending=[True, False])

window_spec = Window.partitionBy("view_hour").orderBy(col("total_views").desc())
ranked_views = agg_views.withColumn("row_num", row_number().over(window_spec))
most_viewed_platform_per_hour = ranked_views.filter(col("row_num") == 1).drop("row_num")

In [36]:
df_hour_kpi = df_hour_kpi.join(most_viewed_platform_per_hour.select("view_hour","source"), on="view_hour", how="left")
df_hour_kpi = df_hour_kpi.withColumnRenamed("source", "most_viewed_platform")

In [37]:
agg_apart_views = df_full.groupBy("view_hour", "apartment_id").agg(
    count("*").alias("total_views")
).orderBy("view_hour", "total_views", ascending=[True, False])

In [38]:
top3_apart_views_per_hour = agg_apart_views.withColumn("rank", rank().over(window_spec)).withColumn("row_num", row_number().over(window_spec)).filter(col("row_num") <= 3)

In [39]:
top3_apart_views_per_hour = top3_apart_views_per_hour.groupBy("view_hour").agg(concat_ws(", ", collect_list("apartment_id")).alias("top_apartments"))

In [40]:
df_hour_kpi = df_hour_kpi.join(top3_apart_views_per_hour, on="view_hour", how="left")

In [41]:
engagement_rate_per_hour = df_full.groupBy("view_hour").agg(
    round((sum(when(col("call_to_action") == "contact_agent", 1)) / count("*") * 100),2).alias("engagement_rate")
)

In [42]:
df_hour_kpi = df_hour_kpi.join(engagement_rate_per_hour, on="view_hour", how="left")

In [43]:
df_hour_kpi.orderBy("view_hour").show(30)

+---------+-----------+---------------+-------------------+--------------------+--------------------+---------------+
|view_hour|total_views|total_wishlists|total_contact_agent|most_viewed_platform|      top_apartments|engagement_rate|
+---------+-----------+---------------+-------------------+--------------------+--------------------+---------------+
|        0|        200|             97|                 66|           RentLingo|5509190861, 56598...|           33.0|
|        1|        202|            109|                 64|           RentLingo|5668615387, 56645...|          31.68|
|        2|        210|             97|                 76|           RentLingo|5668609614, 55087...|          36.19|
|        3|        195|             95|                 61|           RentLingo|5668631291, 55091...|          31.28|
|        4|        194|             92|                 64|           RentLingo|5668612366, 56686...|          32.99|
|        5|        204|             95|                 

### Hourly (Should be View instead)

In [44]:
df_hourly_kpi = df_full.groupBy("view_year","view_month","view_day","view_hour").agg(
    count("*").alias("total_views"),
    count(when(col("is_wishlisted") == True, 1)).alias("total_wishlists"),
    count(when(col("call_to_action") == "contact_agent", 1)).alias("total_contact_agent")
).orderBy("view_year","view_month","view_day","view_hour", ascending=([False, False, True, True]))

In [45]:
agg_views = df_full.groupBy("view_year","view_month","view_day", "view_hour", "source").agg(
    count("*").alias("total_views")
).orderBy("view_year","view_month","view_day", "view_hour", "total_views", ascending=[False, False, True, True, False])

window_spec = Window.partitionBy(["view_year","view_month","view_day", "view_hour"]).orderBy(col("total_views").desc())
ranked_views = agg_views.withColumn("row_num", row_number().over(window_spec))
most_viewed_platform_hourly = ranked_views.filter(col("row_num") == 1).drop("row_num")

In [46]:
df_hourly_kpi = df_hourly_kpi.join(most_viewed_platform_hourly.select("view_year","view_month","view_day", "view_hour","source"), on=["view_year","view_month","view_day", "view_hour"], how="left")
df_hourly_kpi = df_hourly_kpi.withColumnRenamed("source", "most_viewed_platform")

In [47]:
engagement_rate_per_hour = df_full.groupBy("view_year","view_month","view_day", "view_hour").agg(
    round((sum(when(col("call_to_action") == "contact_agent", 1)) / count("*") * 100),2).alias("engagement_rate")
)

In [48]:
df_hourly_kpi = df_hourly_kpi.join(engagement_rate_per_hour, on=["view_year","view_month","view_day", "view_hour"], how="left")

In [49]:
df_hourly_kpi.orderBy('view_day', 'view_hour', ascending=[True, True]).show()

+---------+----------+--------+---------+-----------+---------------+-------------------+--------------------+---------------+
|view_year|view_month|view_day|view_hour|total_views|total_wishlists|total_contact_agent|most_viewed_platform|engagement_rate|
+---------+----------+--------+---------+-----------+---------------+-------------------+--------------------+---------------+
|     2024|         5|      16|        3|          2|              1|                  0|        RentDigs.com|           NULL|
|     2024|         5|      16|        5|          3|              0|                  0|           RentLingo|           NULL|
|     2024|         5|      16|        7|          3|              2|                  0|           RentLingo|           NULL|
|     2024|         5|      16|        8|          4|              2|                  0|           RentLingo|           NULL|
|     2024|         5|      16|        9|          2|              0|                  0|           RentLingo| 

### Dayofmonth KPI Summary

In [50]:
df_full = df_full.withColumn("is_weekend", when((dayofweek(col("viewed_at")) == 1) | (dayofweek(col("viewed_at")) == 7), True).otherwise(False))

In [51]:
df_dom_kpi = df_full.groupBy("view_day").agg(
    count("*").alias("total_views"),
    count(when(col("is_wishlisted") == True, 1)).alias("total_wishlists"),
    count(when(col("call_to_action") == "contact_agent", 1)).alias("total_contact_agent")
)

In [52]:
agg_views = df_full.groupBy("view_day", "source").agg(
    count("*").alias("total_views")
).orderBy("view_day", "total_views", ascending=[True, False])

window_spec = Window.partitionBy("view_day").orderBy(col("total_views").desc())
ranked_views = agg_views.withColumn("row_num", row_number().over(window_spec))
most_viewed_platform_per_dom = ranked_views.filter(col("row_num") == 1).drop("row_num")

In [53]:
df_dom_kpi = df_dom_kpi.join(most_viewed_platform_per_dom.select("view_day","source"), on="view_day", how="left")
df_dom_kpi = df_dom_kpi.withColumnRenamed("source", "most_viewed_platform")

In [54]:
agg_apart_views = df_full.groupBy("view_day", "apartment_id").agg(
    count("*").alias("total_views")
).orderBy("view_day", "total_views", ascending=[True, False])

top3_apart_views_per_dom = agg_apart_views.withColumn("rank", rank().over(window_spec)).withColumn("row_num", row_number().over(window_spec)).filter(col("row_num") <= 3)
top3_apart_views_per_dom = top3_apart_views_per_dom.groupBy("view_day").agg(concat_ws(", ", collect_list("apartment_id")).alias("top_apartments"))

In [55]:
df_dom_kpi = df_dom_kpi.join(top3_apart_views_per_dom, on="view_day", how="left")

In [56]:
engagement_rate_per_dom = df_full.groupBy("view_day").agg(
    round((sum(when(col("call_to_action") == "contact_agent", 1)) / count("*") * 100),2).alias("engagement_rate")
)

df_dom_kpi = df_dom_kpi.join(engagement_rate_per_dom, on="view_day", how="left")

### Daily KPI (View)

### State KPI

In [57]:
df_state_kpi = df_full.groupBy("state", "cityname").agg(
    count("*").alias("total_views"),
    count(when(col("is_wishlisted") == True, 1)).alias("total_wishlists"),
    count(when(col("call_to_action") == "contact_agent", 1)).alias("total_contact_agent"),
    min(col("price")).alias("min_price"),
    max(col("price")).alias("max_price"),
    count(when(col("is_active") == True, 1)).alias("total_active_apartment"),
    round(avg(col("price")),2).alias("avg_price"),
).withColumn("wishlist_rate", round((col("total_wishlists")/col("total_views")),2)) \
.withColumn("contact_rate", round((col("total_contact_agent")/col("total_views")),2)) \
.orderBy("state", "total_views", ascending=[True, False])

In [58]:
window_spec_state = Window.partitionBy("state").orderBy(col("total_views").desc())
df_state_kpi = df_state_kpi.withColumn("row_num", row_number().over(window_spec_state)).filter(col("row_num") <= 5).withColumnRenamed("row_num", "rank_city")

In [59]:
window_spec = Window.partitionBy("state", "cityname").orderBy(col("total_views").desc())
agg_platform_views = df_full.groupBy("state", "cityname", "source").agg(
    count("*").alias("total_views")
).orderBy("total_views", ascending=[False])
most_platform_views_per_city = agg_platform_views.withColumn("row_num", row_number().over(window_spec)).filter(col("row_num") == 1)

In [60]:
df_state_kpi = df_state_kpi.join(most_platform_views_per_city.select("state","cityname","source"), on=["state","cityname"], how="left")

### Platform / Source KPI

In [61]:
df_platform_kpi = df_full.groupBy("source").agg(
    count("*").alias("total_views"),
    count(when(col("is_wishlisted") == True, 1)).alias("total_wishlists"),
    count(when(col("call_to_action") == "contact_agent", 1)).alias("total_contact_agent"),
    min(col("price")).alias("min_price"),
    max(col("price")).alias("max_price"),
    count(when(col("is_active") == True, 1)).alias("total_active_apartment"),
    count(when(col("is_active") == False, 1)).alias("total_non-active_apartment"),
    round(avg(col("price")),2).alias("avg_price"),
).withColumn("wishlist_rate", round((col("total_wishlists")/col("total_views")),2)) \
.withColumn("contact_rate", round((col("total_contact_agent")/col("total_views")),2))

In [62]:
agg_views = df_full.groupBy("source", "apartment_id").agg(
    count("*").alias("total_views")
).orderBy("total_views", ascending=[False])

window_spec = Window.partitionBy(["source"]).orderBy(col("total_views").desc())
ranked_views = agg_views.withColumn("row_num", row_number().over(window_spec))
top3_viewed_apartment_source = ranked_views.filter(col("row_num") <= 3).drop("row_num")
top3_viewed_apartment_source = top3_viewed_apartment_source.groupBy("source").agg(concat_ws(", ", collect_list("apartment_id")).alias("top_apartments"))

In [63]:
df_platform_kpi = df_platform_kpi.join(top3_viewed_apartment_source.select("source", "top_apartments"), on='source', how="left")

In [64]:
df_platform_kpi.show()

+-----------------+-----------+---------------+-------------------+---------+---------+----------------------+--------------------------+---------+-------------+------------+--------------------+
|           source|total_views|total_wishlists|total_contact_agent|min_price|max_price|total_active_apartment|total_non-active_apartment|avg_price|wishlist_rate|contact_rate|      top_apartments|
+-----------------+-----------+---------------+-------------------+---------+---------+----------------------+--------------------------+---------+-------------+------------+--------------------+
|     RentDigs.com|       1356|            667|                476|      499|    52500|                  1098|                       258|  1592.07|         0.49|        0.35|5509074094, 55090...|
|        RentLingo|       3493|           1712|               1162|      300|    11000|                  2868|                       625|   1462.6|         0.49|        0.33|5668628934, 56686...|
|      RealRentals| 

### Last

In [65]:
df_full.unpersist()
spark.stop()

fact_userview, dim_apart, dim_att, df_full, df_apartment_perf, df_hour_kpi, df_hourly_kpi (view), df_dom_kpi, df_state_kpi, df_platform_kpi df_daily_kpi (view)