In [0]:
%run "./weather_ETL_logger"

In [0]:
# Creating a fact table for inserting hourly data
def create_fact_table():
    spark.sql("""
        create table fact_weather_data_hourly(
          date_alt_key DATE,
          time_alt_key TIMESTAMP,
          id INT,
          name VARCHAR(100),
          lat FLOAT,
          lon FLOAT,
          temp FLOAT,
          temp_min FLOAT,
          temp_max FLOAT,
          pressure INT,
          humidity INT,
          visibility INT,
          wind_speed FLOAT,
          wind_deg FLOAT,
          wind_gust FLOAT,
          clouds_all INT
        )
   """)

In [0]:
# Function to insert into hourly data
@update_log
def insert_into_weather_hourly(df):
    spark.sql("drop view if exists new_processed_data")
    df_processed.createTempView('new_processed_data')
    df = spark.sql("""
        select
          dd.date_alt_key,
          dt.time_alt_key,
          dc.id,
          dc.name,
          pd.lat,
          pd.lon,
          pd.temp,
          pd.temp_min,
          pd.temp_max,
          pd.pressure,
          pd.humidity,
          pd.visibility,
          pd.wind_speed,
          pd.wind_deg,
          pd.wind_gust,
          pd.clouds_all
        from
          new_processed_data as pd
          inner join dim_date as dd on pd.date = dd.date_alt_key
          inner join dim_time as dt on hour(pd.time) = hour(dt.time_alt_key)
          inner join dim_city as dc on pd.city_name = dc.name
        where
          (pd.date, hour(pd.time)) not in (select distinct date_alt_key, hour(time_alt_key) from fact_weather_data_hourly)
    """)
    return df, spark.sql("select current_timestamp() as ts").collect()[0].ts, spark.sql("select current_timestamp() as ts").collect()[0].ts

In [0]:
# Function to insert into daily data
@update_log
def insert_into_weather_daily(data):
    df = spark.sql("""
        select
            date_alt_key,
            id,
            name,
            lat,
            lon,
            AVG(temp) OVER (PARTITION BY name,date_alt_key) as avg_temp,
            MIN(temp_min) OVER (PARTITION BY name,date_alt_key) as min_temp,
            MAX(temp_max) OVER (PARTITION BY name,date_alt_key) as max_temp,
            AVG(pressure) OVER (PARTITION BY name,date_alt_key) as avg_pressure,
            AVG(humidity) OVER (PARTITION BY name,date_alt_key) as avg_humidity,
            AVG(visibility) OVER (PARTITION BY name,date_alt_key) as avg_visibility,
            AVG(wind_speed) OVER (PARTITION BY name,date_alt_key) as avg_wind_speed,
            AVG(wind_deg) OVER (PARTITION BY name,date_alt_key) as avg_wind_deg,
            AVG(wind_gust) OVER (PARTITION BY name,date_alt_key) as avg_wind_gust,
            AVG(clouds_all) OVER (PARTITION BY name,date_alt_key) as avg_clouds_all 
        from
            fact_weather_hourly
        where
            date_alt_key not in (select distinct date_alt_key from fact_weather_daily)
    """)
    return df, spark.sql("select current_timestamp() as ts").collect()[0].ts, spark.sql("select current_timestamp() as ts").collect()[0].ts