In [None]:
from pathlib import Path
import os
from dotenv import load_dotenv

from pyspark.sql import SparkSession

dir_path = Path().resolve().parent
load_dotenv()

user = os.getenv('PG_USER')
password = os.getenv('PG_PASSWORD')
host = os.getenv('PG_HOST', 'localhost')
port = os.getenv('PG_PORT', '5432')
dbname = os.getenv('POSTGRES_DB')

In [None]:
spark = SparkSession.builder \
    .appName("Weather Study") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.23") \
    .getOrCreate()

url = f"jdbc:postgresql://{host}:{port}/{dbname}"
properties = {
    "user": user,
    "password": password,
    "driver": "org.postgresql.Driver"
}

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, lit, lag, when
from pyspark.sql.window import Window

In [None]:
daily_weather_query = """
SELECT 
    EXTRACT(YEAR FROM dw.date) || '-' || EXTRACT(MONTH FROM dw.date) || '-' || EXTRACT(DAY FROM dw.date) AS "Date",
    c.name AS "Location",
    dw.min_temp AS "MinTemp",
    dw.max_temp AS "MaxTemp",
    dw.rainfall AS "Rainfall",
    dw.wind_gust_speed AS "WindGustSpeed",
    dw.wind_gust_dir AS "WindGustDir"
FROM 
    daily_weather dw
JOIN 
    city c ON dw.city_id = c.id
ORDER BY 
    c.name, dw.date
"""

df_daily = spark.read.jdbc(url=url, table=f"({daily_weather_query}) as daily_weather", properties=properties)

df_daily = df_daily.dropDuplicates(['Date', 'Location'])
df_daily = df_daily.withColumn("Evaporation", lit(-1)).withColumn("Sunshine", lit(-1))
df_daily = df_daily.withColumn("RainToday", when(col("Rainfall") >= 1, "Yes").otherwise("No"))

window_spec = Window.partitionBy("Location").orderBy("Date")
df_daily = df_daily.withColumn("RainTomorrow", lag("RainToday", -1).over(window_spec))
df_daily = df_daily.withColumn("RainTomorrow", when(col("RainTomorrow") == "Yes", "Yes").otherwise("No"))

In [None]:
weather_9am_query = """
SELECT 
    EXTRACT(YEAR FROM w.date) || '-' || EXTRACT(MONTH FROM w.date) || '-' || EXTRACT(DAY FROM w.date) AS "Date",
    c.name AS "Location",
    w.temp AS "Temp9am",
    w.cloudiness AS "Cloud9am",
    w.pressure AS "Pressure9am",
    w.humidity AS "Humidity9am",
    w.wind_gust_speed AS "WindSpeed9am",
    w.wind_gust_dir AS "WindDir9am"
FROM 
    weather w
JOIN 
    city c ON w.city_id = c.id
WHERE 
    EXTRACT(HOUR FROM w.date) = 17
ORDER BY 
    c.name, w.date
"""

weather_3pm_query = """
SELECT 
    EXTRACT(YEAR FROM w.date) || '-' || EXTRACT(MONTH FROM w.date) || '-' || EXTRACT(DAY FROM w.date) AS "Date",
    c.name AS "Location",
    w.temp AS "Temp3pm",
    w.cloudiness AS "Cloud3pm",
    w.pressure AS "Pressure3pm",
    w.humidity AS "Humidity3pm",
    w.wind_gust_speed AS "WindSpeed3pm",
    w.wind_gust_dir AS "WindDir3pm"
FROM 
    weather w
JOIN 
    city c ON w.city_id = c.id
WHERE 
    EXTRACT(HOUR FROM w.date) = 23
ORDER BY 
    c.name, w.date
"""

df_9am = spark.read.jdbc(url=url, table=f"({weather_9am_query}) as weather_9am", properties=properties)
df_9am = df_9am.dropDuplicates(['Date', 'Location'])

df_3pm = spark.read.jdbc(url=url, table=f"({weather_3pm_query}) as weather_3pm", properties=properties)
df_3pm = df_3pm.dropDuplicates(['Date', 'Location'])

In [None]:
df_3_9 = df_9am.join(df_3pm, on=['Date', 'Location'], how='inner')

In [None]:
df_open = df_daily.join(df_3_9, on=['Date', 'Location'], how='inner')

In [None]:
aus_weather_query = """
SELECT 
    EXTRACT(YEAR FROM date) || '-' || EXTRACT(MONTH FROM date) || '-' || EXTRACT(DAY FROM date) AS "Date",
    location AS "Location",
    min_temp AS "MinTemp",
    max_temp AS "MaxTemp",
    rainfall AS "Rainfall",
    evaporation AS "Evaporation",
    sunshine AS "Sunshine",
    wind_gust_dir AS "WindGustDir",
    wind_gust_speed AS "WindGustSpeed",
    temp_9am AS "Temp9am",
    humidity_9am AS "Humidity9am",
    cloud_9am AS "Cloud9am",
    wind_dir_9am AS "WindDir9am",
    wind_speed_9am AS "WindSpeed9am",
    pressure_9am AS "Pressure9am",
    temp_3pm AS "Temp3pm",
    humidity_3pm AS "Humidity3pm",
    cloud_3pm AS "Cloud3pm",
    wind_dir_3pm AS "WindDir3pm",
    wind_speed_3pm AS "WindSpeed3pm",
    pressure_3pm AS "Pressure3pm"
FROM 
    australian_meteorology_weather
"""

df_aus = spark.read.jdbc(url=url, table=f"({aus_weather_query}) as australian_weather", properties=properties)

df_aus = df_aus.withColumn("RainToday", when(col("Rainfall") >= 1, "Yes").otherwise("No"))
df_aus = df_aus.withColumn("RainTomorrow", lag("RainToday", -1).over(window_spec))
df_aus = df_aus.withColumn("RainTomorrow", when(col("RainTomorrow") == "Yes", "Yes").otherwise("No"))

In [None]:
df_final = df_open.unionByName(df_aus)

In [None]:
parquet_path = os.path.join(dir_path, 'dataParquet', 'weather_study.parquet')
df_final.write.parquet(parquet_path)