In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col, row_number, current_timestamp, lit, to_json, struct, max
)

### Tables Creation

In [0]:
%sql
CREATE TABLE IF NOT EXISTS capstone.default.silver_data_quality_rejects (
    table_name STRING,
    reject_reason STRING,
    record STRING,
    rejected_at TIMESTAMP
) USING DELTA;

CREATE TABLE IF NOT EXISTS capstone.default.silver_fk_rejects (
    table_name STRING,
    reject_reason STRING,
    record STRING,
    rejected_at TIMESTAMP
) USING DELTA;

### Helper Functions

In [0]:
# Write rejects to quarantine

def write_rejects(df, table_name, reason, reject_table):

    business_cols = df.columns

    rejects_df = (
        df
        .withColumn("record", to_json(struct(*business_cols)))
        .withColumn("table_name", lit(table_name))
        .withColumn("reject_reason", lit(reason))
        .withColumn("rejected_at", current_timestamp())
        .select("table_name", "reject_reason", "record", "rejected_at")
    )

    rejects_df.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable(reject_table)


In [0]:
# Function to get the last processed timestamp

def get_last_processed_timestamp(table_name):
    last_ts=(
        spark.table("capstone.default.silver_watermark")
        .filter(col("table_name") == table_name)
        .select("last_processed_timestamp")
        .collect()[0][0]
    )
    return last_ts

In [0]:
# Function to update watermark

def update_watermark(table_name, new_max_ts):
    spark.sql(f"""
    MERGE INTO capstone.default.silver_watermark tgt
    USING (SELECT '{table_name}' AS table_name,
                TIMESTAMP('{new_max_ts}') AS last_processed_timestamp) src
    ON tgt.table_name = src.table_name
    WHEN MATCHED THEN UPDATE SET last_processed_timestamp = src.last_processed_timestamp
    WHEN NOT MATCHED THEN INSERT *
    """)

In [0]:
from functools import reduce

# Function to check if any column is null

def any_null_condition(dataframe):
    return reduce(
        lambda x, y: x | y,
        (col(c).isNull() for c in dataframe.columns)
    )


In [0]:
from pyspark.sql.functions import lit, current_timestamp
import uuid

def write_log(table_name, source_system, log_table):
    spark.createDataFrame(
        [(str(uuid.uuid4()), table_name, source_system)],
        ["log_id", "table_name", "source_system"]
    ) \
    .withColumn("ingestion_timestamp", current_timestamp()) \
    .withColumn("created_at", current_timestamp()) \
    .write \
    .format("delta") \
    .mode("append") \
    .saveAsTable(log_table)

### Loading the companies data

In [0]:
last_ts=get_last_processed_timestamp("capstone.default.silver_companies")

print(last_ts)

companies_bronze_df=spark.table("capstone.default.bronze_companies").filter(col("ingestion_timestamp")>last_ts)

2026-01-05 09:00:49.108720


In [0]:

# Get invalid rows
companies_bronze_invalid=companies_bronze_df.filter(any_null_condition(companies_bronze_df))

print("Invalid rows: ", companies_bronze_invalid.count())

# Quarantine the invalid columns
write_rejects(
    companies_bronze_invalid,
    "companies",
    "ticker is null",
    "capstone.default.silver_data_quality_rejects"
)

# Get valid rows
companies_bronze_valid=companies_bronze_df.filter(~any_null_condition(companies_bronze_df))

Invalid rows:  0


In [0]:
# Delete duplicates

w=Window.partitionBy("ticker").orderBy(col("ingestion_timestamp").desc())

companies_dedup=(
    companies_bronze_valid
    .withColumn("row_number",row_number().over(w))
    .filter(col("row_number")==1)
    .drop("row_number")
    .withColumn("silver_processed_timestamp",current_timestamp())
)

In [0]:
# Create staging table

companies_dedup.createOrReplaceTempView("silver_companies_staging")

In [0]:
%sql
-- Merge the staged rows into the silver tables

MERGE INTO capstone.default.silver_companies tgt
USING silver_companies_staging src
ON tgt.ticker = src.ticker
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
write_log("capstone.default.silver_companies","capstone", "capstone.default.silver_log")

In [0]:
new_max_ts=companies_dedup.agg(max("ingestion_timestamp")).collect()[0][0]

if new_max_ts is not None: 
    update_watermark("capstone.default.silver_companies",new_max_ts)


### Loading daily prices data

In [0]:
last_ts=get_last_processed_timestamp("capstone.default.silver_daily_prices")

print(last_ts)

