In [0]:
# Load bronze tables from Delta Lake into Spark DataFrames
bronze_schema = "workspace.stock_project"
silver_schema = "workspace.stock_project"

df_stock = spark.table(f"{bronze_schema}.bronze_stock_prices")           # Stock price history
df_port = spark.table(f"{bronze_schema}.bronze_portfolio_transactions")  # Portfolio transactions
df_company = spark.table(f"{bronze_schema}.bronze_company_sector")       # Company sector metadata
df_bench = spark.table(f"{bronze_schema}.bronze_benchmark_index")        # Benchmark index data

**stock prices cleaning**

In [0]:
from pyspark.sql.functions import when, col

# Standardize missing values and fix column data types for stock price data
df_stock1 = (
    df_stock
    .replace("NaN", None)  # Replace string "NaN" with nulls
    .withColumn("open", col("open").cast("double"))    # Cast 'open' to double
    .withColumn("high", col("high").cast("double"))    # Cast 'high' to double
    .withColumn("low", col("low").cast("double"))      # Cast 'low' to double
    .withColumn("close", col("close").cast("double"))  # Cast 'close' to double
    .withColumn("volume", col("volume").cast("long"))  # Cast 'volume' to long
    .withColumn("date", col("date").cast("date"))      # Cast 'date' to date
)

In [0]:
#validation rules
df_stock2 = (
    df_stock1
    .withColumn("rule_null_price",       col("open").isNull() | col("close").isNull())
    .withColumn("rule_null_volume",      col("volume").isNull())
    .withColumn("rule_null_high_low",    col("high").isNull() | col("low").isNull())
    .withColumn("rule_negative_volume",  col("volume") < 0)
    .withColumn("rule_zero_close",       col("close") == 0)
    .withColumn("rule_close_outside",   (col("close") < col("low")) | (col("close") > col("high")))
)


In [0]:
# Filter rows violating validation rules for stock prices
df_stock_invalid = df_stock2.filter(
    "rule_null_price OR rule_null_volume OR rule_negative_volume OR rule_zero_close OR rule_close_outside OR rule_null_high_low"
)

# Save invalid stock price rows to Delta table
df_stock_invalid.write.mode("overwrite").format("delta").saveAsTable(f"{silver_schema}.silver_invalid_stock_prices")

In [0]:
# Clean stock price table by filtering out invalid rows, fixing swapped high/low values, and removing duplicates
df_stock_clean = (
    df_stock2
    .filter(~(
        col("rule_null_price") | 
        col("rule_null_volume") | 
        col("rule_negative_volume") | 
        col("rule_zero_close") | 
        col("rule_close_outside") |
        col("rule_null_high_low")
    ))
    # Correct cases where 'high' is less than 'low'
    .withColumn("fixed_high", when(col("high") < col("low"), col("low")).otherwise(col("high")))
    .withColumn("fixed_low", when(col("high") < col("low"), col("high")).otherwise(col("low")))
    .drop("high", "low")
    .withColumnRenamed("fixed_high", "high")
    .withColumnRenamed("fixed_low", "low")
    .dropDuplicates()
    # Remove validation rule columns
    .drop(
        "rule_null_price",
        "rule_null_volume",
        "rule_negative_volume",
        "rule_zero_close",
        "rule_null_high_low",
        "rule_close_outside"
    )
)

# Save cleaned stock price data to Delta table, partitioned by ticker
df_stock_clean.write.mode("overwrite").partitionBy("Ticker").format("delta").saveAsTable(f"{silver_schema}.silver_stock_prices")

**company sector cleaning**

In [0]:
# Correct sector misspellings and fill nulls with 'Other'
df_company1 = (
    df_company
    .replace({"Technlogy": "Technology"})
    .withColumn("sector", when(col("sector").isNull(), "Other").otherwise(col("sector")))
)

In [0]:
# Select distinct tickers from cleaned stock prices
valid_tickers = df_stock_clean.select("ticker").distinct()

# Identify companies with tickers not present in the stock price dataset
df_company_invalid = df_company1.join(valid_tickers, "ticker", "left_anti")

# Identify companies with tickers present in the stock price dataset
df_company_valid = df_company1.join(valid_tickers, "ticker", "inner")

In [0]:
# Save companies with tickers not present in cleaned stock prices to Delta table
df_company_invalid.write.mode("overwrite").format("delta").saveAsTable(f"{silver_schema}.silver_invalid_company_sector")

# Save companies with valid tickers to Delta table
df_company_valid.write.mode("overwrite").format("delta").saveAsTable(f"{silver_schema}.silver_company_sector")

**portfolio transactions cleaning**

In [0]:
# Standardize portfolio transactions: replace 'NaN' with nulls, and cast columns to correct types
df_port1 = (
    df_port
    .replace("NaN", None)
    .withColumn("price", col("price").cast("double"))
    .withColumn("quantity", col("quantity").cast("int"))
    .withColumn("date", col("date").cast("date"))
)

