In [0]:
import yfinance as yf
from datetime import datetime, timedelta
from pyspark.sql.functions import lit, col
import pandas as pd

In [0]:
# Get the parameter's value
ticker_symbol = dbutils.widgets.get("ticker")

In [0]:

# Update the status of the process to failed if it is still running
query = f"""
UPDATE yahoo_finance.processrunlogs.processrunlog
SET status = 'Failed'
WHERE ticker = '{ticker_symbol}'
  AND processname = 'load_stock_data'
  AND status = 'Running'
  """
spark.sql(query)

In [0]:
current_datetime = datetime.now()
current_datetime_str = current_datetime.strftime('%Y-%m-%d %H:%M:%S')

spark.sql(
    f"""
    INSERT INTO yahoo_finance.processrunlogs.processrunlog 
    (ticker, processname, last_loaded_date, startdate, status) 
    VALUES ('{ticker_symbol}', 'load_stock_data', null, '{current_datetime_str}', 'Running')
    """
)
end_date = current_datetime.date() + timedelta(days=1)
end_date = end_date.strftime('%Y-%m-%d')

last_loaded_datetime = spark.sql(
    f"""
    SELECT NVL(MAX(last_loaded_date), CAST('2025-01-01' AS timestamp)) 
    FROM yahoo_finance.processrunlogs.processrunlog 
    WHERE ticker = '{ticker_symbol}' 
    AND processname = 'load_stock_data' 
    AND status = 'Completed'
    AND last_loaded_date IS NOT NULL
    """
).collect()[0][0]

# If last_loaded_datetime is a string, parse it to a date
if isinstance(last_loaded_datetime, str):
    start_date = datetime.strptime(last_loaded_datetime, '%Y-%m-%d').strftime('%Y-%m-%d')
else:
    start_date = last_loaded_datetime.strftime('%Y-%m-%d')
# Download data for the given ticker symbol from Yahoo Finance
bronze_pd_data = yf.download(
    ticker_symbol,
    start=start_date,
    end=end_date,
    interval="60m",
    auto_adjust=True
)
# Reset index of the Pandas dataframe to make Datetime ordinal column
bronze_pd_data = bronze_pd_data.reset_index()


In [0]:
# Convert loaded data to Spark DataFrame
bronze_spark_data = spark.createDataFrame(bronze_pd_data)
# Add ticker column and rename columns to remove ticker name from column names
bronze_spark_data = bronze_spark_data.withColumn("ticker", lit(ticker_symbol)) \
             .withColumn("Datetime", col("('Datetime', '')").cast("string")) \
             .withColumn("Close", col(f"('Close', '{ticker_symbol}')").cast("string")) \
             .withColumn("High", col(f"('High', '{ticker_symbol}')").cast("string")) \
             .withColumn("Low", col(f"('Low', '{ticker_symbol}')").cast("string")) \
             .withColumn("Open", col(f"('Open', '{ticker_symbol}')").cast("string")) \
             .withColumn("Volume", col(f"('Volume', '{ticker_symbol}')").cast("string")) \
             .drop(f"('Close', '{ticker_symbol}')", f"('High', '{ticker_symbol}')", f"('Low', '{ticker_symbol}')", f"('Open', '{ticker_symbol}')", f"('Volume', '{ticker_symbol}')", "('Datetime', '')")
# Keep only the rows with Datetime greater than the last_loaded_date to avoid duplicates
bronze_spark_data_filtered = bronze_spark_data.filter(col("Datetime").cast("timestamp") > last_loaded_datetime)
# Save the filtered data to the bronze layer
bronze_spark_data_filtered.write.format("delta").mode("append").save("abfss://bronze@yahoofinancestorage.dfs.core.windows.net/finance_bronze")

In [0]:
# Audit the process
from pyspark.sql.functions import max as spark_max
from datetime import datetime

current_datetime = datetime.now()
current_datetime_str = current_datetime.strftime('%Y-%m-%d %H:%M:%S')
# Find the latest loaded datetime in the newly loaded data
last_loaded_date_row = bronze_spark_data_filtered.agg(
    spark_max("Datetime").alias("max_datetime")
).collect()[0]

max_datetime_val = last_loaded_date_row["max_datetime"]

if max_datetime_val is not None:
    # If max_datetime_val is a datetime object, convert to string first
    if isinstance(max_datetime_val, datetime):
        last_loaded_date = max_datetime_val
    else:
        last_loaded_date = datetime.strptime(str(max_datetime_val), '%Y-%m-%d %H:%M:%S')
else:
    # Handle the case where there is no data
    last_loaded_date = None
# Get string representation of the latest loaded datetime to use in SQL scripts
if last_loaded_date is not None:
    last_loaded_date_str = last_loaded_date.strftime('%Y-%m-%d %H:%M:%S')
else:
    last_loaded_date_str = None 

if last_loaded_date_str is not None:
    # Update the last_loaded_date because there is new data
    spark.sql(
        f"""
        UPDATE yahoo_finance.processrunlogs.processrunlog
        SET last_loaded_date = '{last_loaded_date_str}',
            enddate = '{current_datetime_str}',
            status = 'Completed'
        WHERE id = (
            SELECT MAX(id) FROM yahoo_finance.processrunlogs.processrunlog
            WHERE ticker = '{ticker_symbol}'
            AND processname = 'load_stock_data'
            AND status = 'Running'
        )
        """
    )
else:
     # Do not update the last_loaded_date because there is no new data
     spark.sql(
        f"""
        UPDATE yahoo_finance.processrunlogs.processrunlog
        SET enddate = '{current_datetime_str}',
            status = 'Completed'
        WHERE id = (
            SELECT MAX(id) FROM yahoo_finance.processrunlogs.processrunlog
            WHERE ticker = '{ticker_symbol}'
            AND processname = 'load_stock_data'
            AND status = 'Running'
        )
        """
    )
