In [0]:
%sql
CREATE VOLUME IF NOT EXISTS lakehouse.raw_public.yfinance

In [0]:
%python
%pip install yfinance

In [0]:
%python
dbutils.library.restartPython()

In [0]:
import yfinance as yf
import pandas as pd
from datetime import datetime, UTC
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Volumne path
RAW_BASE_PATH = "/Volumes/lakehouse/raw_public/yfinance/commodities/latest_prices"

def get_commodities_df()-> pd.DataFrame:
    """ Function to extract latest commodities prices from yfinance API
    Returns:
    - pd.DataFrame: commodities dataframe
    """
    commodities = ['CL=F', 'GC=F', 'SI=F'] # Gold, oil and silver
    dfs=[]

    for com in commodities:
        # download the lastest price (1 min)
        try:
            last_df = yf.Ticker(com).history(period="1d", interval="1m")[["Close"]].tail(1)
            if last_df.empty:
                continue
            last_df = last_df.rename(columns={"Close": "price"})
            last_df["commodity"] = com
            last_df["coin"] = "USD"
            last_df["timestamp"] = datetime.now(UTC)

            dfs.append(last_df[["commodity", "price", "coin", "timestamp"]])
        except Exception as e:
            print(f"Error downloading {com}: {e}")
    if not dfs:
        raise ValueError("No commodities data downloaded")
    df = pd.concat(dfs, ignore_index=True)
    df["source_system"] = "yfinance"
    df["source_endpoint"] = "https://finance.yahoo.com"
    df["ingestion_ts_utc"] = datetime.now(UTC)

    return df

In [0]:
df = get_commodities_df()

file_name = f"{RAW_BASE_PATH}/yfinance_commodities_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json"
df.to_json(file_name, orient="records", lines=True, force_ascii=False)

print(f"Data saved to {file_name}")

In [0]:
df = get_commodities_df()

schema = StructType([
    StructField("commodity", StringType(), False),
    StructField("price", DoubleType(), False),
    StructField("coin", StringType(), False),
    StructField("timestamp", TimestampType(), False)
])

df = (
    spark.createDataFrame(pdf, schema=schema)
    .withColumn("ingestion_ts_utc", F.current_timestamp())
    .withColumn("source_system", F.lit("yfinance"))
    .withColumn("source_endpoint", F.lit("https://finance.yahoo.com"))
    .withColumn("ingestion_date", F.to_date(F.col("ingestion_ts_utc")))
)
(df.write
.mode("append")
.partitionBy("ingestion_date")
.json(RAW_BASE_PATH)
)

