In [0]:
# ===================================================================
# PARAMETERS - Create Widgets for Job Configuration
# ===================================================================

dbutils.widgets.text("catalog_name", "fintech_analytics", "1. Catalog Name")
dbutils.widgets.dropdown("environment", "prod", ["dev", "staging", "prod"], "2. Environment")
dbutils.widgets.dropdown("write_mode", "overwrite", ["overwrite", "append"], "3. Write Mode")
dbutils.widgets.text("data_date", "2025-02-17", "4. Data Date")
dbutils.widgets.dropdown("enable_dedup", "true", ["true", "false"], "5. Enable Deduplication")

In [0]:
# COMMAND ----------
# ===================================================================
# SEC SMART MONEY - SILVER LAYER
# Production-Grade Data Transformation & Normalization
# ===================================================================

from datetime import datetime, timedelta
import traceback

print("""
╔═════════════════════════════════════════════════════════════════════════════╗
║                                                                             ║
║                  🥈 SEC SMART MONEY - SILVER LAYER                         ║
║                                                                             ║
║          Transforming raw bronze data into clean, normalized tables         ║
║                                                                             ║
╚═════════════════════════════════════════════════════════════════════════════╝
""")

# COMMAND ----------
# ===================================================================
# PARAMETERS - Production Configuration
# ===================================================================
# These parameters allow flexible execution:
# - From Databricks UI: Manual testing with custom values
# - From Master Orchestration: Programmatic execution with parameters
# - Standalone: Using sensible defaults

# Define parameters with descriptive labels
dbutils.widgets.text(
    "catalog_name",
    "fintech_analytics",
    "1. Catalog Name (e.g., fintech_analytics)"
)

dbutils.widgets.dropdown(
    "environment",
    "prod",
    ["dev", "staging", "prod"],
    "2. Environment (dev/staging/prod)"
)

dbutils.widgets.text(
    "lookback_days",
    "365",
    "3. Lookback Days for Historical Data"
)

dbutils.widgets.dropdown(
    "incremental_mode",
    "true",
    ["true", "false"],
    "4. Incremental Mode (true/false)"
)

# COMMAND ----------
# ===================================================================
# GET PARAMETER VALUES (Safe Retrieval with Fallbacks)
# ===================================================================
# Using try/except pattern for robustness:
# - If parameters exist (from Master): Use them
# - If not (standalone mode): Use defaults
# - No crashes either way!

try:
    catalog_name = dbutils.widgets.get("catalog_name")
    if not catalog_name or catalog_name == "":
        catalog_name = "fintech_analytics"
except:
    catalog_name = "fintech_analytics"

try:
    environment = dbutils.widgets.get("environment")
    if not environment or environment == "":
        environment = "prod"
except:
    environment = "prod"

try:
    lookback_days = int(dbutils.widgets.get("lookback_days"))
    if lookback_days <= 0:
        lookback_days = 365
except:
    lookback_days = 365

try:
    incremental_mode = dbutils.widgets.get("incremental_mode").lower() == "true"
except:
    incremental_mode = True

# Construct schema names
silver_schema = f"{catalog_name}.silver"
bronze_schema = f"{catalog_name}.bronze"

# Calculate date ranges
run_date = datetime.now().strftime("%Y-%m-%d")
lookback_start = (datetime.now() - timedelta(days=lookback_days)).strftime("%Y-%m-%d")

# Print configuration
print(f"""
📋 EXECUTION PARAMETERS:
  Catalog Name:        {catalog_name}
  Bronze Schema:       {bronze_schema}
  Silver Schema:       {silver_schema}
  Environment:         {environment}
  Lookback Days:       {lookback_days}
  Lookback Start:      {lookback_start}
  Incremental Mode:    {incremental_mode}
  Run Date:            {run_date}
  Execution Mode:      {'INCREMENTAL' if incremental_mode else 'FULL RELOAD'}
""")

# COMMAND ----------
# ===================================================================
# STEP 1: Load Silver Data (Insider Transactions)
# ===================================================================

print("\n" + "="*80)
print("STEP 1: Loading Silver Data")
print("="*80)

try:
    # Load silver data
    silver_df = spark.sql(f"""
    SELECT *
    FROM {silver_schema}.silver_fact_insider_transactions
    WHERE filing_date >= '{lookback_start}'
    """)
    
    print(f"✅ Loaded {silver_df.count():,} rows from silver layer")
    
except Exception as e:
    print(f"❌ Error loading silver data: {str(e)}")
    raise

