#Weather & Airbnb data

##Weather Data Section

###Data Ingestion(Bronze Layer)

###importing all the necessary libraries

In [0]:
import requests
import json
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window



In [0]:
API_KEY = "88b1862508f7780614e4ca6542ef0d32"
CITIES = ["Johannesburg","Cape Town","Durban"]
URL = f"http://api.openweathermap.org/data/2.5/weather?q={CITIES}&appid={API_KEY}&units=metric"
records = []

###Reading the weather data from OpenWeatherMap API

In [0]:
for CITY in CITIES:
    URL = f"http://api.openweathermap.org/data/2.5/forecast?q={CITY}&appid={API_KEY}&units=metric"
    response = requests.get(URL)

    if response.status_code == 200:
        weather_data = response.json()
        #converting weather data into a list
        if "list" in weather_data and weather_data["list"]:
            for entry in weather_data["list"]:
                record = {
                    "city": CITY,
                    "timestamp": entry["dt_txt"],
                    "temperature": entry["main"]["temp"],
                    "humidity": entry["main"]["humidity"],
                    "pressure": entry["main"]["pressure"],
                    "wind_speed": entry["wind"]["speed"],
                    "weather": entry["weather"][0]["description"]
                }
                records.append(record)

        else:
            print(f"No forecast data found for {CITY}")
    else:
        print(f"Failed to fetch data for {CITY}: {response.status_code}")

##Schema definition

In [0]:
schema = StructType([
    StructField("city",StringType(),True),
    StructField("timestamp",StringType(),True),
    StructField("temperature",StringType(),True),
    StructField("humidity",IntegerType(),True),
    StructField("pressure",IntegerType(),True),
    StructField("wind_speed",StringType(),True),
    StructField("weather",StringType(),True)
])

df_bronze = spark.createDataFrame(records,schema=schema)

In [0]:
window_spec = Window.orderBy("city")
df_bronze = df_bronze.withColumn("city_id", dense_rank().over(window_spec))\
    .withColumn("row_id", monotonically_increasing_id())


In [0]:
df_bronze.display()

city,timestamp,temperature,humidity,pressure,wind_speed,weather,city_id,row_id
Cape Town,2025-08-27 15:00:00,17.68,60,1022,4.91,clear sky,1,0
Cape Town,2025-08-27 18:00:00,15.62,71,1022,3.65,few clouds,1,1
Cape Town,2025-08-27 21:00:00,13.96,81,1023,3.17,few clouds,1,2
Cape Town,2025-08-28 00:00:00,13.71,84,1021,2.71,scattered clouds,1,3
Cape Town,2025-08-28 03:00:00,13.31,85,1021,0.78,broken clouds,1,4
Cape Town,2025-08-28 06:00:00,13.76,89,1022,1.7,scattered clouds,1,5
Cape Town,2025-08-28 09:00:00,16.94,78,1022,2.59,scattered clouds,1,6
Cape Town,2025-08-28 12:00:00,18.35,73,1020,3.05,scattered clouds,1,7
Cape Town,2025-08-28 15:00:00,16.76,79,1019,4.31,broken clouds,1,8
Cape Town,2025-08-28 18:00:00,15.63,83,1019,2.94,broken clouds,1,9


###Connect and transfer Bronze Layer data to PostgreSQL

In [0]:
jdbc_url_bronze = "jdbc:postgresql://ep-purple-hat-a86fiso7-pooler.eastus2.azure.neon.tech:5432/neondb?sslmode=require"

df_bronze.write \
    .format("jdbc")\
        .option("url",jdbc_url_bronze)\
            .option("dbtable","WeatherDataBronze")\
                .option("user","neondb_owner")\
                    .option("password","npg_GKcj5YRy2rEb")\
                        .option("driver","org.postgresql.Driver")\
                            .mode("overwrite")\
                                .save()

###Data Cleaning(Silver Layer)

