In [0]:
%pip install yfinance
%restart_python

In [0]:
import yfinance as yf
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import mlflow
import mlflow.spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression,RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.preprocessing import MinMaxScaler
from pyspark.sql.types import DoubleType

In [0]:
%sql
-- Run as SQL cell
CREATE CATALOG IF NOT EXISTS uc_finance;

CREATE SCHEMA IF NOT EXISTS uc_finance.bronze;
CREATE SCHEMA IF NOT EXISTS uc_finance.silver;
CREATE SCHEMA IF NOT EXISTS uc_finance.gold;

In [0]:
%sql
SHOW SCHEMAS IN uc_finance;

In [0]:
tickers = [
    "AAPL", "MSFT", "GOOGL", "AMZN", "META", "NVDA", "TSLA", "ORCL", "ADBE", "CRM", "INTC", "AMD", "QCOM", "CSCO", "IBM", "JPM", "BAC", "WFC", "C", "GS", "MS", "AXP", "V", "MA", "BLK", "WMT", "COST", "TGT", "HD", "LOW", "PG", "KO", "PEP", "MCD", "NKE", "JNJ", "PFE", "MRK", "ABBV", "LLY", "UNH", "CVS", "TMO", "ABT", "BMY", "XOM", "CVX", "COP", "SLB", "EOG", "PSX", "VLO", "CAT", "DE", "BA", "GE", "HON", "MMM", "UPS", "FDX", "RTX", "NFLX", "DIS", "CMCSA", "T", "VZ", "TMUS", "SNOW", "SHOP", "SQ", "UBER", "LYFT", "ADSK", "TXN", "MU", "LRCX", "AVGO", "SPGI", "SCHW", "COF", "CME", "MSCI", "BK", "TFC", "PGR", "SBUX", "DG", "F", "GM", "MGM", "CL", "KMB", "EL", "GIS", "REGN", "BIIB", "VRTX", "MRNA", "CRSP", "ISRG", "MDT", "SYK", "HAL", "MPC", "PSX", "PXD", "XEL", "NEE", "D", "SO", "LMT", "NOC", "EMR", "ROP", "PH", "ITW", "CSX", "NSC", "ATVI", "EA", "TTWO", "CHTR", "DISCA", "DISCK", "ROKU", "ZM", "PLTR", "PANW", "NET", "CRWD", "DDOG", "TWLO", "ADP", "NOW", "SNPS", "CDNS", "INTU", "QCOM", "AMAT", "KLAC", "ON", "ADI", "MRVL", "ARM", "MDB", "OKTA", "ZS", "BILL", "TEAM", "DOCU", "HUBS", "ALLY", "AON", "MET", "PRU", "CB", "MMC", "AMP", "PYPL", "AFRM", "HOOD", "FIS", "AMGN", "BABA", "JD", "PDD", "BBY", "ROST", "TJX", "BBWI", "GILD", "AZN", "SNY", "ZTS", "IDXX", "EW", "ETN", "IR", "PCAR", "WM", "LHX", "GD", "HEI", "OXY", "KMI", "WMB", "BKR", "AEP", "DUK", "PCG", "SRE", "AMT", "PLD", "EQIX", "CCI", "DAL", "UAL", "ALK", "EXPD", "CHRW", "WBD", "SPOT", "PINS", "ATRO"]

In [0]:
len(tickers)

In [0]:
def calculate_peg(pe, growth):
    """
    pe: trailing or forward P/E
    growth: earnings growth as decimal (e.g. 0.15 for 15%)
    """
    if pe is None or growth is None:
        return None

    if growth <= 0:
        return None  # PEG meaningless for negative/zero growth

    return pe / (growth * 100)

