In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import uuid
from pyspark.sql.types import StringType

# Add UUID generation function
uuid_udf = F.udf(lambda: str(uuid.uuid4()), StringType())

# Load Silver Layer and Fundamental Data
silver_df = spark.table("silver_data")
fundamentals_df = spark.table("company_data")

# Clean and Standardize Column Names
for col_name in fundamentals_df.columns:
    clean_name = (col_name.replace(",", "")
                      .replace(" ", "_")
                      .replace(".", "")
                      .replace("/", "_")
                      .replace("-", "_")
                      .replace("'", "")
                      .replace("&", "and")
                      .lower())
    fundamentals_df = fundamentals_df.withColumnRenamed(col_name, clean_name)

# Select and Alias Relevant Columns
fundamentals_df = fundamentals_df.select(
    F.col("ticker_symbol").alias("ticker"),
    F.col("period_ending").alias("period_ending"),
    F.col("depreciation").alias("depreciation_amortization"),
    F.col("capital_expenditures"),
    F.col("net_income"),
    F.col("total_equity"),
    F.col("long_term_debt"),
    F.col("earnings_before_interest_and_tax").alias("ebit"),
    F.col("total_revenue"),
    F.col("gross_profit"),
    F.col("cash_and_cash_equivalents").alias("cash"),
    F.col("total_assets"),
    F.col("accounts_receivable"),
    F.col("inventory"),
    F.col("accounts_payable"),
    F.col("estimated_shares_outstanding").alias("shares_outstanding"),
    F.col("retained_earnings"),
    F.col("research_and_development").alias("rd_expense"),
    F.col("sales_general_and_admin").alias("sga_expense")
)

# Derived Metrics
fundamentals_df = fundamentals_df.withColumns({
    "free_cash_flow": F.col("ebit") + F.col("depreciation_amortization") - F.col("capital_expenditures"),
    "return_on_equity": F.col("net_income") / F.col("total_equity"),
    "debt_to_equity": F.col("long_term_debt") / F.col("total_equity"),
    "price_to_earnings": F.lit(None).cast("double")
})

# Valuation Parameters
valuation_params = {
    "discount_rate": 0.09,
    "growth_rate": 0.03,
    "margin_of_safety": 0.25,
    "pe_threshold": 20,
    "debt_to_equity_threshold": 0.5,
    "min_roe": 0.15
}

# Define Windows
window_short = Window.partitionBy("ticker").orderBy("date").rowsBetween(-13, 0)
window_medium = Window.partitionBy("ticker").orderBy("date").rowsBetween(-49, 0)
window_long = Window.partitionBy("ticker").orderBy("date").rowsBetween(-199, 0)
window_30d = Window.partitionBy("ticker").orderBy("date").rowsBetween(-29, 0)
window_price = Window.partitionBy("ticker").orderBy("date")

