In [10]:
import pyspark.sql.functions as F

In [11]:
rawfolder = "abfss://raw@dobbinsdatalake.dfs.core.windows.net/weatherbit/*.json"
schema_file = "abfss://raw@dobbinsdatalake.dfs.core.windows.net/weatherbit/2023_07_21_16_00_23_UTC.json"
silver_folder = "abfss://silver@dobbinsdatalake.dfs.core.windows.net/delta/weather_silver/weather"

In [12]:
df_schema = spark.read.format("json").option("inferSchema", True).load(schema_file).schema

In [13]:
df = spark.readStream.format("json").schema(df_schema).load(rawfolder).withColumn("SourceFileName", F.input_file_name())

In [14]:
df = (
    df.withColumn('exploded', F.explode('data').alias('exploded'))
    .withColumn('pres', F.expr('exploded.pres'))
    .withColumn('snow', F.expr('exploded.snow'))
    .withColumn('uv', F.expr('exploded.uv'))
    .withColumn('aqi', F.expr('exploded.aqi'))
    .withColumn('gust', F.expr('exploded.gust'))
    .withColumn('precip', F.expr('exploded.precip'))
    .withColumn('rh', F.expr('exploded.rh'))
    .withColumn('app_temp', F.expr('exploded.app_temp'))
    .withColumn('timezone', F.expr('exploded.timezone'))
    .withColumn('wind_spd', F.expr('exploded.wind_spd'))
    .withColumn('ts', F.expr('exploded.ts'))
    .withColumn('datetime', F.expr('exploded.datetime'))
    .withColumn('vis', F.expr('exploded.vis'))
    .withColumn('solar_rad', F.expr('exploded.solar_rad'))
    .withColumn('pod', F.expr('exploded.pod'))
    .withColumn('country_code', F.expr('exploded.country_code'))
    .withColumn('clouds', F.expr('exploded.clouds'))
    .withColumn('elev_angle', F.expr('exploded.elev_angle'))
    .withColumn('dni', F.expr('exploded.dni'))
    .withColumn('temp', F.expr('exploded.temp'))
    .withColumn('ob_time', F.expr('exploded.ob_time'))
    .withColumn('sunrise', F.expr('exploded.sunrise'))
    .withColumn('wind_cdir_full', F.expr('exploded.wind_cdir_full'))
    .withColumn('state_code', F.expr('exploded.state_code'))
    .withColumn('lon', F.expr('exploded.lon'))
    .withColumn('city_name', F.expr('exploded.city_name'))
    .withColumn('sunset', F.expr('exploded.sunset'))
    .withColumn('slp', F.expr('exploded.slp'))
    .withColumn('weather_code', F.expr('exploded.weather.code'))
    .withColumn('weather_description', F.expr('exploded.weather.description'))
    .withColumn('weather_icon', F.expr('exploded.weather.icon'))
    .withColumn('h_angle', F.expr('exploded.h_angle'))
    .withColumn('ghi', F.expr('exploded.ghi'))
    .withColumn('wind_dir', F.expr('exploded.wind_dir'))
    .withColumn('dewpt', F.expr('exploded.dewpt'))
    .withColumn('dhi', F.expr('exploded.dhi'))
    .withColumn('lat', F.expr('exploded.lat'))
    .withColumn('wind_cdir', F.expr('exploded.wind_cdir'))
    .withColumn('station', F.expr('exploded.station'))
    .withColumn('SourceFileName', F.regexp_replace("SourceFileName", "abfss://raw@dobbinsdatalake.dfs.core.windows.net/weatherbit/", ""))
    .drop(*('data','exploded','count'))
    )

In [15]:
# # this works and is much simpler, leave it in for demo purposes

# display(spark.sql(f""" 
#     with cte as 
#     (
#     select explode(data) as exploded, * 
#     from df
#     )
#     select exploded.weather.*, exploded.* from cte
# """))


In [16]:
streamingQuery = (df
    .writeStream
    .format("delta")
    .option("checkpointLocation", silver_folder + "/checkpoint")
    .option("mergeSchema", True)
    .outputMode("append")
    .queryName("weather_stream")
    .trigger(once=True)
    .start(silver_folder)
)

In [17]:
for s in spark.streams.active:
    print(s.name)
    print(s.status)
    print(s.lastProgress)
    s.awaitTermination()

In [18]:
%%sql

create database if not exists weather_silver;

In [19]:
%%sql

use database weather_silver;

In [20]:
%%sql

create table if not exists weather_silver.weather
using delta
location 'abfss://silver@dobbinsdatalake.dfs.core.windows.net/delta/weather_silver/weather'

#### Time travel and change data feed
###### This is for "clone and demo purposes," do not leave uncommented

In [21]:
# %%sql

#-- ALTER TABLE weather_silver.weather SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

In [22]:
# %%sql

# DESCRIBE extended weather_silver.weather
# DESCRIBE history weather_silver.weather


In [23]:
# version as ints or longs

# display(spark.read.format("delta") \
#   .option("readChangeFeed", "true") \
#   .option("startingVersion", 24) \
#   .option("endingVersion", 25) \
#   .table("weather_silver.weather"))

In [24]:
# df4 = spark.read\
#   .format('delta')\
#   .option('versionAsOf', 20)\
#   .load(silver_folder)

In [25]:
#this will run the silver to gold notebook using the same spark session

# mssparkutils.notebook.run("Weather silver to gold", 90)

In [26]:
mssparkutils.notebook.exit('Success')