In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag

In [0]:
bronze_df = spark.table("bronze_stock_prices")
window = Window.partitionBy("company").orderBy("date")

In [0]:
silver_df = (
    bronze_df
    .withColumn("prev_close", lag("close").over(window))
    .withColumn(
        "daily_return",
        (col("close") - col("prev_close")) / col("prev_close")
    )
    .dropna()
)

In [0]:
silver_df.write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable("silver_stock_prices")

In [0]:
%sql
SELECT company,  ROUND(AVG(daily_return) * 100, 4) AS avg_daily_return_pct
FROM silver_stock_prices
GROUP BY company;
-- These values are very small per day, so the average across many days becomes a tiny decimal.


company,avg_daily_return_pct
AAPL,0.0053
GOOGL,0.0111
JPM,0.0082
MSFT,0.001


In [0]:
#Monitoring: to ensure no nulls in key analytical columns.

from pyspark.sql.functions import col, sum
table_name = "silver_stock_prices"

critical_columns = ["date", "close", "daily_return"]

df = spark.table(table_name)

null_check = df.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in critical_columns
])

display(null_check)


date,close,daily_return
0,0,0


In [0]:
%sql
select * from silver_stock_prices limit 5;

date,open,high,low,close,volume,company,sector,prev_close,daily_return
2005-01-04,150.49671415301123,151.00071059629937,149.54312370521046,150.8190959524751,3747663,AAPL,Technology,150.07211270865417,0.004977495354324
2005-01-05,150.35844985184005,152.1999627539495,150.0932854914777,150.7654200728332,1230096,AAPL,Technology,150.8190959524751,-0.00035589577899879934
2005-01-06,151.00613838994073,152.16931606564023,150.21598573916435,150.2796484371675,5116952,AAPL,Technology,150.7654200728332,-0.0032220361634054
2005-01-07,152.52916824634875,153.5961286638383,152.17489733676274,153.155063650238,8720093,AAPL,Technology,150.2796484371675,0.0191337632405541
2005-01-10,152.2950148716254,153.48780207101032,151.5677593740831,152.65209893120692,7751214,AAPL,Technology,153.155063650238,-0.0032840227873868


In [0]:
%sql
CREATE OR REPLACE TABLE silver_portfolio_transactions AS
SELECT
    stock,
    transaction_type,
    quantity,
    price,
    transaction_date
FROM bronze_portfolio_transactions
WHERE
    quantity > 0
    AND price > 0
    AND transaction_date IS NOT NULL;


num_affected_rows,num_inserted_rows


In [0]:
%sql
select * from silver_portfolio_transactions limit 5;

stock,transaction_type,quantity,price,transaction_date
MSFT,BUY,20.0,104.16,2016-01-26
JPM,BUY,28.0,103.65,2009-09-24
JPM,BUY,26.0,129.91,2020-08-14
MSFT,SELL,47.0,140.51,2016-02-29
GOOGL,SELL,10.0,242.83,2011-04-07
