In [0]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.window import Window

raw_path = "/Volumes/main/stocks/nvda_raw"


Bronze Table

In [0]:
@dlt.table(
    name="nvda_bronze",
    comment="Raw NVDA OHLCV data from CSV files in a UC volume."
)
def nvda_bronze():
    return (
        spark.read
        .format("csv")
        .option("header", "true")
        .load(raw_path)
        .withColumn("ingest_ts", F.current_timestamp())
    )


Silver Table

In [0]:
@dlt.table(
    name="nvda_silver",
    comment="Cleaned NVDA prices with daily returns."
)
@dlt.expect_or_drop("valid_close", "close IS NOT NULL")
def nvda_silver():
    bronze = dlt.read("nvda_bronze")

    df = (
        bronze
        .withColumn("date", F.to_date("date"))
        .withColumn("open", F.col("open").cast("double"))
        .withColumn("high", F.col("high").cast("double"))
        .withColumn("low", F.col("low").cast("double"))
        .withColumn("close", F.col("close").cast("double"))
        .withColumn("adj_close", F.col("adj_close").cast("double"))
        .withColumn("volume", F.col("volume").cast("long"))
        .select("date", "open", "high", "low", "close", "adj_close", "volume")
        .orderBy("date")
    )

    w = Window.orderBy("date")
    df = df.withColumn(
        "daily_return",
        F.col("adj_close") / F.lag("adj_close", 1).over(w) - 1
    )

    return df

Gold Table

In [0]:
@dlt.table(
    name="nvda_gold_features",
    comment="NVDA ML features with rolling stats and 30-day forward return label."
)
def nvda_gold_features():
    df = dlt.read("nvda_silver")

    w5 = Window.orderBy("date").rowsBetween(-4, 0)
    w20 = Window.orderBy("date").rowsBetween(-19, 0)

    df = (
        df
        .withColumn("ret_5d", F.avg("daily_return").over(w5))
        .withColumn("ret_20d", F.avg("daily_return").over(w20))
        .withColumn("vol_20d", F.stddev("daily_return").over(w20))
    )

    df = df.withColumn(
        "adj_close_30d_ahead",
        F.lead("adj_close", 30).over(Window.orderBy("date"))
    )

    df = df.withColumn(
        "fwd_30d_return",
        (F.col("adj_close_30d_ahead") / F.col("adj_close") - 1.0) * 100.0
    )

    return df.filter(F.col("fwd_30d_return").isNotNull())