In [0]:
def fetch_fundamentals(ticker):
    stock = yf.Ticker(ticker)
    info = stock.info

    pe = info.get("trailingPE")
    earnings_growth = info.get("earningsGrowth")  # decimal

    peg = info.get("pegRatio")
    if peg is None:
        peg = calculate_peg(pe, earnings_growth)

    return {
        "ticker": ticker,
        "sector": info.get("sector"),
        "industry": info.get("industry"),
        "market_cap": info.get("marketCap"),
        "pe_ratio": pe,
        "pb_ratio": info.get("priceToBook"),
        "peg_ratio": peg,
        "roe": info.get("returnOnEquity"),
        "roa": info.get("returnOnAssets"),
        "debt_to_equity": info.get("debtToEquity"),
        "revenue_growth": info.get("revenueGrowth"),
        "eps_growth": earnings_growth
    }

In [0]:
raw_data = [fetch_fundamentals(t) for t in tickers]
bronze_df = spark.createDataFrame(pd.DataFrame(raw_data))

In [0]:
bronze_df.printSchema()

In [0]:
# Persist Bronze data as Delta Table
bronze_df = bronze_df.withColumn("ingestion_ts", F.current_timestamp())

bronze_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("uc_finance.bronze.stock_fundamentals_raw")

In [0]:
%sql
select * from uc_finance.bronze.stock_fundamentals_raw;

In [0]:
# Read Bronze Table
silver_df = spark.table("uc_finance.bronze.stock_fundamentals_raw")

# Drop VOID columns first
for col_name, dtype in silver_df.dtypes:
    if dtype == "void":
        silver_df = silver_df.drop(col_name)

# Fill nulls
silver_df = silver_df.fillna({
    "pe_ratio": 0,
    "pb_ratio": 0,
    "peg_ratio": 0,
    "roe": 0,
    "roa": 0,
    "debt_to_equity": 0,
    "revenue_growth": 0,
    "eps_growth": 0
})

# Convert to percentage (if intended)
silver_df = silver_df.withColumn(
    "revenue_growth", F.col("revenue_growth") * 100
).withColumn(
    "eps_growth", F.col("eps_growth") * 100
)

# Filter invalid data
silver_df = silver_df.filter(F.col("market_cap") > 0)

# Write Silver table
silver_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("uc_finance.silver.stock_fundamentals_clean")

In [0]:
%sql
select * from uc_finance.silver.stock_fundamentals_clean;

In [0]:
silver_df = spark.table(
    "uc_finance.silver.stock_fundamentals_clean"
)

In [0]:
sector_w = Window.partitionBy("sector")

In [0]:
def min_max(col_name, invert=False):
    min_col = F.min(col_name).over(sector_w)
    max_col = F.max(col_name).over(sector_w)

    range_col = max_col - min_col

    score = F.when(
        range_col == 0,        # ðŸš‘ prevent divide-by-zero
        F.lit(0.5)             # neutral score
    ).otherwise(
        (F.col(col_name) - min_col) / range_col
    )
    
    return (1 - score) if invert else score

In [0]:
scored_df = (
    silver_df
    .withColumn("pe_s",  min_max("pe_ratio", True))
    .withColumn("pb_s",  min_max("pb_ratio", True))
    .withColumn("peg_s", min_max("peg_ratio", True))

    .withColumn("roe_s", min_max("roe"))
    .withColumn("roa_s", min_max("roa"))

    .withColumn("debt_s", min_max("debt_to_equity", True))

    .withColumn("rev_s", min_max("revenue_growth"))
    .withColumn("eps_s", min_max("eps_growth"))
)

In [0]:
scored_df = scored_df.withColumn(
    "rule_score",
    100 * (
        0.30 * (0.4*F.col("pe_s") + 0.3*F.col("pb_s") + 0.3*F.col("peg_s")) +
        0.25 * (0.6*F.col("roe_s") + 0.4*F.col("roa_s")) +
        0.20 * F.col("debt_s") +
        0.25 * (0.5*F.col("rev_s") + 0.5*F.col("eps_s"))
    )
)

