# Airflow DAG Testing Notebook

This notebook allows end-to-end testing of the weekly_listings_etl DAG by executing each task sequentially.

**Testing Strategy:**
- Isolate each task function and execute independently
- Use only 3 search locations to reduce runtime
- Store intermediate outputs in `test_data/` folder
- Verify each step before proceeding to next
- Follow Airflow best practices: test DAG structure, task execution, and data flow

## Setup: Imports and Configuration

In [1]:
import sys
import os
from datetime import datetime, timezone
from pathlib import Path
import pandas as pd

# Load environment variables first
from dotenv import load_dotenv
load_dotenv('/home/james/PDS/client_data_feeds/realestate/airflow/.env')

# Add airflow directory and parent directory to path (for pyRealtor)
sys.path.insert(0, '/home/james/PDS/client_data_feeds/realestate/airflow')
sys.path.insert(0, '/home/james/PDS/client_data_feeds/realestate')

# Import task modules (database connections imported in next cell)
from include.extract import listing_query, search_locations
from include.transform.base_cleaning import clean_extracted_listings
from include.transform.address_cleaning import clean_addresses
from include.transform.address_correction import correct_addresses
from include.transform.pool_inference import add_pool_inference_columns
from include.transform.listing_filters import remove_non_home_values
from include.load.load_listings import load_listings_to_db
from include.inform.send_reports import send_weekly_reports

# Test data directory
TEST_DATA_DIR = Path("/home/james/PDS/client_data_feeds/realestate/airflow/test_data")
TEST_DATA_DIR.mkdir(exist_ok=True)

print(f"‚úì Imports successful")
print(f"‚úì Test data directory: {TEST_DATA_DIR}")

‚úì Imports successful
‚úì Test data directory: /home/james/PDS/client_data_feeds/realestate/airflow/test_data


  import pkg_resources


## Task 1: Get Search Areas (Limited to 3)

In [2]:
# Get all search locations and limit to 3 for testing
all_locations = search_locations.get_all_search_locations()
test_search_areas = all_locations[:3]

print(f"Total available locations: {len(all_locations)}")
print(f"Using {len(test_search_areas)} locations for testing:\n")
for loc in test_search_areas:
    print(f"  - {loc['country_code']}, {loc['province_state']}, {loc['search_area']}")

# Store for next step
search_areas_result = test_search_areas

Total available locations: 71
Using 3 locations for testing:

  - CA, BC, Abbotsford
  - CA, BC, Aldergrove
  - CA, BC, Burnaby


In [3]:
# Extract listings for the 3 test locations
workdir = str(TEST_DATA_DIR / f"test_run_{datetime.now().strftime('%Y%m%d_%H%M%S')}")

parquet_path, failed_locations = listing_query.query_listing_data_to_parquet(
    locations=search_areas_result,
    sleep_s=2.0,
    max_locations=None,
    base_output_dir=str(TEST_DATA_DIR),
    run_subdir=workdir.split('/')[-1]
)

# Load and inspect
df_extracted = pd.read_parquet(parquet_path)

print(f"\n{'='*60}")
print(f"EXTRACT RESULTS")
print(f"{'='*60}")
print(f"Output file: {parquet_path}")
print(f"Total rows extracted: {len(df_extracted)}")
print(f"Failed locations: {len(failed_locations)}")
print(f"\nColumns: {list(df_extracted.columns)}")
print(f"\nFirst 3 rows:")
print(df_extracted.head(3))

# Store for next step
extract_result = {
    "parquet_path": parquet_path,
    "failed_locations": failed_locations,
    "workdir": workdir
}

Querying listings for CA, BC, Abbotsford
          MLS                                        Description Bedrooms  \
