# Load BAR Corps Table Notebook

## Purpose
This notebook automates the population of the `bar_corps` table in the COLIN Extract database.
It pulls data from:
1. **BAR Database** - Business AR filing data
2. **Auth Database** - Account mailing address information

## Process Overview
1. Truncate the `bar_corps` table in COLIN Extract
2. Query BAR database for business filing records
3. Query Auth database for mailing address status by org/business
4. Merge data and derive `bar_account_has_mailing_address` flag
5. Load merged data into `bar_corps` table
6. Optionally refresh the `mv_legacy_corps_data` materialized view

## Prerequisites
- `.env` file with database credentials
- Network access to BAR, Auth, and COLIN Extract databases
- `bar_corps` table must have the `bar_account_has_mailing_address` column

---

## Install Required Packages

In [None]:
%pip install pandas
%pip install sqlalchemy>=2.0
%pip install python-dotenv
%pip install psycopg2-binary
%pip install openpyxl

## Imports and Environment Setup

In [None]:
import os
from datetime import datetime
from typing import Optional

import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError, OperationalError
from sqlalchemy.engine import Engine
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
print("Environment variables loaded successfully.")

## Database Configuration

Configure connections to:
- **colin_extract**: Target database for `bar_corps` table
- **bar**: Source database for business AR filing data
- **auth**: Source database for account mailing address data

In [None]:
DATABASE_CONFIG = {
    'colin_extract': {
        'username': os.getenv("DATABASE_COLIN_EXTRACT_USERNAME"),
        'password': os.getenv("DATABASE_COLIN_EXTRACT_PASSWORD"),
        'host': os.getenv("DATABASE_COLIN_EXTRACT_HOST"),
        'port': os.getenv("DATABASE_COLIN_EXTRACT_PORT"),
        'name': os.getenv("DATABASE_COLIN_EXTRACT_NAME")
    },
    'bar': {
        'username': os.getenv("DATABASE_BAR_USERNAME"),
        'password': os.getenv("DATABASE_BAR_PASSWORD"),
        'host': os.getenv("DATABASE_BAR_HOST"),
        'port': os.getenv("DATABASE_BAR_PORT"),
        'name': os.getenv("DATABASE_BAR_NAME")
    },
    'auth': {
        'username': os.getenv("DATABASE_AUTH_USERNAME"),
        'password': os.getenv("DATABASE_AUTH_PASSWORD"),
        'host': os.getenv("DATABASE_AUTH_HOST"),
        'port': os.getenv("DATABASE_AUTH_PORT"),
        'name': os.getenv("DATABASE_AUTH_NAME")
    },
}

# Build connection URIs
for db_key, db_config in DATABASE_CONFIG.items():
    # Validate config
    missing_keys = [k for k, v in db_config.items() if v is None]
    if missing_keys:
        print(f"{db_key.upper()}: Missing environment variables for: {missing_keys}")

    # Build PostgreSQL URI
    uri = f"postgresql://{db_config['username']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['name']}"
    DATABASE_CONFIG[db_key] = {'uri': uri}

print("Database configurations built successfully.")

## Create and Test Database Engines

In [None]:
engines = {}

for db_key, config in DATABASE_CONFIG.items():
    try:
        print(f"Creating engine for {db_key.upper()}...")
        engine = create_engine(config['uri'])

        # Test connection
        with engine.connect() as conn:
            conn.execute(text("SELECT 1"))

        engines[db_key] = engine
        print(f"✓ {db_key.upper()} database engine created and tested successfully.")

    except OperationalError as e:
        print(f"✗ {db_key.upper()} database connection failed: {e}")
        raise
    except SQLAlchemyError as e:
        print(f"✗ {db_key.upper()} database engine creation failed: {e}")
        raise
    except Exception as e:
        print(f"✗ {db_key.upper()} unexpected error: {e}")
        raise

print("="*50)
print("All database engines ready for use.")
print("="*50)

---
# Section 1: Data Extraction
---

## Query BAR Database

Extract business filing data from the BAR database.

