In [None]:
# %%sh
# /databricks/python/bin/pip install --upgrade pip
# /databricks/python/bin/pip install --upgrade pymongo
# /databricks/python/bin/pip install --upgrade pyspark
# /databricks/python/bin/pip install --upgrade databricks-spark-xml
# /databricks/python/bin/pip install --upgrade pymongo-spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date

In [None]:
# Create a SparkSession

spark = SparkSession.builder \
    .appName("Read from Cosmos DB") \
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .config("spark.executor.instances", "3") \
    .getOrCreate()

uri = 'mongodb://project-db:gGMORZyxSMjSQ2cWRhVbOJSOkYOFvauXA6YoYxr1MQrJKVm6WH7MU6QSCQ1D0zxODAtnYhb1vzwVACDbJ524Sg==@project-db.mongo.cosmos.azure.com:10255/?ssl=true&retrywrites=false&replicaSet=globaldb&maxIdleTimeMS=120000&appName=@project-db@'
database_name = 'project-database'



# Load the MongoDB collections into PySpark DataFrames
listing_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option('database',database_name).option("collection", "listings_main").load()
property_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option('database',database_name).option("collection", "property_collection").load()
reviews_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option('database',database_name).option("collection", "reviews").load()
rsummary_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option('database',database_name).option("collection", "previews_summary_collection").load()

cal_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", database_name).option("collection", "cal").load()
neighbourhoods_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", database_name).option("collection", "neighbourhoods").load()
host_collection_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", database_name).option("collection", "host_collection").load()
availiability_collection_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", database_name).option("collection", "availiability_collection").load()


In [None]:
# Register the DataFrames as temporary views
listing_df.createOrReplaceTempView("listings_main")
property_df.createOrReplaceTempView("property_collection")
reviews_df.createOrReplaceTempView("reviews")
rsummary_df.createOrReplaceTempView("previews_summary_collection")
cal_df.createOrReplaceTempView("cal")
neighbourhoods_df.createOrReplaceTempView("neighbourhoods")
host_collection_df.createOrReplaceTempView("host_collection")
availiability_collection_df.createOrReplaceTempView("availiability_collection")



In [None]:
# QUERY 1: Display list of stays in Portland, OR with details: name, neighbourhood, room type, how many guests it accommodates, property type and amenities, per night’s cost and is available for the next two days in descending order of rating.

In [None]:
## QUESTION 1 - Not able to execute due to resource constraints

# Define the SQL query
query = """
    SELECT l.name, l.neighbourhood, p.room_type, p.accommodates,
           p.property_type, p.amenities, r.review_scores_rating, c.price
    FROM listings_main l
    LEFT JOIN property_collection p ON l.id = p.id
    LEFT JOIN previews_summary_collection r ON l.id = r.id
    LEFT JOIN cal c ON c.listing_id = l.id
    WHERE l.city_number = 2 
      AND c.available = 't'
      AND (c.date = DATE_ADD(CURRENT_DATE(), 1) OR c.date = DATE_ADD(CURRENT_DATE(), 2))
    ORDER BY r.review_scores_rating DESC
"""

# Execute the SQL query
result = spark.sql(query)

# Display the result
result.show()