0    R3084262  Boating, Swimming, Fishing! Own a piece of par...     None   
1    R3072182  Very well-kept & Clean! Situated in a quiet ar...        2   
2    R3014819  Local Lake Resort! Boating, Swimming, Fishing!...     None   
3    R3078591  Discover your new home in this beautiful modul...        2   
4    R2978465  Waterfront Lifestyle at Hatzic Lake! "Everglad...     None   
..        ...                                                ...      ...   
595  R2994423  This Beautifully updated rancher Bungalow in t...        3   
596  R3082180  Step into refined living with this exquisite t...        3   
597  R3083065  Discover East Ridge, a townhome community in P...        3   
598  R3074783  Experience the perfect balance of heritage cha...        4   
599  R3076593  Welcome to Highstreet Village 2! This modern 3...        3   

    Bathrooms       Size Stories  

In [4]:
# Check what columns were actually extracted
print(f"Columns in extracted DataFrame:")
print(list(df_extracted.columns))
print(f"\nDataFrame shape: {df_extracted.shape}")
print(f"\nFirst row sample:")
print(df_extracted.iloc[0] if len(df_extracted) > 0 else "No data")

Columns in extracted DataFrame:
['MLS', 'Description', 'Bedrooms', 'Bathrooms', 'Size', 'Stories', 'House Category', 'Ammenities', 'Price', 'Address', 'Latitude', 'Longitude', 'Ownership Category', 'Nearby Ammenities', 'Open House', 'Website', 'country_code', 'province_state', 'search_area', 'search_location', 'run_ts']

DataFrame shape: (1292, 21)

First row sample:
MLS                                                            R3084262
Description           Boating, Swimming, Fishing! Own a piece of par...
Bedrooms                                                           None
Bathrooms                                                          None
Size                                                               None
Stories                                                                
House Category                                             Recreational
Ammenities                                                         None
Price                                                 

## Task 2: Extract Listings

In [5]:
# Validate extracted data has required columns
required_columns = ['MLS', 'Address', 'Description', 'Price', 'Bedrooms', 'Bathrooms']
missing_columns = [col for col in required_columns if col not in df_extracted.columns]

if missing_columns:
    print(f"‚ùå ERROR: Missing required columns: {missing_columns}")
    print(f"\nActual columns present: {list(df_extracted.columns)}")
    print(f"\nSample data:")
    print(df_extracted.head())
    raise ValueError(f"Extract failed: Missing required columns {missing_columns}")
else:
    print(f"‚úì All required columns present: {required_columns}")

‚úì All required columns present: ['MLS', 'Address', 'Description', 'Price', 'Bedrooms', 'Bathrooms']


## Task 3: Base Cleaning

In [6]:
# Base cleaning: dedupe, parse numerics, Size ‚Üí Size_sqft
in_path = extract_result["parquet_path"]
df = pd.read_parquet(in_path)
before_rows = len(df)

df_cleaned = clean_extracted_listings(df)
after_rows = len(df_cleaned)

out_path = f"{extract_result['workdir']}/listings_cleaned.parquet"
df_cleaned.to_parquet(out_path, index=False)

print(f"\n{'='*60}")
print(f"BASE CLEANING RESULTS")
print(f"{'='*60}")
print(f"Rows before: {before_rows}")
print(f"Rows after: {after_rows}")
print(f"Rows removed: {before_rows - after_rows}")
print(f"Output file: {out_path}")
print(f"\nSample data:")
print(df_cleaned[['MLS', 'Price', 'Size_sqft', 'Bedrooms', 'Bathrooms']].head(3))

# Store for next step
cleaned_result = {
    "parquet_path": out_path,
    "rows_before": before_rows,
    "rows_after": after_rows,
    "workdir": extract_result["workdir"],
    "failed_locations": extract_result.get("failed_locations", [])
}


BASE CLEANING RESULTS
Rows before: 1292
Rows after: 1290
Rows removed: 2
Output file: /home/james/PDS/client_data_feeds/realestate/airflow/test_data/test_run_20260217_210334/listings_cleaned.parquet