prices_bronze_df=spark.table("capstone.default.bronze_daily_prices").filter(col("ingestion_timestamp")>last_ts)

2026-01-05 09:36:21.936777


In [0]:
# Type casting

prices_cast_df=(
    prices_bronze_df
    .withColumn("trade_date",col("trade_date").cast("date"))
    .withColumn("open_price",col("open_price").cast("double"))
    .withColumn("high_price",col("high_price").cast("double"))
    .withColumn("low_price",col("low_price").cast("double"))
    .withColumn("close_price",col("close_price").cast("double"))
    .withColumn("volume",col("volume").cast("long"))
)

In [0]:
# Removing invalid rows

invalid_prices_df=prices_cast_df.filter(
    (any_null_condition(prices_cast_df)) |
    (col("open_price")<0) |
    (col("high_price")<0) |
    (col("low_price")<0)
)

# Quarantine the invalid columns

write_rejects(
    invalid_prices_df,
    "capstone.default.silver_daily_prices",
    "invalid data",
    "capstone.default.silver_data_quality_rejects"
)

# Get valid rows

prices_bronze_valid=prices_bronze_df.subtract(invalid_prices_df)

prices_bronze_valid.display()

ticker,trade_date,open_price,high_price,low_price,close_price,adjusted_close,volume,ingestion_timestamp,source_file,load_type


In [0]:
# Get rows with valid tickers

valid_tickers=spark.table("capstone.default.silver_companies").select("ticker")

invalid_tickers_df=prices_bronze_valid.join(valid_tickers, on="ticker", how="leftanti")
write_rejects(
    invalid_tickers_df,
    "capstone.default.silver_daily_prices",
    "ticker not in silver_companies",
    "capstone.default.silver_fk_rejects"
)

prices_bronze_valid=prices_bronze_valid.join(valid_tickers, on="ticker", how="inner")\
                        .withColumn("silver_processed_timestamp",current_timestamp())

In [0]:
# Delete duplicates

w=Window.partitionBy("ticker","trade_date").orderBy(col("ingestion_timestamp").desc())

prices_dedup=(
    prices_bronze_valid
    .withColumn("row_number",row_number().over(w))
    .filter(col("row_number")==1)
    .drop("row_number")
)

prices_dedup.createOrReplaceTempView("silver_prices_staging")

In [0]:
%sql
-- Merge the staged rows into the silver table

MERGE INTO capstone.default.silver_daily_prices tgt
USING silver_prices_staging src
ON tgt.ticker = src.ticker AND tgt.trade_date = src.trade_date
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
write_log("capstone.default.silver_daily_prices","capstone", "capstone.default.silver_log")

In [0]:
new_max_ts = prices_dedup.agg(
    {"ingestion_timestamp": "max"}
).collect()[0][0]

if new_max_ts is not None:
    update_watermark("capstone.default.silver_daily_prices", new_max_ts)

### Loading traders data

In [0]:
last_ts=get_last_processed_timestamp("capstone.default.silver_traders")

print(last_ts)

traders_bronze_df=spark.table("capstone.default.bronze_traders").filter(col("ingestion_timestamp")>last_ts)

2026-01-05 09:03:45.186493


In [0]:
# Type casting

traders_cast_df=(
    traders_bronze_df
    .withColumn("initial_cash",col("initial_cash").cast("double"))
)

In [0]:
# Remove invalid roows

invalid_traders=traders_cast_df.filter(
    (any_null_condition(traders_cast_df)) |
    (col("initial_cash")<0)
)

write_rejects(
    invalid_traders,
    "capstone.default.silver_traders",
    "invalid data",
    "capstone.default.silver_data_quality_rejects"
)

# Get valid rows

traders_bronze_valid=traders_bronze_df.subtract(invalid_traders)

In [0]:
traders_bronze_valid.display()

trader_id,trader_type,initial_cash,base_currency,created_at,ingestion_timestamp,source_file,load_type


In [0]:
# Delete duplicates

w=Window.partitionBy("trader_id").orderBy(col("ingestion_timestamp").desc())

traders_dedup=(
    traders_bronze_valid
    .withColumn("row_number",row_number().over(w))
    .filter(col("row_number")==1)
    .drop("row_number")
    .withColumn("silver_processed_timestamp",current_timestamp())
)

In [0]:
# Create staging table

traders_dedup.createOrReplaceTempView("silver_traders_staging")

In [0]:
%sql
-- Merge rows into silver_traders

MERGE INTO capstone.default.silver_traders tgt
USING silver_traders_staging src
ON tgt.trader_id = src.trader_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
write_log("capstone.default.silver_traders","capstone", "capstone.default.silver_log")

