# Task 02 — Data Modeling & Transformation (dbt + Postgres)

**Project workspace**
- dbt project root: `D:/Python/Week 8/Shipping-a-Data-Product/medical_warehouse`
- models:
  - staging: `medical_warehouse/models/staging/stg_telegram_messages.sql`
  - marts: `medical_warehouse/models/marts/dim_channels.sql`, `dim_dates.sql`, `fct_messages.sql`
- schema docs/tests:
  - `medical_warehouse/models/staging/schema.yml`
  - `medical_warehouse/models/marts/schema.yml`
- singular tests: `medical_warehouse/tests/`
- upstream scripts (reference): `src/scraper.py`, `src/load_raw_to_postgres.py`, `src/login_test.py`

## What this notebook demonstrates (Task 2 requirements)
1. dbt project runs successfully (deps/debug/build/test)
2. Transformation layer implemented: staging + dimensional marts (star schema)
3. Data quality tests executed and passing
4. Documentation artifacts generated (manifest/catalog)
5. Direct Postgres SQL validation: relation existence, row counts, referential integrity

> Important: If your Postgres shows schemas like `analytics_staging` / `analytics_analytics` instead of `staging` / `analytics`, that's normal. This notebook reads `target/manifest.json` to discover the **actual schema names** and queries those.


## 0) Setup notes

If dbt cannot find `profiles.yml`, set `DBT_PROFILES_DIR`:

```powershell
$env:DBT_PROFILES_DIR = 'D:\\Python\\Week 8\\Shipping-a-Data-Product\\medical_warehouse'
# optionally:
$env:PGPASSWORD = 'your_password'
```

Then restart your notebook kernel.


In [None]:
from __future__ import annotations

import os
import json
import subprocess
from pathlib import Path
from dataclasses import dataclass
from typing import Any, Dict, Optional, List

import pandas as pd

try:
    import yaml
except Exception as e:
    raise RuntimeError('Missing dependency: PyYAML. Install with: pip install pyyaml') from e

try:
    from sqlalchemy import create_engine, text
except Exception as e:
    raise RuntimeError('Missing dependency: SQLAlchemy (and psycopg2). Install with: pip install sqlalchemy psycopg2-binary') from e

# --- Paths (based on your workspace) ---
REPO_ROOT = Path(r'D:\\Python\\Week 8\\Shipping-a-Data-Product')
PROJECT_DIR = REPO_ROOT / 'medical_warehouse'

DBT_PROJECT_YML = PROJECT_DIR / 'dbt_project.yml'
PROFILES_YML = PROJECT_DIR / 'profiles.yml'
TARGET_DIR = PROJECT_DIR / 'target'
MANIFEST_JSON = TARGET_DIR / 'manifest.json'

print('REPO_ROOT:', REPO_ROOT)
print('PROJECT_DIR:', PROJECT_DIR)
print('dbt_project.yml exists:', DBT_PROJECT_YML.exists())
print('profiles.yml exists:', PROFILES_YML.exists())


## 1) Helper functions
These helpers:
- run dbt CLI commands
- read `profiles.yml` (Postgres connection)
- parse `target/manifest.json` to find actual schemas/aliases
- run SQL queries and return results as DataFrames


In [None]:
def run_cmd(cmd: List[str], cwd: Path = PROJECT_DIR, check: bool = True):
    """Run a command and print stdout/stderr."""
    print('\n$ ' + ' '.join(cmd))
    proc = subprocess.run(cmd, cwd=str(cwd), text=True, capture_output=True)
    if proc.stdout:
        print(proc.stdout)
    if proc.stderr:
        print(proc.stderr)
    if check and proc.returncode != 0:
        raise RuntimeError(f'Command failed ({proc.returncode}): {cmd}')
    return proc

def read_yaml(path: Path) -> Dict[str, Any]:
    with open(path, 'r', encoding='utf-8') as f:
        return yaml.safe_load(f)

@dataclass
class DbtProfileConn:
    host: str
    port: int
    user: str
    password: Optional[str]
    dbname: str
    schema: str

def load_dbt_profile_conn(profiles_yml: Path = PROFILES_YML) -> DbtProfileConn:
    """Load target connection info from profiles.yml.

    Note: This assumes the first top-level profile in profiles.yml is the one used.
    If you have multiple profiles, adjust this function to choose by name.
    """
    data = read_yaml(profiles_yml)
    if not isinstance(data, dict) or not data:
        raise ValueError('profiles.yml is empty/invalid')

    profile_name = next(iter(data.keys()))
    profile = data[profile_name]
    target_name = profile.get('target')
    outputs = profile.get('outputs', {})

    if not target_name or target_name not in outputs:
        raise ValueError(f'Cannot resolve target from profiles.yml (profile={profile_name}, target={target_name})')

    out = outputs[target_name]
    host = out.get('host', 'localhost')
    port = int(out.get('port', 5432))
    user = out.get('user')
    password = out.get('password') or os.environ.get('PGPASSWORD')
    dbname = out.get('dbname') or out.get('database')
    schema = out.get('schema', 'public')

    if not user or not dbname:
        raise ValueError('profiles.yml missing required fields user/dbname')

    return DbtProfileConn(host=host, port=port, user=user, password=password, dbname=dbname, schema=schema)

def make_engine(conn: DbtProfileConn):
    # If password is None, you likely need to set $env:PGPASSWORD or fill profiles.yml
    pw = '' if conn.password is None else conn.password
    url = f'postgresql+psycopg2://{conn.user}:{pw}@{conn.host}:{conn.port}/{conn.dbname}'
    return create_engine(url)

