##### Description:
Transform weather-data in Bronze layer to convert complex data type values in source into tabular format and load it into Silver Layer . Store the final output as delta lake table in silver scheme.

Source File Path : "abfss://bronze@datalakestorageaccountname.dfs.core.windows.net/weather-data/

##### Input Columns:
daily:struct ,
daily_units:struct ,
latitude:double,
longitude:double,
marketName:string

##### Output Columns 
marketName ,weatherDate,unitOfTemparature,maximumTemparature ,minimumTemparature,unitOfRainFall,rainFall,latitude,longitude

##### Output table name : weather_data_silver

##### Transformation Rules:
Extract WeatherDate values from the source column daily.time

Extract maximumTemparature values from source column daily.temperature_2m_max

Extract  minimumTemparature values from source column daily.temperature_2m_min

Extract rainFall values from source column daily.rain_sum

Extract unitOfTemparature and unitOfRainFall values from source columns daily_units.temperature_2m_max and daily_units.rain_sum respectively

- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html#pyspark.sql.DataFrame.join" target="_blank">**DataFrame Joins** </a>

##### Step 1: Define the Variables to read weather-data ingested in bronze Layer

1. Replace <datalakestorageaccountname> with the ADLS account name crated in your account


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
weatherDataSourceLayerName = 'bronze'
weatherDataSourceStorageAccountName = 'madhupavanadls'
weatherDataSourceFolderName = 'weather-data'

weatherDataSourceFolderPath = f"abfss://{weatherDataSourceLayerName}@{weatherDataSourceStorageAccountName}.dfs.core.windows.net/{weatherDataSourceFolderName}"

##### Step 2: Create Spark Dataframe For weather-data in Json form stored in bronze layer

1. Define Spark Dataframe variable name as weatherDataBronzeDF
1. Use spark.read.json method to read the source data path defined above using the variable weatherDataSourceFolderPath 
1. Include display for converted Spark Dataframe variables to view the dataframe columns and data for further processing


In [0]:
weatherDataBronzeDF = spark.read.format("delta").load(weatherDataSourceFolderPath)
weatherDataBronzeDF_main = weatherDataBronzeDF.filter(length(col('market_name')) > 0)
original_df = weatherDataBronzeDF_main.checkpoint()


first_split = original_df.select(col('market_name'),col('latitude'),col('longitude'),explode(col('daily.time')).alias('weather_date'),
                                         col('daily_units.rain_sum').alias('unit_of_rainfall'),col('daily_units.temperature_2m_max').alias('unit_of_temperature')
                                         ,monotonically_increasing_id().alias('sequenceId')).cache()

                           
                        #    col('daily.temperature_2m_max').alias('maximumTemparature'),col('daily.temperature_2m_min').alias('minimumTemparature'),col('daily.rain_sum').alias('rainfall'),col('daily_units.rain_sum').alias('unit_of_rainfall'),col('daily_units.temperature_2m_max').alias('unit_of_temperature')).display()

In [0]:
temperature_2m_max_second_split =  original_df.select(explode(col('daily.temperature_2m_max')).alias('maximumTemparature'),monotonically_increasing_id().alias('temperature_2m_max_sequenceId')).cache()

In [0]:
temperature_2m_min_second_split =  original_df.select(explode(col('daily.temperature_2m_min')).alias('minimumTemparature'),monotonically_increasing_id().alias('temperature_2m_min_sequenceId')).cache()

##### Step 3: Convert Weathe Date Values in ARRAY format to ROWS Using Explode

1. Import all functions from pyspark.sql.functions package
1. Define New Spark Dataframe variable name as weatherDataDailyDateTransDF
1. Use Dataframe select method to select the columns given below from source Spark Dataframe variable weatherDataBronzeDF
1. First select column is "daily.time" and apply the explode function on this source column and also add alias for exploded values column as "weatherDate"
1. Along with above explode select the columns "marketName" , "latitude" , "longitude" from source Spark Dataframe
1. Last column in the select is running sequence id generated by Spark function monotonically_increasing_id() and add alias name as 'sequenceId'
1. Include display for converted Spark Dataframe variables to view the dataframe columns and data for further processing


In [0]:
temperature_2m_min_third_split =  original_df.select(explode(col('daily.rain_sum')).alias('rainfall'),monotonically_increasing_id().alias('rainfall_sequenceId')).cache()


In [0]:
spark.sparkContext.setCheckpointDir("/tmp/spark_checkpoints")