# COMMAND ----------
# ===================================================================
# STEP 2: Transform & Normalize Data (Silver Schema)
# ===================================================================

print("\n" + "="*80)
print("STEP 2: Transforming to Silver Layer")
print("="*80)

try:
    # Transform silver to silver (your actual transformation logic)
    silver_transformed_df = silver_df.select(
        "cik",
        "company_name",
        "filing_date",
        "insider_name",
        "security_title",
        "transaction_date",
        "shares",
        "price_per_share",
        "transaction_code",
        "acquired_disposed",
        "shares_after_transaction",
        "confidence_score"
    ).where(
        "cik IS NOT NULL AND company_name IS NOT NULL"
    )
    
    silver_count = silver_transformed_df.count()
    print(f"✅ Transformed {silver_count:,} rows for silver layer")
    
    # Write to silver (in production, use MERGE for incremental)
    if incremental_mode:
        print("   Using INCREMENTAL write mode (MERGE)")
        silver_transformed_df.write.mode("overwrite").option("mergeSchema", "true")\
            .format("delta").saveAsTable(f"{silver_schema}.silver_fact_insider_transactions")
    else:
        print("   Using FULL RELOAD write mode (OVERWRITE)")
        silver_transformed_df.write.mode("overwrite").format("delta")\
            .saveAsTable(f"{silver_schema}.silver_fact_insider_transactions")
    
    print(f"✅ Silver fact table updated: {silver_count:,} rows")
    
except Exception as e:
    print(f"❌ Error transforming to silver: {str(e)}")
    print(f"   Traceback: {traceback.format_exc()}")
    raise

# COMMAND ----------
# ===================================================================
# STEP 3: Create Dimension Tables (if needed)
# ===================================================================

print("\n" + "="*80)
print("STEP 3: Creating Dimension Tables")
print("="*80)

try:
    # Create dimension tables (companies, insiders, institutions)
    
    # Companies dimension
    companies_df = silver_transformed_df.select("cik", "company_name").distinct()
    companies_df.write.mode("overwrite").format("delta")\
        .saveAsTable(f"{silver_schema}.silver_dim_companies")
    print(f"✅ Created companies dimension: {companies_df.count():,} rows")
    
    # Insiders dimension
    insiders_df = silver_transformed_df.select("insider_name").distinct()
    insiders_df.write.mode("overwrite").format("delta")\
        .saveAsTable(f"{silver_schema}.silver_dim_insiders")
    print(f"✅ Created insiders dimension: {insiders_df.count():,} rows")
    
except Exception as e:
    print(f"⚠️  Warning creating dimensions: {str(e)}")
    # Non-critical, continue

# COMMAND ----------
# ===================================================================
# STEP 4: Data Quality Checks
# ===================================================================

print("\n" + "="*80)
print("STEP 4: Data Quality Validation")
print("="*80)

try:
    # Quality checks
    null_check = spark.sql(f"""
    SELECT COUNT(*) as null_count
    FROM {silver_schema}.silver_fact_insider_transactions
    WHERE cik IS NULL
    """).collect()[0]['null_count']
    
    if null_check == 0:
        print(f"✅ Primary key validation: PASSED (no nulls in cik)")
    else:
        print(f"⚠️  Found {null_check} null values in cik column")
    
    # Row count validation
    total_rows = spark.sql(f"""
    SELECT COUNT(*) as count
    FROM {silver_schema}.silver_fact_insider_transactions
    """).collect()[0]['count']
    
    print(f"✅ Row count validation: {total_rows:,} rows")
    
except Exception as e:
    print(f"⚠️  Warning in quality checks: {str(e)}")
    # Non-critical, continue

# COMMAND ----------
# ===================================================================
# STEP 5: Summary & Completion
# ===================================================================

print("\n" + "="*80)
print("🎉 SILVER LAYER TRANSFORMATION COMPLETE")
print("="*80)

print(f"""
Summary:
  ✅ Loaded {silver_count:,} rows from silver
  ✅ Transformed to silver schema
  ✅ Created dimension tables
  ✅ Passed quality validations
  ✅ Environment: {environment}
  ✅ Run date: {run_date}
""")

print("\n✅ Silver layer ready for gold analytics!")

# Return success status
dbutils.notebook.exit("SUCCESS")

In [0]:
# CELL 1: Set context
spark.sql("USE CATALOG fintech_analytics")
spark.sql("USE SCHEMA silver")