In [0]:
df_silver = df_bronze.withColumn("timestamp", to_timestamp("timestamp"))\
    .withColumn("month",month(col("timestamp")))\
        .withColumn("year",year("timestamp"))\
            .withColumn("wind_speed_kmh",col("wind_speed") * 3.6)\
                .drop("wind_speed")\
                    .withColumnRenamed("wind_speed_kmh","wind_speed")

In [0]:
df_silver.display()

city,timestamp,temperature,humidity,pressure,weather,city_id,row_id,month,year,wind_speed
Cape Town,2025-08-27T15:00:00.000+0000,17.68,60,1022,clear sky,1,0,8,2025,17.676000000000002
Cape Town,2025-08-27T18:00:00.000+0000,15.62,71,1022,few clouds,1,1,8,2025,13.14
Cape Town,2025-08-27T21:00:00.000+0000,13.96,81,1023,few clouds,1,2,8,2025,11.412
Cape Town,2025-08-28T00:00:00.000+0000,13.71,84,1021,scattered clouds,1,3,8,2025,9.756
Cape Town,2025-08-28T03:00:00.000+0000,13.31,85,1021,broken clouds,1,4,8,2025,2.8080000000000003
Cape Town,2025-08-28T06:00:00.000+0000,13.76,89,1022,scattered clouds,1,5,8,2025,6.12
Cape Town,2025-08-28T09:00:00.000+0000,16.94,78,1022,scattered clouds,1,6,8,2025,9.324
Cape Town,2025-08-28T12:00:00.000+0000,18.35,73,1020,scattered clouds,1,7,8,2025,10.98
Cape Town,2025-08-28T15:00:00.000+0000,16.76,79,1019,broken clouds,1,8,8,2025,15.515999999999998
Cape Town,2025-08-28T18:00:00.000+0000,15.63,83,1019,broken clouds,1,9,8,2025,10.584


###Connect and transfer Silver Layer data to PostgreSQL

In [0]:
jdbc_url_silver = "jdbc:postgresql://ep-purple-hat-a86fiso7-pooler.eastus2.azure.neon.tech:5432/neondb?sslmode=require"

df_silver.write \
    .format("jdbc")\
        .option("url",jdbc_url_silver)\
            .option("dbtable","WeatherDataSilver")\
                .option("user","neondb_owner")\
                    .option("password","npg_GKcj5YRy2rEb")\
                        .option("driver","org.postgresql.Driver")\
                            .mode("overwrite")\
                                .save()

##Aggregate data by city and day (Gold Layer)

In [0]:
df_gold = df_silver.groupBy("city_id","city","year","month").agg(
    mean("temperature").alias("avg_temperature"),
    mean("humidity").alias("avg_humidity"),
    mean("pressure").alias("avg_pressure"),
    mean("wind_speed").alias("avg_wind_speed"),
    stddev("temperature").alias("stddev_temperature")
)

In [0]:
df_gold.display()

city_id,city,year,month,avg_temperature,avg_humidity,avg_pressure,avg_wind_speed,stddev_temperature
1,Cape Town,2025,8,15.16285714285714,80.91428571428571,1021.1142857142858,9.072000000000005,1.6888293394582905
1,Cape Town,2025,9,14.906,71.4,1022.6,8.459999999999999,3.1084288635900936
2,Durban,2025,8,21.077999999999992,79.05714285714286,1019.0285714285716,15.182742857142856,1.7051527445812418
2,Durban,2025,9,20.642000000000003,70.8,1024.8,29.311200000000003,0.4978152267659155
3,Johannesburg,2025,8,21.00857142857143,19.085714285714285,1020.5142857142856,12.283199999999995,3.2924031477622777
3,Johannesburg,2025,9,21.108,20.0,1021.6,7.927200000000001,4.369212743733133


###Connect and transfer Gold Layer data to PostgreSQL

In [0]:
jdbc_url_gold = "jdbc:postgresql://ep-purple-hat-a86fiso7-pooler.eastus2.azure.neon.tech:5432/neondb?sslmode=require"

