In [1]:
import duckdb
print("Connected!")

Connected!


In [2]:
import os
os.getcwd()

'/home/hoan/VSCode/projects/local_elt_pipeline/notebooks'

In [3]:
con = duckdb.connect("../database/cfpb_complaints.duckdb")
con

<_duckdb.DuckDBPyConnection at 0x70743c32edf0>

In [4]:
result = con.execute("SELECT catalog_name, schema_name FROM information_schema.schemata;").fetchdf()
result

Unnamed: 0,catalog_name,schema_name
0,cfpb_complaints,intermediate
1,cfpb_complaints,main
2,cfpb_complaints,marts
3,cfpb_complaints,raw
4,cfpb_complaints,raw_staging
5,cfpb_complaints,staging
6,system,information_schema
7,system,main
8,system,pg_catalog
9,temp,main


## So we just need to focus on catalog_nameL cfpb_complaints. Let explore everything on it

### Raw -> raw_staging -> Staging -> Intermediate -> Marts
1. raw - Raw data exactly as loaded
2. raw_staging - Lightly cleaned + typed versions of raw
3. staging - Business-ready standardized tables
4. Intermediate - Transformations combining multiple staging models
5. marts - Final analytics tables for BI dashboards

# 1. Raw

In [5]:
# Show tables in the raw schema
raw_tables = con.sql("""
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema = 'raw'
    ORDER BY table_name;
""").df()
raw_tables

Unnamed: 0,table_name
0,_dlt_loads
1,_dlt_pipeline_state
2,_dlt_version
3,cfpb_complaints


In [6]:
# Explore each table
cfpb_complaints = con.sql("SELECT * FROM raw.cfpb_complaints LIMIT 3").df()
dlt_loads = con.sql("SELECT * FROM raw._dlt_loads LIMIT 3").df()
dlt_version = con.sql("SELECT * FROM raw._dlt_version LIMIT 3").df()
dlt_pipeline_state = con.sql("SELECT * FROM raw._dlt_pipeline_state LIMIT 3").df()

cfpb_complaints

Unnamed: 0,product,complaint_what_happened,date_sent_to_company,issue,sub_product,zip_code,complaint_id,timely,company_response,submitted_via,...,date_received,state,consumer_disputed,sub_issue,_dlt_extracted_at,_dlt_load_id,_dlt_id,consumer_consent_provided,tags,company_public_response
0,Credit reporting or other personal consumer re...,,2025-11-05 12:00:00-05:00,Incorrect information on your report,Credit reporting,30501,17054567,Yes,In progress,Web,...,2025-11-05 12:00:00-05:00,GA,,Account status incorrect,2025-11-29 20:41:26.161918-05:00,1764484856.0932565,teTjViH620pbXQ,,,
1,Credit card,,2025-10-02 13:00:00-04:00,Incorrect information on your report,General-purpose credit card or charge card,13021,16285565,Yes,In progress,Web,...,2025-10-02 13:00:00-04:00,NY,,Information belongs to someone else,2025-11-29 20:41:50.988020-05:00,1764484898.746703,DCm8e3jj9OzUOA,,,
2,Credit card,,2024-09-03 13:00:00-04:00,Getting a credit card,General-purpose credit card or charge card,2631,10000688,Yes,Closed with non-monetary relief,Postal mail,...,2024-09-03 13:00:00-04:00,MA,,Card opened without my consent or knowledge,2025-12-04 15:00:26.649658-05:00,1764896401.92204,Ld/Hxv4D+ZxWwQ,,,


In [7]:
con.sql("DESCRIBE raw.cfpb_complaints").df()

Unnamed: 0,column_name,column_type,null,key,default,extra
0,product,VARCHAR,YES,,,
1,complaint_what_happened,VARCHAR,YES,,,
2,date_sent_to_company,TIMESTAMP WITH TIME ZONE,YES,,,
3,issue,VARCHAR,YES,,,
4,sub_product,VARCHAR,YES,,,
5,zip_code,VARCHAR,YES,,,
6,complaint_id,VARCHAR,NO,,,
7,timely,VARCHAR,YES,,,
8,company_response,VARCHAR,YES,,,
9,submitted_via,VARCHAR,YES,,,


In [8]:
# Count number of complaints for each company
con.sql("""
    SELECT company, COUNT(*) AS total
    FROM raw.cfpb_complaints
    GROUP BY company
    ORDER BY total DESC
""").df()

Unnamed: 0,company,total
0,CAPITAL ONE FINANCIAL CORPORATION,68053
1,JPMORGAN CHASE & CO.,56003
2,WELLS FARGO & COMPANY,52658
3,"BANK OF AMERICA, NATIONAL ASSOCIATION",45185
4,"CITIBANK, N.A.",41131
...,...,...
63,UnBanked,2
64,Secure One Capital Corporation,2
65,"HUNTINGTON NATIONAL BANK, THE - FIRSTMERIT BAN...",1
66,TCF NATIONAL BANK,1


In [9]:
# FILTER OUT 10 COMPANIES
companies = [
    "CAPITAL ONE FINANCIAL CORPORATION",
    "JPMORGAN CHASE & CO.",
    "WELLS FARGO & COMPANY",
    "BANK OF AMERICA, NATIONAL ASSOCIATION",
    "CITIBANK, N.A.",
    "U.S. BANCORP",
    "GOLDMAN SACHS BANK USA",
    "PNC Bank N.A.",
    "TRUIST FINANCIAL CORPORATION",
    "STATE STREET BANK AND TRUST COMPANY"
]

query = f"""
    SELECT company, COUNT(*) AS total
    FROM raw.cfpb_complaints
    WHERE company IN ({','.join([f"'{c}'" for c in companies])})
    GROUP BY company
    ORDER BY total DESC
"""

con.sql(query).df()

Unnamed: 0,company,total
0,CAPITAL ONE FINANCIAL CORPORATION,68053
1,JPMORGAN CHASE & CO.,56003
2,WELLS FARGO & COMPANY,52658
3,"BANK OF AMERICA, NATIONAL ASSOCIATION",45185
4,"CITIBANK, N.A.",41131
5,U.S. BANCORP,13519
6,GOLDMAN SACHS BANK USA,11972
7,TRUIST FINANCIAL CORPORATION,11936
8,PNC Bank N.A.,7690
9,STATE STREET BANK AND TRUST COMPANY,8


## Just keep 9 companies => Create a new table

In [10]:
companies = [
    "CAPITAL ONE FINANCIAL CORPORATION",
    "JPMORGAN CHASE & CO.",
    "WELLS FARGO & COMPANY",
    "BANK OF AMERICA, NATIONAL ASSOCIATION",
    "CITIBANK, N.A.",
    "U.S. BANCORP",
    "GOLDMAN SACHS BANK USA",
    "PNC Bank N.A.",
    "TRUIST FINANCIAL CORPORATION",
]

query = f"""
    CREATE OR REPLACE TABLE eda_complaints AS
    SELECT *
    FROM raw.cfpb_complaints
    WHERE company IN ({','.join([f"'{c}'" for c in companies])})
"""

con.sql(query)

In [16]:
con.close()