[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-2580591174032057>:41[0m
[1;32m     38[0m result [38;5;241m=[39m spark[38;5;241m.[39msql(query)
[1;32m     40[0m [38;5;66;03m# Display the result[39;00m
[0;32m---> 41[0m result[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39mlog_success(
[1;32m     50[0m         module_name, class_name, functi

In [None]:
## Q1 multiple queries
query1 = """
    SELECT l.id, l.name, l.neighbourhood, p.room_type, p.accommodates, p.property_type, p.amenities
    FROM listings_main l
    LEFT JOIN property_collection p ON l.id = p.id
    WHERE l.city_number = 2
"""
result1 = spark.sql(query1)

# Create temporary view for result1
result1.createOrReplaceTempView("result1")

In [None]:
query2 = """
    SELECT r.id, r.name, r.neighbourhood, r.room_type, r.accommodates, r.property_type, r.amenities,
           s.review_scores_rating
    FROM result1 r
    LEFT JOIN previews_summary_collection s ON r.id = s.id
"""
result2 = spark.sql(query2)

# Create temporary view for result2
result2.createOrReplaceTempView("result2")



In [None]:
from datetime import date, timedelta

tomorrow = date.today() + timedelta(days=1)
day_after_tomorrow = date.today() + timedelta(days=2)

query3 = f"""
    SELECT c.id, c.name, c.neighbourhood, c.room_type, c.accommodates, c.property_type, c.amenities,
           c.review_scores_rating, d.price
    FROM result2 c
    LEFT JOIN cal d ON c.id = d.listing_id
    WHERE d.available = 't'
      AND (d.date = '{tomorrow}' OR d.date = '{day_after_tomorrow}')
"""
result3 = spark.sql(query3)

# Create temporary view for result3
result3.createOrReplaceTempView("result3")


In [None]:
query4 = """
    SELECT *
    FROM result3
    ORDER BY review_scores_rating DESC
"""
result4 = spark.sql(query4)

# Display the result
result4.show()

+------------------+--------------------+--------------------+---------------+------------+--------------------+--------------------+--------------------+-------+
|                id|                name|       neighbourhood|      room_type|accommodates|       property_type|           amenities|review_scores_rating|  price|
+------------------+--------------------+--------------------+---------------+------------+--------------------+--------------------+--------------------+-------+
|          43595828|New NE PDX Garden...|Portland, Oregon,...|Entire home/apt|           3|     Entire bungalow|["Carbon monoxide...|                 5.0|$500.00|
|          39643910|Simple convenient...|                    |   Private room|           1|Private room in home|["Carbon monoxide...|                 5.0| $32.00|
|          27031951|The Goddess Room ...|Portland, Oregon,...|   Private room|           2|Private room in home|["Carbon monoxide...|                 5.0|$200.00|
|563954648920832614|Ex

In [None]:
# QUERY 2: Are there any neighbourhoods in any of the cities that don’t have any listings?

In [None]:

query = """
        SELECT Distinct neighbourhood FROM neighbourhoods WHERE neighbourhood NOT IN (SELECT neighbourhood from listings_main)
"""

# Execute the SQL query
result = spark.sql(query)

# Display the result
result.show()




+--------------------+
|       neighbourhood|
+--------------------+
|     Harvard Heights|
|           Mar Vista|
|              Sunbow|
| Chesterfield Square|
|          West Hills|
|       Angeles Crest|
|Northwest Antelop...|
|       Glassell Park|
|           Hollywood|
|Sellwood-Moreland...|
|            Bradbury|
|             Maywood|
|     Tujunga Canyons|
|          Mount Hope|
|            Oak Park|
|  Rancho Penasquitos|
|             Compton|
|         Culver City|
|              Lomita|
|              Lennox|
+--------------------+
only showing top 20 rows



In [None]:
# QUERY 3: For “Entire home/apt” type listings in Portland provide it’s availability estimate for each month of Spring and Winter this year.

In [None]:

query = """
        SELECT MONTH(c.date), 
       SUM(CASE WHEN c.available = 't' THEN 1 ELSE 0 END) AS count_t,
       SUM(CASE WHEN c.available = 'f' THEN 1 ELSE 0 END) AS count_f
        FROM listings_main l
        LEFT JOIN property_collection p ON l.id = p.id
        LEFT JOIN cal c on c.listing_id = l.id
        WHERE p.room_type = 'Entire home/apt' AND l.city_number = '2' AND MONTH(date) IN (12,1,2,3,4,5) AND YEAR(date) = 2023
        GROUP BY MONTH(c.date)
"""

# Execute the SQL query
result = spark.sql(query)

# Display the result
result.show()





+-----------+-------+-------+
|month(date)|count_t|count_f|
+-----------+-------+-------+
|         12|  53428|  65147|
|          3|   9520|  24905|
|          5|  68983|  49592|
|          4|  53080|  61670|
+-----------+-------+-------+



In [None]:
# QUERY 4: For each city, how many reviews are received for December of each year?

In [None]:
query = """
        SELECT l.city_number, YEAR(r.date), count(*) FROM listings_main l
        LEFT JOIN reviews r ON r.listing_id = l.id
        WHERE MONTH(r.date) = 12
        GROUP BY l.city_number, YEAR(r.date)
"""

# Execute the SQL query
result = spark.sql(query)

# Display the result
result.show()


+-----------+----------+--------+
|city_number|year(date)|count(1)|
+-----------+----------+--------+
|          1|      2011|      86|
|          1|      2021|   21755|
|          1|      2014|    1374|
|          1|      2019|   17182|
|          1|      2016|    4868|
|          1|      2010|      13|
|          1|      2015|    3112|
|          1|      2022|   27064|
|          1|      2017|    8214|
|          1|      2018|   12900|
|          1|      2012|     233|
|          1|      2020|    9250|
|          2|      2014|     354|
|          3|      2017|      71|
|          3|      2018|     132|
|          4|      2021|   11145|
|          2|      2016|    1450|
|          4|      2019|    6910|
|          3|      2015|       8|
|          2|      2022|    5269|
+-----------+----------+--------+
only showing top 20 rows



In [None]:
# QUERY 5: Retrieve host-related information and calculate the percentage of five-star listings for each host, based on the total number of listings. Provides insights into host performance and the proportion of highly rated listings they have.

In [None]:

query = """
    SELECT hc.host_id, hc.host_acceptance_rate, hc.host_response_rate, hc.host_response_time, hc.host_total_listings_count, hc.host_is_superhost, hc.host_since,
    fiveStars.NumberOfFiveStarListings/hc.host_total_listings_count AS FiveStarListingsPercent 
    FROM host_collection hc, (
    SELECT h.host_id, COUNT(*) as NumberOfFiveStarListings FROM listings_main l
    LEFT JOIN host_collection h ON h.id =  l.id
    LEFT JOIN previews_summary_collection r ON l.id = r.id
    WHERE r.review_scores_rating = 5
    GROUP BY host_id) fiveStars
    Where fiveStars.host_id = hc.host_id
    ORDER BY FiveStarListingsPercent DESC
"""

# Execute the SQL query
result = spark.sql(query)

# Display the result
result.show()


+---------+--------------------+------------------+------------------+-------------------------+-----------------+----------+-----------------------+
|  host_id|host_acceptance_rate|host_response_rate|host_response_time|host_total_listings_count|host_is_superhost|host_since|FiveStarListingsPercent|
+---------+--------------------+------------------+------------------+-------------------------+-----------------+----------+-----------------------+
| 33451922|                  0%|              100%|within a few hours|                        1|                f|2015-05-15|                    1.0|
|172746749|                 N/A|               N/A|               N/A|                        1|                f|2018-02-10|                    1.0|
|486200100|                100%|               80%|      within a day|                        1|                f|2022-11-03|                    1.0|
| 50241674|                 N/A|               N/A|               N/A|                        1|    

In [None]:
## Scratchpad

In [None]:
# # Define the SQL query
# query = """
#     SELECT l.name, l.neighbourhood, p.room_type, p.accommodates,
#            p.property_type, p.amenities, r.review_scores_rating
#     FROM listings_main l
#     LEFT JOIN property_collection p ON l.id = p.id
#     LEFT JOIN previews_summary_collection r ON l.id = r.id
#     WHERE l.city_number = 2
#     ORDER BY r.review_scores_rating DESC
# """

# # Execute the SQL query
# result = spark.sql(query)

# # Display the result
# result.show()

+--------------------+--------------------+---------------+------------+--------------------+--------------------+--------------------+
|                name|       neighbourhood|      room_type|accommodates|       property_type|           amenities|review_scores_rating|
+--------------------+--------------------+---------------+------------+--------------------+--------------------+--------------------+
|Doug Fir Room/Pet...|Portland, Oregon,...|     Hotel room|           2|Room in boutique ...|["Carbon monoxide...|                 5.0|
|Willamette River ...|                    |Entire home/apt|           2|        Entire condo|["Shampoo", "Esse...|                 5.0|
|Bright and Welcom...|Portland, Oregon,...|Entire home/apt|           2|  Entire guest suite|["Carbon monoxide...|                 5.0|
|Jojo’s Blue House...|Portland, Oregon,...|Entire home/apt|          10|         Entire home|["Drying rack for...|                 5.0|
|Alberta Arts and ...|                    |Entir