# Step 2 which is after ingesting the files we will standardize types and keys from raw.clients in pgadmin.

setup (imports + DB connection + helpers)

In [3]:
import os
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime
from pathlib import Path

# 1) DB connection (adjust only this line if needed)
PG_URL = os.getenv("PG_URL") or "postgresql+psycopg2://etl_user:Capstone2025@localhost:5432/Wisdomindex_db"
engine = create_engine(PG_URL)

def run_sql(sql: str):
    with engine.begin() as conn:
        conn.execute(text(sql))

def q(sql: str) -> pd.DataFrame:
    with engine.connect() as conn:
        return pd.read_sql(text(sql), conn)

print("Connected OK.")

Connected OK.


ensure schemas exist

In [5]:
run_sql("""
CREATE SCHEMA IF NOT EXISTS raw  AUTHORIZATION etl_user;
CREATE SCHEMA IF NOT EXISTS stg   AUTHORIZATION etl_user;
CREATE SCHEMA IF NOT EXISTS mart  AUTHORIZATION etl_user;
""")
print("Schemas ensured.")

Schemas ensured.


configuration (what to treat as numeric, what to skip)

In [7]:
# Columns to drop from views if present (lineage/noise columns)
SKIP_COLS = {"_src_file", "_src_sheet", "_load_ts"}

# Column-name patterns that are likely numeric even if stored as text
LIKELY_NUMERIC = {
    "value","values","units","marketprice","market_price","costbasis","cost_basis",
    "totalvalue","total_value","cashvalue","facevalue","principal","balance",
    "totaldue","amount","purchaseamount","salary","annual_income","annual_expense",
    "gross_income","currentvalue","marketvalue","interest","premium"
}

# Optional: per-table overrides (force treatment for specific columns)
# e.g. {"holdings": {"numeric": {"mycol1","mycol2"}, "text": {"some_text"}}}
TABLE_OVERRIDES = {
    # "holdings": {"numeric": {"value","units","marketprice","costbasis"}},
}

auto-generate & create all stg.* views from raw.*

In [9]:
# This cell inspects every raw table. If it has client_id, it builds a stg view:
# - client_id is normalized as text (lower/trim)
# - numeric-like columns are cast to numeric
# - text columns are trimmed
# - adds a derived total_estimated_value if (value, units, marketprice) exist

generated_sql_chunks = []
built, skipped = [], []

raw_tables = q("""
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema='raw'
    ORDER BY table_name;
""")["table_name"].tolist()

for tbl in raw_tables:
    cols = q(f"""
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_schema='raw' AND table_name='{tbl}'
        ORDER BY ordinal_position;
    """)
    if cols.empty:
        skipped.append((tbl, "no columns"))
        continue

    colnames = [c.lower() for c in cols["column_name"]]
    if "client_id" not in colnames:
        skipped.append((tbl, "no client_id"))
        continue

    # Build SELECT list
    select_parts = ["  LOWER(TRIM(client_id::text)) AS client_id"]

    # Overrides for this table (if any)
    ovr = TABLE_OVERRIDES.get(tbl, {})
    ovr_num = set(map(str.lower, ovr.get("numeric", set())))
    ovr_txt = set(map(str.lower, ovr.get("text", set())))

    for _, row in cols.iterrows():
        c = row["column_name"]
        lc = c.lower()
        if lc == "client_id" or lc in SKIP_COLS:
            continue

        dt = str(row["data_type"]).lower()
        is_numeric_db = any(k in dt for k in ["integer","numeric","double","real","smallint","bigint","decimal"])

        # decide numeric vs text
        treat_numeric = (
            lc in ovr_num
            or is_numeric_db
            or (lc in LIKELY_NUMERIC and lc not in ovr_txt)
        )

        if treat_numeric:
            expr = f"(NULLIF(TRIM({c}::text),''))::numeric AS {lc}"
        else:
            expr = f"NULLIF(TRIM({c}::text),'') AS {lc}"
        select_parts.append("  " + expr)

    # derived total if we have value+units+marketprice
    if {"value","units","marketprice"}.issubset(set(colnames)):
        select_parts.append("""
  COALESCE(
    ((NULLIF(TRIM(value::text),''))::numeric
      * COALESCE((NULLIF(TRIM(units::text),''))::numeric, 1)
      * COALESCE((NULLIF(TRIM(marketprice::text),''))::numeric, 1)),
    (NULLIF(TRIM(value::text),''))::numeric,
    ((NULLIF(TRIM(units::text),''))::numeric
      * COALESCE((NULLIF(TRIM(marketprice::text),''))::numeric, 1))
  ) AS total_estimated_value
""".strip())

    select_sql = ",\n".join(select_parts)
    view_sql = f"""CREATE OR REPLACE VIEW stg.{tbl} AS
SELECT
{select_sql}
FROM raw.{tbl};"""

    try:
        run_sql(view_sql)
        built.append(tbl)
        generated_sql_chunks.append(f"-- stg.{tbl}\n{view_sql}\n")
    except Exception as e:
        skipped.append((tbl, str(e)))