In [0]:
scored_df.show(5)

In [0]:
feature_cols = [
    "pe_ratio", "pb_ratio", "peg_ratio",
    "roe", "roa",
    "debt_to_equity",
    "revenue_growth", "eps_growth"
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

In [0]:
train_df, test_df = scored_df.randomSplit([0.8, 0.2], seed=42)

In [0]:
models = {
    "LinearRegression": LinearRegression(
        labelCol="rule_score",
        featuresCol="features"
    ),

    "RandomForest": RandomForestRegressor(
        labelCol="rule_score",
        featuresCol="features",
        numTrees=50,
        maxDepth=6
    ),

    "GradientBoostedTrees": GBTRegressor(
        labelCol="rule_score",
        featuresCol="features",
        maxIter=50,
        maxDepth=5
    )
}

In [0]:
mlflow.set_experiment("/Shared/Fundamental_Scoring_Gold")

In [0]:
evaluator = RegressionEvaluator(
    labelCol="rule_score",
    predictionCol="prediction",
    metricName="rmse"
)

results = []

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS uc_finance.gold.mlflow_tmp;

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS uc_finance.ml_models;

In [0]:
from mlflow.models.signature import infer_signature

for name, model in models.items():
    with mlflow.start_run(run_name=name):

        pipeline = Pipeline(stages=[assembler, model])
        fitted_model = pipeline.fit(train_df)

        preds = fitted_model.transform(test_df)

        rmse = evaluator.evaluate(preds)
        r2 = RegressionEvaluator(
            labelCol="rule_score",
            predictionCol="prediction",
            metricName="r2"
        ).evaluate(preds)

        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)

        # Create input example (Spark DF â†’ small sample)
        input_example = train_df.limit(5)

        # Generate output example
        output_example = fitted_model.transform(input_example).select("prediction")

        # Infer signature
        signature = infer_signature(
            input_example.toPandas(),
            output_example.toPandas()
        )

        # Log model WITH signature
        mlflow.spark.log_model(
            fitted_model,
            artifact_path="model",
            registered_model_name=f"uc_finance.ml_models.fundamental_{name}",
            dfs_tmpdir="/Volumes/uc_finance/gold/mlflow_tmp",
            input_example=input_example.toPandas(),
            signature=signature
        )

        results.append((name, rmse, r2))

In [0]:
results_df = spark.createDataFrame(
    results, ["model", "rmse", "r2"]
)

results_df.orderBy("rmse").show()

In [0]:
best_model_name = results_df.orderBy("rmse").first()["model"]

In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()

# Full 3-level UC name
model_name = f"uc_finance.ml_models.fundamental_{best_model_name}"

versions = client.search_model_versions(f"name='{model_name}'")

latest_version = max(int(v.version) for v in versions)

In [0]:
best_model = mlflow.spark.load_model(
    f"models:/uc_finance.ml_models.fundamental_{best_model_name}/{latest_version}",
    dfs_tmpdir="/Volumes/uc_finance/gold/mlflow_tmp"
)

In [0]:
gold_scored_df = best_model.transform(scored_df).withColumnRenamed("prediction", "final_score")

In [0]:
gold_scored_df = gold_scored_df.withColumn(
    "signal",
    F.when(F.col("final_score") >= 80, "Strong Buy")
     .when(F.col("final_score") >= 65, "Buy")
     .when(F.col("final_score") >= 50, "Hold")
     .otherwise("Avoid")
).withColumn("as_of_date", F.current_date())

In [0]:
gold_scored_df.select(
    "ticker", "sector", "industry", "market_cap",
    "final_score", "signal", "as_of_date"
).write.format("delta") \
 .mode("overwrite") \
 .option("overwriteSchema", "true") \
 .saveAsTable("uc_finance.gold.stock_fundamental_scores_ml")

In [0]:
%sql
Select * from uc_finance.gold.stock_fundamental_scores_ml