Sample data:
        MLS     Price  Size_sqft  Bedrooms Bathrooms
0  R3084262  132000.0        NaN       NaN      None
1  R3072182  149999.0     1054.0       2.0         2
2  R3014819  165000.0        NaN       NaN      None


## Task 4: Filter Non-Homes

In [7]:
# Remove non-home listings
in_path = cleaned_result["parquet_path"]
df = pd.read_parquet(in_path)
before_rows = len(df)

df_filtered = remove_non_home_values(df)
after_rows = len(df_filtered)

out_path = f"{cleaned_result['workdir']}/listings_filtered.parquet"
df_filtered.to_parquet(out_path, index=False)

print(f"\n{'='*60}")
print(f"FILTER RESULTS")
print(f"{'='*60}")
print(f"Rows before: {before_rows}")
print(f"Rows after: {after_rows}")
print(f"Filtered out: {before_rows - after_rows}")
print(f"Output file: {out_path}")

# Store for next step
filtered_result = {
    "parquet_path": out_path,
    "rows_before": before_rows,
    "rows_after": after_rows,
    "workdir": cleaned_result["workdir"],
    "failed_locations": cleaned_result.get("failed_locations", [])
}


FILTER RESULTS
Rows before: 1290
Rows after: 357
Filtered out: 933
Output file: /home/james/PDS/client_data_feeds/realestate/airflow/test_data/test_run_20260217_210334/listings_filtered.parquet


## Task 5: Pool Inference

In [8]:
# Add pool inference columns
in_path = filtered_result["parquet_path"]
df = pd.read_parquet(in_path)
before_rows = len(df)

df_pool = add_pool_inference_columns(df)
after_rows = len(df_pool)

out_path = f"{filtered_result['workdir']}/listings_pool_inferred.parquet"
df_pool.to_parquet(out_path, index=False)

# Analyze pool results
pool_stats = df_pool['pool_flag'].value_counts()
pool_type_stats = df_pool['pool_type'].value_counts()

print(f"\n{'='*60}")
print(f"POOL INFERENCE RESULTS")
print(f"{'='*60}")
print(f"Total rows: {after_rows}")
print(f"\nPool flag distribution:")
print(pool_stats)
print(f"\nPool type distribution:")
print(pool_type_stats)
print(f"\nOutput file: {out_path}")
print(f"\nSample with pools:")
print(df_pool[df_pool['pool_flag'] == True][['MLS', 'pool_flag', 'pool_type', 'Description']].head(3))

# Store for next step
pool_result = {
    "parquet_path": out_path,
    "rows_before": before_rows,
    "rows_after": after_rows,
    "workdir": filtered_result["workdir"]
}


POOL INFERENCE RESULTS
Total rows: 357

Pool flag distribution:
pool_flag
False    357
Name: count, dtype: int64

Pool type distribution:
pool_type
none    357
Name: count, dtype: int64

Output file: /home/james/PDS/client_data_feeds/realestate/airflow/test_data/test_run_20260217_210334/listings_pool_inferred.parquet

Sample with pools:
Empty DataFrame
Columns: [MLS, pool_flag, pool_type, Description]
Index: []


## Task 6: Address Cleaning and Correction

In [10]:
# Address parsing + validation + Google correction
in_path = pool_result["parquet_path"]
df = pd.read_parquet(in_path)
before_rows = len(df)

# Parse addresses
df_parsed, issue_inds = clean_addresses(df)
initial_bad = len(issue_inds)

# Correct bad addresses with Google API (if key available)
api_key = os.getenv("GOOGLE_GEOCODE_API_KEY")
still_bad = issue_inds
df_fixed = df_parsed

if api_key and issue_inds:
    print(f"Correcting {len(issue_inds)} addresses with Google API...")
    df_fixed, still_bad = correct_addresses(
        df=df_parsed,
        issue_inds=issue_inds,
        api_key=api_key,
        max_fix=250,
        sleep_s=0.05,
    )