In [None]:
BAR_QUERY = """
SELECT DISTINCT ON (b.identifier)
    CASE
        WHEN b.identifier ~ '^\\d{7}$' THEN 'BC' || b.identifier
        ELSE b.identifier
    END AS identifier,
    MAX(f.fiscal_year) OVER (PARTITION BY b.identifier) AS latest_fiscal_year,
    MAX(f.filing_date) OVER (PARTITION BY b.identifier) AS last_ar_filing_date,
    u.sub,
    u.idp_userid,
    f.payment_account
FROM business b
    JOIN filing f ON f.business_id = b.id
    LEFT JOIN users u ON u.id = f.submitter_id
WHERE f.status = 'COMPLETED'
ORDER BY b.identifier, f.fiscal_year DESC;
"""

def query_bar_db(engine: Engine) -> pd.DataFrame:
    """Query BAR database for business filing data."""
    print("Starting BAR database query...")
    start_time = datetime.now()

    with engine.connect() as conn:
        df = pd.read_sql(text(BAR_QUERY), conn)

    duration = (datetime.now() - start_time).total_seconds()

    print(f"BAR query completed in {duration:.2f} seconds")
    print(f"Total records retrieved: {len(df):,}")
    print(f"Records with null payment_account: {df['payment_account'].isna().sum():,}")
    print(f"Unique payment_accounts: {df['payment_account'].nunique():,}")

    return df

# Execute BAR query
bar_df = query_bar_db(engines['bar'])

# Preview data
print("\nSample BAR data:")
display(bar_df.head())
print(f"\nColumn dtypes:\n{bar_df.dtypes}")

## Query Auth Database for Mailing Address Status

Determine which org/business combinations have complete mailing addresses.

A mailing address is considered complete if ALL of the following fields are non-null:
- `street`
- `city`
- `region`
- `country`
- `postal_code`

In [None]:
AUTH_QUERY = """
SELECT DISTINCT
    o.id AS org_id,
    e.business_identifier,
    CASE
        WHEN c.city IS NOT NULL
         AND c.country IS NOT NULL
         AND c.street IS NOT NULL
         AND c.region IS NOT NULL
         AND c.postal_code IS NOT NULL
        THEN TRUE
        ELSE FALSE
    END AS has_mailing_address
FROM affiliations a
JOIN orgs o ON a.org_id = o.id
LEFT JOIN entities e ON a.entity_id = e.id
LEFT JOIN contact_links cl ON o.id = cl.org_id
LEFT JOIN contacts c ON cl.contact_id = c.id
WHERE o.id = ANY(:org_ids)
;
"""

