# Housing Data Workflow Notebook

End-to-end workflow runner for the housing data pipeline with dataframe views at each step.

This notebook wires together the individual steps:
1. Verify and fetch HPD data from NYC Open Data
2. Classify projects by financing type (LL44 funding)
3. Search DOB NB/New Building filings (with optional BBL fallback)
4. Search Certificate of Occupancy filings
5. Create timeline joins
6. Generate charts

## Configuration

Set your workflow options here:

In [None]:
# Configuration options (equivalent to command line arguments)
CONFIG = {
    'refresh_hpd': False,  # Set to True to fetch fresh HPD data
    'skip_ll44': False,    # Set to True to skip LL44 funding lookup
    'skip_dob': False,     # Set to True to skip DOB queries
    'skip_co': False,      # Set to True to skip CO queries
    'skip_join': False,    # Set to True to skip timeline joins
    'skip_charts': False,  # Set to True to skip chart generation
    'disable_bbl_fallback': False,  # Set to True to disable BBL fallback
    'hpd_csv': None,       # Custom HPD CSV path (None = auto-detect)
    'financing_output': None,  # Custom financing output path
    'dob_output': None,    # Custom DOB output path
    'co_output': None      # Custom CO output path
}

print("Configuration set:")
for key, value in CONFIG.items():
    print(f"  {key}: {value}")

## Setup and Imports

Import required modules and set up the environment.

In [None]:
import sys
from pathlib import Path
from typing import Optional
import time

import pandas as pd

# Add current directory to path for local imports
sys.path.append(".")

# Import our workflow modules
from fetch_affordable_housing_data import update_local_data, verify_and_fetch_hpd_data
from query_ll44_funding import query_and_add_financing
from query_dob_filings import query_dob_filings
from query_co_filings import query_co_filings
from HPD_DOB_Join_On_BIN import create_separate_timelines
from create_timeline_chart import create_timeline_chart, create_financing_charts
from data_quality import quality_tracker

print("✅ All imports successful")

## Helper Functions

Utility functions needed by the workflow.

In [None]:
def _default_hpd_csv() -> Path:
    """Pick the best-available HPD dataset on disk."""
    candidates = [
        Path("data/processed/Affordable_Housing_Production_by_Building_with_financing.csv"),
        Path("data/raw/Affordable_Housing_Production_by_Building_with_financing.csv"),
        Path("data/raw/Affordable_Housing_Production_by_Building.csv"),
    ]
    for candidate in candidates:
        if candidate.exists():
            return candidate
    return candidates[-1]


def _normalize_bin(bin_value) -> Optional[str]:
    """Normalize BIN to a clean string."""
    if pd.isna(bin_value):
        return None
    try:
        return str(int(float(bin_value)))
    except (TypeError, ValueError):
        value = str(bin_value).strip()
        return value or None


def _write_bin_file(source_csv: Path, output_txt: Path) -> Path:
    """Extract BINs from a CSV and write them to a text file for CO searches."""
    df = pd.read_csv(source_csv)
    candidate_cols = [col for col in df.columns if col.lower() in ("bin", "bin_normalized")]
    if not candidate_cols:
        raise SystemExit(f"Could not find a BIN column in {source_csv}")

    bins = [_normalize_bin(val) for val in df[candidate_cols[0]].dropna()]
    bins = sorted({b for b in bins if b})

    output_txt.parent.mkdir(parents=True, exist_ok=True)
    output_txt.write_text("\n".join(bins))
    print(f"Wrote {len(bins)} BINs to {output_txt}")
    return output_txt


def _require_file(path: Path, description: str) -> None:
    """Exit with a helpful message if a required file is missing."""
    if not path.exists():
        raise SystemExit(f"{description} not found at {path}")

print("✅ Helper functions defined")

## Step 1: Verify and Fetch HPD Data

Load or refresh the HPD affordable housing dataset from NYC Open Data.

In [None]:
print("=" * 70)
print("STEP 1: VERIFY AND FETCH HPD DATA")
print("=" * 70)