df_gold.write \
    .format("jdbc")\
        .option("url",jdbc_url_gold)\
            .option("dbtable","WeatherDataGold")\
                .option("user","neondb_owner")\
                    .option("password","npg_GKcj5YRy2rEb")\
                        .option("driver","org.postgresql.Driver")\
                            .mode("overwrite")\
                                .save()

#Airbnb Data Section

##Data Ingestion(Gold Layer)

###Reading & loading the uploaded Airbnb listings csv file

In [0]:
df_bronze2 = spark.read.format('csv').option('inferschema',True).option('header',True).load('/FileStore/tables/mock_airbnb_listings_august_2025__v2_.csv')

In [0]:
df_bronze2.display()

listing_id,listing_name,host_name,city,date,rating,price,room_type,number_of_reviews,minimum_nights
CA_1,Cape Town Retreat 1,Host_CA_1,Cape Town,2025-08-01,4.44,79.08,Shared room,14,1
CA_2,Cape Town Retreat 2,Host_CA_2,Cape Town,2025-08-01,3.92,176.59,Entire home/apt,158,5
CA_3,Cape Town Retreat 3,Host_CA_3,Cape Town,2025-08-01,4.11,175.5,Private room,54,4
CA_4,Cape Town Retreat 4,Host_CA_4,Cape Town,2025-08-01,4.9,214.1,Shared room,6,4
CA_5,Cape Town Retreat 5,Host_CA_5,Cape Town,2025-08-01,4.9,154.27,Shared room,55,4
CA_6,Cape Town Retreat 6,Host_CA_6,Cape Town,2025-08-01,4.31,238.61,Private room,155,3
CA_7,Cape Town Retreat 7,Host_CA_7,Cape Town,2025-08-01,4.96,176.71,Private room,195,1
CA_8,Cape Town Retreat 8,Host_CA_8,Cape Town,2025-08-01,4.13,159.44,Entire home/apt,134,2
CA_9,Cape Town Retreat 9,Host_CA_9,Cape Town,2025-08-01,4.46,100.67,Shared room,96,1
CA_10,Cape Town Retreat 10,Host_CA_10,Cape Town,2025-08-01,4.85,162.98,Entire home/apt,67,5


###Schema Definition

In [0]:
df_bronze2.printSchema()

root
 |-- listing_id: string (nullable = true)
 |-- listing_name: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- date: date (nullable = true)
 |-- rating: double (nullable = true)
 |-- price: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)



In [0]:
schema2 = StructType([
    StructField("listing_id",StringType(),True),
    StructField("listing_name",StringType(),True),
    StructField("host_name",StringType(),True),
    StructField("city",StringType(),True),
    StructField("date",DateType(),True),
    StructField("rating",DoubleType(),True),
    StructField("price",DoubleType(),True),
    StructField("room_type",StringType(),True),
    StructField("number_of_reviews",IntegerType(),True),
    StructField("minimum_nights",IntegerType(),True)
])

In [0]:
df_bronze2 = spark.read.format('csv').schema(schema2).option('header',True).load('/FileStore/tables/mock_airbnb_listings_august_2025__v2_.csv')

In [0]:
df_bronze2.display()

listing_id,listing_name,host_name,city,date,rating,price,room_type,number_of_reviews,minimum_nights
CA_1,Cape Town Retreat 1,Host_CA_1,Cape Town,2025-08-01,4.44,79.08,Shared room,14,1
CA_2,Cape Town Retreat 2,Host_CA_2,Cape Town,2025-08-01,3.92,176.59,Entire home/apt,158,5
CA_3,Cape Town Retreat 3,Host_CA_3,Cape Town,2025-08-01,4.11,175.5,Private room,54,4
CA_4,Cape Town Retreat 4,Host_CA_4,Cape Town,2025-08-01,4.9,214.1,Shared room,6,4
CA_5,Cape Town Retreat 5,Host_CA_5,Cape Town,2025-08-01,4.9,154.27,Shared room,55,4
CA_6,Cape Town Retreat 6,Host_CA_6,Cape Town,2025-08-01,4.31,238.61,Private room,155,3
CA_7,Cape Town Retreat 7,Host_CA_7,Cape Town,2025-08-01,4.96,176.71,Private room,195,1
CA_8,Cape Town Retreat 8,Host_CA_8,Cape Town,2025-08-01,4.13,159.44,Entire home/apt,134,2
CA_9,Cape Town Retreat 9,Host_CA_9,Cape Town,2025-08-01,4.46,100.67,Shared room,96,1
CA_10,Cape Town Retreat 10,Host_CA_10,Cape Town,2025-08-01,4.85,162.98,Entire home/apt,67,5