print("Built views:", built)
print("Skipped:", skipped)

Built views: ['account_history', 'businesses', 'charities', 'disabilityltcinsuranceaccounts', 'entityinterests', 'expenses', 'facts', 'flows', 'incomes', 'investmentdepositaccounts', 'liabilitynoteaccounts', 'lifeinsuranceannuityaccounts', 'medicalinsuranceaccounts', 'personalpropertyaccounts', 'propertycasualtyinsuranceaccoun', 'realestateassets', 'values']
Skipped: [('_smoketest', 'no client_id'), ('calculations', 'no client_id'), ('clients', '(psycopg2.errors.InvalidTableDefinition) cannot change name of view column "client_name" to "clientname"\nHINT:  Use ALTER VIEW ... RENAME COLUMN ... to change name of view column instead.\n\n[SQL: CREATE OR REPLACE VIEW stg.clients AS\nSELECT\n  LOWER(TRIM(client_id::text)) AS client_id,\n  NULLIF(TRIM(clientname::text),\'\') AS clientname,\n  NULLIF(TRIM(firstname::text),\'\') AS firstname,\n  NULLIF(TRIM(lastname::text),\'\') AS lastname,\n  NULLIF(TRIM(hh_dateofbirth::text),\'\') AS hh_dateofbirth,\n  NULLIF(TRIM(gender::text),\'\') AS gend

quick sanity previews

In [11]:
# Show first few rows of the two views you care most about
for t in ["clients", "holdings"]:
    try:
        display(q(f"SELECT * FROM stg.{t} LIMIT 5;"))
    except Exception as e:
        print(f"stg.{t} preview error:", e)

Unnamed: 0,client_id,client_name,first_name,last_name,gender,marital_status,date_of_birth,address1,city,state_or_province,postal_code,employer_name,job_title
0,1,Tom And Tammy Cruise,Tom,Cruise,Male,Married,1977-01-20,101 Nice House,Rockwall,TX,75087,Maverick Enterprises,Pilot
1,2,Val And Victoria Kilmer,Val,Kilmer,Male,Married,1974-05-05,202 Nice House,Prosper,TX,75078,Kilmer Construction,CEO
2,3,Tom And Jane Skeritt,Tom,Skeritt,Male,Married,1946-02-07,303 Nice House,Dallas,TX,75252,Skerrit Airways,CFO
3,4,Tim And Terri Robbins,Tim,Robbins,Male,Married,1968-03-02,404 Nice House,Plano,TX,75024,Robins Roofing,Accounting
4,5,Michael And Mica Ironside,Michael,Ironside,Male,Married,1963-07-12,505 Nice House,Arlington,TX,76001,Ironside Iron,Head of Sales


Unnamed: 0,client_id,account_id,holdings_id,ticker,description,asset_class,holding_type,value,units,market_price,cost_basis,total_estimated_value
0,1,e5d2,0be0,DFIV,Dimensional International Value ETF,internat,Cash,322.97,7.36,43.88,276.33,104305.357696
1,1,e5d2,77b0,DFLV,Dimensional US Large Cap Value ETF,largevalue,Cash,575.1,18.498,31.09,546.25,330741.631782
2,1,e5d2,9089,DFSD,Dimensional Short Duration Fixed Income ETF,shortermbond,Cash,362.18,7.517,48.18,352.49,131170.390151
3,1,e5d2,f240,DFSV,Dimensional US Small Cap Value ETF,smallvalue,Cash,273.74,9.198,29.76,272.79,74931.529075
4,1,e5d2,e00d,DISV,Dimensional International Small Cap Value ETF,internat,Cash,174.75,5.188,33.68,146.64,30534.38904
