### Function to create and load data into fact table for hourly weather

In [0]:
@logger
def load_hourly_weather_data_with_forecast(clean_weather_df):
    
    # importing required libraries
    from pyspark.sql.functions import col, hour, lit, mean, max, min
    from datetime import datetime, timedelta
    
    clean_weather_df = clean_weather_df.withColumn("Date", col("created_on").cast('Date')).withColumn("timeID", hour('created_on'))
    
    try:
        timeID, date = clean_weather_df.select('timeID', 'Date').first()
        dateID = str(date).replace('-', '')
        query = f"delete from fact_hourly_weather where timeID>='{timeID}' and date_key>='{dateID}';"
        spark.sql(query)
    except:
        pass
    
    date_df = spark.table("dim_date_table")
    fact_weather_df = clean_weather_df.join(date_df, date_df.full_date == clean_weather_df.Date).select(
                            clean_weather_df.timeID,
                            date_df.date_key,
                            clean_weather_df.city_id,
                            clean_weather_df.temperature,
                            clean_weather_df.minimum_temperature,
                            clean_weather_df.maximum_temperature,
                            clean_weather_df.pressure,
                            clean_weather_df.humidity,
                            clean_weather_df.visibility,
                            clean_weather_df.wind_speed,
                            clean_weather_df.wind_degree,
                            clean_weather_df.wind_gust,
                            clean_weather_df.clouds_all
    )
    fact_weather_df = fact_weather_df.withColumn('Is_Forecast', lit(False))
    
    for i in range(1,5):
        ext_timeID = (timeID - i) % 24
        date_offset = int((timeID - i) // 24)
        ext_date = date + timedelta(date_offset)
        ext_dateID = str(date).replace('-', '')
        try:
            ext_weather_df = spark.sql(f"SELECT * FROM fact_hourly_weather WHERE timeID = {ext_timeID} and date_key = {ext_dateID}")
            ext_weather_df = ext_weather_df.drop('load_run_id', 'created_on', 'created_by')
            fact_weather_df = fact_weather_df.union(ext_weather_df)
        except:
            pass
        
    for i in range(1, 5):
        ext_timeID = (timeID + i) % 24
        date_offset = int((timeID + i) // 24)
        ext_date = date + timedelta(date_offset)
        ext_dateID = int(str(date).replace('-', ''))
        
        forecast_weather_df = fact_weather_df.groupby(
                       col('city_id')).agg(mean("temperature").alias('temperature'), 
                             min('minimum_temperature').alias('minimum_temperature'), 
                             max('maximum_temperature').alias('maximum_temperature'), 
                             mean('pressure').cast('int').alias('pressure'), 
                             mean('humidity').cast('int').alias('humidity'), 
                             mean('visibility').cast('int').alias('visibility'),
                             mean('wind_speed').cast('int').alias('wind_speed'),
                             mean('wind_degree').cast('int').alias('wind_degree'), 
                             mean('wind_gust').alias('wind_gust'),
                             mean('clouds_all').alias('clouds_all')
                            )
        forecast_weather_df = forecast_weather_df.withColumn('timeID', lit(ext_timeID)).withColumn('date_key', lit(ext_dateID)).withColumn('Is_Forecast', lit(True))
        forecast_weather_df = forecast_weather_df.select(
                                                            "timeID",
                                                            "date_key",
                                                            "city_id",
                                                            "temperature",
                                                            "minimum_temperature",
                                                            "maximum_temperature",
                                                            "pressure",
                                                            "humidity",
                                                            "visibility",
                                                            "wind_speed",
                                                            "wind_degree",
                                                            "wind_gust",
                                                            "clouds_all",
                                                            "Is_Forecast"
                                                        )
        fact_weather_df = fact_weather_df.union(forecast_weather_df)
        drop_time_ID = (ext_timeID - 5) % 24
        fact_weather_df = fact_weather_df.filter(col('timeID') != drop_time_ID)
    
    
    start = datetime.fromtimestamp(clean_weather_df.selectExpr("min(dt)").first()[0])
    end = datetime.fromtimestamp(clean_weather_df.selectExpr("max(dt)").first()[0])
    
    
    return fact_weather_df, start, end
    

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-2772749447538956>:1[0m
[0;32m----> 1[0m [38;5;129m@logger[39m
[1;32m      2[0m [38;5;28;01mdef[39;00m [38;5;21mload_hourly_weather_data_with_forecast[39m(clean_weather_df):
[1;32m      3[0m     
[1;32m      4[0m     [38;5;66;03m# importing required libraries[39;00m
[1;32m      5[0m     [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfunctions[39;00m [38;5;28;01mimport[39;00m col, hour, lit, mean, [38;5;28mmax[39m, [38;5;28mmin[39m
[1;32m      6[0m     [38;5;28;01mfrom[39;00m [38;5;21;01mdatetime[39;00m [38;5;28;01mimport[39;00m datetime, timedelta

[0;31mNameError[0m: name 'logger' is not defined

### Function to create and load data into fact table for daily weather

In [0]:
@logger
def load_daily_weather_data():
    
    from pyspark.sql.functions import col, mean, max, min, lit
    from datetime import datetime
    
    
#     hourly_weather_df = spark.sql(f"SELECT\
#                                       *\
#                                   FROM\
#                                       (select * from fact_hourly_weather where Is_Forecast = False)\
#                                   WHERE\
#                                       date_key =\
#                                       (\
#                                          SELECT max(date_key) FROM fact_hourly_weather\
#                                       );\
#                                    ").drop('load_run_id', 'created_on', 'created_by', 'timeID', 'Is_Forecast')

    hourly_weather_df = spark.table('fact_hourly_weather').filter(col('Is_Forecast') == False)
    max_date = hourly_weather_df.select(max(col("date_key"))).first()[0]
    hourly_weather_df = hourly_weather_df.filter(col('date_key') == max_date).drop('load_run_id', 'created_on', 'created_by', 'timeID', 'Is_Forecast')
    try:
        spark.sql(f"delete from fact_daily_weather where date_key = (select max(date_key) from fact_daily_weather);")
    except:
        pass
    
    daily_weather_df = hourly_weather_df.groupby(
                       col('city_id'), col('date_key')).agg(mean("temperature").alias('temperature'), 
                             min('minimum_temperature').alias('minimum_temperature'), 
                             max('maximum_temperature').alias('maximum_temperature'), 
                             mean('pressure').cast('int').alias('pressure'), 
                             mean('humidity').cast('int').alias('humidity'), 
                             mean('visibility').cast('int').alias('visibility'),
                             mean('wind_speed').cast('int').alias('wind_speed'),
                             mean('wind_degree').cast('int').alias('wind_degree'), 
                             mean('wind_gust').cast('float').alias('wind_gust'),
                             mean('clouds_all').alias('clouds_all')
                            )
    start = datetime.fromtimestamp(hourly_weather_df.selectExpr("min(date_key)").first()[0])
    end = datetime.fromtimestamp(hourly_weather_df.selectExpr("max(date_key)").first()[0])
    
    return daily_weather_df, start, end

