In [0]:
from pyspark.sql.functions import *

In [0]:
dbutils.widgets.text('p_catalog_name','')
dbutils.widgets.text('p_source_schema_name','')
dbutils.widgets.text('p_schema_name','')
dbutils.widgets.text('p_source_table_name','')
dbutils.widgets.text('p_table_name','')
dbutils.widgets.text('p_storage_account_baseURL','')
dbutils.widgets.text('p_container','')
dbutils.widgets.text('p_last_loaded_date','')
# dbutils.widgets.text('p_control_schema','')
# dbutils.widgets.text('p_control_table','')
# dbutils.widgets.dropdown('p_load_type','incremental',['full','incremental'])

catalog_name = dbutils.widgets.get('p_catalog_name')
source_schema_name = dbutils.widgets.get('p_source_schema_name')
schema_name = dbutils.widgets.get('p_schema_name')
source_table_name = dbutils.widgets.get('p_source_table_name')
table_name = dbutils.widgets.get('p_table_name')
storage_account = dbutils.widgets.get('p_storage_account_baseURL')
container = dbutils.widgets.get('p_container')
last_loaded_date = dbutils.widgets.get('p_last_loaded_date')
# control_schema = dbutils.widgets.get('p_control_schema')
# control_table = dbutils.widgets.get('p_control_table')
# load_type = dbutils.widgets.get('p_load_type')

In [0]:
last_loaded_date

In [0]:
df_ingested = spark.table(f'{catalog_name}.{source_schema_name}.{source_table_name}')

In [0]:
df_ingested.createOrReplaceTempView('v_df_bronze')

In [0]:

df_bronze = spark.sql("""select 
  from_json(value,'STRUCT<chart: STRUCT<error: STRING, result: ARRAY<STRUCT<indicators: STRUCT<adjclose: ARRAY<STRUCT<adjclose: ARRAY<DOUBLE>>>, quote: ARRAY<STRUCT<close: ARRAY<DOUBLE>, high: ARRAY<DOUBLE>, low: ARRAY<DOUBLE>, open: ARRAY<DOUBLE>, volume: ARRAY<BIGINT>>>>, meta: STRUCT<chartPreviousClose: DOUBLE, currency: STRING, currentTradingPeriod: STRUCT<post: STRUCT<end: BIGINT, gmtoffset: BIGINT, start: BIGINT, timezone: STRING>, pre: STRUCT<end: BIGINT, gmtoffset: BIGINT, start: BIGINT, timezone: STRING>, regular: STRUCT<end: BIGINT, gmtoffset: BIGINT, start: BIGINT, timezone: STRING>>, dataGranularity: STRING, exchangeName: STRING, exchangeTimezoneName: STRING, fiftyTwoWeekHigh: DOUBLE, fiftyTwoWeekLow: DOUBLE, firstTradeDate: BIGINT, fullExchangeName: STRING, gmtoffset: BIGINT, hasPrePostMarketData: BOOLEAN, instrumentType: STRING, longName: STRING, priceHint: BIGINT, range: STRING, regularMarketDayHigh: DOUBLE, regularMarketDayLow: DOUBLE, regularMarketPrice: DOUBLE, regularMarketTime: BIGINT, regularMarketVolume: BIGINT, shortName: STRING, symbol: STRING, timezone: STRING, validRanges: ARRAY<STRING>>, timestamp: ARRAY<BIGINT>>>>>') as value
from v_df_bronze""")

In [0]:
df_silver = (df_bronze
    .selectExpr("value.chart.result[0].meta.symbol as symbol",
                "value.chart.result[0].meta.longName as longName",
                "value.chart.result[0].meta.fiftyTwoWeekHigh as fiftyTwoWeekHigh",
                "value.chart.result[0].meta.fiftyTwoWeekLow as fiftyTwoWeekLow",
                "value.chart.result[0].timestamp as timestamp",
                "value.chart.result[0].indicators.quote[0].open as open",
                "value.chart.result[0].indicators.quote[0].high as high",
                "value.chart.result[0].indicators.quote[0].low as low",
                "value.chart.result[0].indicators.quote[0].close as close",
                "value.chart.result[0].indicators.quote[0].volume as volume",
                "value.chart.result[0].indicators.adjclose[0].adjclose as adjclose")
    # Zip arrays together so each index aligns
    .withColumn("zipped", arrays_zip("timestamp","open","high","low","close","volume","adjclose"))
    # Explode zipped array → one row per trading day
    .withColumn("zipped", explode(col("zipped")))
    .withColumn("trade_date",date_format(col("zipped.timestamp").cast("timestamp"),"yyyy-MM-dd"))
    # Flatten zipped struct into columns
    .select(
        col("symbol"),
        col("longName"),
        col("trade_date").cast("date"),
        round(col("zipped.open"),2).alias("open"),
        round(col("zipped.high"),2).alias("high"),
        round(col("zipped.low"),2).alias("low"),
        round(col("zipped.close"),2).alias("close"),
        col("zipped.volume").alias("volume"),
        round(col("zipped.adjclose"),2).alias("adjclose"),
        col("fiftyTwoWeekHigh"),
        col("fiftyTwoWeekLow")
    )
    
)

In [0]:
display(df_silver)