#### Defining function to get fact data

In [0]:
@keep_log
def get_hourly_fact_weather(df):
    
    from datetime import datetime, timedelta
    from pyspark.sql.functions import hour, col, lit, min, max, mean
    df = df.withColumn('timeID', hour('created_on')).withColumn('Date', col('created_on').cast('date'))
    timeID, date = df.select('timeID', 'Date').first()
    dateID = str(date).replace('-', '')
    
    try:
        query = f"delete from hourly_fact_weather where timeID >= '{timeID}' and dateID >= '{dateID}';"
        spark.sql(query)
    except:
        pass
    
    df_date = spark.table('dim_date')
    df_fact = df.join(df_date, df.Date == df_date.fullDate).select(
                            df.timeID,
                            df_date.dateID,
                            df.city_id,
                            df.temp,
                            df.temp_min,
                            df.temp_max,
                            df.pressure,
                            df.humidity,
                            df.visibility,
                            df.wind_speed,
                            df.wind_deg,
                            df.wind_gust,
                            df.clouds_all
                        )
    df_fact = df_fact.withColumn('isForecasted', lit('No'))\
                .select('timeID', 'dateID', 'city_id', 'temp', 'temp_min', 'temp_max', 'pressure', 'humidity', 
                        'visibility', 'wind_speed', 'wind_deg', 'wind_gust', 'clouds_all', 'isForecasted')

    
    for i in range(1, 5):
        tID = (timeID - i) % 24
        dOffset = int((timeID - i) // 24)
        dID = date + timedelta(dOffset)
        dID = str(dID).replace('-', '')
        
        try:
            df_old = spark.sql(f"select * from hourly_fact_weather where timeID='{tID}' and dateID='{dID}'")\
                    .drop('load_run_id', 'created_on', 'created_by')
            df_fact = df_fact.union(df_old)
        except:
            pass
        
    for i in range(1, 5):
        tID = (timeID + i) % 24
        dOffset = int((timeID + i) // 24)
        dID = date + timedelta(dOffset)
        dID = str(dID).replace('-', '')
        
        df_pred = df_fact.groupBy(col('city_id'))\
                    .agg(mean("temp").alias('temp'), 
                         min('temp_min').alias('temp_min'), 
                         max('temp_max').alias('temp_max'), 
                         mean('pressure').cast('int').alias('pressure'), 
                         mean('humidity').cast('int').alias('humidity'), 
                         mean('visibility').cast('int').alias('visibility'),
                         mean('wind_speed').cast('int').alias('wind_speed'),
                         mean('wind_deg').cast('int').alias('wind_deg'), 
                         mean('wind_gust').alias('wind_gust'),
                         mean('clouds_all').alias('clouds_all'),
                        )
        
        df_pred = df_pred.withColumn('timeID', lit(tID)).withColumn('dateID', lit(dID)).withColumn('isForecasted', lit('Yes'))\
                    .select('timeID', 'dateID', 'city_id', 'temp', 'temp_min', 'temp_max', 'pressure', 'humidity', 
                        'visibility', 'wind_speed', 'wind_deg', 'wind_gust', 'clouds_all', 'isForecasted')
        
        df_fact = df_fact.union(df_pred)
        ntID = (tID - 5) % 24
        df_fact = df_fact.filter(df_fact.timeID != ntID)
        
    start = datetime.fromtimestamp(df.selectExpr("min(dt)").first()[0])
    end = datetime.fromtimestamp(df.selectExpr("max(dt)").first()[0])
    
    return df_fact, start, end

In [0]:
@keep_log
def get_daily_fact_weather(df):
    
    from datetime import datetime
    from pyspark.sql.functions import hour, col, min, max, mean
    
    df = df.withColumn('Date', col('created_on').cast('date'))  
    
    df_date = spark.table('dim_date')
    
    df_fact = df.join(df_date, df.Date == df_date.fullDate).select(
                            df_date.dateID,
                            df.city_id,
                            df.temp,
                            df.temp_min,
                            df.temp_max,
                            df.pressure,
                            df.humidity,
                            df.visibility,
                            df.wind_speed,
                            df.wind_deg,
                            df.wind_gust,
                            df.clouds_all
                        )
    
    df_daily_fact = df_fact.alias('df_daily_fact')
    
    try:
        date = df.select('Date').first()[0]
        dateID = str(date).replace('-', '')
#         print("============>>>", dateID)
        df_daily_fact = spark.sql(f"select * from daily_fact_weather where dateID='{dateID}';")\
                            .drop('load_run_id', 'created_on', 'created_by')
        
#         display(df_daily_fact)
        
        spark.sql(f"delete from daily_fact_weather where dateID='{dateID}';")
    except:
        df_daily_fact = df_fact.alias('df_daily_fact')
    
    
    df_fact = df_fact.union(df_daily_fact).groupBy(col('city_id'), col('dateID'))\
                    .agg(mean("temp").alias('temp'), 
                         min('temp_min').alias('temp_min'), 
                         max('temp_max').alias('temp_max'), 
                         mean('pressure').cast('int').alias('pressure'), 
                         mean('humidity').cast('int').alias('humidity'), 
                         mean('visibility').cast('int').alias('visibility'),
                         mean('wind_speed').cast('int').alias('wind_speed'),
                         mean('wind_deg').cast('int').alias('wind_deg'), 
                         mean('wind_gust').alias('wind_gust'),
                         mean('clouds_all').alias('clouds_all'),
                        )
    
    
    start = datetime.fromtimestamp(df.selectExpr("min(dt)").first()[0])
    end = datetime.fromtimestamp(df.selectExpr("max(dt)").first()[0])
    
    return df_fact, start, end