In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType


In [None]:
stock_price_date = dbutils.widgets.get("stock_price_date")


To use databricks widgets interactively in your notebook, please install databricks sdk using:
	pip install 'databricks-sdk[notebook]'
Falling back to default_value_only implementation for databricks widgets.


In [None]:
stock_price_df = spark.read.table("`yfinance-dev`.bronze.stock_price").filter(f"date = '{stock_price_date}'")


In [None]:
stock_price_df = (stock_price_df.withColumn("date", F.to_date(F.col("Date")))
                  .withColumn("open", F.col("open").cast(IntegerType()))
                  .withColumn("high", F.col("high").cast(IntegerType()))
                  .withColumn("low", F.col("low").cast(IntegerType()))
                  .withColumn("close", F.col("close").cast(IntegerType()))
                  .withColumn("volume", F.col("volume").cast(IntegerType()))
                  .withColumn("code", F.when(F.col('ticker').isNotNull(), F.split(F.col("ticker"), '\\.')[0]))
                  .select("code", "date", "open", "high", "low", "close", "volume")
                  .withColumn("metadata", 
                     F.create_map(
                        F.lit("pipeline_id"), F.lit("pipeline_id"),
                        F.lit("run_id"), F.lit("run_id"),
                        F.lit("task_id"), F.lit("task_id"),
                        F.lit("processed_timestamp"), F.lit("processed_timestamp")
                  )))


In [None]:
if spark.catalog.tableExists("`yfinance-dev`.silver.stock_price"):
    spark.sql(f"DELETE FROM `yfinance-dev`.silver.stock_price WHERE date = '{stock_price_date}'")


In [None]:
stock_price_df.write.mode("append").saveAsTable("`yfinance-dev`.silver.stock_price")