In [0]:
new_max_ts = traders_dedup.agg(
    {"ingestion_timestamp": "max"}
).collect()[0][0]

if new_max_ts is not None:
    update_watermark("capstone.default.silver_traders", new_max_ts)

### Loading of Trades

In [0]:
last_ts=get_last_processed_timestamp("capstone.default.silver_trades")

trades_bronze_df=spark.table("capstone.default.bronze_trades").filter(col("ingestion_timestamp")>last_ts)

In [0]:
# Type casting

trades_cast_df=(
    trades_bronze_df
    .withColumn("trade_date",col("trade_date").cast("date"))
    .withColumn("trade_time",col("trade_time").cast("timestamp"))
    .withColumn("price",col("price").cast("double"))
    .withColumn("quantity",col("quantity").cast("int"))
)

In [0]:
# Remove invalid rows

invalid_trades=trades_cast_df.filter(
    (any_null_condition(trades_cast_df)) |
    (col("price")<0)|
    (col("quantity")<0)
)

write_rejects(
    invalid_trades,
    "capstone.default.silver_trades",
    "invalid data",
    "capstone.default.silver_data_quality_rejects"
)

trades_bronze_valid=trades_cast_df.subtract(invalid_trades)

invalid_traders.display()

trader_id,trader_type,initial_cash,base_currency,created_at,ingestion_timestamp,source_file,load_type


In [0]:
# Checking for valid tickers and traders

valid_traders = spark.table("capstone.default.silver_traders").select("trader_id")
valid_companies = spark.table("capstone.default.silver_companies").select("ticker")

invalid_trades = (
    trades_bronze_valid
    .join(valid_traders, "trader_id", "left_anti")
    .unionByName(
        trades_bronze_valid.join(valid_companies, "ticker", "left_anti")
    )
)

write_rejects(
    invalid_trades,
    "trades",
    "FK violation (trader or ticker)",
    "capstone.default.silver_fk_rejects"
)

trades_bronze_valid = (
    trades_bronze_valid
    .join(valid_traders, "trader_id")
    .join(valid_companies, "ticker")
    .withColumn("silver_processed_timestamp", current_timestamp())
)


In [0]:
# Delete duplicates

w=Window.partitionBy("trade_id").orderBy(col("ingestion_timestamp").desc())

trades_dedup=(
    trades_bronze_valid
    .withColumn("row_number",row_number().over(w))
    .filter(col("row_number")==1)
    .drop("row_number")
    .withColumn("silver_processed_timestamp",current_timestamp())
)

In [0]:
trades_dedup.createOrReplaceTempView("silver_trades_staging")

In [0]:
%sql
MERGE INTO capstone.default.silver_trades tgt
USING silver_trades_staging src
ON tgt.trade_id = src.trade_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
write_log("capstone.default.silver_trades","capstone", "capstone.default.silver_log")

In [0]:
new_max_ts = trades_dedup.agg(
    {"ingestion_timestamp": "max"}
).collect()[0][0]

if new_max_ts is not None:
    update_watermark("capstone.default.silver_trades", new_max_ts)

### Loading of fx rates

In [0]:
new_max_ts = trades_dedup.agg(
    {"ingestion_timestamp": "max"}
).collect()[0][0]

if new_max_ts is not None:
    update_watermark("capstone.default.silver_trades", new_max_ts)

In [0]:
last_ts=get_last_processed_timestamp("capstone.default.silver_fx_rates")

fx_rates_bronze_df=spark.table("capstone.default.bronze_fx_rates").filter(col("ingestion_timestamp")>last_ts)

In [0]:
# Type casting

fx_rates_cast=(
    fx_rates_bronze_df
    .withColumn("fx_date",col("fx_date").cast("date"))
    .withColumn("fx_rate",col("fx_rate").cast("double"))
)
invalid_fx_rates=fx_rates_cast.filter(
    (any_null_condition(fx_rates_cast)) |
    (col("fx_rate")<0)
)
write_rejects(
    invalid_fx_rates,
    "capstone.default.silver_fx_rates",
    "invalid data",
    "capstone.default.silver_data_quality_rejects"
)
fx_rates_bronze_valid=fx_rates_cast.subtract(invalid_fx_rates)

In [0]:
# Deleting duplicates

w=Window.partitionBy("fx_date","from_currency","to_currency").orderBy(col("ingestion_timestamp").desc())

fx_rates_dedup=(
    fx_rates_bronze_valid
    .withColumn("row_number",row_number().over(w))
    .filter(col("row_number")==1)
    .drop("row_number")
    .withColumn("silver_processed_timestamp",current_timestamp())
)