# Start quality tracking
quality_tracker.start_processing()

if CONFIG["refresh_hpd"]:
    print("Force refresh requested - fetching fresh HPD data...")
    hpd_df = update_local_data()
    hpd_csv = Path("data/raw/Affordable_Housing_Production_by_Building.csv")
else:
    print("Verifying local HPD data against API...")
    hpd_df = verify_and_fetch_hpd_data()
    hpd_csv = Path("data/raw/Affordable_Housing_Production_by_Building.csv")

_require_file(hpd_csv, "HPD dataset")

# Record initial dataset size
quality_tracker.analyze_hpd_data(hpd_df, "Full_HPD_Dataset")
quality_tracker.record_pipeline_stage("raw_hpd_data", len(hpd_df), "Raw HPD affordable housing dataset")

print(f"✅ Step 1 complete: {len(hpd_df):,} records loaded")
print(f"📁 Data saved to: {hpd_csv}")

# Display the dataframe
print("\n🔍 HPD Dataset Overview:")
print(f"Shape: {hpd_df.shape}")
print("\nColumns:")
for col in hpd_df.columns:
    print(f"  - {col}")

print("\n📊 Sample Data:")
display(hpd_df.head())

print("\n📈 Basic Statistics:")
display(hpd_df.describe(include="all"))

## Step 2: Add Financing Classification

Classify projects by financing type using LL44 funding data.

In [None]:
print("\n" + "=" * 70)
print("STEP 2: ADD FINANCING CLASSIFICATION")
print("=" * 70)

if CONFIG["skip_ll44"]:
    print("Skipping LL44 financing classification as requested.")
    # Still record the stage for tracking
    financing_df = hpd_df.copy()
    quality_tracker.record_pipeline_stage("after_financing_skip", len(financing_df), "Financing classification skipped")
    building_csv = hpd_csv
else:
    financing_output = Path(CONFIG["financing_output"]) if CONFIG["financing_output"] else Path(
        "data/processed/Affordable_Housing_Production_by_Building_with_financing.csv"
    )
    financing_output.parent.mkdir(parents=True, exist_ok=True)

    print(f"Classifying financing types -> {financing_output}")
    financing_df = query_and_add_financing(str(hpd_csv), output_path=str(financing_output))
    building_csv = financing_output

    # Record dataset after financing classification
    quality_tracker.analyze_hpd_data(financing_df, "Filtered_HPD")
    quality_tracker.record_pipeline_stage("after_financing", len(financing_df), "Added LL44 financing classification")

print(f"✅ Step 2 complete: {len(financing_df):,} records with financing classification")

# Display the dataframe with financing info
print("\n🔍 Financing Classification Results:")
print(f"Shape: {financing_df.shape}")

# Check if financing columns were added
financing_cols = [col for col in financing_df.columns if "financ" in col.lower() or "ll44" in col.lower()]
print(f"\nFinancing-related columns: {financing_cols}")

print("\n📊 Sample Data with Financing:")
display(financing_df.head())

# Show financing type distribution
if financing_cols:
    for col in financing_cols:
        if col in financing_df.columns:
            print(f"\n📈 Distribution of {col}:")
            display(financing_df[col].value_counts(dropna=False))

## Step 3: Enrich with DOB and CO Data

Query DOB APIs for New Building filings and Certificate of Occupancy data.

In [None]:
print("\n" + "=" * 70)
print("STEP 3: ENRICH WITH DOB AND CO DATA")
print("=" * 70)

# Prepare inputs for DOB/CO queries
dob_search_source = Path(CONFIG["hpd_csv"]) if CONFIG["hpd_csv"] else building_csv
_require_file(dob_search_source, "DOB search input")

# Generate BIN file for CO searches
bin_output = Path("data/processed/workflow_bins.txt")
bin_file = _write_bin_file(building_csv, bin_output)

print(f"\n📋 BIN file created: {bin_file}")
print(f"Contains {len(bin_file.read_text().split())} BINs")