def query_auth_mailing_addresses(engine: Engine, org_ids: list, chunk_size: int = 1000) -> pd.DataFrame:
    """
    Query Auth database for mailing address status.

    Args:
        engine: SQLAlchemy engine for auth database
        org_ids: List of org/account IDs to check
        chunk_size: Number of org_ids per query batch

    Returns:
        DataFrame with org_id, business_identifier, has_mailing_address
    """
    print(f"Starting Auth database query for {len(org_ids):,} org IDs...")
    start_time = datetime.now()

    # Remove nulls and duplicates
    org_ids = [int(x) for x in org_ids if pd.notna(x)]
    org_ids = list(set(org_ids))
    print(f"Unique non-null org IDs to query: {len(org_ids):,}")

    if not org_ids:
        print("No valid org IDs to query!")
        return pd.DataFrame(columns=['org_id', 'business_identifier', 'has_mailing_address'])

    # Process in chunks to avoid parameter limits
    all_results = []
    total_chunks = (len(org_ids) + chunk_size - 1) // chunk_size

    with engine.connect() as conn:
        for i in range(0, len(org_ids), chunk_size):
            chunk = org_ids[i:i + chunk_size]
            chunk_num = (i // chunk_size) + 1
            print(f"Processing chunk {chunk_num}/{total_chunks} ({len(chunk)} org IDs)...")

            result = pd.read_sql(
                text(AUTH_QUERY),
                conn,
                params={'org_ids': chunk}
            )
            all_results.append(result)

    # Combine all chunks
    if all_results:
        df = pd.concat(all_results, ignore_index=True)
    else:
        df = pd.DataFrame(columns=['org_id', 'business_identifier', 'has_mailing_address'])

    duration = (datetime.now() - start_time).total_seconds()

    print(f"Auth query completed in {duration:.2f} seconds")
    print(f"Total records retrieved: {len(df):,}")

    if len(df) > 0:
        has_address_count = df['has_mailing_address'].sum()
        print(f"Records with mailing address: {has_address_count:,} ({100*has_address_count/len(df):.1f}%)")
        print(f"Records without mailing address: {len(df) - has_address_count:,}")

    return df

# Get list of org IDs from BAR data
org_ids_to_query = bar_df['payment_account'].dropna().unique().tolist()
print(f"Extracted {len(org_ids_to_query):,} unique payment_account values from BAR data")

# Execute Auth query
auth_df = query_auth_mailing_addresses(engines['auth'], org_ids_to_query)

# Preview data
print("\nSample Auth data:")
display(auth_df.head())

---
# Section 2: Data Transformation
---

## Merge BAR and Auth Data

Merge the BAR filing data with Auth mailing address status.

**Join logic:**
- BAR `payment_account` = Auth `org_id`
- BAR `identifier` (without 'BC' prefix) = Auth `business_identifier`

**Note:** Records without a match in Auth will have `bar_account_has_mailing_address = False`

In [None]:
def merge_bar_auth_data(bar_df: pd.DataFrame, auth_df: pd.DataFrame) -> pd.DataFrame:
    """
    Merge BAR and Auth data to create final bar_corps dataset.

    Args:
        bar_df: DataFrame from BAR database
        auth_df: DataFrame from Auth database

    Returns:
        Merged DataFrame ready for loading into bar_corps table
    """
    print("Starting data merge...")
    print(f"BAR records: {len(bar_df):,}")
    print(f"Auth records: {len(auth_df):,}")

    # Create a copy to avoid modifying original
    bar_merged = bar_df.copy()

    # Create identifier without 'BC' prefix for matching
    # Auth business_identifier typically doesn't have the 'BC' prefix
    bar_merged['identifier_for_match'] = bar_merged['identifier'].str.replace('^BC', '', regex=True)

    # Ensure payment_account is numeric for joining
    bar_merged['payment_account'] = pd.to_numeric(bar_merged['payment_account'], errors='coerce')

    # Prepare auth data for merge
    auth_for_merge = auth_df.copy()
    auth_for_merge['org_id'] = pd.to_numeric(auth_for_merge['org_id'], errors='coerce')

    # Aggregate auth data - if ANY record for org/business has mailing address, mark as True
    # This handles cases where there might be multiple contact records
    auth_agg = auth_for_merge.groupby(['org_id', 'business_identifier']).agg({
        'has_mailing_address': 'max'  # True if any record has mailing address
    }).reset_index()

    print(f"Auth records after aggregation: {len(auth_agg):,}")

    # Perform left join
    merged_df = bar_merged.merge(
        auth_agg,
        left_on=['payment_account', 'identifier_for_match'],
        right_on=['org_id', 'business_identifier'],
        how='left'
    )

    # Fill nulls with False (no match = no mailing address on file)
    merged_df['has_mailing_address'] = merged_df['has_mailing_address'].fillna(False)

    # Rename to final column name
    merged_df['bar_account_has_mailing_address'] = merged_df['has_mailing_address'].astype(bool)

    # Select final columns matching bar_corps table schema
    final_columns = [
        'identifier',
        'latest_fiscal_year',
        'last_ar_filing_date',
        'sub',
        'idp_userid',
        'payment_account',
        'bar_account_has_mailing_address'
    ]

    result_df = merged_df[final_columns].copy()

    # Log merge statistics
    matched_count = merged_df['org_id'].notna().sum()
    unmatched_count = len(merged_df) - matched_count
    has_address_count = result_df['bar_account_has_mailing_address'].sum()

    print("")
    print("=" * 50)
    print("MERGE STATISTICS")
    print("=" * 50)
    print(f"Total records: {len(result_df):,}")
    print(f"Matched with Auth: {matched_count:,} ({100*matched_count/len(result_df):.1f}%)")
    print(f"Unmatched (no Auth record): {unmatched_count:,} ({100*unmatched_count/len(result_df):.1f}%)")
    print(f"With mailing address: {has_address_count:,} ({100*has_address_count/len(result_df):.1f}%)")
    print(f"Without mailing address: {len(result_df) - has_address_count:,}")
    print("=" * 50)

    return result_df

# Perform merge
final_df = merge_bar_auth_data(bar_df, auth_df)

# Preview merged data
print("\nSample merged data:")
display(final_df.head(10))

print(f"\nFinal DataFrame shape: {final_df.shape}")
print(f"\nColumn dtypes:\n{final_df.dtypes}")

## Data Validation

Validate the merged data before loading.

In [None]:
def validate_data(df: pd.DataFrame) -> bool:
    """
    Validate the merged DataFrame before loading.

    Returns:
        True if validation passes, False otherwise
    """
    print("Running data validation...")
    issues = []

    # Check for required columns
    required_columns = [
        'identifier', 'latest_fiscal_year', 'last_ar_filing_date',
        'sub', 'idp_userid', 'payment_account', 'bar_account_has_mailing_address'
    ]
    missing_cols = set(required_columns) - set(df.columns)
    if missing_cols:
        issues.append(f"Missing required columns: {missing_cols}")

    # Check identifier format
    invalid_identifiers = df[~df['identifier'].str.match(r'^(BC\d{7}|[A-Z]+\d+)$', na=False)]
    if len(invalid_identifiers) > 0:
        print(f"Records with potentially invalid identifier format: {len(invalid_identifiers)}")
        print(f"Sample invalid identifiers: {invalid_identifiers['identifier'].head().tolist()}")

    # Check for duplicates
    duplicate_count = df['identifier'].duplicated().sum()
    if duplicate_count > 0:
        issues.append(f"Duplicate identifiers found: {duplicate_count}")

    # Check null counts in critical fields
    null_identifiers = df['identifier'].isna().sum()
    if null_identifiers > 0:
        issues.append(f"Null identifiers: {null_identifiers}")

    # Check boolean column
    if df['bar_account_has_mailing_address'].dtype != bool:
        print(f"bar_account_has_mailing_address is {df['bar_account_has_mailing_address'].dtype}, expected bool")

    # Report results
    if issues:
        print("Validation FAILED with the following issues:")
        for issue in issues:
            print(f"  - {issue}")
        return False
    else:
        print("✓ Validation PASSED")
        return True

# Run validation
validation_passed = validate_data(final_df)

if not validation_passed:
    print("Validation failed - review issues before proceeding to load")

---
# Section 3: Data Loading
---

## Truncate and Load bar_corps Table

⚠️ **WARNING**: This will truncate the existing `bar_corps` table!

Operations:
1. Truncate `bar_corps` table
2. Insert merged data
3. Verify row counts

In [None]:
def load_bar_corps(engine: Engine, df: pd.DataFrame, chunk_size: int = 1000) -> int:
    """
    Truncate and load data into bar_corps table.

    Args:
        engine: SQLAlchemy engine for colin_extract database
        df: DataFrame to load
        chunk_size: Rows per insert batch

    Returns:
        Number of rows inserted
    """
    print("="*50)
    print("STARTING DATA LOAD TO bar_corps")
    print("="*50)
    start_time = datetime.now()

    with engine.begin() as conn:  # Automatic transaction management
        # Step 1: Get current row count
        current_count = conn.execute(text("SELECT COUNT(*) FROM bar_corps")).scalar()
        print(f"Current bar_corps row count: {current_count:,}")

        # Step 2: Truncate table
        print("Truncating bar_corps table...")
        conn.execute(text("TRUNCATE TABLE bar_corps"))
        print("✓ Table truncated")

        # Step 3: Insert data
        print(f"Inserting {len(df):,} records (chunk_size={chunk_size})...")

        # Use pandas to_sql for efficient bulk insert
        df.to_sql(
            name='bar_corps',
            con=conn,
            if_exists='append',
            index=False,
            method='multi',
            chunksize=chunk_size
        )
        print("✓ Data inserted")

        # Step 4: Verify row count
        new_count = conn.execute(text("SELECT COUNT(*) FROM bar_corps")).scalar()
        print(f"New bar_corps row count: {new_count:,}")

        if new_count != len(df):
            print(f"Row count mismatch! Expected {len(df):,}, got {new_count:,}")
        else:
            print("✓ Row count verified")

    duration = (datetime.now() - start_time).total_seconds()
    print(f"Load completed in {duration:.2f} seconds")
    print("="*50)

    return new_count

# Execute load
rows_loaded = load_bar_corps(engines['colin_extract'], final_df)
print(f"\nTotal rows loaded: {rows_loaded:,}")

## Verify Loaded Data

Run verification queries against the loaded data.

In [None]:
def verify_loaded_data(engine: Engine) -> None:
    """
    Run verification queries on the loaded bar_corps data.
    """
    print("Running verification queries...")

    with engine.connect() as conn:
        # Total count
        total = conn.execute(text("SELECT COUNT(*) FROM bar_corps")).scalar()
        print(f"Total records: {total:,}")

        # Count with mailing address
        with_address = conn.execute(text(
            "SELECT COUNT(*) FROM bar_corps WHERE bar_account_has_mailing_address = TRUE"
        )).scalar()
        print(f"Records with mailing address: {with_address:,} ({100*with_address/total:.1f}%)")

        # Count by fiscal year
        fiscal_year_df = pd.read_sql(text("""
            SELECT latest_fiscal_year, COUNT(*) as count
            FROM bar_corps
            WHERE latest_fiscal_year IS NOT NULL
            GROUP BY latest_fiscal_year
            ORDER BY latest_fiscal_year DESC
            LIMIT 5
        """), conn)
        print(f"\nTop 5 fiscal years:\n{fiscal_year_df.to_string(index=False)}")

        # Sample records
        sample_df = pd.read_sql(text("""
            SELECT * FROM bar_corps
            ORDER BY last_ar_filing_date DESC NULLS LAST
            LIMIT 5
        """), conn)
        print(f"\nSample records (most recent filings):")
        display(sample_df)

# Run verification
verify_loaded_data(engines['colin_extract'])

---
# Section 4: Materialized View Refresh (Optional)
---

## Refresh mv_legacy_corps_data

⚠️ **NOTE**: This may take several minutes depending on data volume.

Only run this cell if you want to refresh the materialized view immediately.

In [None]:
def refresh_materialized_view(engine: Engine, view_name: str = 'mv_legacy_corps_data') -> None:
    """
    Refresh the materialized view.

    Args:
        engine: SQLAlchemy engine for colin_extract database
        view_name: Name of the materialized view to refresh
    """
    print(f"Starting refresh of {view_name}...")
    print("This may take several minutes...")
    start_time = datetime.now()

    with engine.begin() as conn:
        conn.execute(text(f"REFRESH MATERIALIZED VIEW {view_name}"))

    duration = (datetime.now() - start_time).total_seconds()
    print(f"✓ Materialized view refreshed in {duration:.2f} seconds ({duration/60:.1f} minutes)")

refresh_materialized_view(engines['colin_extract'])

## Verify Materialized View (After Refresh)

Run this after refreshing the materialized view to verify the new column is populated.

In [None]:
def verify_materialized_view(engine: Engine) -> None:
    """
    Verify the materialized view has the data.
    """
    print("Verifying mv_legacy_corps_data...")

    with engine.connect() as conn:
        # Check data
        stats = pd.read_sql(text("""
            SELECT
                COUNT(*) as total,
                SUM(CASE WHEN has_bar_filing THEN 1 ELSE 0 END) as with_bar_filing,
                SUM(CASE WHEN bar_account_has_mailing_address THEN 1 ELSE 0 END) as with_mailing_address
            FROM mv_legacy_corps_data
        """), conn)

        print(f"\nMaterialized view statistics:")
        print(f"  Total records: {stats['total'].iloc[0]:,}")
        print(f"  With BAR filing: {stats['with_bar_filing'].iloc[0]:,}")
        print(f"  With mailing address: {stats['with_mailing_address'].iloc[0]:,}")

verify_materialized_view(engines['colin_extract'])

---
# Summary
---

In [None]:
print("BAR CORPS DATA LOAD COMPLETE")
print("="*60)
print(f"BAR records processed: {len(bar_df):,}")
print(f"Auth records retrieved: {len(auth_df):,}")
print(f"Final records loaded: {len(final_df):,}")
print(f"Records with mailing address: {final_df['bar_account_has_mailing_address'].sum():,}")
print("Refresh the materialized view")
print("Verify the materialized view")
print("="*60)