In [0]:
bookings_df = spark.read.table("samples.wanderbricks.bookings")
bookings_df.createOrReplaceTempView("temp_bookings")

booking_upd_df = spark.read.table("samples.wanderbricks.booking_updates")
booking_upd_df.createOrReplaceTempView("booking_updates")

click_st_df = spark.read.table("samples.wanderbricks.clickstream")
click_st_df.createOrReplaceTempView("clickstream")

countries_df = spark.read.table("samples.wanderbricks.countries")
countries_df.createOrReplaceTempView("countries")

customer_sp_df = spark.read.table("samples.wanderbricks.customer_support_logs")
customer_sp_df.createOrReplaceTempView("customer_support_logs")

destinations_df = spark.read.table("samples.wanderbricks.destinations")
destinations_df.createOrReplaceTempView("destinations")

employees_df = spark.read.table("samples.wanderbricks.employees")
employees_df.createOrReplaceTempView("employees")

hosts_df = spark.read.table("samples.wanderbricks.hosts")
hosts_df.createOrReplaceTempView("hosts")

page_views_df = spark.read.table("samples.wanderbricks.page_views")
page_views_df.createOrReplaceTempView("page_views")

payments_df = spark.read.table("samples.wanderbricks.payments")
payments_df.createOrReplaceTempView("payments")

properties_df = spark.read.table("samples.wanderbricks.properties")
properties_df.createOrReplaceTempView("properties")

property_amenities_df = spark.read.table("samples.wanderbricks.property_amenities")
property_amenities_df.createOrReplaceTempView("property_amenities")

reviews_df = spark.read.table("samples.wanderbricks.reviews")
reviews_df.createOrReplaceTempView("reviews")

users_df = spark.read.table("samples.wanderbricks.users")
users_df.createOrReplaceTempView("users")

In [0]:
table_list = ['temp_bookings', 'booking_updates', 'clickstream', 'countries', 'customer_support_logs', 'destinations', 'employees', 'hosts', 'page_views', 'payments', 'properties', 'property_amenities', 'reviews', 'users']

for table_name in table_list:
    sql_query = f"DESCRIBE {table_name}"
    description_df = spark.sql(sql_query)
    print(f"\n--- Overlook of {table_name} table---")
    description_df.select('col_name', 'data_type').show(truncate=False)

## -> Question1: What are the top 5 destinations with the highest average revenue (per reservation) ?

In [0]:
temp_query = """
                SELECT
                        DS.DESTINATION_ID ,
                        DS.DESTINATION,
                        ROUND( SUM(TB.TOTAL_AMOUNT),2 )  AS TOTAL_REVENUE,
                        ROUND( COUNT(DISTINCT TB.BOOKING_ID),0) AS TOTAL_BOOKINGS,
                        ROUND((CASE 
                            COUNT(DISTINCT TB.BOOKING_ID) WHEN 0 THEN 0 
                                                        ELSE SUM(TB.TOTAL_AMOUNT) / COUNT(DISTINCT TB.BOOKING_ID) END),2) AVG_REVENUE
                        
                        FROM temp_bookings TB
                        LEFT JOIN properties PR ON PR.property_id = TB.property_id
                        LEFT JOIN destinations DS ON DS.destination_id = PR.destination_id
                        GROUP BY DS.DESTINATION_ID ,DS.DESTINATION
                        HAVING ROUND( COUNT(DISTINCT TB.BOOKING_ID),0) > 5
                        ORDER BY AVG_REVENUE DESC
                        LIMIT 5

            """
spark.sql(temp_query).show()

del temp_query

## -> Question2: What is the difference between the average ratings and average stay lengths (per night) for houses and apartments?