fx_rates_dedup.createOrReplaceTempView("silver_fx_rates_staging")

In [0]:
%sql
-- Merge the rows into the silver table

MERGE INTO capstone.default.silver_fx_rates tgt
USING silver_fx_rates_staging src
ON tgt.fx_date = src.fx_date AND tgt.from_currency = src.from_currency AND tgt.to_currency = src.to_currency
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
write_log("capstone.default.silver_fx_rates","capstone", "capstone.default.silver_log")

In [0]:
new_max_ts = fx_rates_dedup.agg(
    {"ingestion_timestamp": "max"}
).collect()[0][0]

if new_max_ts is not None:
    update_watermark("capstone.default.silver_fx_rates", new_max_ts)

### Optimization

In [0]:
%sql
OPTIMIZE capstone.default.silver_daily_prices
ZORDER BY (ticker);

path,metrics
,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(1, 200896), List(0, 0), 1, List(0, 0), 0, null), null, 0, 0, 1, 1, false, 0, 0, 1767606654778, 1767606655149, 8, 0, null, List(0, 0), null, 9, 9, 0, 0, null)"


In [0]:
%sql
OPTIMIZE capstone.default.silver_trades
ZORDER BY (trade_id, trader_id);

path,metrics
,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(1, 393230), List(0, 0), 1, List(0, 0), 0, null), null, 0, 0, 1, 1, false, 0, 0, 1767606656443, 1767606656958, 8, 0, null, List(0, 0), null, 10, 10, 0, 0, null)"


### Logging

In [0]:
%sql
SELECT * FROM capstone.default.silver_log;

log_id,table_name,source_system,ingestion_timestamp,created_at
9cdc6a2d-257f-4a1f-9414-7da9a77c8787,capstone.default.silver_daily_prices,capstone,2026-01-05T09:47:11.747Z,2026-01-05T09:47:11.747Z
237e514c-4fa3-41c0-9038-e7cf4fc55fe4,capstone.default.silver_daily_prices,capstone,2026-01-05T09:50:26.492Z,2026-01-05T09:50:26.492Z
217eadc4-7520-4f4b-886b-32789c50e301,capstone.default.silver_companies,capstone,2026-01-05T09:50:17.328Z,2026-01-05T09:50:17.328Z
3955d513-4ef6-4e1e-b605-6e5254097b18,capstone.default.silver_companies,capstone,2026-01-05T09:46:56.234Z,2026-01-05T09:46:56.234Z
b1664e3f-c09c-4b46-82f2-e9f8e57bbecb,capstone.default.silver_fx_rates,capstone,2026-01-05T09:50:52.689Z,2026-01-05T09:50:52.689Z
6714329b-ea3c-46e2-82e5-73a8beabfc04,capstone.default.silver_traders,capstone,2026-01-05T09:50:34.131Z,2026-01-05T09:50:34.131Z
5ab47c7e-abac-425e-a0a5-494eaab77b77,capstone.default.silver_traders,capstone,2026-01-05T09:47:31.418Z,2026-01-05T09:47:31.418Z
302f2592-e234-4102-85dc-60846e0f52d8,capstone.default.silver_trades,capstone,2026-01-05T09:50:43.378Z,2026-01-05T09:50:43.378Z


In [0]:
%sql
SELECT * FROM capstone.default.silver_watermark;

table_name,last_processed_timestamp
capstone.default.silver_daily_companies,2026-01-03T05:49:24.142Z
capstone.default.silver_companies,2026-01-05T09:00:49.108Z
capstone.default.silver_traders,2026-01-05T09:03:45.186Z
capstone.default.silver_daily_prices,2026-01-05T09:36:21.936Z
capstone.default.silver_trades,2026-01-05T09:36:28.822Z
capstone.default.silver_fx_rates,2026-01-05T09:36:33.484Z


In [0]:
%sql
SELECT * FROM capstone.default.silver_data_quality_rejects
ORDER BY rejected_at DESC;

