In [0]:
%run ./utilityFunctions

In [0]:
# Mount Storage
mountPoint = "/mnt/weathernowFiles"
mountStorage(accountName="weathernowstorage", 
             containerName="weathernowdatastore",
             folderName="weatherNowData",
             mountPoint=mountPoint, 
             scopeName="weatherNowKeyVault", 
             applicationIDKeyName="ApplicationID", 
             clientIDKeyName="ClientSecret", 
             tenantIDKeyName="TenantID")

In [0]:
# Initialize variables
from pyspark.sql.functions import col, explode, from_unixtime
import re

def addLocalTimeToCurrentDf(dataFrame):
    colNameList = ["dt", "sunrise", "sunset", "timezone_offset"]
    dfColList = dataFrame.columns
    for name in colNameList:
        assert name in dfColList
    dataFrame = dataFrame.withColumn("dt_local", col("dt") + col("timezone_offset")).withColumn("sunrise_local", col("sunrise") + col("timezone_offset")).withColumn("sunset_local", col("sunset") + col("timezone_offset")).drop(col("timezone_offset"))
    return dataFrame


def addLocalTimeToHourlyDf(dataFrame):
    colNameList = ["dt", "timezone_offset"]
    dfColList = dataFrame.columns
    for name in colNameList:
        assert name in dfColList
    dataFrame = dataFrame.withColumn("dt_local", col("dt") + col("timezone_offset")).drop(col("timezone_offset"))
    return dataFrame


def addLocalTimeToDailyDf(dataFrame):
    colNameList = ["dt", "moonrise", "moonset", "sunrise", "sunset", "timezone_offset"]
    dfColList = dataFrame.columns
    for name in colNameList:
        assert name in dfColList
    dataFrame = dataFrame.withColumn("dt_local", col("dt") + col("timezone_offset")).withColumn("moonrise_local", col("moonrise") + col("timezone_offset")).withColumn("moonset_local", col("moonset") + col("timezone_offset")).withColumn("sunrise_local", col("sunrise") + col("timezone_offset")).withColumn("sunset_local", col("sunset") + col("timezone_offset")).drop(col("timezone_offset"))
    return dataFrame


def convertTimeUTCToHuman(dataFrame, columnName):
    dataFrame = dataFrame.withColumn(columnName, from_unixtime(columnName, "yyyy-MM-dd HH:00"))
    return dataFrame


def convertTimeDfUTCToHumanCurrent(dataFrame):
    colNameList = ["dt", "dt_local", "sunrise", "sunrise_local", "sunset", "sunset_local"]
    dfColList = dataFrame.columns
    for name in colNameList:
        assert name in dfColList
    dataFrame = convertTimeUTCToHuman(dataFrame, "dt")
    dataFrame = convertTimeUTCToHuman(dataFrame, "dt_local")
    dataFrame = convertTimeUTCToHuman(dataFrame, "sunrise")
    dataFrame = convertTimeUTCToHuman(dataFrame, "sunrise_local")
    dataFrame = convertTimeUTCToHuman(dataFrame, "sunset")
    dataFrame = convertTimeUTCToHuman(dataFrame, "sunset_local")
    return dataFrame


def convertTimeDfUTCToHumanHourly(dataFrame):
    colNameList = ["dt", "dt_local"]
    dfColList = dataFrame.columns
    for name in colNameList:
        assert name in dfColList
    dataFrame = convertTimeUTCToHuman(dataFrame, "dt")
    dataFrame = convertTimeUTCToHuman(dataFrame, "dt_local")
    return dataFrame


def convertTimeDfUTCToHumanDaily(dataFrame):
    colNameList = ["dt", "dt_local", "moonrise", "moonrise_local", "moonset", "moonset_local", "sunrise", "sunrise_local", "sunset", "sunset_local"]
    dfColList = dataFrame.columns
    for name in colNameList:
        assert name in dfColList
    dataFrame = convertTimeUTCToHuman(dataFrame, "dt")
    dataFrame = convertTimeUTCToHuman(dataFrame, "dt_local")
    dataFrame = convertTimeUTCToHuman(dataFrame, "moonrise")
    dataFrame = convertTimeUTCToHuman(dataFrame, "moonrise_local")
    dataFrame = convertTimeUTCToHuman(dataFrame, "moonset")
    dataFrame = convertTimeUTCToHuman(dataFrame, "moonset_local")
    dataFrame = convertTimeUTCToHuman(dataFrame, "sunrise")
    dataFrame = convertTimeUTCToHuman(dataFrame, "sunrise_local")
    dataFrame = convertTimeUTCToHuman(dataFrame, "sunset")
    dataFrame = convertTimeUTCToHuman(dataFrame, "sunset_local")
    return dataFrame