In [0]:
temp_query = """
                WITH 
                  PROPERTY_FEATURE AS (SELECT
                                                PR.PROPERTY_ID,
                                                TB.BOOKING_ID,
                                                PR.TITLE,
                                                (CASE
                                                      WHEN PR.TITLE ILIKE '%apartment%' OR PR.title ILIKE '%flat%' THEN 'Apartment'
                                                      WHEN PR.TITLE ILIKE '%house%' OR PR.title ILIKE '%villa%' THEN 'House'
                                                      ELSE 'Other' 
                                                END) AS PROPERTY_CAT,
                                                RV.RATING,
                                                DATEDIFF(TB.CHECK_OUT, TB.CHECK_IN) AS STAY_LENGTH
                                          FROM temp_bookings TB
                                                LEFT JOIN properties PR ON PR.property_id = TB.property_id
                                                LEFT JOIN reviews RV ON RV.booking_id = TB.booking_id
                                          WHERE DATEDIFF(TB.CHECK_OUT, TB.CHECK_IN) > 0 
                                                AND RV.RATING IS NOT NULL 
                                          )
                  SELECT
                        PF.PROPERTY_CAT,
                        ROUND(AVG(PF.RATING),2) AVG_RATE,
                        ROUND(AVG(PF.STAY_LENGTH),2) AVG_STAY,
                        COUNT(BOOKING_ID) TOTAL_ANALYZED_BOOKINGS
                  FROM PROPERTY_FEATURE PF
                  WHERE PF.PROPERTY_CAT IN ('Apartment', 'House')
                  GROUP BY PF.PROPERTY_CAT
                  ORDER BY PROPERTY_CAT 

              """
spark.sql(temp_query).show(10)

del temp_query

## -> Question3: Sort each user's (user_id) reservations by date and calculate the price difference compared to the user's previous reservation.

In [0]:
temp_query = """
                WITH 
                  COMMON_USER AS (SELECT
                                          USER_ID,
                                          COUNT(DISTINCT(TB.BOOKING_ID)) FRQ
                                    FROM temp_bookings TB
                                    WHERE TB.STATUS <> 'cancelled'
                                    GROUP BY TB.USER_ID
                                    HAVING COUNT(DISTINCT(TB.BOOKING_ID)) > 1
                                    ORDER BY FRQ DESC
                                    LIMIT 10 
                                    )
                      
                  SELECT
                        --TB.USER_ID,
                        TB.*,
                        LAG(total_amount, 1) OVER (PARTITION BY TB.USER_ID ORDER BY TB.CHECK_IN ASC) PREV_BOOKING_PRICE,
                        ROUND((TB.TOTAL_AMOUNT - LAG(TB.TOTAL_AMOUNT, 1) OVER ( PARTITION BY TB.USER_ID ORDER BY TB.CHECK_IN ASC)),2) AS PRICE_DIFFERENCE,

                        MAX(ABS(ROUND((TB.TOTAL_AMOUNT - LAG(TB.TOTAL_AMOUNT, 1) OVER ( PARTITION BY TB.USER_ID ORDER BY TB.CHECK_IN ASC)),2)) ) OVER (PARTITION BY TB.USER_ID) AS MAX_DIFFERENCE,
                        MIN(ABS(ROUND((TB.TOTAL_AMOUNT - LAG(TB.TOTAL_AMOUNT, 1) OVER ( PARTITION BY TB.USER_ID ORDER BY TB.CHECK_IN ASC)),2)) ) OVER (PARTITION BY TB.USER_ID) AS MIN_DIFFERENCE,
                        ROUND( AVG(ABS(ROUND((TB.TOTAL_AMOUNT - LAG(TB.TOTAL_AMOUNT, 1) OVER ( PARTITION BY TB.USER_ID ORDER BY TB.CHECK_IN ASC)),2)) ) OVER (PARTITION BY TB.USER_ID),2) AS AVG_DIFFERENCE
                        
                        FROM temp_bookings TB
                        WHERE TB.USER_ID IN (SELECT USER_ID FROM COMMON_USER )
                              AND TB.STATUS <> 'cancelled'
                        ORDER BY TB.USER_ID ASC, TB.CHECK_IN
            """

spark.sql(temp_query).show(10)

del temp_query

## -> Question4: Find the top 10 property owners and display each one's average bathroom/bedroom ratio and average rating in a single result set.