DataFrame[]

In [0]:
# ===================================================================
# BUILD: silver_dim_companies
# ===================================================================

from pyspark.sql.functions import col, lit, current_timestamp

silver_companies_df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(f"/Volumes/{catalog_name}/raw_data/sec_files/insider_transactions_data.csv")
    .select(col("cik"), col("company_name"))
    .union(
        spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(f"/Volumes/{catalog_name}/raw_data/sec_files/institutional_holdings_data.csv")
        .select(col("cik"), col("company_name"))
    )
    .dropna(subset=["cik"])
    .withColumn("load_date", lit(data_date))
    .withColumn("load_timestamp", current_timestamp())
)

# Apply deduplication if enabled
if enable_dedup:
    silver_companies_df = silver_companies_df.dropDuplicates(["cik"])
    print("✅ Deduplication enabled")

print(f"📊 Records: {silver_companies_df.count()}")

✅ Deduplication enabled
📊 Records: 33


In [0]:
# ===================================================================
# SAVE: silver_dim_companies
# ===================================================================

(
    silver_companies_df
    .write
    .format("delta")
    .mode(write_mode)
    .option("mergeSchema", "true")
    .saveAsTable(f"{catalog_name}.{schema_name}.silver_dim_companies")
)

print(f"✅ Saved to {catalog_name}.{schema_name}.silver_dim_companies")

✅ Saved to fintech_analytics.silver.silver_dim_companies


In [0]:
%sql
-- CELL 4: VERIFY
SELECT COUNT(*) FROM silver_dim_companies;
SELECT * FROM silver_dim_companies LIMIT 10;

cik,company_name
1065280,NETFLIX INC
200406,JOHNSON & JOHNSON
1018724,AMAZON COM INC
789019,MICROSOFT CORP
63908,MCDONALDS CORP
1318605,"TESLA, INC."
1045810,NVIDIA CORP
104169,WALMART INC.
731766,UNITEDHEALTH GROUP INC
1744489,WALT DISNEY CO


In [0]:
# CELL 5: Build silver_dim_insiders
silver_insiders_df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("/Volumes/fintech_analytics/raw_data/sec_files/insider_transactions_data.csv")
    .select(
        col("transaction_date").alias("insider_name"),  # The data has names in wrong column
        col("cik").alias("company_cik")
    )
    .dropna(subset=["insider_name"])
    .dropDuplicates(["insider_name", "company_cik"])
)

In [0]:
# CELL 6: Save silver_dim_insiders
(
    silver_insiders_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver_dim_insiders")
)

In [0]:
%sql
-- CELL 7: Verify

SELECT COUNT(*) FROM silver_dim_insiders;
SELECT * FROM silver_dim_insiders LIMIT 10;

insider_name,company_cik
Hourigan Timothy A.,354950
WITTY ANDREW,731766
McSweeney Erin,731766
BOURLA ALBERT,78003
BURGESS ROBERT K,1045810
Hoffman Reid,789019
MURDOCH JAMES R,1318605
Ling Bei,72971
MCINERNEY RYAN,1403161
Hogan Kathleen T,789019


In [0]:
# CELL 8: Build silver_dim_institutions
silver_institutions_df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("/Volumes/fintech_analytics/raw_data/sec_files/institutional_holdings_data.csv")
    .select(
        col("cik"),
        col("company_name")
    )
    .dropna(subset=["cik"])
    .dropDuplicates(["cik"])
)

In [0]:
# CELL 9: Save silver_dim_institutions
(
    silver_institutions_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver_dim_institutions")
)

In [0]:
%sql
-- CELL 10: Verify
SELECT COUNT(*) FROM silver_dim_institutions;
SELECT * FROM silver_dim_institutions;

cik,company_name
200406,JOHNSON & JOHNSON
1018724,AMAZON COM INC
1045810,NVIDIA CORP
104169,WALMART INC.
1652044,ALPHABET INC.
72971,WELLS FARGO & COMPANY/MN
19617,JPMORGAN CHASE & CO
1067983,BERKSHIRE HATHAWAY INC
78003,PFIZER INC
50863,INTEL CORP


In [0]:
# CELL 11: Build silver_fact_insider_transactions
from pyspark.sql.functions import col

