In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
import uuid


In [None]:
from pyspark.sql import SparkSession
session = SparkSession.builder.appName('sparksession').getOrCreate()

In [None]:
%run /Users/user/actionLogger

In [None]:

log_schema = StructType([
    StructField("id",StringType()),
    StructField("load_type", StringType()),
    StructField("table_name", StringType()),
    StructField("process_start_time", TimestampType()),
    StructField("process_end_time", TimestampType()),
    StructField("status", StringType()),
    StructField("comments", StringType()),
    StructField("start_date_time", TimestampType()),
    StructField("end_date_time", TimestampType()),
    StructField("created_on", TimestampType()),
    StructField("created_by", StringType())

])

In [None]:

IDudf = udf(lambda: str(uuid.uuid4()), StringType())

In [None]:
def lookup_data():
    
    data = session.read.format('delta').load('dbfs:/databases/weather/cleaned_weather_table')
    dim_location = session.read.format('delta').load('dbfs:/databases/weather/dim_location')
    dim_time = session.read.format('delta').load('dbfs:/databases/weather/dim_time')
    dim_date = session.read.format('delta').load('dbfs:/databases/weather/dim_date')


    dim_time = dim_time.withColumn('hr',hour(col('Hour')))

    hr_data = data.withColumn('hr',hour(col('datetime')))
    hr_data = hr_data.withColumn('Date',to_date(col('datetime')))

    hr_data = hr_data.join(dim_time,how='inner', on ='hr')
    hr_data = hr_data.join(dim_date, how = 'inner', on = 'Date')

    hr_data = hr_data.withColumn("Fact_HourID", IDudf())

    hr_data = hr_data.select(
                         col('Fact_HourID'),
                         col('TimeID').alias('HourID'),
                         col('Date_id').alias('DateID'),
                         col('city_id').alias('CityID'),

                         col('temp').cast('double').alias('Temperature'),
                         col('pressure').cast('double').alias('Pressure'),
                         col('humidity').cast('double').alias('Humidity'),
                         col('clouds').alias('Clouds'),
                         col('visibility').cast('double').alias('Visibility'),
                         col('wind_speed').cast('double').alias('Wind_speed'),
                         col('wind_deg').cast('double').alias('Wind_degree'),
                         col('wind_gust').cast('double').alias('Wind_gust'),
                         col('created_on'),
                         
        
                            )
    return hr_data

In [None]:
# hr_data = lookup_data()
# fact_hourly = session.read.format('delta').load('dbfs:/databases/weather/fact_hourly')



In [None]:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

def write_to_fact_hourly(hr_data):
    try:
        fact_hourly = session.read.format('delta').load('dbfs:/databases/weather/fact_hourly')   
         
        fct = fact_hourly.drop('Fact_HourID')
        fct = facct_hourly.drop('is_forecasted')
        hrly_data = hr_data.drop('Fact_HourID')
        
        comparison_columns = ['HourID','DateID','CityID']

        new = fct.unionAll(hrly_data.subtract(fct))

        window_spec = Window.partitionBy(comparison_columns)\
                    .orderBy(col('created_on').desc())
        new = new.withColumn('row_number',row_number().over(window_spec))

        new = new.filter(col('row_number') == 1)
        new = new.drop('row_number')
        new = new.withColumn("Fact_HourID", IDudf())
        new = new.withColumn("is_forecasted", lit("N"))
        
        new.write.option("overwriteSchema", "true")\
            .format('delta')\
            .mode('overwrite')\
            .save('dbfs:/databases/weather/fact_hourly')   
    except: 
        hr_data.write.option("overwriteSchema", "true")\
            .format('delta')\
            .mode('overwrite')\
            .save('dbfs:/databases/weather/fact_hourly')   
       

In [None]:
def load_fact_hourly():
    
    process_start_time =  session.sql("SELECT current_timestamp()").collect()[0][0]
    log_dict = {'id': str(uuid.uuid4().hex),
            'load_type': 'fact_load_hourly',
            'table_name': 'fact_hourly',
            'process_start_time':  session.sql("SELECT current_timestamp()").collect()[0][0],
            'process_end_time': session.sql("SELECT current_timestamp()").collect()[0][0],
            'start_date_time':  session.sql("SELECT current_timestamp()").collect()[0][0],
            'end_date_time':  session.sql("SELECT current_timestamp()").collect()[0][0]}
    
    log = action_logger(log_dict)
        
    
    try:
        hr_data = lookup_data()
        data = session.read.format('delta').load('dbfs:/databases/weather/cleaned_weather_table')
        
        log_dict = {
               'process_start_time' : process_start_time,
               'process_end_time' : session.sql("SELECT current_timestamp()").collect()[0][0],
               'status' : 'extracting',
               'start_date_time' : data.select(min('created_on')).first()[0],
               'end_date_time' : data.select(max('created_on')).first()[0]}
            
        log.action(log_dict)
        
        
        write_to_fact_hourly(hr_data)
        
        
            
        log_dict = {
               'process_start_time' : process_start_time,
               'process_end_time' : session.sql("SELECT current_timestamp()").collect()[0][0],
               'status' : 'completed',
               'start_date_time' : data.select(min('created_on')).first()[0],
               'end_date_time' : data.select(max('created_on')).first()[0]}
            
        log.action(log_dict)    
            
            
    except Exception as e:
        error_dict = {
               'process_start_time' : process_start_time,
               'process_end_time' : session.sql("SELECT current_timestamp()").collect()[0][0],
               'status' : 'error',
               'error_data' : e,
               'start_date_time' : session.sql("SELECT current_timestamp()").collect()[0][0],
               'end_date_time' : session.sql("SELECT current_timestamp()").collect()[0][0]
        }
        log.action(error_dict)
            