In [0]:
final_df = first_split.join(
    temperature_2m_max_second_split,
    first_split['sequenceId'] == temperature_2m_max_second_split['temperature_2m_max_sequenceId'],
    'inner'
).join(
    temperature_2m_min_second_split,
    first_split['sequenceId'] == temperature_2m_min_second_split['temperature_2m_min_sequenceId'],
    'inner'
).join(
    temperature_2m_min_third_split,
    first_split['sequenceId'] == temperature_2m_min_third_split['rainfall_sequenceId'],
    'inner'
).select(col('market_name'),col('latitude'),col('longitude'),col('weather_date'),col('unit_of_rainfall'),col('unit_of_temperature'),col('maximumTemparature'),col('minimumTemparature'),col('rainfall'))

final_data = final_df.checkpoint()



In [0]:
final_data = final_data.withColumn('weather_date', to_date(col('weather_date'),'yyyy-MM-dd'))
# DBTITLE 1

In [0]:
final_data.write.format('delta').mode('overwrite').saveAsTable('uc_prod.silver.weather_data_silver')

##### Step 4: Convert Maximum Temparature Values in ARRAY format to ROWS Using Explode

1. Define New Spark Dataframe variable name as weatherDataMaxTemparatureTransDF
1. Use Dataframe select method to select the columns given below from source Spark Dataframe variable weatherDataBronzeDF
1. First select column is "daily.temperature_2m_max" and apply the explode function on this source column and also add alias for exploded values column as "maximumTemparature"
1. Along with above explode select the columns "marketName" , "latitude" , "longitude" from source Spark Dataframe
1. Last column in the select is running sequence id generated by Spark function monotonically_increasing_id() and add alias name as 'sequenceId'
1. Add one more column from the Source Spark Dataframe "daily_units.temperature_2m_max" and provide alias name as "unitOfTemparature"
1. Include display for converted Spark Dataframe variables to view the dataframe columns and data for further processing

##### Step 5: Convert Minimum Temparature Values in ARRAY format to ROWS Using Explode

1. Define New Spark Dataframe variable name as weatherDataMinTemparatureTransDF
1. Use Dataframe select method to select the columns given below from source Spark Dataframe variable weatherDataBronzeDF
1. First select column is "daily.temperature_2m_min" and apply the explode function on this source column and also add alias for exploded values column as "minimumTemparature"
1. Along with above explode select the columns "marketName" , "latitude" , "longitude" from source Spark Dataframe
1. Last column in the select is running sequence id generated by Spark function monotonically_increasing_id() and add alias name as 'sequenceId'
1. Include display for converted Spark Dataframe variables to view the dataframe columns and data for further processing

##### Step 6: Convert Rain Fall Values in ARRAY format to ROWS Using Explode

1. Define New Spark Dataframe variable name as weatherDataRainFallTransDF
1. Use Dataframe select method to select the columns given below from source Spark Dataframe variable weatherDataBronzeDF
1. First select column is "daily.rain_sum" and apply the explode function on this source column and also add alias for exploded values column as "rainFall"
1. Along with above explode select the columns "marketName" , "latitude" , "longitude" from source Spark Dataframe
1. Last column in the select is running sequence id generated by Spark function monotonically_increasing_id() and add alias name as 'sequenceId'
1. Add one more column from the Source Spark Dataframe "daily_units.rain_sum" and provide alias name as "unitOfRainFall"
1. Include display for converted Spark Dataframe variables to view the dataframe columns and data for further processing

##### Step 7: Join All Intermediate Dataframes To Merge All Data & Write Into Silver Layer

1. Define New Spark Dataframe variable name as weatherDataTransDF
1. Join weatherDataDailyDateTransDF with weatherDataMaxTemparatureTransDF Using the Joining Columns ['marketName','latitude','longitude','sequenceId']
1. Extend weatherDataDailyDateTransDF with weatherDataMinTemparatureTransDF Using the Joining Columns ['marketName','latitude','longitude','sequenceId']
1. Extend weatherDataDailyDateTransDF with weatherDataRainFallTransDF Using the Joining Columns ['marketName','latitude','longitude','sequenceId']
1. Select the Columns "marketName" , "weatherDate" , "unitOfTemparature" , "maximumTemparature" , "minimumTemparature" , "unitOfRainFall" , "rainFall" , "latitude" and "longitude" to write final output columns into silve layer

##### Step 8: Write the Final Transformed Dataframe Into Silve Layer As Delta Table

1. Write Final Spark Dataframe weatherDataTransDF values using spark.write method
1. Use Write mode as overwrite 
1. Write the data into the Datalake Table "pricing_analytics.silver.weather_data_silver" using saveAsTable Method

##### Step 9: Test The Data Stored in Tranformed Silve Layer Table
1. Write SELECT query to select the data from pricing_analytics.silver.weather_data_silver table
1. Check the data for any one of the Market matches with the source data in Complex JSON format