def createRainTable(dataFrame):
    dataFrame = dataFrame.na.drop(subset=["rain"]).select("capital", "continent", "country", "time_data_queried_utc", "dt", "dt_local", "rain.*").withColumnRenamed("1h", "rain_last_hour")
    return dataFrame

  
def createSnowTable(dataFrame):
    dataFrame = dataFrame.na.drop(subset=["snow"]).select("capital", "continent", "country", "time_data_queried_utc", "dt", "dt_local", "snow.*").withColumnRenamed("1h", "snow_last_hour")
    return dataFrame
    
    
def createWeatherTable(dataFrame):
    dataFrame = dataFrame.select("capital", "continent", "country", "time_data_queried_utc", "dt_local", explode("weather").alias("weather"))
    dataFrame = dataFrame.select("capital", "continent", "country", "time_data_queried_utc", "dt_local", "weather.*").withColumnRenamed("icon", "weather_icon").withColumnRenamed("id", "weather_description_id").withColumnRenamed("main", "description_short")
    return dataFrame


def createFeelsLikeTable(dataFrame):
    dataFrame = dataFrame.select("capital", "continent", "country", "time_data_queried_utc", "dt", "dt_local", "feels_like.*").withColumnRenamed("day", "day_temp").withColumnRenamed("eve", "eve_temp").withColumnRenamed("morn", "morn_temp").withColumnRenamed("night", "night_temp")
    return dataFrame


def createTempTable(dataFrame):
    dataFrame = dataFrame.select("capital", "continent", "country", "time_data_queried_utc", "dt", "dt_local", "temp.*").withColumnRenamed("day", "day_temp").withColumnRenamed("eve", "eve_temp").withColumnRenamed("morn", "morn_temp").withColumnRenamed("night", "night_temp").withColumnRenamed("max", "max_temp").withColumnRenamed("min", "min_temp")
    return dataFrame


In [0]:
listBronzeTables = spark.catalog.listTables("bronze")
# listTables contains a list of tables. The first element of each list is the table name.
nameCurrentBronzeTables = [listBronzeTables[x][0] for x in range(0, len(listBronzeTables)) if re.match("current*", listBronzeTables[x][0])]
initCurrentName = "bronze." + nameCurrentBronzeTables[0]
initCurrentTable = spark.table(initCurrentName)
initCurrentTable = addLocalTimeToCurrentDf(initCurrentTable)
initCurrentTable = convertTimeDfUTCToHumanCurrent(initCurrentTable)
initCurrentTable = initCurrentTable.select(sorted(initCurrentTable.columns))
initCurrentWeather = createWeatherTable(initCurrentTable)
initCurrentWeather = initCurrentWeather.select(sorted(initCurrentWeather.columns))
nameHourlyBronzeTables = [listBronzeTables[x][0] for x in range(0, len(listBronzeTables)) if re.match("hourly*", listBronzeTables[x][0])]
initHourlyName = "bronze." + nameHourlyBronzeTables[0]
initHourlyTable = spark.table(initHourlyName)
initHourlyTable = addLocalTimeToHourlyDf(initHourlyTable)
initHourlyTable = convertTimeDfUTCToHumanHourly(initHourlyTable)
initHourlyTable = initHourlyTable.select(sorted(initHourlyTable.columns))
initHourlyWeather = createWeatherTable(initHourlyTable)
initHourlyWeather = initHourlyWeather.select(sorted(initHourlyWeather.columns))
nameDailyBronzeTables = [listBronzeTables[x][0] for x in range(0, len(listBronzeTables)) if re.match("daily*", listBronzeTables[x][0])]
initDailyName = "bronze." + nameDailyBronzeTables[0]
initDailyTable = spark.table(initDailyName)
initDailyTable = addLocalTimeToDailyDf(initDailyTable)
initDailyTable = convertTimeDfUTCToHumanDaily(initDailyTable)
initDailyTable = initDailyTable.select(sorted(initDailyTable.columns))
initDailyTemp = createTempTable(initDailyTable)
initDailyTemp = initDailyTemp.select(sorted(initDailyTemp.columns))
initDailyFeelsLike = createFeelsLikeTable(initDailyTable)
initDailyFeelsLike = initDailyFeelsLike.select(sorted(initDailyFeelsLike.columns))
initDailyWeather = createWeatherTable(initDailyTable)
initDailyWeather = initDailyWeather.select(sorted(initDailyWeather.columns))

for counter in range(1, len(nameCurrentBronzeTables)):
    currentName = "bronze." + nameCurrentBronzeTables[counter]
    currentTable = spark.table(currentName)
    currentTable = addLocalTimeToCurrentDf(currentTable)
    currentTable = convertTimeDfUTCToHumanCurrent(currentTable)
    currentTable = currentTable.select(sorted(currentTable.columns))
    currentWeather = createWeatherTable(currentTable)
    currentWeather = currentWeather.select(sorted(currentWeather.columns))
    initCurrentTable = initCurrentTable.union(currentTable)
    initCurrentWeather = initCurrentWeather.union(currentWeather)