# GOLD Layer Transformation with ID column
gold_df = (
    silver_df
    .join(fundamentals_df, "ticker", "left")
    
    # Add unique ID column
    .withColumn("id", uuid_udf())
    
    # Buffett Metrics
    .withColumn("price_to_earnings", F.col("price_close") / (F.col("net_income") / F.col("shares_outstanding")))
    .withColumn("fcf_yield", F.col("free_cash_flow") / (F.col("price_close") * F.col("shares_outstanding")))
    .withColumn("owner_earnings", F.col("net_income") + F.col("depreciation_amortization") - F.col("capital_expenditures"))
    .withColumn("intrinsic_value", (F.col("owner_earnings") * (1 + F.lit(valuation_params["growth_rate"]))) / 
                                   (F.lit(valuation_params["discount_rate"]) - F.lit(valuation_params["growth_rate"])))
    .withColumn("target_buy_price", F.col("intrinsic_value") * (1 - F.lit(valuation_params["margin_of_safety"])))
    .withColumn("buffett_approved", 
        (F.col("return_on_equity") > F.lit(valuation_params["min_roe"])) &
        (F.col("debt_to_equity") < F.lit(valuation_params["debt_to_equity_threshold"])) &
        (F.col("price_to_earnings") < F.lit(valuation_params["pe_threshold"])) &
        (F.col("price_close") < F.col("target_buy_price"))
    )
    
    # Technical Indicators
    .withColumn("price_diff", F.col("price_close") - F.lag("price_close").over(window_price))
    .withColumn("gain", F.when(F.col("price_diff") > 0, F.col("price_diff")).otherwise(0))
    .withColumn("loss", F.when(F.col("price_diff") < 0, -F.col("price_diff")).otherwise(0))
    .withColumn("avg_gain", F.avg("gain").over(window_short))
    .withColumn("avg_loss", F.avg("loss").over(window_short))
    .withColumn("rsi", F.when(F.col("avg_loss") == 0, 100)
                        .otherwise(100 - (100 / (1 + F.col("avg_gain") / F.col("avg_loss")))))
    
    # Moving Averages and MACD
    .withColumn("50d_moving_avg", F.avg("price_close").over(window_medium))
    .withColumn("200d_moving_avg", F.avg("price_close").over(window_long))
    .withColumn("12d_ema", F.avg("price_close").over(Window.partitionBy("ticker").orderBy("date").rowsBetween(-11, 0)))
    .withColumn("26d_ema", F.avg("price_close").over(Window.partitionBy("ticker").orderBy("date").rowsBetween(-25, 0)))
    .withColumn("macd", F.col("12d_ema") - F.col("26d_ema"))
    .withColumn("signal_line", F.avg("macd").over(Window.partitionBy("ticker").orderBy("date").rowsBetween(-8, 0)))
    
    # Risk Metrics
    .withColumn("daily_return", F.col("price_close") / F.lag("price_close").over(window_price) - 1)
    .withColumn("30d_volatility", F.stddev("daily_return").over(window_30d))
    .withColumn("var_95", F.col("price_close") * (1 - 1.645 * F.col("30d_volatility")))
    
    # Position Sizing
    .withColumn("position_size", F.when(F.col("buffett_approved"), 0.10).otherwise(0.05))

    # Final Columns - ID first for better visibility
    .select(
        "id",
        "ticker", 
        "date",
        "price_close", "price_open", "price_high", "price_low", "volume",
        "intrinsic_value", "target_buy_price", "owner_earnings", "fcf_yield",
        "return_on_equity", "debt_to_equity", "price_to_earnings", "buffett_approved",
        "rsi", "50d_moving_avg", "200d_moving_avg", "macd", "signal_line",
        "30d_volatility", "var_95", "position_size",
        "total_revenue", "gross_profit", "net_income",
        "free_cash_flow", "shares_outstanding"
    )
)

# Save Gold Layer Table
(
    gold_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .option("delta.enableChangeDataFeed", "true")
    .saveAsTable("gold_stock_analysis")
)

# Optimize Table for Performance (include ID in ZORDER)
spark.sql("OPTIMIZE gold_stock_analysis ZORDER BY (id, ticker, date)")

# Add Table Properties and Description
spark.sql("""
    ALTER TABLE gold_stock_analysis SET TBLPROPERTIES (
        'description' = 'Gold layer: Buffett valuation, technical indicators, risk metrics',
        'created_by' = 'quant-team',
        'agents_supported' = 'Buffett, Valuation, Technicals, Risk, Portfolio',
        'version' = '1.1',
        'changes' = 'Added unique ID column'
    )
""")

# Display Sample Results with ID
print("✅ Gold layer transformation successfully completed with IDs! 🚀")
display(spark.sql("SELECT id, ticker, date, price_close FROM gold_stock_analysis LIMIT 5"))

StatementMeta(, 3447bff0-9d39-4617-b47e-35c4eca11445, 16, Finished, Available, Finished)

✅ Gold layer transformation successfully completed with IDs! 🚀


SynapseWidget(Synapse.DataFrame, 897a8f70-6158-425a-991f-46170e5bcb1c)