else:
    print("Skipping Google correction (no API key)")

# Remove rows with bad addresses
if still_bad:
    still_bad = [i for i in still_bad if i in df_fixed.index]
    if still_bad:
        df_fixed = df_fixed.drop(index=still_bad).reset_index(drop=True)

# Fix data types for parquet compatibility
df_fixed['Latitude'] = pd.to_numeric(df_fixed['Latitude'], errors='coerce')
df_fixed['Longitude'] = pd.to_numeric(df_fixed['Longitude'], errors='coerce')

out_path = f"{pool_result['workdir']}/listings_addr_fixed.parquet"
df_fixed.to_parquet(out_path, index=False)

print(f"\n{'='*60}")
print(f"ADDRESS CLEANING RESULTS")
print(f"{'='*60}")
print(f"Rows before: {before_rows}")
print(f"Rows after: {len(df_fixed)}")
print(f"Initial bad addresses: {initial_bad}")
print(f"Still bad after correction: {len(still_bad) if still_bad else 0}")
print(f"Dropped rows: {len(still_bad) if still_bad else 0}")
print(f"\nOutput file: {out_path}")
print(f"\nSample parsed addresses:")
print(df_fixed[['MLS', 'address_number', 'street_address', 'city', 'postal_code', 'province_state']].head(3))

# Store for next step
addr_result = {
    "parquet_path": out_path,
    "rows_before": before_rows,
    "rows_after": len(df_fixed),
    "initial_bad_rows": initial_bad,
    "still_bad_rows": len(still_bad) if still_bad else 0,
    "workdir": pool_result["workdir"]
}

Correcting 10 addresses with Google API...

ADDRESS CLEANING RESULTS
Rows before: 367
Rows after: 365
Initial bad addresses: 10
Still bad after correction: 2
Dropped rows: 2

Output file: /home/james/PDS/client_data_feeds/realestate/airflow/test_data/test_run_20260128_090936/listings_addr_fixed.parquet

Sample parsed addresses:
        MLS address_number          street_address        city postal_code  \
0  R3071271            138    138 3300 HORN STREET  Abbotsford      V2S7Y4   
1  R3072182             43  43 2035 MARTENS STREET  Abbotsford      V2T6M4   
2  R3078591             91     91 3300 HORN STREET  Abbotsford      V2S7Y6   

     province_state  
0  British Columbia  
1  British Columbia  
2  British Columbia  


## Task 7: Load to Database

In [11]:
# Load transformed data to database
in_path = addr_result["parquet_path"]
run_ts = datetime.now(timezone.utc)

stats = load_listings_to_db(in_path, run_ts)

print(f"\n{'='*60}")
print(f"DATABASE LOAD RESULTS")
print(f"{'='*60}")
print(f"Total input rows:     {stats['total_input_rows']}")
print(f"Staged rows:          {stats['staging_rows']}")
print(f"New listings:         {stats['new_listings']}")
print(f"Removed listings:     {stats['removals']}")
print(f"Re-listings:          {stats['relistings']}")
print(f"Skipped rows:         {stats['skipped_rows']}")
print(f"{'='*60}")

# Store for next step
load_result = {
    "stats": stats,
    "parquet_path": in_path,
    "workdir": addr_result["workdir"]
}

Read 365 records from /home/james/PDS/client_data_feeds/realestate/airflow/test_data/test_run_20260128_090936/listings_addr_fixed.parquet
Prepared 365 valid rows for database load
‚úì Loaded 365 rows to listing_staging
‚úì Inserted 11 new listings
‚úì Detected 10483 removed listings
‚úì Detected 0 re-listings
‚úì Transaction committed successfully

DATABASE LOAD RESULTS
Total input rows:     365
Staged rows:          365
New listings:         11
Removed listings:     10483
Re-listings:          0
Skipped rows:         0