In [0]:
from pyspark.sql.functions import when, col

# Correct misspelled 'BUYY' in the 'action' column to 'BUY'
df_port2 = (
    df_port1
    .withColumn(
        "action",
        when(col("action") == "BUYY", "BUY")
        .otherwise(col("action"))
    )
)

In [0]:
# Identify invalid portfolio transaction rows based on nulls, non-positive values, invalid actions, or future dates
from pyspark.sql.functions import current_date

invalid_condition = (
    (col("price").isNull()) |                # price is null
    (col("quantity").isNull()) |             # quantity is null
    (col("quantity") <= 0) |                 # quantity is non-positive
    (col("price") <= 0) |                    # price is non-positive
    (~col("action").isin("BUY", "SELL")) |   # action is not BUY or SELL
    (col("date") > current_date())           # date is in the future
)

df_port_invalid = df_port2.filter(invalid_condition)

# Save invalid portfolio transactions to Delta table
df_port_invalid.write.mode("overwrite").format("delta").saveAsTable(f"{silver_schema}.silver_invalid_portfolio")

In [0]:
# Filter out invalid portfolio transactions to obtain valid rows
df_port_clean = df_port2.filter(~invalid_condition)

# Save valid portfolio transactions to Delta table
df_port_clean.write.mode("overwrite").format("delta").saveAsTable(f"{silver_schema}.silver_portfolio")

**benchmark index cleaning**

In [0]:
# Standardize benchmark index: replace 'NaN' with nulls, cast 'date' to date type, and 'close' to double
df_bench1 = (
    df_bench
    .replace("NaN", None)
    .withColumn("date", col("date").cast("date"))
    .withColumn("close", col("close").cast("double"))
)

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when

# Add a constant key column for window partitioning
df_bench1_keyed = df_bench1.withColumn("key", F.lit(1))

# Define window for calculating previous close
w = Window.partitionBy("key").orderBy("date")

# Calculate previous close, daily return, and flag extreme/invalid values
df_bench2 = (
    df_bench1_keyed
    .withColumn("prev_close", F.lag("close").over(w))  # Previous day's close
    .withColumn(
        "daily_return",
        (col("close") - col("prev_close")) / col("prev_close")  # Daily return
    )
    .withColumn(
        "extreme_return_flag",
        when(col("daily_return") < -0.20, True).otherwise(False)  # Flag extreme negative returns
    )
    .withColumn(
        "invalid_open_flag",
        when(col("open").isNull() | (col("open") <= 0), True).otherwise(False)  # Flag invalid open prices
    )
    .withColumn(
        "invalid_close_flag",
        when(col("close").isNull() | (col("close") <= 0), True).otherwise(False)  # Flag invalid close prices
    )
)

# Filter rows with invalid open/close or missing daily return
df_bench_invalid = df_bench2.filter(
    (col("invalid_open_flag") == True) |
    (col("invalid_close_flag") == True) |
    (col("daily_return").isNull())
)

# Save invalid benchmark index rows to Delta table
df_bench_invalid.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable(f"{silver_schema}.silver_invalid_benchmark")

In [0]:
# Filter benchmark index for valid rows (open/close > 0 and not null), drop helper columns, and save to Delta table
df_bench_clean = df_bench2.filter(
    (col("invalid_open_flag") == False) &
    (col("invalid_close_flag") == False) 
).drop("prev_close", "key")

df_bench_clean.write.mode("overwrite").format("delta").saveAsTable(
    f"{silver_schema}.silver_benchmark_index"
)

**Silver data Quality Summary Log**

In [0]:
# Data quality summary function: returns a DataFrame with table name, row count, unique row count, and total null count
def dq(df, name):
    return spark.createDataFrame([
        (
            name,  # Table name
            df.count(),  # Total row count
            df.dropDuplicates().count(),  # Unique row count
            sum(df.filter(col(c).isNull()).count() for c in df.columns)  # Total null values across all columns
        )
    ], ["table", "row_count", "unique_rows", "null_count"])

# Aggregate data quality logs for all silver tables
dq_log = (
    dq(df_stock_clean, "silver_stock_prices")
    .union(dq(df_port_clean, "silver_portfolio"))
    .union(dq(df_company_valid, "silver_company_sector"))
    .union(dq(df_bench_clean, "silver_benchmark_index"))
)

display(dq_log)  # Display data quality summary

# Save data quality log to Delta table
dq_log.write.mode("overwrite").format("delta").saveAsTable(f"{silver_schema}.silver_dq_log")

table,row_count,unique_rows,null_count
silver_stock_prices,62640,62640,0
silver_portfolio,29796,29796,0
silver_company_sector,30,30,0
silver_benchmark_index,2025,2025,22


In [0]:
# Exit the notebook with a status message for workflow orchestration
dbutils.notebook.exit("SUCCESS")