In [1]:
# Husayn El Sharif
comment = """
This script demonstrates basic operations using PySpark.
Using AirBnB dataset as an example.
use environment: spark4_env001"""

In [2]:
# imports

from pyspark.sql import SparkSession # Import SparkSession from PySpark. SQL module

import requests

In [7]:
# Download data from insideairbnb.com for Broward County - Florida - USA and download to folder "airbnb_data"
urls = [
    "https://data.insideairbnb.com/united-states/fl/broward-county/2025-09-26/data/listings.csv.gz",
    "https://data.insideairbnb.com/united-states/fl/broward-county/2025-09-26/data/calendar.csv.gz",
    "https://data.insideairbnb.com/united-states/fl/broward-county/2025-09-26/data/reviews.csv.gz",
    ]

# loop through the URLs and download each file, but check if file already exists
for url in urls:
    filename = url.split("/")[-1]
    try:
        with open(f"airbnb_data/{filename}", "rb") as f:
            print(f"{filename} already exists. Skipping download.")
    except FileNotFoundError:
        print(f"Downloading {filename}...")
        response = requests.get(url)
        with open(f"airbnb_data/{filename}", "wb") as f:
            f.write(response.content)
        print(f"Downloaded {filename}.")
        

listings.csv.gz already exists. Skipping download.
calendar.csv.gz already exists. Skipping download.
reviews.csv.gz already exists. Skipping download.


In [4]:
# Read Data in Spark Session
spark = SparkSession.builder.appName("AirBnB_Data_Example").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/25 10:18:14 WARN Utils: Your hostname, Husayn-SLS2, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/25 10:18:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/25 10:18:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Read Listings Data
listings_df = spark.read.csv("airbnb_data/listings.csv.gz", 
                             header=True, 
                             inferSchema=True, 
                             sep=",", 
                             quote='"', 
                             escape='"',  # Handle escaped quotes
                             multiLine=True, 
                             mode="PERMISSIVE")

In [8]:
# Select the review_scores_location column
review_locations = listings_df.select(listings_df.review_scores_location)
review_locations.show(20)

+----------------------+
|review_scores_location|
+----------------------+
|                  4.88|
|                   5.0|
|                  4.82|
|                  4.56|
|                  4.87|
|                  4.77|
|                  4.88|
|                   4.5|
|                  4.68|
|                  4.88|
|                  4.72|
|                  4.75|
|                   5.0|
|                  4.92|
|                  4.88|
|                  4.93|
|                   3.5|
|                  4.64|
|                   5.0|
|                   4.7|
+----------------------+
only showing top 20 rows


In [23]:
# chain of method calls
# drop rows with null values
review_locations_chained = (
    listings_df
    .filter(listings_df.review_scores_location > 4.5)
    .select('id', 'name', 'price','review_scores_location',)
    .dropna() # drop rows with null values
    .show(20, truncate=False)
                            )

+-------+--------------------------------------------------+-------+----------------------+
|id     |name                                              |price  |review_scores_location|
+-------+--------------------------------------------------+-------+----------------------+
|69824  |2 bd/2ba Oceanfront Condo                         |$140.00|5.0                   |
|129099 |MIAMI - AMAZING APARTMENT OCEANVIEW               |$234.00|4.56                  |
|191160 |Tropical Beach Paradise Awaits You                |$119.00|4.87                  |
|216046 |Dual Masters! - Adult Media/Game Rm - 1GB Internet|$247.00|4.77                  |
|217990 |COSY APT OVER THE BEACH                           |$250.00|4.88                  |
|256386 |Country Living for Four                           |$79.00 |4.68                  |
|308839 |Fabulous & Private 475 sq ft Suite                |$75.00 |4.88                  |
|325734 |Hidden Gem for two                                |$43.00 |4.72        

In [24]:
# Find places with low price by high rating
listings_df.schema['price'] # price is currently a string, need to convert to float

from pyspark.sql.functions import regexp_replace, col
price_numeric = (
    listings_df
    .withColumn('price_numeric', regexp_replace(col('price'), '[$,]', '').cast('float'))
                )

price_numeric.schema['price_numeric'] # now price is float

StructField('price_numeric', FloatType(), True)

In [25]:
price_numeric.show()

25/12/25 10:32:00 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------+--------------------+--------------+------------+---------------+--------------------+--------------------+---------------------+--------------------+-------+--------------------+---------+----------+--------------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+--------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+--------------------+----------------------+----------------------------+----------+-----------+--------------------+---------------+------------+---------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------+----------------+---------------+---------------+---------------+----------------+------------------

In [31]:
# Find "hidden gems" - high rating and low price
hidden_gems = (
    price_numeric
    .filter((price_numeric.review_scores_location >= 4.5) & (price_numeric.price_numeric <= 100))
    .select('id', 'name', 'price_numeric', 'review_scores_location')
    .distinct()
    .sort(col('price_numeric').asc(), col('review_scores_location').desc())
    .show(20, truncate=False)
)

+-------------------+--------------------------------------------------+-------------+----------------------+
|id                 |name                                              |price_numeric|review_scores_location|
+-------------------+--------------------------------------------------+-------------+----------------------+
|1407434945159519189|Fuego Flame Shared Room | Cozy Vibes + Wi-Fi      |17.0         |5.0                   |
|1411601831397814339|Cactus Sunset Stay | Snug Shared Room + Wi-Fi     |20.0         |5.0                   |
|1080465942036472771|Marriott's BeachPlace Towers - 2BR Waterview Villa|22.0         |5.0                   |
|1082128443548644880|Marriott's BeachPlace Towers - 2BR Waterview Villa|22.0         |5.0                   |
|1085532924382363878|Marriott's BeachPlace Towers - Waterview Studio   |22.0         |5.0                   |
|1313333889280898230|Cozy Shared Room near Beach                       |23.0         |4.78                  |
|148572420

In [37]:
# write the ouptut file to csv
hidden_gems = (
    price_numeric
    .filter((price_numeric.review_scores_location >= 4.5) & (price_numeric.price_numeric <= 100))
    .select('id', 'name', 'price_numeric', 'review_scores_location')
    .distinct()
    .sort(col('price_numeric').asc(), col('review_scores_location').desc())
)

hidden_gems.write.csv("airbnb_data/hidden_gems", header=True)

In [38]:
spark.stop()  # Stop the Spark session when done