table_name,reject_reason,record,rejected_at
companies,ticker is null,"{""company_name"":""Bad Corp"",""sector"":""Tech"",""industry"":""Software"",""country"":""US"",""exchange"":""NASDAQ"",""currency"":""USD"",""is_active"":true,""ingestion_timestamp"":""2026-01-04T09:53:02.480Z"",""source_file"":""/Volumes/capstone/default/raw_data/companies/historical/companies_inconsistent.csv"",""load_type"":""historical""}",2026-01-05T09:05:41.443Z
companies,ticker is null,"{""company_name"":""Bad Corp"",""sector"":""Tech"",""industry"":""Software"",""country"":""US"",""exchange"":""NASDAQ"",""currency"":""USD"",""is_active"":true,""ingestion_timestamp"":""2026-01-04T09:53:02.480Z"",""source_file"":""/Volumes/capstone/default/raw_data/companies/historical/companies_inconsistent.csv"",""load_type"":""historical""}",2026-01-05T08:58:54.798Z
companies,ticker is null,"{""company_name"":""Bad Corp"",""sector"":""Tech"",""industry"":""Software"",""country"":""US"",""exchange"":""NASDAQ"",""currency"":""USD"",""is_active"":true,""ingestion_timestamp"":""2026-01-04T09:53:02.480Z"",""source_file"":""/Volumes/capstone/default/raw_data/companies/historical/companies_inconsistent.csv"",""load_type"":""historical""}",2026-01-05T07:21:52.281Z
companies,ticker is null,"{""company_name"":""Bad Corp"",""sector"":""Tech"",""industry"":""Software"",""country"":""US"",""exchange"":""NASDAQ"",""currency"":""USD"",""is_active"":true,""ingestion_timestamp"":""2026-01-04T09:53:02.480Z"",""source_file"":""/Volumes/capstone/default/raw_data/companies/historical/companies_inconsistent.csv"",""load_type"":""historical""}",2026-01-05T07:16:59.541Z
companies,ticker is null,"{""company_name"":""Bad Corp"",""sector"":""Tech"",""industry"":""Software"",""country"":""US"",""exchange"":""NASDAQ"",""currency"":""USD"",""is_active"":true,""ingestion_timestamp"":""2026-01-04T09:53:02.480Z"",""source_file"":""/Volumes/capstone/default/raw_data/companies/historical/companies_inconsistent.csv"",""load_type"":""historical""}",2026-01-05T03:54:42.808Z
capstone.default.silver_trades,invalid data,"{""trade_id"":""T005"",""trader_id"":""TRD001"",""ticker"":""AAPL"",""trade_date"":""2025-12-25"",""trade_time"":""2025-12-25T10:20:00.000Z"",""side"":""BUY"",""quantity"":5,""price"":-100.0,""ingestion_timestamp"":""2026-01-04T11:49:49.308Z"",""source_file"":""/Volumes/capstone/default/raw_data/trades/historical/trades_inconsistent.csv"",""load_type"":""historical""}",2026-01-05T02:53:23.126Z
capstone.default.silver_trades,invalid data,"{""trade_id"":""T003"",""trader_id"":""TRD001"",""ticker"":""INVALID"",""trade_date"":""2025-12-25"",""trade_time"":""2025-12-25T10:10:00.000Z"",""side"":""SELL"",""quantity"":10,""price"":180.0,""ingestion_timestamp"":""2026-01-04T11:49:49.308Z"",""source_file"":""/Volumes/capstone/default/raw_data/trades/historical/trades_inconsistent.csv"",""load_type"":""historical""}",2026-01-05T02:53:23.126Z
capstone.default.silver_trades,invalid data,"{""trade_id"":""T004"",""trader_id"":""TRD001"",""ticker"":""AAPL"",""trade_date"":""2025-12-25"",""trade_time"":""2025-12-25T10:15:00.000Z"",""side"":""SELL"",""quantity"":-5,""price"":180.0,""ingestion_timestamp"":""2026-01-04T11:49:49.308Z"",""source_file"":""/Volumes/capstone/default/raw_data/trades/historical/trades_inconsistent.csv"",""load_type"":""historical""}",2026-01-05T02:53:23.126Z
capstone.default.silver_traders,invalid data,"{""trader_type"":""institutional"",""initial_cash"":1000000.0,""base_currency"":""USD"",""ingestion_timestamp"":""2026-01-04T11:22:22.882Z"",""source_file"":""/Volumes/capstone/default/raw_data/traders/historical/traders_inconsistent.csv"",""load_type"":""historical""}",2026-01-05T02:53:05.072Z
capstone.default.silver_traders,invalid data,"{""trader_id"":""TRD003"",""trader_type"":""algo"",""initial_cash"":-5000.0,""base_currency"":""USD"",""ingestion_timestamp"":""2026-01-04T11:22:22.882Z"",""source_file"":""/Volumes/capstone/default/raw_data/traders/historical/traders_inconsistent.csv"",""load_type"":""historical""}",2026-01-05T02:53:05.072Z


In [0]:
%sql
SELECT * FROM capstone.default.silver_fk_rejects
ORDER BY rejected_at DESC;

table_name,reject_reason,record,rejected_at