## Task 8: Cleanup Temporary Files

In [None]:
# Delete temporary parquet files after successful database load
import glob

workdir = load_result["workdir"]
parquet_files = glob.glob(f"{workdir}/*.parquet")

for f in parquet_files:
    try:
        os.remove(f)
        print(f"Deleted: {f}")
    except Exception as e:
        print(f"Warning: Could not delete {f}: {e}")

print(f"\n‚úì Cleanup complete: Deleted {len(parquet_files)} parquet files from {workdir}")

cleanup_result = {
    "stats": load_result["stats"],
    "cleanup_count": len(parquet_files)
}

## Task 9: Send Email Reports (Optional)

In [None]:
# Send email reports to license holders (only if RESEND_API_KEY is set)
resend_api_key = os.getenv("RESEND_API_KEY")
from include.db.connections import get_master_db_connection, get_listing_db_connection
if resend_api_key:
    print("Sending email reports...")
    
    master_conn = get_master_db_connection()
    listing_conn = get_listing_db_connection()
    
    try:
        email_stats = send_weekly_reports(master_conn, listing_conn)
        
        print(f"\n{'='*60}")
        print(f"EMAIL REPORTS RESULTS")
        print(f"{'='*60}")
        print(f"Licenses processed:    {email_stats['total_licenses']}")
        print(f"Emails sent:           {email_stats['emails_sent']}")
        print(f"Emails failed:         {email_stats['emails_failed']}")
        print(f"New pool listings:     {email_stats['total_new_listings']}")
        print(f"Removed pool listings: {email_stats['total_removed_listings']}")
        print(f"{'='*60}")
        
        email_result = {
            "email_stats": email_stats,
            "load_stats": cleanup_result["stats"]
        }
        
    finally:
        master_conn.close()
        listing_conn.close()
else:
    print("Skipping email reports (RESEND_API_KEY not set)")
    email_result = {"skipped": True}

## Summary and Verification

In [None]:
# Print end-to-end summary
print(f"\n{'='*70}")
print(f"END-TO-END TEST SUMMARY")
print(f"{'='*70}")
print(f"\nüìç Search Locations: {len(test_search_areas)} (limited from {len(all_locations)})")
print(f"\nüì• Extract:")
print(f"   - Extracted: {len(df_extracted)} listings")
print(f"   - Failed locations: {len(extract_result['failed_locations'])}")

print(f"\nüßπ Cleaning Pipeline:")
print(f"   - After base cleaning: {cleaned_result['rows_after']} (-{cleaned_result['rows_before'] - cleaned_result['rows_after']})")
print(f"   - After filtering: {filtered_result['rows_after']} (-{filtered_result['rows_before'] - filtered_result['rows_after']})")
print(f"   - After address fix: {addr_result['rows_after']} (-{addr_result['initial_bad_rows']})")

print(f"\nüèä Pool Inference:")
print(f"   - Total with pools: {pool_stats.get(True, 0)}")
print(f"   - In-ground: {pool_type_stats.get('in-ground', 0)}")
print(f"   - Above-ground: {pool_type_stats.get('above-ground', 0)}")
print(f"   - Type unknown: {pool_type_stats.get('none', 0)}")

print(f"\nüíæ Database Load:")
print(f"   - Staged: {load_result['stats']['staging_rows']}")
print(f"   - New listings: {load_result['stats']['new_listings']}")
print(f"   - Removals: {load_result['stats']['removals']}")
print(f"   - Re-listings: {load_result['stats']['relistings']}")

if 'email_stats' in email_result:
    print(f"\nüìß Email Reports:")
    print(f"   - Licenses processed: {email_result['email_stats']['total_licenses']}")
    print(f"   - Emails sent: {email_result['email_stats']['emails_sent']}")

print(f"\n{'='*70}")
print(f"‚úÖ ALL TASKS COMPLETED SUCCESSFULLY")
print(f"{'='*70}")