In [0]:
temp_query = """
                WITH WEALTHY_HOSTS AS (SELECT
                                                HS.HOST_ID,
                                                HS.NAME,
                                                COUNT(DISTINCT(PR.PROPERTY_ID)) TOTAL_PROPERTIES
                                                FROM hosts HS
                                                      LEFT JOIN properties PR ON PR.HOST_ID = HS.HOST_ID
                                                      GROUP BY HS.HOST_ID,HS.NAME
                                                      ORDER BY TOTAL_PROPERTIES DESC
                                                      LIMIT 10
                                          ),

                  MOST_RATED_PR AS (SELECT property_id,
                                          ROUND(AVG(rating), 2) AS avg_property_rating 
                                    FROM reviews
                                    GROUP BY property_id
                                    ),
                  HOST_PROPERTY_METRICS AS (
                                                SELECT WH.HOST_ID,
                                                      WH.NAME,
                                                      CASE 
                                                            WHEN PR.bedrooms > 0 THEN ROUND( (CAST(PR.bathrooms AS FLOAT) / PR.bedrooms),2)
                                                            ELSE 0.00 
                                                      END BATHROOM_TO_BEDROOM_RATIO,
                                                      COALESCE(MRP.avg_property_rating,0.00 ) PROPERTY_RATING
                                                FROM WEALTHY_HOSTS WH
                                                INNER JOIN properties PR ON PR.HOST_ID = WH.HOST_ID
                                                INNER JOIN hosts HS ON HS.HOST_ID = WH.HOST_ID
                                                LEFT JOIN MOST_RATED_PR MRP ON  MRP.PROPERTY_ID = PR.PROPERTY_ID 
                                          )

                  SELECT
                        HPM.HOST_ID,
                        HPM.NAME,
                        COUNT(*) TOTAL_PROPERTIES_ANALYZED,
                        ROUND(AVG( HPM.BATHROOM_TO_BEDROOM_RATIO ),2) AVG_HOST_RATIO,
                        ROUND( AVG( HPM.PROPERTY_RATING ) ,2) AVG_HOST_RATING
                  FROM HOST_PROPERTY_METRICS HPM
                  GROUP BY HPM.HOST_ID,HPM.NAME
                  ORDER BY TOTAL_PROPERTIES_ANALYZED DESC
                  LIMIT 10
                        
            """

spark.sql(temp_query).show()

del temp_query

## -> Question5: How many events (view, search, click, filter) have occurred? Who are the users with the most events? Which properties receive the most interactions? 

In [0]:
temp_query = """
                SELECT
                        event,
                        COUNT(1) AS total_events
                FROM clickstream
                GROUP BY event
                ORDER BY total_events DESC
            """

spark.sql(temp_query).show()

del temp_query

In [0]:
temp_query = """
                SELECT
                    u.user_id,
                    u.name,
                    COUNT(c.event) AS total_interactions
                FROM clickstream c
                INNER JOIN users U ON c.user_id = U.user_id
                GROUP BY u.user_id,u.name
                ORDER BY total_interactions DESC
                LIMIT 10
                """

spark.sql(temp_query).show()

del temp_query

In [0]:
temp_query = """
                SELECT
                    u.user_id,
                    u.name,
                    c.event
                FROM clickstream c
                INNER JOIN users U ON c.user_id = U.user_id
                """

df_user = spark.sql(temp_query)
df_user_piv = df_user.groupBy("user_id", "name").pivot("event").agg(F.count("event").alias("total_interactions"))
df_user_filled = df_user_piv.na.fill(0)
user_event_cols = [col for col in df_user_filled.columns if col not in ("user_id", "name")]
sum_expression_user = sum(F.col(col) for col in user_event_cols)
df_user_final = df_user_filled.withColumn("Row Summary", sum_expression_user)
df_user_final.sort("Row Summary", ascending=False).show(10)
del temp_query,df_user, df_user_piv,df_user_filled,user_event_cols,sum_expression_user,df_user_final

In [0]:
temp_query_property = """
                        SELECT
                                PR.property_id,
                                PR.title,
                                c.event
                        FROM clickstream c
                            INNER JOIN properties PR ON c.property_id = PR.property_id
                        """
df_property = spark.sql(temp_query_property)
df_property_piv = df_property.groupBy("property_id", "title").pivot("event").agg(F.count("event").alias("total_interactions"))
df_property_filled = df_property_piv.na.fill(0)
property_event_cols = [col for col in df_property_filled.columns if col not in ("property_id", "title")]
sum_expression_property = sum(F.col(col) for col in property_event_cols)
df_property_final = df_property_filled.withColumn("Row Summary", sum_expression_property)
df_property_final.sort("Row Summary", ascending=False).show(10)
del temp_query_property,df_property, df_property_piv,df_property_filled,property_event_cols,sum_expression_property,df_property_final