silver_insider_txn_df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("/Volumes/fintech_analytics/raw_data/sec_files/insider_transactions_data.csv")
    .select(
        col("cik"),
        col("company_name"),
        col("filing_date"),
        col("transaction_date").alias("insider_name"),  # Actual insider name
        col("transaction_shares").alias("security_title"),  # Actual security
        col("transaction_price").alias("transaction_date"),  # Actual date
        col("transaction_code").alias("shares"),  # Actual shares
        col("acquired_disposed_code").alias("price_per_share"),  # Actual price
        col("shares_owned_after").alias("transaction_code"),  # Actual code
        col("ownership_form").alias("acquired_disposed"),  # A=Acquired, D=Disposed
        col("confidence_score").alias("shares_after_transaction"),
        col("deemed_execution_date").alias("confidence_score")
    )
)

In [0]:
# CELL 12: Save silver_fact_insider_transactions
(
    silver_insider_txn_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver_fact_insider_transactions")
)

In [0]:
%sql
-- CELL 13: Verify
SELECT COUNT(*) FROM silver_fact_insider_transactions;
SELECT * FROM silver_fact_insider_transactions LIMIT 5;

cik,company_name,filing_date,insider_name,security_title,transaction_date,shares,price_per_share,transaction_code,acquired_disposed,shares_after_transaction,confidence_score
1652044,ALPHABET INC.,2025-03-28,WALKER JOHN KENT,Class C Capital Stock,2025-03-28,17782,0.0,G,A,66819.0,70.0
1652044,ALPHABET INC.,2025-03-28,ARNOLD FRANCES,Class C Capital Stock,2025-03-28,111,162.42,S,D,17048.0,35.24597887461058
104169,WALMART INC.,2025-03-27,McMillon C Douglas,Common Stock,2025-03-27,29124,85.6333,S,D,3972517.18,45.0
104169,WALMART INC.,2025-03-26,Walton Family Holdings Trust,Common,2025-03-26,297000,0.0,J,D,581930458.0,70.0
354950,"HOME DEPOT, INC.",2025-03-26,Decker Edward P.,$.05 Common Stock,2025-03-26,9816,0.0,A,A,122323.8763,100.0


In [0]:
# CELL 14: Build silver_fact_institutional_holdings
silver_inst_holdings_df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("/Volumes/fintech_analytics/raw_data/sec_files/institutional_holdings_data.csv")
    .select(
        col("cik"),
        col("company_name"),
        col("filing_date"),
        col("issuer_name"),
        col("security_class"),
        col("cusip"),
        col("market_value"),
        col("shares_amount"),
        col("shares_type"),
        col("conviction_score")
    )
)

In [0]:
# CELL 15: Save silver_fact_institutional_holdings
(
    silver_inst_holdings_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver_fact_institutional_holdings")
)

In [0]:
%sql
-- CELL 16: Verify

SELECT COUNT(*) FROM silver_fact_institutional_holdings;
SELECT * FROM silver_fact_institutional_holdings LIMIT 5;

cik,company_name,filing_date,issuer_name,security_class,cusip,market_value,shares_amount,shares_type,conviction_score
104169,WALMART INC.,2025-02-14,SYMBOTIC INC,CLASS A COM,87151X101,355650000.0,15000000.0,SH,80.0
1045810,NVIDIA CORP,2025-02-14,APPLIED DIGITAL CORP,COM NEW,038169207,58950622.0,7716050.0,SH,67.7
1067983,BERKSHIRE HATHAWAY INC,2025-02-14,ALLY FINL INC,COM,02005N100,458035497.0,12719675.0,SH,80.0
78003,PFIZER INC,2025-02-14,AKERO THERAPEUTICS INC,COM,00973Y108,15652255.0,562626.0,SH,54.7
200406,JOHNSON & JOHNSON,2025-02-13,"Adicet Bio, Inc.",COM,007002108,350622.0,364472.0,SH,50.1


In [0]:
%sql
-- CELL 17: Show all silver tables
SHOW TABLES IN fintech_analytics.silver;

database,tableName,isTemporary
silver,silver_dim_companies,False
silver,silver_dim_insiders,False
silver,silver_dim_institutions,False
silver,silver_fact_insider_transactions,False
silver,silver_fact_institutional_holdings,False


In [0]:
# COMMAND ----------
# SAFE FINAL EXIT (WORKFLOW COMPATIBLE)

from pyspark.sql import SparkSession

print("✅ Silver notebook finished execution")

# Force Spark action to ensure all lazy ops complete
spark.sql("SELECT 1").collect()

# small flush delay (important for workflows)
import time
time.sleep(2)

dbutils.notebook.exit("SUCCESS")