In [0]:
%run /Users/075bei014.gokarna@pcampus.edu.np/MaintainingLog

In [0]:
import uuid
from pyspark.sql.functions import col, substring, regexp_replace, concat, avg, lit
from pyspark.sql.types import IntegerType

In [0]:
df_clean = spark.table("CLEAN.clean_weather_data")
df_log_table = spark.table("FACT.log_table")
df_dim_date = spark.table("dim_date")
df_dim_time = spark.table("dim_time")
df_dim_city = spark.table("dim_city")

# Create a Fact Hourly Table and Fact Daily Table

In [0]:
%sql
-- DROP TABLE fact_hourly_table;
CREATE TABLE IF NOT EXISTS fact_hourly_table
(
  p_key_hourly string,
  date_key int,
  time_key int,
  CityId int,
  timezone int,
  temp double,
  temp_min double,
  temp_max double,
  pressure double,
  humidity double,
  visibility double,
  speed double,
  deg double,
  gust double,
  is_forecasted_data boolean
);

-- DROP TABLE fact_daily_table;

CREATE TABLE IF NOT EXISTS fact_daily_table
(
  date_key int,
  CityId int,
  timezone int,
  temp double,
  temp_min double,
  temp_max double,
  pressure double,
  humidity double,
  visibility double,
  speed double,
  deg double,
  gust double
);

In [0]:
df_to_load = df_clean.select(col("id").alias("CityID"), 'timezone', 'temp', 'temp_min', 'temp_max', 'pressure', 'humidity', 'visibility', 'speed', 'deg', 'gust', 'created_on')

df_to_load = df_to_load.withColumn("p_key_daily", regexp_replace(substring(col("created_on"), 0, 10), "-", ""))
df_to_load = df_to_load.withColumn("time", regexp_replace(substring(col("created_on"), 11, 3), ":", ""))
df_to_load = df_to_load.withColumn("p_key_hourly", concat(col("p_key_daily"), col("time")))

df_to_load = df_to_load.withColumn("date_key", col("p_key_daily").cast("integer"))
df_to_load = df_to_load.withColumn("time_key", col("time").cast("integer"))

df_to_load = df_to_load.withColumn("p_key_hourly", concat(col("p_key_hourly"), lit("-"), col("CityID")))
# df_to_load = df_to_load.withColumn("p_key_hourly", concat(col("p_key_hourly"), col("CityID")))


df_to_load = df_to_load.drop("p_key_daily")
df_to_load = df_to_load.drop("time")
df_to_load = df_to_load.drop("created_on")


df_to_load = df_to_load.withColumn("is_forecasted_data", lit(False))



df_to_load = df_to_load.drop("created_on")

# df_to_load.show()
# df_clean = df_clean.withColumn("created_time", to_timestamp(col("created_on")).cast("time"))

In [0]:
fact_hourly_table_cols = ["p_key_hourly", "date_key", "time_key", "CityId", "timezone", "temp", "temp_min", "temp_max", "pressure", "humidity", "visibility", "speed", "deg", "gust", "is_forecasted_data"]
fact_daily_table_cols = ["date_key", "CityId", "timezone", "temp", "temp_min", "temp_max", "pressure", "humidity", "visibility", "speed", "deg", "gust"]

# Upsert Function

In [0]:
def load_hourly():
    LogTable.load('FACT', 'STARTED', 'fact_hourly_table')
    df_test = spark.table('fact_hourly_table')
    dateTimeInTable = df_test.select('p_key_hourly').rdd.flatMap(lambda x: x).collect()
    
    # Upserting
    
    df_for_hourly = df_to_load.select(*fact_hourly_table_cols).filter(col("is_forecasted_data") | ~col("p_key_hourly").isin(dateTimeInTable))
    
    
    
    df_for_hourly.distinct().write.format('delta').mode('append').option("mergeSchema", "true").saveAsTable('fact_hourly_table')
    LogTable.load('FACT', 'COMPLETED', 'fact_hourly_table')
    

In [0]:
def load_daily():
    LogTable.load('FACT', 'STARTED', 'fact_daily_table')
    
    df_for_daily = df_to_load.select(*fact_daily_table_cols)
    column_list = ["timezone", "temp", "temp_min", "temp_max", "pressure", "humidity", "visibility", "speed", "deg", "gust"]

    df_test = spark.table('fact_daily_table')
    key = df_test.select('date_key').rdd.flatMap(lambda x: x).collect()

    grouped_df = df_for_daily.groupBy("date_key", "cityId").agg(
            *[ avg(col(column)).alias(column) for column in column_list]
        ).filter(~col('date_key').isin(key))
    
    grouped_df = grouped_df.withColumn("timezone", col("timezone").cast(IntegerType()))
    
#     display(grouped_df)
#     grouped_df.printSchema()
    grouped_df.write.format('delta').mode('append').option("mergeSchema", "true").saveAsTable('fact_daily_table')
    LogTable.load('FACT', 'COMPLETED', 'fact_daily_table')
    
# This is fact data to load in daily table
# load_daily()