for counter in range(1, len(nameHourlyBronzeTables)):
    hourlyName = "bronze." + nameHourlyBronzeTables[counter]
    hourlyTable = spark.table(hourlyName)
    hourlyTable = addLocalTimeToHourlyDf(hourlyTable)
    hourlyTable = convertTimeDfUTCToHumanHourly(hourlyTable)
    hourlyTable = hourlyTable.select(sorted(hourlyTable.columns))
    hourlyWeather = createWeatherTable(hourlyTable)
    hourlyWeather = hourlyWeather.select(sorted(hourlyWeather.columns))
    initHourlyTable = initHourlyTable.union(hourlyTable)
    initHourlyWeather = initHourlyWeather.union(hourlyWeather)
    
    
for counter in range(1, len(nameDailyBronzeTables)):
    dailyName = "bronze." + nameDailyBronzeTables[counter]
    dailyTable = spark.table(dailyName)
    dailyTable = addLocalTimeToDailyDf(dailyTable)
    dailyTable = convertTimeDfUTCToHumanDaily(dailyTable)
    dailyTable = dailyTable.select(sorted(dailyTable.columns))
    dailyTemp = createTempTable(dailyTable)
    dailyTemp = dailyTemp.select(sorted(dailyTemp.columns))
    dailyFeelsLike = createFeelsLikeTable(dailyTable)
    dailyFeelsLike = dailyFeelsLike.select(sorted(dailyFeelsLike.columns))
    dailyWeather = createWeatherTable(dailyTable)
    dailyWeather = dailyWeather.select(sorted(dailyWeather.columns))
    initDailyTable = initDailyTable.union(dailyTable)
    initDailyTemp = initDailyTemp.union(dailyTemp)
    initDailyFeelsLike = initDailyFeelsLike.union(dailyFeelsLike)
    initDailyWeather = initDailyWeather.union(dailyWeather)

databaseName = "silver"
    
writeDfToDeltaTable(initCurrentTable, databaseName, "currentfacttable", "append", "time_data_queried_utc")
writeDfToDeltaTable(initCurrentWeather, databaseName, "currentweatherfacttable", "append", "time_data_queried_utc")
writeDfToDeltaTable(initHourlyTable, databaseName, "hourlyfacttable", "append", "time_data_queried_utc")
writeDfToDeltaTable(initHourlyWeather, databaseName, "hourlyweatherfacttable", "append", "time_data_queried_utc")
writeDfToDeltaTable(initDailyTable, databaseName, "dailyfacttable", "append", "time_data_queried_utc")
writeDfToDeltaTable(initDailyTemp, databaseName, "dailytempfacttable", "append", "time_data_queried_utc")
writeDfToDeltaTable(initDailyFeelsLike, databaseName, "dailyfeelslikefacttable", "append", "time_data_queried_utc")
writeDfToDeltaTable(initDailyWeather, databaseName, "dailyweatherfacttable", "append", "time_data_queried_utc")

In [0]:
currentYear = getCurrentYearUTC()
currentMonth = getCurrentMonthUTC()
currentDay = getCurrentDayUTC()
storagePath = mountPoint + "/" + databaseName + "/" + currentYear + "/" + currentMonth + "/" + currentDay + "/"
currentFactTablePath = storagePath + "currentfacttable"
currentWeatherFactTablePath = storagePath + "currentweatherfacttable"
hourlyFactTablePath = storagePath + "hourlyfacttable"
hourlyWeatherFactTablePath = storagePath + "hourlyweatherfacttable"
dailyFactTablePath = storagePath + "dailyfacttable"
dailyFeelsLikeFactTablePath = storagePath + "dailyfeelslikefacttable"
dailyTempFactTablePath = storagePath + "dailytempfacttable"
dailyWeatherFactTablePath = storagePath + "dailyweatherfacttable"

writeDfToStorage(initCurrentTable.drop("weather"), currentFactTablePath)
writeDfToStorage(initCurrentWeather, currentWeatherFactTablePath)
writeDfToStorage(initHourlyTable.drop("weather"), hourlyFactTablePath)
writeDfToStorage(initHourlyWeather, hourlyWeatherFactTablePath)
writeDfToStorage(initDailyTable.drop("feels_like", "weather", "temp"), dailyFactTablePath)
writeDfToStorage(initDailyFeelsLike, dailyFeelsLikeFactTablePath)
writeDfToStorage(initDailyTemp, dailyTempFactTablePath)
writeDfToStorage(initDailyWeather, dailyWeatherFactTablePath)

In [0]:
unmountStorage(mountPoint)

In [0]:
clearBronzeDatabase()