# DOB filings
dob_output = Path(CONFIG["dob_output"]) if CONFIG["dob_output"] else Path(
    f"data/processed/{dob_search_source.stem}_dob_filings.csv"
)
dob_output.parent.mkdir(parents=True, exist_ok=True)

# Check for existing DOB files when skipping
if CONFIG["skip_dob"]:
    print("⏭️  Skipping DOB queries as requested")
    # Look for existing files in processed folder or external folder
    alt_dob_path = Path(f"data/external/{dob_search_source.stem}_dob_filings.csv")
    if dob_output.exists():
        print(f"📁 Using existing DOB data at {dob_output}")
        dob_df = pd.read_csv(dob_output)
    elif alt_dob_path.exists():
        print(f"📁 Using existing DOB data from external folder: {alt_dob_path}")
        dob_output = alt_dob_path
        dob_df = pd.read_csv(dob_output)
    else:
        print("⚠️  No existing DOB data found; timeline will omit DOB entries.")
        dob_df = None
        dob_output = None
else:
    print(f"🔍 Querying DOB APIs using {dob_search_source} -> {dob_output}")
    print("   This may take several minutes...")
    query_dob_filings(
        str(dob_search_source),
        output_path=str(dob_output),
        use_bbl_fallback=not CONFIG["disable_bbl_fallback"],
    )
    print(f"✅ DOB query completed: {dob_output}")
    dob_df = pd.read_csv(dob_output)

# Display DOB data if available
if dob_df is not None:
    print("\n🔍 DOB Filings Data:")
    print(f"Shape: {dob_df.shape}")
    print("Columns:")
    for col in dob_df.columns:
        print(f"  - {col}")
    
    print("\n📊 Sample DOB Data:")
    display(dob_df.head())
    
    # Show some statistics
    if "filing_date" in dob_df.columns:
        print("\n📈 DOB Filing Date Statistics:")
        display(dob_df["filing_date"].describe())

# Certificate of Occupancy filings
co_output = Path(CONFIG["co_output"]) if CONFIG["co_output"] else Path(
    f"data/processed/{bin_file.stem}_co_filings.csv"
)
co_output.parent.mkdir(parents=True, exist_ok=True)

if CONFIG["skip_co"]:
    # Look for existing CO files in multiple locations
    alt_co_path = Path(f"data/external/{bin_file.stem}_co_filings.csv")
    if co_output.exists():
        print(f"Using existing CO data at {co_output}")
        co_df = pd.read_csv(co_output)
    elif alt_co_path.exists():
        print(f"Using existing CO data from external folder: {alt_co_path}")
        co_output = alt_co_path
        co_df = pd.read_csv(co_output)
    else:
        print("No CO data supplied; timeline will omit CO entries.")
        co_df = None
        co_output = None
else:
    print(f"🏛️  Querying CO APIs using {bin_file} -> {co_output}")
    query_co_filings(str(bin_file), output_path=str(co_output))
    co_df = pd.read_csv(co_output)

# Display CO data if available
if co_df is not None:
    print("\n🔍 Certificate of Occupancy Data:")
    print(f"Shape: {co_df.shape}")
    print("Columns:")
    for col in co_df.columns:
        print(f"  - {col}")
    
    print("\n📊 Sample CO Data:")
    display(co_df.head())
    
    # Show some statistics
    if "issue_date" in co_df.columns:
        print("\n📈 CO Issue Date Statistics:")
        display(co_df["issue_date"].describe())

# Record final enriched dataset
enriched_df = pd.read_csv(building_csv)
if dob_output is not None and dob_output.exists():
    quality_tracker.record_pipeline_stage("after_dob_enrichment", len(enriched_df), "Enriched with DOB and CO data")
else:
    quality_tracker.record_pipeline_stage("after_dob_enrichment", len(enriched_df), "DOB/CO enrichment skipped - no data available")

print("\n✅ Step 3 complete: Dataset enriched with DOB/CO data")