def df_sql(engine, sql: str, params: Optional[dict] = None) -> pd.DataFrame:
    with engine.connect() as c:
        return pd.read_sql(text(sql), c, params=params)

def load_manifest_models(manifest_path: Path = MANIFEST_JSON, project_name: str = 'medical_warehouse') -> pd.DataFrame:
    """Return a DataFrame of project models and the actual schemas/aliases dbt compiled."""
    if not manifest_path.exists():
        raise FileNotFoundError(f'manifest.json not found at {manifest_path}. Run `dbt docs generate` or `dbt build` first.')
    manifest = json.loads(manifest_path.read_text(encoding='utf-8'))
    rows = []
    for unique_id, node in (manifest.get('nodes') or {}).items():
        if node.get('resource_type') != 'model':
            continue
        if node.get('package_name') != project_name:
            continue
        rows.append({
            'unique_id': unique_id,
            'name': node.get('name'),
            'alias': node.get('alias') or node.get('name'),
            'schema': node.get('schema'),
            'database': node.get('database'),
            'materialized': (node.get('config') or {}).get('materialized'),
            'original_file_path': node.get('original_file_path'),
        })
    return pd.DataFrame(rows).sort_values(['schema', 'name']).reset_index(drop=True)


## 2) dbt dependencies + debug (Task 2 evidence)
- `dbt deps`: installs packages (e.g., dbt_utils)
- `dbt debug`: validates profiles + connection


In [None]:
run_cmd(['dbt', 'deps'])
run_cmd(['dbt', 'debug'])


## 3) Build models + run tests (Task 2 core)
`dbt build` runs models and tests in dependency order.


In [None]:
run_cmd(['dbt', 'build'])


## 4) Generate dbt docs artifacts
Creates `target/manifest.json` (lineage + compiled config) and catalog.


In [None]:
run_cmd(['dbt', 'docs', 'generate'])
print('manifest exists:', MANIFEST_JSON.exists())


## 5) Inspect actual schemas/relations from the manifest
This is the authoritative way to know where dbt built relations.


In [None]:
models_df = load_manifest_models()
models_df


## 6) Connect to Postgres using profiles.yml
This connects to the same DB target dbt uses (assuming you run this from the same profiles).


In [None]:
conn = load_dbt_profile_conn()
print(conn)
engine = make_engine(conn)

# List schemas in the connected DB
df_sql(engine, """
select schema_name
from information_schema.schemata
order by schema_name
""")


## 7) Verify dbt relations exist in the DB
We query tables/views in the schemas that the manifest reports.


In [None]:
expected_schemas = sorted(set(models_df['schema'].dropna().tolist()))
print('Schemas from manifest:', expected_schemas)

df_sql(engine, """
select table_schema, table_name, table_type
from information_schema.tables
where table_schema = any(:schemas)
order by table_schema, table_name
""", params={'schemas': expected_schemas})


## 8) Row counts
Shows row counts for staging + marts models.


In [None]:
rels = {r['name']: (r['schema'], r['alias']) for _, r in models_df.iterrows()}
rels


In [None]:
from sqlalchemy import text


def count_relation(schema: str, relation: str) -> int:
    sql = text(f"select count(*) as n from {schema}.{relation}")
    with engine.connect() as conn:
        return int(conn.execute(sql).scalar_one())


for model_name, (schema, relname) in rels.items():
    try:
        n = count_relation(schema, relname)
        print(f"{schema}.{relname}: {n:,}")
    except Exception as e:
        print(f"FAILED counting {schema}.{relname}: {e}")

## 9) Referential integrity: fact → dimensions
These SQL checks complement dbt relationship tests.


In [None]:
schema_fct, fct = rels['fct_messages']
schema_ch, dim_ch = rels['dim_channels']
schema_dt, dim_dt = rels['dim_dates']

df_sql(engine, f"""
select count(*) as orphan_channels
from {schema_fct}.{fct} f
left join {schema_ch}.{dim_ch} d
  on f.channel_key = d.channel_key
where d.channel_key is null
""")


In [None]:
df_sql(engine, f"""
select count(*) as orphan_dates
from {schema_fct}.{fct} f
left join {schema_dt}.{dim_dt} d
  on f.date_key = d.date_key
where d.date_key is null
""")


## 10) Business-rule checks (SQL evidence)
These are common singular-test style checks.


In [None]:
schema_stg, stg = rels['stg_telegram_messages']

df_sql(engine, f"""
select count(*) as future_messages
from {schema_stg}.{stg}
where message_ts > now()
""")

In [None]:
df_sql(engine, f"""
select count(*) as future_messages
from {schema_stg}.{stg}
where message_ts::date > current_date
""")

## 11) Run dbt tests explicitly
Even if you ran `dbt build`, it’s useful to show `dbt test` output separately for Task 2.


In [None]:
run_cmd(['dbt', 'test'])


## 12) Optional: Serve dbt docs (interactive)
This starts a local web server to view the DAG.
Stop the cell/kernel to end.


In [None]:
run_cmd(['dbt', 'docs', 'serve', '--port', '8081'], check=False)


## Task 2 submission checklist
For your report/screenshots:
1. `dbt debug` success
2. `dbt build` success
3. `dbt test` success (0 failures)
4. `models_df` table showing model → schema mapping
5. Postgres evidence: tables/views exist in those schemas
6. Row counts for stg/dim/fct
7. Orphan checks show 0 orphans
8. Optional: dbt docs DAG screenshot