###Connect and transfer Bronze Layer data to PostgreSQL

In [0]:
jdbc_url_bronze2 = "jdbc:postgresql://ep-purple-hat-a86fiso7-pooler.eastus2.azure.neon.tech:5432/neondb?sslmode=require"

df_bronze2.write \
    .format("jdbc")\
        .option("url",jdbc_url_bronze2)\
            .option("dbtable","AirbnbDataBronze")\
                .option("user","neondb_owner")\
                    .option("password","npg_GKcj5YRy2rEb")\
                        .option("driver","org.postgresql.Driver")\
                            .mode("overwrite")\
                                .save()

###Data Cleaning

In [0]:
df_silver2 = df_bronze2.withColumn("price(R)",col("price")*17.75)\
    .drop("price")\
        .withColumnRenamed("price(R)","price")

In [0]:
window_spec = Window.orderBy("city")
df_silver2 = df_silver2.withColumn("city_id", dense_rank().over(window_spec))\
    .withColumn("row_id", monotonically_increasing_id())

In [0]:
df_silver2.display()

listing_id,listing_name,host_name,city,date,rating,room_type,number_of_reviews,minimum_nights,price,city_id,row_id
CA_1,Cape Town Retreat 1,Host_CA_1,Cape Town,2025-08-01,4.44,Shared room,14,1,1403.67,1,0
CA_2,Cape Town Retreat 2,Host_CA_2,Cape Town,2025-08-01,3.92,Entire home/apt,158,5,3134.4725,1,1
CA_3,Cape Town Retreat 3,Host_CA_3,Cape Town,2025-08-01,4.11,Private room,54,4,3115.125,1,2
CA_4,Cape Town Retreat 4,Host_CA_4,Cape Town,2025-08-01,4.9,Shared room,6,4,3800.275,1,3
CA_5,Cape Town Retreat 5,Host_CA_5,Cape Town,2025-08-01,4.9,Shared room,55,4,2738.2925,1,4
CA_6,Cape Town Retreat 6,Host_CA_6,Cape Town,2025-08-01,4.31,Private room,155,3,4235.3275,1,5
CA_7,Cape Town Retreat 7,Host_CA_7,Cape Town,2025-08-01,4.96,Private room,195,1,3136.6025,1,6
CA_8,Cape Town Retreat 8,Host_CA_8,Cape Town,2025-08-01,4.13,Entire home/apt,134,2,2830.06,1,7
CA_9,Cape Town Retreat 9,Host_CA_9,Cape Town,2025-08-01,4.46,Shared room,96,1,1786.8925,1,8
CA_10,Cape Town Retreat 10,Host_CA_10,Cape Town,2025-08-01,4.85,Entire home/apt,67,5,2892.895,1,9


###Connect and transfer Silver Layer data to PostgreSQL

In [0]:
jdbc_url_silver2 = "jdbc:postgresql://ep-purple-hat-a86fiso7-pooler.eastus2.azure.neon.tech:5432/neondb?sslmode=require"

df_silver2.write \
    .format("jdbc")\
        .option("url",jdbc_url_silver2)\
            .option("dbtable","AirbnbDataSilver")\
                .option("user","neondb_owner")\
                    .option("password","npg_GKcj5YRy2rEb")\
                        .option("driver","org.postgresql.Driver")\
                            .mode("overwrite")\
                                .save()