## Step 4: Generate Timeline Charts

Create timeline visualizations and charts from the enriched data.

In [None]:
print("\n" + "=" * 70)
print("STEP 4: GENERATE TIMELINE CHARTS")
print("=" * 70)

if CONFIG["skip_charts"]:
    print("Skipping chart generation as requested.")
else:
    # Timeline join
    hpd_timeline = Path(str(building_csv).replace(".csv", "_hpd_financed_timeline.csv"))
    private_timeline = Path(str(building_csv).replace(".csv", "_privately_financed_timeline.csv"))

    if CONFIG["skip_join"]:
        print("⏭️  Skipping timeline join step.")
    else:
        if dob_output is None:
            print("⚠️  No DOB data available; skipping timeline creation.")
        else:
            _require_file(dob_output, "DOB filings CSV")
            print("🔗 Building timelines...")
            create_separate_timelines(
                str(building_csv),
                str(dob_output),
                str(co_output) if co_output else None,
            )
            
            # Load and display timeline data
            if hpd_timeline.exists():
                hpd_timeline_df = pd.read_csv(hpd_timeline)
                print(f"\n📊 HPD Financed Timeline Data ({hpd_timeline_df.shape[0]} records):")
                display(hpd_timeline_df.head())
                
                # Show event type distribution
                if "event_type" in hpd_timeline_df.columns:
                    print("\n📈 Event Types in HPD Timeline:")
                    display(hpd_timeline_df["event_type"].value_counts())
            
            if private_timeline.exists():
                private_timeline_df = pd.read_csv(private_timeline)
                print(f"\n📊 Privately Financed Timeline Data ({private_timeline_df.shape[0]} records):")
                display(private_timeline_df.head())
                
                # Show event type distribution
                if "event_type" in private_timeline_df.columns:
                    print("\n📈 Event Types in Private Timeline:")
                    display(private_timeline_df["event_type"].value_counts())

    # Charts
    print("\n📈 Generating charts...")
    default_timeline_stem = "Affordable_Housing_Production_by_Building_with_financing"
    if Path(building_csv).name == f"{default_timeline_stem}.csv":
        create_financing_charts()
        print("✅ Created financing-specific charts")
    else:
        if hpd_timeline.exists():
            create_timeline_chart(str(hpd_timeline))
            print(f"✅ Created HPD financed timeline chart")
        else:
            print(f"⚠️  No HPD financed timeline found at {hpd_timeline}; skipping.")

        if private_timeline.exists():
            create_timeline_chart(str(private_timeline))
            print(f"✅ Created privately financed timeline chart")
        else:
            print(f"⚠️  No privately financed timeline found at {private_timeline}; skipping.")

print("\n✅ Step 4 complete: Charts generated")

## Final Data Quality Report

Generate the final data quality report and workflow summary.

In [None]:
print("\n" + "=" * 70)
print("📊 GENERATING FINAL DATA QUALITY REPORT")
print("=" * 70)

# Generate final data quality report and Sankey diagram
quality_tracker.end_processing()
report_filename = quality_tracker.save_report_to_file("notebook_workflow")
sankey_filename = quality_tracker.generate_sankey_diagram()
quality_tracker.print_report()

print("\n🎉 WORKFLOW COMPLETED SUCCESSFULLY!")
print(f"📊 Data quality report: {report_filename}")
if sankey_filename:
    print(f"📊 Sankey diagram: {sankey_filename}")

# Summary of what we accomplished
print("\n📋 WORKFLOW SUMMARY:")
print(f"• HPD Records Processed: {len(hpd_df):,}")
print(f"• Records with Financing: {len(financing_df):,}")
if "dob_df" in locals() and dob_df is not None:
    print(f"• DOB Filings Found: {len(dob_df):,}")
if "co_df" in locals() and co_df is not None:
    print(f"• CO Filings Found: {len(co_df):,}")

print("\n✅ Notebook workflow complete! All dataframes have been displayed for inspection.")