In [0]:
from pyspark.sql.functions import *

#### 1. Creating dimension city table

In [0]:
%sql

CREATE TABLE IF NOT EXISTS dim_city (
    city_id INT,
    city_name STRING,
    country STRING,
    latitude float,
    longitude float
)

#### Inserting the data into the dim_city from the cleaned_weather table

In [0]:
def load_city():
    data = spark.sql("""
                        SELECT DISTINCT city_id, city_name, country, lat, lon
                        FROM weather_processed
                     """)

    # Enable schema migration
    spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

    # Insert the selected data into the dim_city table
    data.write.format("delta").option("mergeSchema", "true").mode("append").saveAsTable("dim_city")


#### 2. Creating the date dimension table

#### Inserting data into dim_date table

In [0]:
def load_date(start_date='2023-01-01', end_date='2023-12-31'):
  
    # importing necessary libraries
    from pyspark.sql import functions as F
    from pyspark.sql.types import IntegerType
    from pyspark.pandas import date_range
 
    # creating daterange
    date_df = spark.createDataFrame(date_range(start=start_date, end=end_date).to_numpy(), ['Date'])
    # creating other columns
    date_df = date_df.withColumn('dateID', F.date_format('Date', 'yyyyMMdd').cast(IntegerType()))
    date_df = date_df.withColumn('fullDate', F.date_format('Date', 'yyyy-MM-dd').cast('DATE'))
    date_df = date_df.withColumn('monthName', F.date_format('Date', 'MMMM'))
    date_df = date_df.withColumn('monthNumOfYear', F.month('Date'))
    date_df = date_df.withColumn('dayNameOfWeek', F.date_format('Date', 'EEEE'))
    date_df = date_df.withColumn('dayNumOfWeek', F.dayofweek('Date'))
    date_df = date_df.withColumn('dayNumOfMonth', F.dayofmonth('Date'))
    date_df = date_df.withColumn('dayNumOfYear', F.dayofyear('Date'))
    date_df = date_df.withColumn('weekNumOfYear', F.weekofyear('Date'))
    date_df = date_df.withColumn('quarterName', F.concat(F.lit('Q').cast('string'), F.quarter('Date')))
    date_df = date_df.withColumn('calenderQuarter', F.quarter('Date'))
    date_df = date_df.withColumn('calenderYear', F.year('Date'))
    date_df = date_df.drop('Date')
 
    # loading into dim_date table
    date_df.write.format('delta').mode('overwrite').saveAsTable('dim_date')


#### 3. Creating the time dimension table and generating the time

In [0]:
def load_time():
 
    from pyspark.sql.functions import col
 
    time_ranges = []
    
    for hour in range(24):
        start_time = f"{hour:02d}:00"  # Format start time as HH:00
        end_time = f"{hour:02d}:59"  # Format end time as HH:59
        time_ranges.append({"timeID": hour, "startTime": start_time, "endTime": end_time})
    
    dfs = spark.createDataFrame(time_ranges)
    dfs = dfs.withColumn('timeID', col('timeID').cast('int'))
    dfs.write.format('delta').mode('overwrite').saveAsTable('dim_time')