

For this analysis, we're modeling a **4:1 split-rate tax** where land is taxed at four times the rate of buildings.

## Policy Definition for Spokane


For this analysis, we're modeling:
- **Revenue-neutral** property tax split for the South Bend School Corporation
- **4:1 land-to-building tax ratio** (partial LVT shift)
- **Existing exemptions and abatements** continue to apply
- **Focus on school corporation taxes** (which bypass Indiana's property tax caps)

Let's begin by importing the necessary libraries and utility functions.


In [None]:
# Import utility functions
import sys
import pandas as pd
import geopandas as gpd
sys.path.append('..')  # Add parent directory to path

# Import from local utility modules
from lvt_utils import ensure_geodataframe

print("✅ Utility functions imported from lvt_utils")


In [None]:
# Import all required modules and functions
from cloud_utils import get_feature_data, get_feature_data_with_geometry
from lvt_utils import (model_split_rate_tax, calculate_current_tax, model_full_building_abatement, 
                       model_stacking_improvement_exemption, categorize_property_type, extract_date_from_filename)
from census_utils import (get_census_data, get_census_blockgroups_shapefile, get_census_data_with_boundaries, 
                          match_to_census_blockgroups, calculate_median_percentage_by_quintile, 
                          match_parcels_to_demographics, create_demographic_summary)
from viz import (create_scatter_plot, plot_comparison, calculate_correlations, weighted_median, 
                 create_quintile_summary, plot_quintile_analysis, create_property_category_chart, 
                 create_map_visualization, calculate_block_group_summary, filter_data_for_analysis)

scrape_data = 0

## Step 1: Getting the Data


https://gismo.spokanecounty.org/arcgis/rest/services/SCOUT/PropertyLookup/MapServer/0/query?f=json&where=PID_NUM%20%3D%20%2725011.1709%27&returnGeometry=false&spatialRel=esriSpatialRelIntersects&outFields=site_address%2Csite_city%2Cowner_name%2Cdocument_date%2Cgross_sale_price%2Cexcise_nbr%2Ctransfer_type%2Cprop_use_desc%2Ctax_code_area%2Cacreage%2CInspectionYear

In [None]:

import os
from datetime import datetime
import glob

# Directory to save/load data
data_dir = "data/spokane"
os.makedirs(data_dir, exist_ok=True)

if scrape_data == 1:
    # Base URL for the ArcGIS services
    base_url = "https://services1.arcgis.com/ozNll27nt9ZtPWOn/ArcGIS/rest/services/"
    # Fetch the main parcel dataset with tax info
    parcel_civic_df = get_feature_data_with_geometry('Parcels', base_url)
    # Save with geometry to parquet, with current date
    today_str = datetime.now().strftime("%Y_%m_%d")
    out_path = os.path.join(data_dir, f"spokane_parcels_{today_str}.parquet")
    parcel_civic_df.to_parquet(out_path, index=False)
    print(f"Saved new scrape to {out_path}")
else:
    # Find the most recent parquet file in the data_dir
    files = glob.glob(os.path.join(data_dir, "spokane_parcels_*.parquet"))
    if not files:
        raise FileNotFoundError("No previously scraped parcel files found in data/spokane/")
    # Sort files by date in filename
    files_sorted = sorted(files, key=lambda x: datetime.strptime(os.path.basename(x).replace("spokane_parcels_", "").replace(".parquet", ""), "%Y_%m_%d"), reverse=True)
    latest_file = files_sorted[0]
    print(f"Loading most recent scrape: {latest_file}")
    parcel_civic_df = pd.read_parquet(latest_file)

# Ensure parcel_civic_df is a proper GeoDataFrame
parcel_civic_df = ensure_geodataframe(parcel_civic_df)
print(f"✅ Parcel data loaded as {type(parcel_civic_df).__name__} with CRS: {parcel_civic_df.crs}")


Charge data comes from:  https://gisdatacatalog-spokanecounty.opendata.arcgis.com/pages/parcel-data-file-downloads


In [None]:

print(parcel_civic_df.columns)


In [None]:
# Load charge_info_1 and charge_info_2 from Excel files in data/spokane/
charge_info_1 = pd.read_excel(os.path.join(data_dir, "charge_info_1.xlsx"))
charge_info_2 = pd.read_excel(os.path.join(data_dir, "charge_info_2.xlsx"))

# Check if charge_info_1 and charge_info_2 have the same columns
if list(charge_info_1.columns) == list(charge_info_2.columns):
    millage_df = pd.concat([charge_info_1, charge_info_2], ignore_index=True)
else:
    print("❌ charge_info_1 and charge_info_2 do not have the same column names.")
    millage_df = None

# Display the dataframe if it was created
if millage_df is not None:
    pd.set_option('display.max_columns', None)
    display(millage_df.head(5))


print("Counts of charge_type in millage_df:")
if millage_df is not None:
    print(millage_df['charge_type'].value_counts(dropna=False).to_string())
else:
    print("millage_df is None, cannot print charge_type counts.")


In [None]:
print("Column names in parcel_civic_df:")
print(parcel_civic_df.columns.tolist())
pd.set_option('display.max_columns', None)
display(parcel_civic_df.head())



In [None]:

parcel_civic_df['full_exmp'] = (parcel_civic_df['taxable_amt'] <= 0).astype(int)


In [None]:
print("Counts of exmp_code in parcel_civic_df:")
print(parcel_civic_df['exmp_code'].value_counts(dropna=False).to_string())


In [None]:
import requests
from bs4 import BeautifulSoup
import pandas as pd

def load_levy_table(pid):
    """
    Loads the levy table for a given PID from the Spokane County property information site.
    Returns a DataFrame with columns: levy_name, levy_rate_24, levy_rate_25, levy_type
    Includes the total row.
    """
    url = f"https://cp.spokanecounty.org/SCOUT/propertyinformation/Summary.aspx?PID={pid}"
    resp = requests.get(url)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")
    table = soup.find("table", id="MainContent_Levy_GridView1")
    if table is None:
        raise ValueError("Could not find levy table in the page.")

    rows = table.find_all("tr")
    data = []
    for tr in rows[1:]:  # skip header row
        tds = tr.find_all("td")
        if len(tds) < 4:
            continue
        # Get text, strip whitespace
        levy_name = tds[0].get_text(strip=True)
        levy_rate_24 = tds[1].get_text(strip=True)
        levy_rate_25 = tds[2].get_text(strip=True)
        levy_type = tds[3].get_text(strip=True)
        data.append({
            "levy_name": levy_name,
            "levy_rate_24": levy_rate_24,
            "levy_rate_25": levy_rate_25,
            "levy_type": levy_type
        })
    # Only keep rows with at least one non-empty rate or the Totals row
    filtered = []
    for row in data:
        if row["levy_name"].lower().startswith("totals"):
            filtered.append(row)
        elif row["levy_rate_24"] or row["levy_rate_25"]:
            filtered.append(row)
    df = pd.DataFrame(filtered)
    return df

# Test the function with the given PID
test_pid = "35182.4601"
levy_df = load_levy_table(test_pid)
print(levy_df)


In [None]:
import os
import glob
from datetime import datetime
from tqdm import tqdm

millage_dir = "data/spokane/"
os.makedirs(millage_dir, exist_ok=True)

def extract_date_from_filename(path):
    """
    Extracts a datetime object from a filename like millage_scrape_YYYY_MM_DD.parquet
    Returns None if parsing fails.
    """
    base = os.path.basename(path)
    # Expecting: millage_scrape_YYYY_MM_DD.parquet
    parts = base.replace(".parquet", "").split("_")
    if len(parts) >= 4:
        try:
            # The last three parts should be YYYY, MM, DD
            date_str = "_".join(parts[-3:])
            return datetime.strptime(date_str, "%Y_%m_%d")
        except Exception:
            return None
    return None

if scrape_data == 1:
    # Get the first PID for each unique tax_code_area
    tax_code_area_pid = (
        parcel_civic_df.dropna(subset=['tax_code_area', 'PID_NUM'])
        .sort_values('PID_NUM')
        .groupby('tax_code_area')['PID_NUM']
        .first()
        .reset_index()
    )

    levy_records = []
    for _, row in tqdm(tax_code_area_pid.iterrows(), total=len(tax_code_area_pid), desc="Pulling levy tables"):
        tax_code_area = row['tax_code_area']
        pid = row['PID_NUM']
        # Try up to two times with this PID, then try another PID from the same tax_code_area if both fail
        success = False
        attempts = 0
        tried_pids = set()
        tried_pids.add(pid)
        while not success and attempts < 2:
            try:
                levy_df = load_levy_table(pid)
                levy_df['tax_code_area'] = tax_code_area
                levy_records.append(levy_df)
                success = True
            except Exception as e:
                attempts += 1
                print(f"Attempt {attempts} failed to load levy table for tax_code_area {tax_code_area} (PID {pid}): {e}")
                if attempts < 2:
                    continue  # Try again with the same PID
                # If failed twice, try another PID from the same tax_code_area
                other_pids = (
                    parcel_civic_df[
                        (parcel_civic_df['tax_code_area'] == tax_code_area) &
                        (parcel_civic_df['PID_NUM'] != pid)
                    ]['PID_NUM']
                    .dropna()
                    .unique()
                )
                found_alternate = False
                for alt_pid in other_pids:
                    if alt_pid in tried_pids:
                        continue
                    try:
                        levy_df = load_levy_table(alt_pid)
                        levy_df['tax_code_area'] = tax_code_area
                        levy_records.append(levy_df)
                        print(f"Success with alternate PID {alt_pid} for tax_code_area {tax_code_area}")
                        found_alternate = True
                        break
                    except Exception as e2:
                        print(f"Alternate PID {alt_pid} also failed for tax_code_area {tax_code_area}: {e2}")
                        tried_pids.add(alt_pid)
                if not found_alternate:
                    print(f"All attempts failed for tax_code_area {tax_code_area}")

    # Concatenate all levy records into a single DataFrame
    all_levies_df = pd.concat(levy_records, ignore_index=True)

    # Reorder columns as requested
    all_levies_df = all_levies_df[['tax_code_area', 'levy_name', 'levy_rate_24', 'levy_rate_25', 'levy_type']]

    # Save to file with current date
    today_str = datetime.now().strftime("%Y_%m_%d")
    millage_path = os.path.join(millage_dir, f"millage_scrape_{today_str}.parquet")
    all_levies_df.to_parquet(millage_path, index=False)
    print(f"Saved millage rates to {millage_path}")

else:
    # Find the most recent millage_scrape_*.parquet file
    files = glob.glob(os.path.join(millage_dir, "millage_scrape_*.parquet"))
    if not files:
        raise FileNotFoundError("No millage_scrape_*.parquet files found in data/spokane/. Set data_scrape=1 to scrape.")
    # Sort by date in filename robustly
    dated_files = []
    for f in files:
        dt = extract_date_from_filename(f)
        if dt is not None:
            dated_files.append((dt, f))
    if dated_files:
        # Pick the file with the latest date
        most_recent = max(dated_files, key=lambda tup: tup[0])[1]
    else:
        # Fallback: sort by mtime
        most_recent = max(files, key=os.path.getmtime)
    all_levies_df = pd.read_parquet(most_recent)
    print(f"Loaded millage rates from {most_recent}")

print(f"Unique tax_code_areas in parcel_civic_df: {parcel_civic_df['tax_code_area'].nunique()}")
print(f"Unique tax_code_areas in all_levies_df: {all_levies_df['tax_code_area'].nunique()}")


In [None]:
# Find all levy_names that include 'SD081'
sd081_levies = all_levies_df[all_levies_df['levy_name'].str.contains('SD081', case=False, na=False)]

if sd081_levies.empty:
    print("No levies found including 'SD081'.")
else:
    print("Summary statistics for levies including 'SD081':")
    for levy in sd081_levies['levy_name'].unique():
        levy_rows = sd081_levies[sd081_levies['levy_name'] == levy]
        for col in ['levy_rate_24', 'levy_rate_25']:
            if col in levy_rows.columns:
                # Convert to numeric, coerce errors to NaN
                vals = pd.to_numeric(levy_rows[col], errors='coerce').dropna()
                if not vals.empty:
                    min_val = vals.min()
                    median_val = vals.median()
                    max_val = vals.max()
                    print(f"Levy: {levy} | {col} - min: {min_val:.6f}, median: {median_val:.6f}, max: {max_val:.6f}")
                else:
                    print(f"Levy: {levy} | {col} - No data")

# Do the same for any levy including 'county'
county_levies = all_levies_df[all_levies_df['levy_name'].str.contains('county', case=False, na=False)]

if county_levies.empty:
    print("No levies found including 'county'.")
else:
    print("Summary statistics for levies including 'county':")
    for levy in county_levies['levy_name'].unique():
        levy_rows = county_levies[county_levies['levy_name'] == levy]
        for col in ['levy_rate_24', 'levy_rate_25']:
            if col in levy_rows.columns:
                # Convert to numeric, coerce errors to NaN
                vals = pd.to_numeric(levy_rows[col], errors='coerce').dropna()
                if not vals.empty:
                    min_val = vals.min()
                    median_val = vals.median()
                    max_val = vals.max()
                    print(f"Levy: {levy} | {col} - min: {min_val:.6f}, median: {median_val:.6f}, max: {max_val:.6f}")
                else:
                    print(f"Levy: {levy} | {col} - No data")


## Step 2: Filtering to Spokane

In [None]:
# Filter to Spokane properties only
df = parcel_civic_df

print(f"Filtered to {len(df):,} Spokane properties")
print(f"Original dataset had {len(parcel_civic_df):,} total properties")


# Step 2.5 Add in Millage Rates by District

Spokane would add an abatement to buildings which would apply across taxing districts. 

In [None]:
# Print all levy_names for tax_code_area 0011
levy_names_0011 = all_levies_df.loc[all_levies_df['tax_code_area'] == "0011", 'levy_name'].unique()
print("Levy names for tax_code_area 0011:")
for name in levy_names_0011:
    print(name)

# Show the full table with all columns for tax_code_area 0011
levies_0011_df = all_levies_df.loc[all_levies_df['tax_code_area'] == "0011"]
print("Full table for tax_code_area 0011:")
print(levies_0011_df)





In [None]:
# Remove levy_name = "Totals:" from all_levies_df
all_levies_df = all_levies_df[all_levies_df['levy_name'] != "Totals:"]
print(f"Removed 'Totals:' entries. Remaining records: {len(all_levies_df):,}")


In [None]:
print(f"Total count of all millages in all_levies_df: {len(all_levies_df):,}")
print(f"Median levy_rate_25 by levy name (excluding voted levies):")
# Filter out voted levies before calculating medians
non_voted_levies = all_levies_df[all_levies_df['levy_type'] != "Voted"]

In [None]:
import pandas as pd

# Ensure numeric
all_levies_df['levy_rate_25'] = pd.to_numeric(all_levies_df['levy_rate_25'], errors='coerce').fillna(0.0)

# --- Identify the Spokane-General tax code areas (TCA) ---
spokane_name = 'spokane general'
spokane_mask = all_levies_df['levy_name'].str.casefold().eq(spokane_name)
spokane_tcas = set(all_levies_df.loc[spokane_mask, 'tax_code_area'].dropna().unique())

if not spokane_tcas:
    raise ValueError("No tax_code_area contains 'Spokane General' in levy_name. Check your data/labels.")

# --- Build levy_name -> set(TCA) mapping (use the same universe as your analysis; here we include all rows) ---
levy_to_tcas = (
    all_levies_df
    .groupby('levy_name')['tax_code_area']
    .apply(lambda s: set(s.dropna().unique()))
)

# Only levies that appear exclusively in Spokane-General TCAs (no outside TCAs)
only_in_spokane = levy_to_tcas.apply(lambda s: len(s) > 0 and s.issubset(spokane_tcas))

# Bucket 1: present in every Spokane-General TCA (and nowhere else)
bucket1_names = levy_to_tcas[only_in_spokane].index[
    levy_to_tcas[only_in_spokane].apply(lambda s: s == spokane_tcas)
].tolist()

# Bucket 2: present in some (≥1) but not all Spokane-General TCAs (and nowhere else)
bucket2_names = levy_to_tcas[only_in_spokane].index[
    levy_to_tcas[only_in_spokane].apply(lambda s: (s != spokane_tcas) and (len(s) >= 1))
].tolist()

# Exclude "Spokane General" itself from both buckets
bucket1_names = [n for n in bucket1_names if n.strip().lower() != spokane_name]
bucket2_names = [n for n in bucket2_names if n.strip().lower() != spokane_name]

# --- Check SD081 Spokane General specifically ---
sd081_name = 'SD081 Spokane General'
if sd081_name in levy_to_tcas.index:
    sd081_tcas = levy_to_tcas[sd081_name]
    sd081_outside_spokane = sd081_tcas - spokane_tcas
    print(f"SD081 Spokane General appears in {len(sd081_tcas)} total TCAs")
    print(f"SD081 Spokane General appears in {len(sd081_outside_spokane)} TCAs outside of Spokane General")
    if sd081_outside_spokane:
        print(f"Extra TCAs: {sorted(sd081_outside_spokane)}")
else:
    print("SD081 Spokane General not found in levy data")

# --- Helper to summarize levy "millages" within Spokane-General TCAs ---
def summarize_levies(names):
    if not names:
        return pd.DataFrame(columns=['levy_name','levy_type','tca_count','mean_millage','min_millage','max_millage','total_millage'])
    sub = all_levies_df[
        all_levies_df['levy_name'].isin(names)
        & all_levies_df['tax_code_area'].isin(spokane_tcas)
    ]
    # Get the most common levy_type for each levy_name (in case of inconsistencies)
    type_map = (
        sub.groupby('levy_name')['levy_type']
        .agg(lambda x: x.mode().iloc[0] if not x.mode().empty else x.iloc[0] if len(x) else None)
    )
    summary = (
        sub.groupby('levy_name')
           .agg(
               tca_count=('tax_code_area', 'nunique'),
               mean_millage=('levy_rate_25', 'mean'),
               min_millage=('levy_rate_25', 'min'),
               max_millage=('levy_rate_25', 'max'),
               total_millage=('levy_rate_25', 'sum')
           )
           .sort_values('levy_name')
           .reset_index()
    )
    summary['levy_type'] = summary['levy_name'].map(type_map)
    # Move levy_type to after levy_name
    cols = ['levy_name', 'levy_type', 'tca_count', 'mean_millage', 'min_millage', 'max_millage', 'total_millage']
    summary = summary[cols]
    return summary

# --- Helper to compute per-TCA total millage sums for a bucket (i.e., "total millages" by TCA) ---
def per_tca_totals(names):
    if not names:
        return pd.DataFrame(columns=['tax_code_area','bucket_total_millage'])
    sub = all_levies_df[
        all_levies_df['levy_name'].isin(names)
        & all_levies_df['tax_code_area'].isin(spokane_tcas)
    ]
    return (
        sub.groupby('tax_code_area', as_index=False)['levy_rate_25']
           .sum()
           .rename(columns={'levy_rate_25': 'bucket_total_millage'})
           .sort_values('tax_code_area')
    )

# --- Compute outputs ---
bucket1_levy_summary = summarize_levies(bucket1_names)
bucket2_levy_summary = summarize_levies(bucket2_names)

bucket1_per_tca = per_tca_totals(bucket1_names)
bucket2_per_tca = per_tca_totals(bucket2_names)

# --- Print results ---
print("Bucket 1 — In every Spokane-General TCA and nowhere else")
print(f"Levy count: {len(bucket1_names)}")
if not bucket1_levy_summary.empty:
    print("Levy summary (including whether voted or not):")
    print(bucket1_levy_summary.to_string(index=False))
else:
    print("No levies in this bucket.")
print("\nPer–TCA totals (Bucket 1):")
print(bucket1_per_tca.to_string(index=False))

print("\n" + "="*80 + "\n")

print("Bucket 2 — In some Spokane-General TCAs and nowhere else")
print(f"Levy count: {len(bucket2_names)}")
if not bucket2_levy_summary.empty:
    print("Levy summary (including whether voted or not):")
    print(bucket2_levy_summary.to_string(index=False))
else:
    print("No levies in this bucket.")
print("\nPer–TCA totals (Bucket 2):")
print(bucket2_per_tca.to_string(index=False))


In [None]:
import re

# Convenience: normalize the levy_name to safely run regex tests
name = all_levies_df['levy_name'].fillna("")

# 1% (constitutional) flag:
# - regular (non-voted) levies count
# - except Port/PUD regular levies (excluded by constitution)
is_non_voted = all_levies_df['levy_type'].ne("Voted")
is_port_or_pud = name.str.contains(r'\b(PUD|Public Utility District|Port)\b', flags=re.I, na=False)

all_levies_df['one_cap_flag'] = is_non_voted & ~is_port_or_pud

# $5.90 (statutory) bucket flag:
# Start from non-voted, then exclude categories that sit outside $5.90:
# - State School (both lines)
# - EMS regular levies
# - Conservation Futures
# - Port/PUD regular levies
is_state_school = name.str.contains(r'\bState\s+School(\s+Levy\s*2)?\b', flags=re.I, na=False)

is_ems = name.str.contains(r'\bEMS\b|\bEmergency\s+Medical', flags=re.I, na=False)

is_conservation_futures = name.str.contains(r'\bCons(?:ervation)?\s+Futures\b', flags=re.I, na=False)

all_levies_df['in_590_bucket'] = (
    is_non_voted
    & ~is_state_school
    & ~is_ems
    & ~is_conservation_futures
    & ~is_port_or_pud
)

# (Optional) Quick sanity check summaries:
print("Counts → one_cap_flag:")
print(all_levies_df.groupby('one_cap_flag').size())
print("\nCounts → in_590_bucket (subset of non-voted):")
print(all_levies_df[is_non_voted].groupby('in_590_bucket').size())


In [None]:
# Check if "Spokane Bond" is a voted levy
spokane_bond_rows = all_levies_df[all_levies_df['levy_name'].str.contains('spokane bond', case=False, na=False)]
if not spokane_bond_rows.empty:
    print("Spokane Bond 'levy_type' values:")
    print(spokane_bond_rows['levy_type'].value_counts())
    is_voted = (spokane_bond_rows['levy_type'].str.lower() == 'voted').any()
    print(f"\nIs Spokane Bond voted? {'Yes' if is_voted else 'No'}")
else:
    print("No rows found for 'Spokane Bond' in all_levies_df.")


In [None]:
# Find all rows where the levy_name contains 'SD081' (case-insensitive)
sd081_rows = all_levies_df[all_levies_df['levy_name'].str.contains('SD081', case=False, na=False)]

# Get unique combinations of levy_name, levy_rate_25, and levy_type (to show voted/non-voted)
unique_sd081_millages = (
    sd081_rows[['levy_name', 'levy_rate_25', 'levy_type']]
    .drop_duplicates()
    .sort_values(['levy_name', 'levy_rate_25'])
)

print("Unique millages containing 'SD081' and whether they are voted or not:")
print(unique_sd081_millages)


In [None]:
from IPython.display import display

# Set pandas display option to ensure all rows are visible when displaying in notebook
import pandas as pd
pd.set_option('display.max_rows', None)

print("Levy name counts:")
display(all_levies_df['levy_name'].value_counts())


In [None]:
import numpy as np
import pandas as pd
import re

# --- Construct per-levy flags DataFrame (levy_df) ---
all_levies_df['levy_rate_25'] = pd.to_numeric(all_levies_df['levy_rate_25'], errors='coerce').fillna(0.0)
nm = all_levies_df['levy_name'].fillna('')

# one_cap_flag: True for regular (non-voted) levies except Port/PUD
is_port_or_pud = nm.str.contains(r'\b(?:PUD|Public Utility District|Port)\b', flags=re.I, na=False)
all_levies_df['one_cap_flag'] = all_levies_df['levy_type'].ne("Voted") & ~is_port_or_pud

# statute_flag: non-voted regulars (not state school, EMS, conservation futures, or port/pud)
is_state_school = nm.str.contains(r'\bState\s+School(?:\s+Levy\s*2)?\b', flags=re.I, na=False)
is_ems = nm.str.contains(r'\bEMS\b|\bEmergency\s+Medical', flags=re.I, na=False)
is_consfut = nm.str.contains(r'\bCons(?:ervation)?\s+Futures\b', flags=re.I, na=False)
is_non_voted = all_levies_df['levy_type'].ne("Voted")
all_levies_df['statute_flag'] = (
    is_non_voted & ~is_state_school & ~is_ems & ~is_consfut & ~is_port_or_pud
)

is_city_general = nm.str.contains(r'\bspokane\s+general\b', flags=re.I, na=False)
is_county_general = nm.str.contains(r'\bcounty\s+general\b', flags=re.I, na=False)
is_county_road = nm.str.contains(r'\bcounty\s+road\b', flags=re.I, na=False)
all_levies_df['statute_360_flag'] = (
    all_levies_df['statute_flag'] &
    (is_city_general | is_county_general | is_county_road)
)
all_levies_df['statute_590_flag'] = all_levies_df['statute_flag']

# --- Add in_spokane_boundaries flag ---
_spokane_name = "spokane general"
_spokane_mask = all_levies_df['levy_name'].str.casefold().eq(_spokane_name)
_spokane_tcas = set(all_levies_df.loc[_spokane_mask, 'tax_code_area'].dropna().unique())
if not _spokane_tcas:
    raise ValueError("No tax_code_area contains 'Spokane General' in levy_name. Check labels/spelling.")

_levy_to_tcas = (
    all_levies_df.groupby('levy_name')['tax_code_area']
    .apply(lambda s: set(s.dropna().unique()))
)

only_in_spokane_levy_mask = _levy_to_tcas.apply(lambda s: len(s) > 0 and s.issubset(_spokane_tcas))
_spokane_only_levies = _levy_to_tcas[only_in_spokane_levy_mask].index.tolist()
_sd081_extra_levies = ["SD081 Spokane B&I", "SD081 Spokane General"]
for extra_levy in _sd081_extra_levies:
    if extra_levy not in _spokane_only_levies and extra_levy in all_levies_df['levy_name'].unique():
        _spokane_only_levies.append(extra_levy)
spokane_boundaries_set = set(_spokane_only_levies)

# summarize per-levy-name for flags and in_spokane_boundaries
levy_df = (
    all_levies_df
    .groupby('levy_name', dropna=False)
    .agg(
        total_levy_rate_25=('levy_rate_25', 'first'),
        any_one_cap_flag=('one_cap_flag', 'any'),
        any_statute_flag=('statute_flag', 'any'),
        any_statute_590_flag=('statute_590_flag', 'any'),
        any_statute_360_flag=('statute_360_flag', 'any'),
        tca_set=('tax_code_area', lambda s: set(s.dropna().unique()))
    )
    .reset_index()
)
levy_df['in_spokane_boundaries'] = levy_df['levy_name'].isin(spokane_boundaries_set)
levy_df.to_csv("levy_df_flags_spokane.csv", index=False)

# --- 2. Create summary_by_tca: sum of total, one_cap, statute, statute_590, statute_360, and spokane_general for each TCA
all_levies_df['_rate_all']      = all_levies_df['levy_rate_25']
all_levies_df['_rate_one_cap']  = np.where(all_levies_df['one_cap_flag'],     all_levies_df['levy_rate_25'], 0.0)
all_levies_df['_rate_statute']  = np.where(all_levies_df['statute_flag'],     all_levies_df['levy_rate_25'], 0.0)
all_levies_df['_rate_statute_590']  = np.where(all_levies_df['statute_590_flag'], all_levies_df['levy_rate_25'], 0.0)
all_levies_df['_rate_statute_360']  = np.where(all_levies_df['statute_360_flag'], all_levies_df['levy_rate_25'], 0.0)

summary_by_tca = (
    all_levies_df
    .groupby('tax_code_area', as_index=False)
    .agg(
        total_millage   = ('_rate_all',     'sum'),
        one_cap_millage = ('_rate_one_cap', 'sum'),
        statute_millage = ('_rate_statute', 'sum'),
        statute_590_millage = ('_rate_statute_590', 'sum'),
        statute_360_millage = ('_rate_statute_360', 'sum'),
    )
)

# Add Spokane General millage per TCA
spokane_general = (
    all_levies_df.loc[all_levies_df['levy_name'].str.casefold() == 'spokane general']
    .groupby('tax_code_area', as_index=False)['levy_rate_25']
    .sum()
    .rename(columns={'levy_rate_25': 'spokane_general_millage'})
)
summary_by_tca = summary_by_tca.merge(spokane_general, on='tax_code_area', how='left')
summary_by_tca['spokane_general_millage'] = summary_by_tca['spokane_general_millage'].fillna(0.0)

# Add total millage with only-in-spokane-boundary levies per tca
_spokane_only_rows = all_levies_df[
    all_levies_df['levy_name'].isin(spokane_boundaries_set)
    & all_levies_df['tax_code_area'].isin(_spokane_tcas)
].copy()

_spokane_boundaries_per_tca = (
    _spokane_only_rows
    .groupby('tax_code_area', as_index=False)['levy_rate_25']
    .sum()
    .rename(columns={'levy_rate_25': 'total_spokane_boundaries_millage'})
)

summary_by_tca = summary_by_tca.merge(_spokane_boundaries_per_tca, on='tax_code_area', how='left')
summary_by_tca['total_spokane_boundaries_millage'] = summary_by_tca['total_spokane_boundaries_millage'].fillna(0.0)

# --- 3. Make per-levy millage columns in df for every unique levy ---

# Create a tax_code_area x levy_name pivot of millages (fill missing with 0)
tca_levy_millage = all_levies_df.pivot_table(
    index='tax_code_area',
    columns='levy_name',
    values='levy_rate_25',
    aggfunc='sum',
    fill_value=0.0
)

# For each parcel, add a column for each levy: levy_millage_(levyname)
df = df.copy()
levyname_list = list(tca_levy_millage.columns)
for levyname in levyname_list:
    colname = "levy_millage_" + str(levyname).strip().lower().replace(' ', '_').replace('&', 'and').replace('/', '_').replace('-', '_').replace('.', '_')
    millage_by_tca = tca_levy_millage[levyname]
    df = df.merge(
        millage_by_tca.rename(colname).reset_index(),
        on='tax_code_area',
        how='left'
    )
    df[colname] = df[colname].fillna(0.0)

# -- Optionally print --
print("levy_df (first 10 rows):")
print(levy_df.head(10))
print("\nsummary_by_tca (first 10 rows):")
print(summary_by_tca.head(10))
colprint = [c for c in df.columns if c.startswith('levy_millage_')]
print("\nParcel levy millage columns (first 5 rows):")
print(df[['tax_code_area'] + colprint].head(5))


In [None]:
# Calculate min, max, and median for statute_590_millage and statute_360_millage from levy_summary
for col in ['statute_590_millage', 'statute_360_millage']:
    if col in summary_by_tca.columns:
        col_min = summary_by_tca[col].min()
        col_max = summary_by_tca[col].max()
        col_median = summary_by_tca[col].median()
        print(f"{col}: min={col_min}, max={col_max}, median={col_median}")
    else:
        print(f"Column '{col}' not found in levy_summary.")


In [None]:
# Add a column to levy_df indicating the levy_millage_* column name used in df
def normalize_levy_name(name: str) -> str:
    return (
        str(name)
        .strip()
        .lower()
        .replace(' ', '_')
        .replace('&', 'and')
        .replace('/', '_')
        .replace('-', '_')
        .replace('.', '_')
    )

levy_df['millage_col'] = levy_df['levy_name'].apply(
    lambda x: f"levy_millage_{normalize_levy_name(x)}"
)


In [None]:
# Print all tax code areas where Spokane General levy appears

spokane_general_levy_mask = all_levies_df['levy_name'].str.casefold() == 'spokane general'
spokane_general_tcas = all_levies_df.loc[spokane_general_levy_mask, 'tax_code_area'].unique()
print("Tax code areas with 'Spokane General' levy:")
for tca in sorted(spokane_general_tcas):
    print(tca)


In [None]:

levy_statute_summary = (
    all_levies_df
    .groupby('levy_name', dropna=False)
    .agg(
        statute_flag=('statute_flag', 'first'),
        statute_360_flag=('statute_360_flag', 'first'),
        statute_590_flag=('statute_590_flag', 'first'),
        median_levy_rate_25=('levy_rate_25', 'median')
    )
    .reset_index()
)
levy_statute_summary.to_csv("levy_statute_summary_spokane.csv", index=False)
# For each levy_name, check if all rows have the same levy_rate_25
levyname_rate_varies = (
    all_levies_df
    .groupby('levy_name')['levy_rate_25']
    .nunique()
    .reset_index()
    .rename(columns={'levy_rate_25': 'n_unique_rates'})
)

# Add a column: is_same_rate (True if only 1 unique rate for that name)
levyname_rate_varies['is_same_rate'] = levyname_rate_varies['n_unique_rates'] == 1

# Compute the percent of levy_names where all rows have the same rate
num_levy_names = len(levyname_rate_varies)
num_same_rate = levyname_rate_varies['is_same_rate'].sum()
percent_same_rate = (num_same_rate / num_levy_names) * 100 if num_levy_names > 0 else 0

print(f"{percent_same_rate:.2f}% of levy_names have the same levy_rate_25 for every row of that name "
      f"({num_same_rate} of {num_levy_names})")

# Optionally, print a few examples where the rate varies
varying = levyname_rate_varies[~levyname_rate_varies['is_same_rate']]
if not varying.empty:
    print("\nlevy_names where levy_rate_25 is NOT the same for every row:")
    print(varying.head(1))


## Step 3: Recreating Current Property Tax Revenue

Before we can model an LVT shift, we need to accurately recreate the current property tax system. This validation step ensures our dataset correctly reflects the real-world tax landscape.


In [None]:
display(df.head())



In [None]:
# Print the percent of rows in df where total_spokane_boundaries_millage is NA
total = len(df)
num_exmp_ge_assessed = (df['exmp_amt'] >= df['assessed_amt']).sum()
percent_exmp_ge_assessed = (num_exmp_ge_assessed / len(df)) * 100 if len(df) > 0 else 0
print(f"Percent of rows with exmp_amt >= assessed_amt: {percent_exmp_ge_assessed:.2f}% ({num_exmp_ge_assessed} of {len(df)})")
# Calculate and print percent of exmp_amt and assessed_amt that are NA
num_exmp_amt_na = df['exmp_amt'].isna().sum()
num_assessed_amt_na = df['assessed_amt'].isna().sum()
percent_exmp_amt_na = (num_exmp_amt_na / total) * 100 if total > 0 else 0
percent_assessed_amt_na = (num_assessed_amt_na / total) * 100 if total > 0 else 0
print(f"Percent of rows with exmp_amt NA: {percent_exmp_amt_na:.2f}% ({num_exmp_amt_na} of {total})")
print(f"Percent of rows with assessed_amt NA: {percent_assessed_amt_na:.2f}% ({num_assessed_amt_na} of {total})")

df['exmp_amt'] = df['exmp_amt'].fillna(0)

# Restrict to rows with non-NA spokane_general_millage and assessed_amt > exmp_amt, print percent removed
num_rows_before = len(df)
df = df[(df['levy_millage_spokane_general'].notna()) & (df['levy_millage_spokane_general'] != 0)] 
df = df[df['assessed_amt'] > df['exmp_amt']]
num_rows_after = len(df)
percent_removed = ((num_rows_before - num_rows_after) / num_rows_before) * 100 if num_rows_before > 0 else 0
print(f"Removed {num_rows_before - num_rows_after} rows ({percent_removed:.2f}%) where total_spokane_boundaries_millage was NA or 0, or assessed_amt <= exmp_amt.")


In [None]:
# Remove columns that start with 'levy_millage_' where all entries are either NA or 0
levy_millage_cols = [col for col in df.columns if col.startswith('levy_millage_')]
cols_to_drop = [col for col in levy_millage_cols if df[col].isna().all() or (df[col].fillna(0) == 0).all()]
df = df.drop(columns=cols_to_drop)
print(f"Removed {len(cols_to_drop)} levy_millage_ columns that were all NA or 0: {cols_to_drop}")


In [None]:
display(df.head())

In [None]:
# --- Calculate current tax for total_spokane_boundaries_millage and save as current_tax ---
df['millage_rate'] = pd.to_numeric(df['levy_millage_spokane_general'], errors='coerce')
print(f"Median millage_rate: {df['millage_rate'].median()}")

# Set exmp_flag = 1 if exmp_amt > assessed_amt or taxable_amt <= 0.05, else 0
df['exmp_flag'] = ((df['exmp_amt'] > df['assessed_amt']) | (df['taxable_amt'] <= 0.05)).astype(int)
df = df[df['exmp_flag'] != 1].copy()
# Calculate current revenue for the overall Spokane boundaries millage using calculate_current_tax
# This will also save the per-row current_tax to df['current_tax']
current_revenue, second_revenue, df = calculate_current_tax(
    df=df, 
    tax_value_col='assessed_amt',
    millage_rate_col='millage_rate',
    exemption_col='exmp_amt',
    exemption_flag_col='exmp_flag'
)

print(f"Total number of properties: {len(df):,}")
print(f"Current annual revenue with millage rate: ${current_revenue:,.2f}")
print(f"Total land value: ${df['land_value'].sum():,.2f}")
print(f"Total overall value: ${df['assessed_amt'].sum():,.2f}")
print(f"Total taxable value: ${df['taxable_amt'].sum():,.2f}")

total_assessed_minus_exempt = (df['assessed_amt'] - df['exmp_amt']).sum()
print("Sum of assessed_amt minus exmp_amt:", total_assessed_minus_exempt)

# Calculate millage_rate as (current_revenue / total_assessed_minus_exempt) * 1000
millage_rate = (current_revenue / total_assessed_minus_exempt) * 1000
print("Calculated millage_rate:", millage_rate)



In [None]:

# Calculate current tax for every levy millage column (those starting with 'levy_millage_')

levy_millage_cols = [col for col in df.columns if col.startswith('levy_millage_')]

levy_millage_revenues = {}
current_tax_cols = []

for millage_col in levy_millage_cols:
    # Generate output column by removing 'levy_millage_' and adding '_current_tax'
    suffix = millage_col[len('levy_millage_'):]
    tax_col = f"levy_millage_{suffix}_current_tax"
    current_tax_cols.append(tax_col)
    
    # Convert millage column to numeric
    df[millage_col] = pd.to_numeric(df[millage_col], errors='coerce')
    
    # Calculate current tax for this levy millage column
    revenue, _, df_mill = calculate_current_tax(
        df=df.copy(),
        tax_value_col='assessed_amt',
        millage_rate_col=millage_col,
        exemption_col='exmp_amt',
        exemption_flag_col='exmp_flag'
    )
    # Save the per-row current_tax as the appropriate column in the main df
    df[tax_col] = df_mill['current_tax']
    levy_millage_revenues[millage_col] = revenue
    print(f"Current annual revenue for {millage_col}: ${revenue:,.2f}")

# Optionally, print the sum for all the levy millage columns
total_levy_millage_revenue = sum(levy_millage_revenues.values())
print(f"Total current revenue from all levy millage columns: ${total_levy_millage_revenue:,.2f}")

# Ensure total_current_tax is the sum of all current tax columns
df['total_current_tax'] = df[current_tax_cols].fillna(0).sum(axis=1)



In [None]:
display(df.head())

In [None]:
def categorize_property_type(prop_use_desc):
    # Direct mapping based on the property use descriptions from the data
    category_mapping = {
        "Single Family": ["Single Unit"],
        "Small Multi-Family (2-4 units)": ["Two-to-Four Unit"],
        "Large Multi-Family (5+ units)": ["Five-Plus Unit"],
        "Other Residential": ["Other Residential", "Vacation Home"],
        "Mobile Home Park": ["Mobile Home Park"],
        "Vacant Land": ["Vacant Land"],
        "Agricultural": ["Cur - Use - Ag", "Agricultural Not Classified", "Agricultural"],
        "Retail/Service/Commercial": [
            "Retail - General Mrchds", "Retail - Other", "Retail - Hardware", "Retail - Food", "Retail - Eating",
            "Retail - Auto", "Retail - Furniture", "Service - Finance", "Service - Professional", "Service - Repair",
            "Service - Education", "Service - Governmental", "Service - Construction", "Service - Personal",
            "Service - Business", "Wholesale", "Hotel/Motel", "Hotel/Condo", "Inst Lodging", "Recreational",
            "Resort - Camping", "Public Assembly", "Churches", "Park", "Other Cultural"
        ],
        "Manufacturing/Industrial": [
            "Manf - Other", "Manf - Fabricated Material", "Manf - Petroleum", "Manf - Printed Material",
            "Manf - Stone/Glass", "Manf - Printing", "Manf - Instrumentation", "Manf - Leather", "Manf - Paper",
            "Manufacturing - Food", "Manufacturing - Lumber", "Mining", "Utilities", "Communication"
        ],
        "Transportation - Parking": ["Trans - Parking"],
        "Transportation/Other": [
            "Trans - Highway", "Trans - Railroad", "Trans - Aircraft", "Trans - Motor", "Trans - Other"
        ],
        "Designated Forest": ["Designated Forest Lnd"],
        "Water Areas": ["Water Area"],
        "Marijuana": ["Marijuana Growing"],
        "Current Use Open": ["Cur - Use - Open"]
    }

    # Check for exact matches first
    for category, keywords in category_mapping.items():
        if prop_use_desc in keywords:
            return category

    # If no match found, return "Other"
    return "Other"

# Apply the function to the DataFrame
df['PROPERTY_CATEGORY'] = df['prop_use_desc'].apply(categorize_property_type)

## Step 4: Modeling the Split-Rate Land Value Tax

Now for the exciting part - modeling the LVT shift! We'll create a revenue-neutral policy that taxes land at 4 times the rate of buildings.

### The Split-Rate Formula

Under our proposed system:
- **Buildings** are taxed at a lower rate (Building Millage)  
- **Land** is taxed at 4x that rate (4 × Building Millage)
- **Total revenue** remains the same as current system

The formula to solve for the building millage rate is:
```
Current Revenue = (Building Millage × Total Taxable Buildings) + (4 × Building Millage × Total Taxable Land)
```

### Handling Exemptions in Split-Rate System

Since we want to maintain existing exemptions, we need to:
1. Apply exemptions to building value first
2. If exemptions exceed building value, apply remainder to land value
3. Calculate separate taxable values for land and buildings

This ensures properties don't over-benefit from exemptions and maintains the intent of existing tax policy.


In [None]:
# Find properties where assessed_amt < land_value
mask = df['assessed_amt'] < df['land_value']

# Print value counts of prop_use_desc for those properties
if 'prop_use_desc' in df.columns:
    print("prop_use_desc counts where assessed_amt < land_value:")
    print(df.loc[mask, 'prop_use_desc'].value_counts())
else:
    print("Column 'prop_use_desc' not found in dataframe.")

# Set land_value to assessed_amt for those properties
df.loc[mask, 'land_value'] = df.loc[mask, 'assessed_amt']

# Check if any assessed_amt values are negative
num_negative_assessed = (df['land_value'] < 0).sum()
if num_negative_assessed > 0:
    print(f"Warning: There are {num_negative_assessed} properties with negative assessed_amt values.")
else:
    print("No negative assessed_amt values found.")


# Option B

In [None]:
# ===============================
# Calculate split-rate (LVT) _new_tax for every levy millage column individually
# with different parameters based on 590/360 flags
# ===============================

# Find all levy millage columns
levy_millage_cols = [col for col in df.columns if col.startswith('levy_millage_')]

# Calculate improvement_value as assessed_amt - land_value
df['improvement_value'] = (df['assessed_amt'] - df['land_value']).clip(lower=0)

# For results
levy_millage_rates = {}      # split-rate per levy
levy_new_revenues = {}       # modeled revenue per levy
levy_current_revenues = {}   # current revenue per levy

# ---- Build lookup from millage_col -> flags from levy_df ----
levy_flag_lookup = (
    levy_df[['millage_col', 'any_statute_590_flag', 'any_statute_360_flag']]
    .fillna(False)
    .set_index('millage_col')
    .to_dict(orient='index')
)
# Example of levy_flag_lookup:
# {
#   'levy_millage_spokane_general': {'any_statute_590_flag': True, 'any_statute_360_flag': True},
#   'levy_millage_sd081_spokane_bi': {'any_statute_590_flag': False, 'any_statute_360_flag': False},
#   ...
# }

# Add _new_tax column for each levy
for millage_col in levy_millage_cols:
    # The corresponding _current_tax column (already calculated earlier)
    suffix = millage_col[len("levy_millage_"):]
    tax_col = f"levy_millage_{suffix}_current_tax"
    new_tax_col = f"{millage_col}_new_tax"
    new_millage_col = f"{millage_col}_new_millage"

    # Require the _current_tax column to exist for current revenue
    if tax_col not in df.columns:
        print(f"Skipping {millage_col}: missing {tax_col} column for current revenue summing.")
        continue

    # --- Choose parameters based on 590/360 flags ---
    flags = levy_flag_lookup.get(
        millage_col,
        {'any_statute_590_flag': False, 'any_statute_360_flag': False}
    )
    has_590_or_360 = flags['any_statute_590_flag'] or flags['any_statute_360_flag']

    if has_590_or_360:
        # For 590 / 360 levies: 25% improvement exemption, $10k base
        improvement_exemption_pct = 0.20
        building_abatement_floor = 0
    else:
        # All other levies: 90% improvement exemption, $100k base
        improvement_exemption_pct = 0.6
        building_abatement_floor = 100000

    current_revenue = df[tax_col].sum()
    levy_current_revenues[millage_col] = current_revenue

    # Model split-rate for this levy with its chosen parameters
    millage_rate, modeled_revenue, df_result = model_stacking_improvement_exemption(
        df,
        land_value_col='land_value',
        improvement_value_col='improvement_value',
        current_revenue=current_revenue,
        building_abatement_floor=building_abatement_floor,
        improvement_exemption_percentage=improvement_exemption_pct,
        exemption_col='exmp_amt',
        exemption_flag_col='exmp_flag'
    )

    df[new_tax_col] = df_result["new_tax"]
    df[new_millage_col] = millage_rate
    levy_millage_rates[millage_col] = millage_rate
    levy_new_revenues[millage_col] = modeled_revenue

    print(
        f"{millage_col}: flags(590={flags['any_statute_590_flag']}, "
        f"360={flags['any_statute_360_flag']}), "
        f"improv_exempt={improvement_exemption_pct:.2%}, "
        f"floor=${building_abatement_floor:,.0f}, "
        f"millage={millage_rate:.5f}, "
        f"revenue=${modeled_revenue:,.2f}"
    )

# Calculate and print totals
total_new_splitrate_revenue = sum(levy_new_revenues.values())
total_current_levy_revenue = sum(levy_current_revenues.values())
print(f"Total modeled split-rate revenue (sum of _new_tax): ${total_new_splitrate_revenue:,.2f}")
print(f"Total current revenue (sum of _current_tax): ${total_current_levy_revenue:,.2f}")
print(f"Difference: ${total_new_splitrate_revenue - total_current_levy_revenue:,.2f}")

# Build df_b with all _new_tax columns and a total
df_b = df.copy()
all_new_tax_cols = [f"{col}_new_tax" for col in levy_millage_cols if f"{col}_new_tax" in df.columns]
df_b["total_new_tax"] = df_b[all_new_tax_cols].sum(axis=1)


In [None]:

# We'll compare the sum of current taxes to the sum of new modeled taxes for each pa

# User manually selects calculation mode
USE_ONLY_SPOKANE_BOUNDARIES = False # Set to True to use only Spokane boundaries levies for tax calculations

if USE_ONLY_SPOKANE_BOUNDARIES:
    # Find millage columns for levies in Spokane boundaries
    in_spokane_levies = levy_df.loc[levy_df['in_spokane_boundaries'] == 1, 'millage_col'].dropna().tolist()
    # Filter out any levies where the millage_col includes "sd081" (case insensitive)
    in_spokane_levies = [col for col in in_spokane_levies if "sd081" not in col.lower()]
    print("Using only Spokane boundary millages:", in_spokane_levies)

    new_tax_cols_in_spokane = [f"{col}_new_tax" for col in in_spokane_levies if f"{col}_new_tax" in df_b.columns]
    current_tax_cols_in_spokane = [f"{col}_current_tax" for col in in_spokane_levies if f"{col}_current_tax" in df_b.columns]

    df_b['new_tax'] = df_b[new_tax_cols_in_spokane].sum(axis=1) if new_tax_cols_in_spokane else 0
    df_b['current_tax'] = df_b[current_tax_cols_in_spokane].sum(axis=1) if current_tax_cols_in_spokane else 0
else:
    # Default: use sum of all total levy columns
    df_b['new_tax'] = df_b['total_new_tax']
    df_b['current_tax'] = df_b['total_current_tax']

df_b['tax_change'] = df_b['new_tax'] - df_b['current_tax']
df_b['tax_change_pct'] = df_b['tax_change'] / df_b['current_tax'] * 100
df_b['tax_change_pct'] = df_b['tax_change_pct'].replace([np.inf, -np.inf], np.nan).fillna(0)
# Sum new_tax and current_tax, and output their values and difference
total_new_tax = df_b['new_tax'].sum()
total_current_tax = df_b['current_tax'].sum()
print(f"Total new_tax: ${total_new_tax:,.2f}")
print(f"Total current_tax: ${total_current_tax:,.2f}")
print(f"Difference (new - current): ${total_new_tax - total_current_tax:,.2f}")

# Save the full results table for further analysis
output_full_results = df_b.copy()

# Calculate and print the summary table for total tax impact (using lvt_utils)
from lvt_utils import calculate_category_tax_summary, print_category_tax_summary

# This will use the default 'PROPERTY_CATEGORY' column if present
output_summary = calculate_category_tax_summary(
    output_full_results,
    category_col='PROPERTY_CATEGORY' if 'PROPERTY_CATEGORY' in output_full_results.columns else output_full_results.columns[0],  # fallback to first col if not present
    current_tax_col='current_tax',
    new_tax_col='new_tax'
)
print_category_tax_summary(output_summary, "Total Tax Impact by Property Category (All sp_ Levies)")



In [None]:
# Calculate and report the sum of the absolute difference between current_tax and new_tax,
# and what percent of the sum of current_tax that represents.

# Calculate absolute difference per parcel
df_b['abs_tax_diff'] = (df_b['current_tax'] - df_b['new_tax']).abs()

# Sum absolute differences
total_abs_tax_diff = df_b['abs_tax_diff'].sum()

# Calculate what percent of total current tax that represents
percent_of_current = (total_abs_tax_diff / total_current_tax) * 100 if total_current_tax != 0 else np.nan

print(f"Sum of absolute value of current_tax minus new_tax: ${total_abs_tax_diff:,.2f}")
print(f"That is {percent_of_current:.2f}% of the sum of current_tax.")


In [None]:
# Print every unique millage name in df_b and the resulting new millage

# Find all columns in df_b that end with '_new_millage'
millage_cols = [col for col in df_b.columns if col.endswith('_new_millage')]

for col in millage_cols:
    # Derive the millage name from the column name
    # The format is expected: 'levy_millage_{NAME}_new_millage'
    # or possibly '{NAME}_new_millage'
    if col.startswith('levy_millage_'):
        millage_name = col[len('levy_millage_'):-len('_new_millage')]
    else:
        millage_name = col[:-len('_new_millage')]
    print(f"\nMillage name: '{millage_name}'")
    print("Unique new millage values:")
    print(df_b[col].unique())


In [None]:
# ============================================================
#  Sum NEW millage by tax_code_area, grouped by levy flags
#  Using levy_df['millage_col'] instead of recomputing names
# ============================================================

records = []

for _, row in levy_df.iterrows():
    millage_col = f"{row['millage_col']}_new_millage"

    if millage_col not in df_b.columns:
        # print(f"Missing millage column: {millage_col}")
        continue

    temp = df_b[['tax_code_area', millage_col]].copy()
    temp = temp.rename(columns={millage_col: 'new_millage'})
    temp['levy_name'] = row['levy_name']

    for flag_col in ["any_statute_590_flag", "any_statute_360_flag"]:
        temp[flag_col] = row[flag_col]

    records.append(temp)

if not records:
    raise ValueError("No *_new_millage levy columns found in df_b.")

long_df = pd.concat(records, ignore_index=True)

# ---- Aggregate by tax_code_area ----
summary_output = {}

for flag in ["any_statute_590_flag", "any_statute_360_flag"]:
    flagged = long_df[long_df[flag] == True]

    # Summarize unique (tax_code_area, levy_name) new millages
    tca_levy = (
        flagged
        .groupby(['tax_code_area', 'levy_name'], as_index=False)['new_millage']
        .first()
    )

    # Sum millages across levies within the TCA
    tca_sum = (
        tca_levy
        .groupby('tax_code_area')['new_millage']
        .sum()
    )

    summary_output[flag] = {
        "min": tca_sum.min(),
        "median": tca_sum.median(),
        "max": tca_sum.max(),
        "count_tcas": len(tca_sum)
    }

# ---- Print results ----
print("\n============================================================")
print(" NEW Millage Sums by Tax Code Area (using levy_df.millage_col)")
print("============================================================")
for flag, stats in summary_output.items():
    print(f"\nFlag: {flag}")
    print(f"  TCA count: {stats['count_tcas']}")
    print(f"  Min:    {stats['min']:.6f}")
    print(f"  Median: {stats['median']:.6f}")
    print(f"  Max:    {stats['max']:.6f}")


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Only include categories with property_count > 50
filtered = output_summary[output_summary['property_count'] > 50].copy()

categories = filtered['PROPERTY_CATEGORY'].tolist()
counts = filtered['property_count'].tolist()
median_pct_change = filtered['median_tax_change_pct'].tolist()
median_dollar_change = filtered['median_tax_change'].tolist()
total_tax_change = filtered['total_tax_change'].tolist() if 'total_tax_change' in filtered.columns else (filtered['mean_tax_change'] * filtered['property_count']).tolist()

# Sort by median_pct_change ascending
sorted_idx = np.argsort(median_pct_change)
categories = [categories[i] for i in sorted_idx]
counts = [counts[i] for i in sorted_idx]
median_pct_change = [median_pct_change[i] for i in sorted_idx]
median_dollar_change = [median_dollar_change[i] for i in sorted_idx]
total_tax_change = [total_tax_change[i] for i in sorted_idx]

# Custom color: anything above 0 is dark red, below 0 is green
bar_colors = []
for val in median_pct_change:
    if val > 0:
        bar_colors.append("#8B0000")  # dark red
    else:
        bar_colors.append("#228B22")  # professional green

# Bar settings
bar_height = 0.75
fig_height = len(categories) * 0.8 + 1.2
right_col_pad = 120  # more padding for right column
fig, ax = plt.subplots(figsize=(17, fig_height))  # wider for right column

y = np.arange(len(categories))

# Draw bars
ax.barh(
    y, median_pct_change, color=bar_colors, edgecolor='none',
    height=bar_height, alpha=0.92, linewidth=0, zorder=2
)

# Remove all spines and ticks for a clean look
for spine in ax.spines.values():
    spine.set_visible(False)
ax.tick_params(left=False, bottom=False, labelleft=False, labelbottom=False)

# Adjusted vertical spacing
cat_offset = 0.18   # less space between category and median
med_offset = -0.03  # median just below category
count_offset = -0.23  # more space below median for parcels

# For right column: position for total tax change
max_abs = max(abs(min(median_pct_change)), abs(max(median_pct_change)))
right_col_x = max_abs + right_col_pad

# Add Net Change header at the top of the right column
ax.text(
    right_col_x, len(categories) - 0.5, "Net Change", va='bottom', ha='left',
    fontsize=15, fontweight='bold', color='black', fontname='Arial'
)

for i, (cat, val, count, med_dol, tot_change) in enumerate(zip(categories, median_pct_change, counts, median_dollar_change, total_tax_change)):
    # Format median dollar and percent change together
    if med_dol >= 0:
        med_dol_str = f"${med_dol:,.0f}"
    else:
        med_dol_str = f"-${abs(med_dol):,.0f}"
    pct_str = f"{val:+.1f}%"
    median_combo = f"Median: {med_dol_str}, {pct_str}"

    # Position: right of bar for positive, left for negative
    if val < 0:
        xpos = val - 2.5
        ha = 'right'
    else:
        xpos = val + 2.5
        ha = 'left'
    # Category name (bold, bigger)
    ax.text(
        xpos, y[i]+cat_offset, cat, va='center', ha=ha,
        fontsize=14, fontweight='bold', color='#222',
        fontname='Arial'
    )
    # Median (dollar + percent, bold, black, just below category)
    ax.text(
        xpos, y[i]+med_offset, median_combo, va='center', ha=ha,
        fontsize=12, fontweight='bold', color='black',
        fontname='Arial'
    )
    # Count (bold, smaller, below median)
    ax.text(
        xpos, y[i]+count_offset, f"{count:,} parcels", va='center', ha=ha,
        fontsize=11, fontweight='bold', color='#888',
        fontname='Arial'
    )
    # Net change column, always right-aligned in a new column, black text, no "Total:"
    if tot_change >= 0:
        tot_change_str = f"${tot_change:,.0f}"
    else:
        tot_change_str = f"-${abs(tot_change):,.0f}"
    ax.text(
        right_col_x, y[i], tot_change_str, va='center', ha='left',
        fontsize=13, fontweight='bold', color='black',
        fontname='Arial'
    )

# Set x limits for symmetry, make bars longer, and leave space for right column
ax.set_xlim(-right_col_x, right_col_x + 60)

# Remove axis labels/ticks
ax.set_yticks([])
ax.set_xticks([])

plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Use output_summary to generate categories and percent increase/decrease, filtering to count > 50

# Filter to property_count > 50
summary_filtered = output_summary[output_summary['property_count'] > 50].copy()

# Sort by pct_increase_gt_threshold ascending (smallest percent increase first)
summary_sorted = summary_filtered.sort_values('pct_increase_gt_threshold', ascending=True)

categories_sorted = summary_sorted['PROPERTY_CATEGORY'].tolist()
pct_increase_sorted = summary_sorted['pct_increase_gt_threshold'].tolist()
pct_decrease_sorted = summary_sorted['pct_decrease_gt_threshold'].tolist()

# Convert to integers for display
pct_increase_int_sorted = [int(round(x)) for x in pct_increase_sorted]
pct_decrease_int_sorted = [int(round(x)) for x in pct_decrease_sorted]

y = np.arange(len(categories_sorted))

fig, ax = plt.subplots(figsize=(8, 6))

# Use specified colors
color_increase = "#8B0000"  # dark red
color_decrease = "#228B22"  # professional green

# Plot left (decrease) bars (green, to the left)
ax.barh(
    y, 
    [-v for v in pct_decrease_sorted], 
    color=color_decrease, 
    edgecolor='none', 
    height=0.7
)

# Plot right (increase) bars (red, to the right)
ax.barh(
    y, 
    pct_increase_sorted, 
    color=color_increase, 
    edgecolor='none', 
    height=0.7
)

# Add percent labels (integer, no decimals), smaller Arial font
for i, (inc, dec) in enumerate(zip(pct_increase_int_sorted, pct_decrease_int_sorted)):
    # Left side (decrease)
    if dec > 0:
        ax.text(
            -dec - 2, y[i], f"{dec}%", 
            va='center', ha='right', 
            fontsize=8, fontweight='normal', color=color_decrease, fontname='Arial'
        )
    # Right side (increase)
    if inc > 0:
        ax.text(
            inc + 2, y[i], f"{inc}%", 
            va='center', ha='left', 
            fontsize=8, fontweight='normal', color=color_increase, fontname='Arial'
        )

# Add category name at end of right bar, bold, smaller Arial, further from percent
for i, (cat, inc) in enumerate(zip(categories_sorted, pct_increase_sorted)):
    xpos = inc + 18 if inc > 0 else 18
    ax.text(
        xpos, y[i], cat, 
        va='center', ha='left', 
        fontsize=9, fontweight='bold', color='#222', fontname='Arial'
    )

# Remove all spines, ticks, and axis lines for minimalist look
for spine in ax.spines.values():
    spine.set_visible(False)
ax.tick_params(left=False, bottom=False, labelleft=False, labelbottom=False)

# Remove grid, axis, and titles
ax.set_yticks([])
ax.set_xticks([])
ax.set_ylabel('')
ax.set_xlabel('')
ax.set_title('')

# Set xlim for symmetry
max_val = max(max(pct_increase_sorted), max(pct_decrease_sorted))
ax.set_xlim(-max_val-20, max_val+48)

# --- Add custom titles above left and right bars ---
# Make the titles a little bit bigger and closer to the center
title_fontsize = 10  # increased from 8
title_color = 'black'
title_fontweight = 'normal'
title_fontname = 'Arial'

# Compute center x for both titles, but offset slightly left/right of center
title_y = len(categories_sorted) - 0.2

# Left title (above left bars), closer to center
left_title_x = -max_val * 0.45
ax.text(
    left_title_x, title_y, 
    "Percent of parcels\ndecreasing >10%", 
    ha='center', va='bottom', fontsize=title_fontsize, fontweight=title_fontweight, 
    color=title_color, fontname=title_fontname, 
    bbox=dict(facecolor='white', edgecolor='none', boxstyle='round,pad=0.15')
)

# Right title (above right bars), closer to center
right_title_x = max_val * 0.45
ax.text(
    right_title_x, title_y, 
    "Percent of parcels\nincreasing >10%", 
    ha='center', va='bottom', fontsize=title_fontsize, fontweight=title_fontweight, 
    color=title_color, fontname=title_fontname, 
    bbox=dict(facecolor='white', edgecolor='none', boxstyle='round,pad=0.15')
)

plt.tight_layout()
plt.show()


## Step 5: Understanding Property Types and Impacts

With our split-rate tax calculated, we can now analyze which property types are most affected. Understanding the distribution of tax impacts across different property categories is crucial for policy makers and stakeholders.

### Property Type Analysis

We'll examine how the tax burden shifts across:
- **Residential properties** (single-family, multi-family, condos)
- **Commercial properties** (retail, office, industrial)  
- **Vacant land** (often sees largest increases under LVT)
- **Exempt properties** (government, religious, charitable)

### Key Metrics to Track:
- **Count**: Number of properties in each category
- **Median tax change**: Typical impact (less affected by outliers)
- **Average percentage change**: Overall magnitude of impact
- **Percentage with increases**: How many properties see tax increases

This analysis helps identify which sectors benefit from the LVT shift (typically developed properties) and which see increased burden (typically land-intensive properties with low improvement ratios).


In [None]:
df = df_b.copy()

### Creating Detailed Property Categories

To better understand impacts, we'll create a detailed property categorization system that groups similar property types together. This makes the analysis more meaningful and interpretable.

The function below categorizes properties into groups like:
- **Single Family** (with subcategories by lot size)
- **Multi-Family** (small vs. large)
- **Commercial** (by type: retail, office, industrial)
- **Exempt** (by type: government, religious, charitable)

This categorization helps us understand not just that "residential" properties are affected, but specifically which types of residential properties see the biggest changes.


In [None]:
display(df.head())


### Summary of Tax Impacts by Property Category

Now we can see the clear patterns of how different property types are affected by the LVT shift. This table will show us:

- **Which property types benefit** (negative changes = tax decreases)
- **Which property types pay more** (positive changes = tax increases)  
- **How concentrated the impacts are** (median vs. average differences)
- **What percentage of each type sees increases**

Generally, we expect:
- **Developed properties** (houses, commercial buildings) to see tax **decreases**
- **Vacant land** to see the **largest increases** 
- **Properties with high improvement-to-land ratios** to benefit most


## Step 6: Adding Geographic Context

To make our analysis spatially-aware, we need to add geographic boundaries to our parcel data. This enables us to:

- **Create maps** showing tax changes across the city
- **Analyze patterns by neighborhood** or district  
- **Combine with demographic data** for equity analysis
- **Present results visually** to stakeholders

We'll fetch the parcel boundary data from the same ArcGIS service that contains the geometric information for each property.


### Merging Tax Analysis with Geographic Data

Here we combine our tax analysis results with the geographic boundaries. This creates a spatially-enabled dataset that allows us to:

1. **Map tax changes** across South Bend
2. **Identify spatial patterns** in tax impacts
3. **Prepare for demographic analysis** by having geographic context

The merge should give us the same number of records as our original analysis, now with geographic coordinates for each parcel.


In [None]:
# Get census data for Spokane County (FIPS code: 53063)
census_data, census_boundaries = get_census_data_with_boundaries(
    fips_code='53063',  # Washington (53) + Spokane County (063)
    year=2022
)
# Set CRS for census boundaries before merging
census_boundaries = census_boundaries.set_crs(epsg=4326)  # Assuming WGS84 coordinate system
boundary_gdf = df.set_crs(epsg=4326)  # Set same CRS for boundary data

# Merge census data with our parcel boundaries
df = match_to_census_blockgroups(
    gdf=boundary_gdf,
    census_gdf=census_boundaries,
    join_type="left"
)

print(f"Number of census blocks: {len(census_boundaries)}")
print(f"Number of census data: {len(census_data)}")
print(f"Number of parcels with census data: {len(df)}")

## Step 7: Demographic and Equity Analysis

One of the most important aspects of LVT analysis is understanding the **equity implications** - how does the tax shift affect different income levels and demographic groups?

### Adding Census Data

We'll match each property to its Census Block Group and pull demographic data including:
- **Median household income** 
- **Racial/ethnic composition**
- **Population characteristics**

### Why This Matters

Policy makers need to understand:
- Does the LVT shift disproportionately burden low-income neighborhoods?
- Are there racial equity implications?  
- Does the policy align with broader equity goals?

**Note**: You'll need a Census API key for this section. Get one free at: https://api.census.gov/data/key_signup.html


In [None]:
print("DataFrame columns:")
print(df.columns.tolist())


### Exploring the Enhanced Dataset

With census data merged in, our dataset now contains both property tax information and demographic context. Let's explore what variables we now have available for analysis.

This enhanced dataset allows us to examine relationships between:
- Property characteristics and demographics
- Tax impacts and neighborhood income levels
- Geographic patterns in tax burden shifts


In [None]:
# Display all columns with maximum width
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
display(df.head())


### Viewing the Complete Dataset

Let's examine our enhanced dataset with all the variables we've created and merged. This gives us a comprehensive view of each property with:

- **Property characteristics** (type, value, location)
- **Current tax calculations** 
- **New LVT calculations**
- **Tax change impacts**
- **Demographic context** (income, race/ethnicity)

This rich dataset forms the foundation for sophisticated equity and impact analysis.


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

def filter_data(df):
    """Filter data to remove negative or zero median incomes and create non-vacant subset (also with positive income)"""
    df_filtered = df[df['median_income'] > 0].copy()
    non_vacant_df = df[(df['PROPERTY_CATEGORY'] != 'Vacant Land') & (df['median_income'] > 0)].copy()
    return df_filtered, non_vacant_df

def calculate_block_group_summary(df):
    """Calculate summary statistics for census block groups, excluding negative/zero median incomes"""
    # Only include block groups with positive median income
    df = df[df['median_income'] > 0].copy()
    summary = df.groupby('std_geoid').agg(
        median_income=('median_income', 'first'),
        minority_pct=('minority_pct', 'first'),
        black_pct=('black_pct', 'first'),
        total_current_tax=('current_tax', 'sum'),
        total_new_tax=('new_tax', 'sum'),
        mean_tax_change=('tax_change', 'mean'),
        median_tax_change=('tax_change', 'median'),
        median_tax_change_pct=('tax_change_pct', 'median'),
        parcel_count=('tax_change', 'count'),
        has_vacant_land=('PROPERTY_CATEGORY', lambda x: 'Vacant Land' in x.values)
    ).reset_index()
    # Exclude block groups with non-positive median income (shouldn't be needed, but for safety)
    summary = summary[summary['median_income'] > 0].copy()
    summary['mean_tax_change_pct'] = ((summary['total_new_tax'] - summary['total_current_tax']) / 
                                    summary['total_current_tax']) * 100
    return summary

def create_scatter_plot(data, x_col, y_col, ax, title, xlabel, ylabel):
    """Create a scatter plot with trend line, excluding negative/zero incomes"""
    # Exclude rows with non-positive x_col (e.g., median_income)
    data = data[data[x_col] > 0].copy()
    sns.scatterplot(
        data=data,
        x=x_col,
        y=y_col,
        size='parcel_count',
        sizes=(20, 200),
        alpha=0.7,
        ax=ax
    )
    
    ax.axhline(y=0, color='r', linestyle='--')
    
    x = data[x_col].dropna()
    y = data[y_col].dropna()
    mask = ~np.isnan(x) & ~np.isnan(y)
    
    if len(x[mask]) > 1:
        z = np.polyfit(x[mask], y[mask], 1)
        p = np.poly1d(z)
        ax.plot(x[mask], p(x[mask]), "r--")
    
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    ax.set_title(title)

def plot_comparison(data1, data2, x_col, y_col, title_prefix, xlabel):
    """Create side-by-side comparison plots, excluding negative/zero incomes"""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 8))
    
    create_scatter_plot(data1, x_col, y_col, ax1, 
                       f'{title_prefix} - All Properties', xlabel, 'Mean Tax Change (%)')
    create_scatter_plot(data2, x_col, y_col, ax2,
                       f'{title_prefix} - Excluding Vacant Land', xlabel, 'Mean Tax Change (%)')
    
    plt.tight_layout()
    plt.show()

def calculate_correlations(data1, data2):
    """Calculate correlations between variables, excluding negative/zero incomes"""
    correlations = {}
    for df, suffix in [(data1, 'all'), (data2, 'non_vacant')]:
        # Exclude rows with non-positive median_income for correlation
        df_corr = df[df['median_income'] > 0].copy()
        correlations[f'income_mean_{suffix}'] = df_corr[['median_income', 'mean_tax_change_pct']].corr().iloc[0, 1]
        correlations[f'income_median_{suffix}'] = df_corr[['median_income', 'median_tax_change_pct']].corr().iloc[0, 1]
        correlations[f'minority_mean_{suffix}'] = df_corr[['minority_pct', 'mean_tax_change_pct']].corr().iloc[0, 1]
        correlations[f'black_mean_{suffix}'] = df_corr[['black_pct', 'mean_tax_change_pct']].corr().iloc[0, 1]
    return correlations

def weighted_median(values, weights):
    """Compute the weighted median of values with corresponding weights."""
    # Remove NaNs
    mask = (~np.isnan(values)) & (~np.isnan(weights))
    values = np.array(values)[mask]
    weights = np.array(weights)[mask]
    if len(values) == 0:
        return np.nan
    sorter = np.argsort(values)
    values = values[sorter]
    weights = weights[sorter]
    cumsum = np.cumsum(weights)
    cutoff = weights.sum() / 2.0
    return values[np.searchsorted(cumsum, cutoff)]

def create_quintile_summary(df, group_col, value_col):
    """Create summary statistics by quintiles, using mean/weighted-median tax change percent, excluding negative/zero incomes for income-based quintiles"""
    # If grouping by income, exclude non-positive values
    if group_col == 'median_income':
        df = df[df['median_income'] > 0].copy()
    df[f'{group_col}_quintile'] = pd.qcut(df[group_col], 5, 
                                         labels=["Q1 (Lowest)", "Q2", "Q3", "Q4", "Q5 (Highest)"])
    
    def weighted_median_tax_change_pct(subdf):
        # Use parcel_count as weights if available, else weight each row equally
        if 'parcel_count' in subdf.columns:
            weights = subdf['parcel_count']
        else:
            weights = np.ones(len(subdf))
        return weighted_median(subdf['tax_change_pct'], weights)
    
    # For this context, each row is a parcel, so weight by 1 (or by parcel_count if already aggregated)
    summary = df.groupby(f'{group_col}_quintile').apply(
        lambda g: pd.Series({
            'count': g['tax_change'].count(),
            'mean_tax_change_pct': g['tax_change_pct'].mean(),
            'median_tax_change_pct': weighted_median(g['tax_change_pct'], np.ones(len(g))),
            'mean_value': g[value_col].mean()
        })
    ).reset_index()
    
    return summary

# Main execution
gdf_filtered, non_vacant_gdf = filter_data(df)
print(f"Number of rows in gdf_filtered: {len(gdf_filtered)}")
print(f"Number of rows in non_vacant_gdf: {len(non_vacant_gdf)}")

# Calculate block group summaries (all with positive median_income only)
census_block_groups = calculate_block_group_summary(gdf_filtered)
non_vacant_block_summary = calculate_block_group_summary(non_vacant_gdf)

# Create comparison plots (all with positive median_income only)
plot_comparison(census_block_groups, non_vacant_block_summary, 
               'median_income', 'mean_tax_change_pct', 
               'Mean Tax Change vs. Median Income', 
               'Median Income by Census Block Group ($)')

plot_comparison(census_block_groups, non_vacant_block_summary,
               'minority_pct', 'mean_tax_change_pct',
               'Mean Tax Change vs. Minority Percentage',
               'Minority Population Percentage by Census Block Group')

plot_comparison(census_block_groups, non_vacant_block_summary,
               'black_pct', 'mean_tax_change_pct',
               'Mean Tax Change vs. Black Percentage',
               'Black Population Percentage by Census Block Group')

# Calculate and print correlations (all with positive median_income only)
correlations = calculate_correlations(census_block_groups, non_vacant_block_summary)
for key, value in correlations.items():
    print(f"Correlation {key}: {value:.4f}")

# Create and display quintile summaries (income quintiles exclude negative/zero incomes)
income_quintile_summary = create_quintile_summary(gdf_filtered, 'median_income', 'median_income')
non_vacant_income_quintile_summary = create_quintile_summary(non_vacant_gdf, 'median_income', 'median_income')
minority_quintile_summary = create_quintile_summary(gdf_filtered, 'minority_pct', 'minority_pct')
non_vacant_minority_quintile_summary = create_quintile_summary(non_vacant_gdf, 'minority_pct', 'minority_pct')

print("\nTax impact by income quintile (all properties):")
display(income_quintile_summary)
print("\nTax impact by income quintile (excluding vacant land):")
display(non_vacant_income_quintile_summary)
print("\nTax impact by minority percentage quintile (all properties):")
display(minority_quintile_summary)
print("\nTax impact by minority percentage quintile (excluding vacant land):")
display(non_vacant_minority_quintile_summary)


In [None]:
# Plot 1: Median Income Quintiles vs. Mean Tax Change Percent (Census Block Groups)

plt.figure(figsize=(10, 6))
plt.plot(
    income_quintile_summary['median_income_quintile'],
    income_quintile_summary['mean_tax_change_pct'],
    marker='o',
    label='All Properties'
)
plt.plot(
    non_vacant_income_quintile_summary['median_income_quintile'],
    non_vacant_income_quintile_summary['mean_tax_change_pct'],
    marker='o',
    label='Excluding Vacant Land'
)
plt.xlabel('Median Income Quintile')
plt.ylabel('Mean Tax Change ($)')
plt.title('Mean Tax Change by Median Income Quintile (Census Block Groups)')
plt.legend()
# Remove grid
# Ensure x-axis at y=0 if negative values present
ymin = min(
    income_quintile_summary['mean_tax_change_pct'].min(),
    non_vacant_income_quintile_summary['mean_tax_change_pct'].min()
)
ymax = max(
    income_quintile_summary['mean_tax_change_pct'].max(),
    non_vacant_income_quintile_summary['mean_tax_change_pct'].max()
)
if ymin < 0 < ymax:
    plt.axhline(0, color='black', linewidth=1, linestyle='dotted')
plt.tight_layout()
plt.show()

# Plot 2: Minority Percentage Quintiles vs. Mean Tax Change Percent (Census Block Groups)

plt.figure(figsize=(10, 6))
plt.plot(
    minority_quintile_summary['minority_pct_quintile'],
    minority_quintile_summary['mean_tax_change_pct'],
    marker='o',
    label='All Properties'
)
plt.plot(
    non_vacant_minority_quintile_summary['minority_pct_quintile'],
    non_vacant_minority_quintile_summary['mean_tax_change_pct'],
    marker='o',
    label='Excluding Vacant Land'
)
plt.xlabel('Minority Percentage Quintile')
plt.ylabel('Mean Tax Change ($)')
plt.title('Mean Tax Change by Minority Percentage Quintile (Census Block Groups)')
plt.legend()
# Remove grid
# Ensure x-axis at y=0 if negative values present
ymin2 = min(
    minority_quintile_summary['mean_tax_change_pct'].min(),
    non_vacant_minority_quintile_summary['mean_tax_change_pct'].min()
)
ymax2 = max(
    minority_quintile_summary['mean_tax_change_pct'].max(),
    non_vacant_minority_quintile_summary['mean_tax_change_pct'].max()
)
if ymin2 < 0 < ymax2:
    plt.axhline(0, color='black', linewidth=1, linestyle='dotted')
plt.tight_layout()
plt.show()


In [None]:
# Plot: Median Tax Change by Neighborhood Median Income Excluding Vacant Land

plt.figure(figsize=(10, 6))
plt.plot(
    non_vacant_income_quintile_summary['median_income_quintile'],
    non_vacant_income_quintile_summary['median_tax_change_pct'],
    marker='o',
    label='Excluding Vacant Land'
)
plt.xlabel('Median Income Quintile')
plt.ylabel('Median Tax Change ($)')
plt.title('Median Tax Change by Neighborhood Median Income Excluding Vacant Land')
ymin = non_vacant_income_quintile_summary['median_tax_change_pct'].min()
ymax = non_vacant_income_quintile_summary['median_tax_change_pct'].max()
# Ensure 0 is included on the y-axis
plt.ylim(min(ymin, 0), max(ymax, 0) if ymax < 0 else max(ymax, 0, 1.05*ymax))
plt.axhline(0, color='black', linewidth=1, linestyle='dotted')
plt.tight_layout()
plt.show()

# Plot: Median Tax Change by Minority Percentage Quintile Excluding Vacant Land

plt.figure(figsize=(10, 6))
plt.plot(
    non_vacant_minority_quintile_summary['minority_pct_quintile'],
    non_vacant_minority_quintile_summary['median_tax_change_pct'],
    marker='o',
    label='Excluding Vacant Land'
)
plt.xlabel('Minority Percentage Quintile')
plt.ylabel('Median Tax Change ($)')
plt.title('Median Tax Change by Minority Percentage Quintile Excluding Vacant Land')
ymin2 = non_vacant_minority_quintile_summary['median_tax_change_pct'].min()
ymax2 = non_vacant_minority_quintile_summary['median_tax_change_pct'].max()
# Ensure 0 is included on the y-axis
plt.ylim(min(ymin2, 0), max(ymax2, 0) if ymax2 < 0 else max(ymax2, 0, 1.05*ymax2))
plt.axhline(0, color='black', linewidth=1, linestyle='dotted')
plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Set a modern style
sns.set_theme(style="whitegrid", font_scale=1.15)

# Upside Down Bar Graph: Median Tax Change by Neighborhood Median Income Excluding Vacant Land
fig, ax = plt.subplots(figsize=(10, 6))

vals = non_vacant_income_quintile_summary['median_tax_change_pct']
labels = non_vacant_income_quintile_summary['median_income_quintile']

# Color mapping: dark green (more negative) to light green (less negative)
colors = sns.color_palette("Greens", n_colors=len(vals))
# Sort so that the most negative (largest magnitude) is darkest
color_map = [colors[i] for i in np.argsort(np.argsort(-vals))]

# To make bars start at the top and go down, invert the y-axis and plot positive heights
bars = ax.bar(
    labels,
    np.abs(vals),
    color=color_map,
    edgecolor='black',
    width=0.7
)

# Invert the y-axis so bars start at the top and go down
ax.invert_yaxis()

# Remove y-axis
ax.yaxis.set_visible(False)
ax.set_ylabel("")
ax.set_xlabel("")
ax.set_title('Median Tax Change by Neighborhood Median Income (Excl. Vacant Land)', weight='bold', pad=30)

# Remove all spines (including bottom)
sns.despine(left=True, right=True, top=True, bottom=True)

# Add value labels (bold, % sign) centered inside each bar (no line below the bar)
for bar, val in zip(bars, vals):
    ax.annotate(
        f"{val:.1f}%",
        xy=(bar.get_x() + bar.get_width() / 2, bar.get_height() / 2),
        xytext=(0, 0),
        textcoords="offset points",
        ha='center', va='center',
        fontsize=13, color='black', fontweight='bold'
    )

# Move x-tick labels to the top
ax.xaxis.set_ticks_position('top')
ax.xaxis.set_label_position('top')
plt.xticks(fontweight='bold')

# Set y-limits to show bars going down from the top
ymax = np.abs(vals).max() * 1.1
ax.set_ylim(ymax, 0)

plt.tight_layout()
plt.show()

# Upside Down Bar Graph: Median Tax Change by Minority Percentage Quintile Excluding Vacant Land
fig, ax = plt.subplots(figsize=(10, 6))

vals2 = non_vacant_minority_quintile_summary['median_tax_change_pct']
labels2 = non_vacant_minority_quintile_summary['minority_pct_quintile']

colors2 = sns.color_palette("Greens", n_colors=len(vals2))
color_map2 = [colors2[i] for i in np.argsort(np.argsort(-vals2))]

bars2 = ax.bar(
    labels2,
    np.abs(vals2),
    color=color_map2,
    edgecolor='black',
    width=0.7
)

ax.invert_yaxis()
ax.yaxis.set_visible(False)
ax.set_ylabel("")
ax.set_xlabel("")
ax.set_title('Median Tax Change by Minority Percentage Quintile (Excl. Vacant Land)', weight='bold', pad=30)
sns.despine(left=True, right=True, top=True, bottom=True)

for bar, val in zip(bars2, vals2):
    ax.annotate(
        f"{val:.1f}%",
        xy=(bar.get_x() + bar.get_width() / 2, bar.get_height() / 2),
        xytext=(0, 0),
        textcoords="offset points",
        ha='center', va='center',
        fontsize=13, color='black', fontweight='bold'
    )

ax.xaxis.set_ticks_position('top')
ax.xaxis.set_label_position('top')
plt.xticks(fontweight='bold')

ymax2 = np.abs(vals2).max() * 1.1
ax.set_ylim(ymax2, 0)

plt.tight_layout()
plt.show()


In [None]:
# Restrict df to only residential property categories
residential_categories = [
    "Single Family",
    "Small Multi-Family (2-4 units)"
]
df_residential = df[df['PROPERTY_CATEGORY'].isin(residential_categories)].copy()

# --- Repeat the block group summary and quintile analysis for residential only ---

# Filter data for residential (positive income, non-vacant)
gdf_residential_filtered, non_vacant_residential_gdf = filter_data(df_residential)

# Calculate block group summaries (all with positive median_income only, residential only)
census_block_groups_res = calculate_block_group_summary(gdf_residential_filtered)
non_vacant_block_summary_res = calculate_block_group_summary(non_vacant_residential_gdf)

# Create comparison plots (all with positive median_income only, residential only)
plot_comparison(
    census_block_groups_res, non_vacant_block_summary_res, 
    'median_income', 'median_tax_change_pct', 
    'Median Tax Change vs. Median Income (Residential Only)', 
    'Median Income by Census Block Group ($)'
)

plot_comparison(
    census_block_groups_res, non_vacant_block_summary_res,
    'minority_pct', 'median_tax_change_pct',
    'Median Tax Change vs. Minority Percentage (Residential Only)',
    'Minority Population Percentage by Census Block Group'
)

plot_comparison(
    census_block_groups_res, non_vacant_block_summary_res,
    'black_pct', 'median_tax_change_pct',
    'Median Tax Change vs. Black Percentage (Residential Only)',
    'Black Population Percentage by Census Block Group'
)

# Calculate and print correlations (all with positive median_income only, residential only)
correlations_res = calculate_correlations(census_block_groups_res, non_vacant_block_summary_res)
for key, value in correlations_res.items():
    print(f"[Residential] Correlation {key}: {value:.4f}")

# Create and display quintile summaries (income quintiles exclude negative/zero incomes, residential only)
income_quintile_summary_res = create_quintile_summary(gdf_residential_filtered, 'median_income', 'median_income')
non_vacant_income_quintile_summary_res = create_quintile_summary(non_vacant_residential_gdf, 'median_income', 'median_income')
minority_quintile_summary_res = create_quintile_summary(gdf_residential_filtered, 'minority_pct', 'minority_pct')
non_vacant_minority_quintile_summary_res = create_quintile_summary(non_vacant_residential_gdf, 'minority_pct', 'minority_pct')

print("\n[Residential] Tax impact by income quintile (all properties):")
display(income_quintile_summary_res)
print("\n[Residential] Tax impact by income quintile (excluding vacant land):")
display(non_vacant_income_quintile_summary_res)
print("\n[Residential] Tax impact by minority percentage quintile (all properties):")
display(minority_quintile_summary_res)
print("\n[Residential] Tax impact by minority percentage quintile (excluding vacant land):")
display(non_vacant_minority_quintile_summary_res)


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Set a modern style
sns.set_theme(style="whitegrid", font_scale=1.15)

# Upside Down Bar Graph: Median Tax Change by Neighborhood Median Income Excluding Vacant Land (Residential Only)
fig, ax = plt.subplots(figsize=(10, 6))

vals = non_vacant_income_quintile_summary_res['median_tax_change_pct']
labels = non_vacant_income_quintile_summary_res['median_income_quintile']

# Color mapping: dark green (more negative) to light green (less negative)
colors = sns.color_palette("Greens", n_colors=len(vals))
# Sort so that the most negative (largest magnitude) is darkest
color_map = [colors[i] for i in np.argsort(np.argsort(-vals))]

# To make bars start at the top and go down, invert the y-axis and plot positive heights
bars = ax.bar(
    labels,
    np.abs(vals),
    color=color_map,
    edgecolor='black',
    width=0.7
)

# Invert the y-axis so bars start at the top and go down
ax.invert_yaxis()

# Remove y-axis
ax.yaxis.set_visible(False)
ax.set_ylabel("")
ax.set_xlabel("")
ax.set_title('Median Tax Change by Neighborhood Median Income (Excl. Vacant Land, Residential Only)', weight='bold', pad=30)

# Remove all spines (including bottom)
sns.despine(left=True, right=True, top=True, bottom=True)

# Add value labels (bold, % sign) centered inside each bar (no line below the bar)
for bar, val in zip(bars, vals):
    ax.annotate(
        f"{val:.1f}%",
        xy=(bar.get_x() + bar.get_width() / 2, bar.get_height() / 2),
        xytext=(0, 0),
        textcoords="offset points",
        ha='center', va='center',
        fontsize=13, color='black', fontweight='bold'
    )

# Move x-tick labels to the top
ax.xaxis.set_ticks_position('top')
ax.xaxis.set_label_position('top')
plt.xticks(fontweight='bold')

# Set y-limits to show bars going down from the top
ymax = np.abs(vals).max() * 1.1
ax.set_ylim(ymax, 0)

plt.tight_layout()
plt.show()

# Upside Down Bar Graph: Median Tax Change by Minority Percentage Quintile Excluding Vacant Land (Residential Only)
fig, ax = plt.subplots(figsize=(10, 6))

vals2 = non_vacant_minority_quintile_summary_res['median_tax_change_pct']
labels2 = non_vacant_minority_quintile_summary_res['minority_pct_quintile']

colors2 = sns.color_palette("Greens", n_colors=len(vals2))
color_map2 = [colors2[i] for i in np.argsort(np.argsort(-vals2))]

bars2 = ax.bar(
    labels2,
    np.abs(vals2),
    color=color_map2,
    edgecolor='black',
    width=0.7
)

ax.invert_yaxis()
ax.yaxis.set_visible(False)
ax.set_ylabel("")
ax.set_xlabel("")
ax.set_title('Median Tax Change by Minority Percentage Quintile (Excl. Vacant Land, Residential Only)', weight='bold', pad=30)
sns.despine(left=True, right=True, top=True, bottom=True)

for bar, val in zip(bars2, vals2):
    ax.annotate(
        f"{val:.1f}%",
        xy=(bar.get_x() + bar.get_width() / 2, bar.get_height() / 2),
        xytext=(0, 0),
        textcoords="offset points",
        ha='center', va='center',
        fontsize=13, color='black', fontweight='bold'
    )

ax.xaxis.set_ticks_position('top')
ax.xaxis.set_label_position('top')
plt.xticks(fontweight='bold')

ymax2 = np.abs(vals2).max() * 1.1
ax.set_ylim(ymax2, 0)

plt.tight_layout()
plt.show()


In [None]:
# Import our new policy analysis functions
import sys
sys.path.append('..')  # Add parent directory to path
from policy_analysis import (
    analyze_vacant_land, 
    analyze_parking_lots, 
    calculate_development_tax_penalty,
    print_vacant_land_summary,
    print_parking_analysis_summary, 
    print_development_penalty_summary
)


In [None]:
## Policy Analysis: Vacant Land Speculation

# Let's see if we have owner information in our dataset
print("Columns that might contain owner information:")
owner_cols = [col for col in df.columns if 'owner' in col.lower() or 'name' in col.lower()]
print(owner_cols)

# Filter out fully exempt properties for analysis
df_non_exempt = df[df['full_exmp'] == 0].copy()
print(f"Excluding {(df['full_exmp'] == 1).sum():,} fully exempt properties from analysis")
print(f"Analysis dataset: {len(df_non_exempt):,} properties (down from {len(df):,})")

# Run vacant land analysis excluding fully exempt land
vacant_analysis = analyze_vacant_land(
    df=df_non_exempt,
    land_value_col='land_value',
    property_type_col='prop_use_desc', 
    neighborhood_col='nbhd_name',
    vacant_identifier='Vacant Land',
    improvement_value_col='improvement_value',
    exemption_col='exmp_amt',
    exemption_flag_col='full_exmp'
)

# Print formatted summary
print_vacant_land_summary(vacant_analysis)


In [None]:
## Policy Analysis: Parking Lot Efficiency

# Run parking lot analysis excluding fully exempt land
parking_analysis = analyze_parking_lots(
    df=df_non_exempt,
    land_value_col='land_value',
    improvement_value_col='improvement_value',
    property_type_col='prop_use_desc',
    parking_identifier='Trans - Parking',
    min_land_value_threshold=50000,  # Focus on land worth $50k+
    max_improvement_ratio=0.1,  # Improvement value <= 10% of land value
    exemption_col='exmp_amt',
    exemption_flag_col='full_exmp'
)

# Print formatted summary
print_parking_analysis_summary(parking_analysis)

# Show detailed breakdown by land value tier
if 'by_land_value_tier' in parking_analysis:
    print("\nDetailed breakdown by land value tier:")
    print(parking_analysis['by_land_value_tier'].to_string())


In [None]:
# Import new property analysis functions
from policy_analysis import (
    analyze_property_values_by_category, 
    print_property_values_summary
)


In [None]:
## Property Value Analysis by Category

# Analyze property values by category including exemptions
property_values = analyze_property_values_by_category(
    df=df,
    category_col='PROPERTY_CATEGORY', 
    land_value_col='land_value',
    improvement_value_col='improvement_value',
    exemption_col='exmp_amt',
    exemption_flag_col='full_exmp'
)

# Print formatted summary
print_property_values_summary(property_values, "Spokane Property Values by Category")


In [None]:
## Policy Analysis: Development Tax Penalty

# Calculate the perverse incentive of building taxes
# Using the current building millage rate from the split-rate system we modeled
building_millage = 1.1238 / 1000  # Convert from per-thousand to decimal

penalty_analysis = calculate_development_tax_penalty(
    df=df,
    improvement_value_col='improvement_value',
    millage_rate=building_millage,  # Current building tax rate
    years=30,  # 30-year analysis horizon
    discount_rate=0.05,  # 5% discount rate
    typical_construction_cost_per_sqft=200,  # Spokane construction costs
    typical_unit_size_sqft=1200  # Typical housing unit size
)

# Print formatted summary
print_development_penalty_summary(penalty_analysis)

# Compare with LVT scenario (zero building tax)
print("\n" + "="*60)
print("COMPARISON: LVT ELIMINATES DEVELOPMENT PENALTY")
print("="*60)

lvt_penalty_analysis = calculate_development_tax_penalty(
    df=df,
    improvement_value_col='improvement_value',
    millage_rate=0.0,  # No building tax under pure LVT
    years=30,
    discount_rate=0.05,
    typical_construction_cost_per_sqft=200,
    typical_unit_size_sqft=1200
)

print(f"Current system development penalty: {penalty_analysis['equivalent_lost_units']:,.0f} housing units")
print(f"LVT system development penalty: {lvt_penalty_analysis['equivalent_lost_units']:,.0f} housing units")
print(f"Net housing units enabled by LVT: {penalty_analysis['equivalent_lost_units'] - lvt_penalty_analysis['equivalent_lost_units']:,.0f}")

# Show different scenarios with varying building tax rates
print("\n" + "="*60)
print("SCENARIO ANALYSIS: VARYING BUILDING TAX RATES")
print("="*60)

tax_rates = [0.005, 0.01, 0.015, 0.02, 0.025]  # 0.5% to 2.5%
scenarios = []

for rate in tax_rates:
    scenario = calculate_development_tax_penalty(
        df=df,
        improvement_value_col='improvement_value',
        millage_rate=rate,
        years=30,
        discount_rate=0.05,
        typical_construction_cost_per_sqft=200,
        typical_unit_size_sqft=1200
    )
    scenarios.append({
        'Tax Rate (%)': rate * 100,
        'NPV as % of Construction': scenario['npv_as_pct_of_construction_cost'],
        'Equivalent Lost Units': scenario['equivalent_lost_units'],
        'Lost Units as % of Stock': scenario['units_lost_percentage']
    })

scenario_df = pd.DataFrame(scenarios)
print(scenario_df.round(1).to_string(index=False))


In [None]:
# Land value by improvement share categories (non-exempt basis)
from policy_analysis import analyze_land_by_improvement_share

# Run for all parcels
share_summary = analyze_land_by_improvement_share(
    df=df,
    land_value_col='land_value',
    improvement_value_col='improvement_value',
    exemption_col='exmp_amt',
    exemption_flag_col='full_exmp'
)

print("Adjusted total land value (non-exempt): ${:,.0f}".format(share_summary['total_adjusted_land_value']))
for row in share_summary['categories']:
    print("- {}: {:,} parcels | Adjusted land: ${:,.0f} ({:.1f}%)".format(
        row['category'], row['parcel_count'], row['adjusted_land_value'], row['share_of_total_land_value_pct']
    ))

# Now exclude vacant land and parking lots using PROPERTY_CATEGORY
exclude_categories = ["Vacant Land", "Transportation - Parking"]
df_non_vacant = df[~df["PROPERTY_CATEGORY"].isin(exclude_categories)]

share_summary_non_vacant = analyze_land_by_improvement_share(
    df=df_non_vacant,
    land_value_col='land_value',
    improvement_value_col='improvement_value',
    exemption_col='exmp_amt',
    exemption_flag_col='full_exmp'
)

print("\n(Excluding Vacant Land and Transportation - Parking)")
print("Adjusted total land value (non-exempt, non-vacant): ${:,.0f}".format(share_summary_non_vacant['total_adjusted_land_value']))
for row in share_summary_non_vacant['categories']:
    print("- {}: {:,} parcels | Adjusted land: ${:,.0f} ({:.1f}%)".format(
        row['category'], row['parcel_count'], row['adjusted_land_value'], row['share_of_total_land_value_pct']
    ))


In [None]:
print(output_summary)

In [None]:
import pandas as pd

# Calculate square footage for each parcel
# If 'Shape__Area' is in square meters, convert to square feet (1 sq meter = 10.7639 sq ft)
if 'Shape__Area' in df.columns:
    df['sqft'] = df['Shape__Area'] * 10.7639
else:
    # If not available, try to calculate from geometry (assuming geometry is projected in meters)
    df['sqft'] = df.geometry.area * 10.7639

# Avoid division by zero
df['sqft'] = df['sqft'].replace(0, pd.NA)

# Calculate improvement value (assessed minus land, clipped at zero)
df['improvement_value'] = (df['assessed_amt'] - df['land_value']).clip(lower=0)

# Calculate per square foot columns
df['land_value_per_sqft'] = df['land_value'] / df['sqft']
df['assessed_amt_per_sqft'] = df['assessed_amt'] / df['sqft']
df['improvement_value_per_sqft'] = df['improvement_value'] / df['sqft']
df['tax_change_per_sqft'] = df['tax_change'] / df['sqft']
df['new_tax_per_sqft'] = df['new_tax'] / df['sqft']
df['current_tax_per_sqft'] = df['current_tax'] / df['sqft']

# Ensure geometry is attached and select columns to save
columns_to_save = [
    'tax_change_pct', 'tax_change', 'new_tax', 'current_tax', 'PID_NUM', 
    'PROPERTY_CATEGORY', 'nbhd_code', 'prop_use_desc', 'site_address', 
    'assessed_amt', 'land_value', 'improvement_value',  'geometry',
    'sqft', 'land_value_per_sqft', 'assessed_amt_per_sqft', 'improvement_value_per_sqft',
    'tax_change_per_sqft', 'new_tax_per_sqft', 'current_tax_per_sqft'
]

# Save the DataFrame with geometry as a parquet file (restricted columns)
df[columns_to_save].to_parquet('spokane_full_geom.parquet', index=False)

# Also save the full DataFrame with geometry as a parquet file (all columns)
df.to_parquet('spokane_full_geom_fullcols.parquet', index=False)


In [None]:
# Check if all geometries are Polygon or MultiPolygon and print the result
is_all_polygons = df.geometry.geom_type.isin(['Polygon', 'MultiPolygon']).all()
print(f"All geometries are Polygon or MultiPolygon: {is_all_polygons}")
if not is_all_polygons:
    # Try to convert to polygons if possible
    df = df[df.geometry.type.isin(['Polygon', 'MultiPolygon'])].copy()


In [None]:

# Define "underdeveloped" as: non-exempt and improvement_value / assessed_amt < 0.5
# "Vacant" properties: PROPERTY_CATEGORY contains "vacant" (case-insensitive)
# "Non-exempt": new_exemption is null/None/NaN or 0

# 1. Vacant properties
vacant_mask = df['PROPERTY_CATEGORY'].str.lower().str.contains('vacant', na=False)

# 3. Underdeveloped: improvement_value / assessed_amt < 0.5
underdeveloped_mask = (df['improvement_value'] / df['assessed_amt'] < 0.75)

# Report the number of parcels that are vacant and number that are underdeveloped
n_vacant = vacant_mask.sum()
n_underdeveloped = underdeveloped_mask.sum()
print(f"Number of vacant parcels: {n_vacant}")
print(f"Number of underdeveloped parcels: {n_underdeveloped}")

# Combine: vacant OR underdeveloped
target_mask = vacant_mask | underdeveloped_mask

# Calculate for target group
target_df = df[target_mask]
other_df = df[~target_mask]

# Results for target group
total_tax_change_target = target_df['tax_change'].sum()
median_pct_target = target_df['tax_change_pct'].median()
n_target = len(target_df)
n_target_increase = (target_df['tax_change'] > 0).sum()
n_target_decrease = (target_df['tax_change'] < 0).sum()
pct_target_increase = 100 * n_target_increase / n_target if n_target > 0 else float('nan')
pct_target_decrease = 100 * n_target_decrease / n_target if n_target > 0 else float('nan')

# Results for all other parcels
total_tax_change_other = other_df['tax_change'].sum()
median_pct_other = other_df['tax_change_pct'].median()
n_other = len(other_df)
n_other_increase = (other_df['tax_change'] > 0).sum()
n_other_decrease = (other_df['tax_change'] < 0).sum()
pct_other_increase = 100 * n_other_increase / n_other if n_other > 0 else float('nan')
pct_other_decrease = 100 * n_other_decrease / n_other if n_other > 0 else float('nan')

print("Vacant or Underdeveloped Parcels:")
print(f"  Total Tax Change ($): {total_tax_change_target:,.2f}")
print(f"  Median Tax Change (%): {median_pct_target:.2f}")
print(f"  Parcels with Tax Increase: {pct_target_increase:.1f}%")
print(f"  Parcels with Tax Decrease: {pct_target_decrease:.1f}%")

print("\nAll Other Parcels:")
print(f"  Total Tax Change ($): {total_tax_change_other:,.2f}")
print(f"  Median Tax Change (%): {median_pct_other:.2f}")
print(f"  Parcels with Tax Increase: {pct_other_increase:.1f}%")
print(f"  Parcels with Tax Decrease: {pct_other_decrease:.1f}%")


In [None]:
with pd.option_context('display.max_rows', None):
    display(df[df['PID_NUM'].isin(['35081.2213', '35081.2312'])])



In [None]:

# --- Export spokane.parquet with same columns as Syracuse ---
import os
import numpy as np

# Create export dataframe
export_gdf = df.copy()

# Create exemption flag as binary (1/0) for fully exempt properties
export_gdf['exemption_flag'] = (export_gdf['full_exmp'] == True).astype(int)

# Map property category (use existing PROPERTY_CATEGORY column)
export_gdf['property_land_use_category'] = export_gdf['PROPERTY_CATEGORY']

# Create refined property/land use category with three options: Vacant, Parking Lot, Underdeveloped
def categorize_property_refined(row):
    """Categorize properties into refined categories"""
    category = row['PROPERTY_CATEGORY']
    if 'Vacant' in str(category):
        return 'Vacant'
    elif 'Parking' in str(category):
        return 'Parking Lot'
    elif row['improvement_value'] < 0.5 * (row['land_value'] + row['improvement_value']):
        return 'Underdeveloped'
    else:
        return None  # null for all other categories

export_gdf['property_land_use_refined'] = export_gdf.apply(categorize_property_refined, axis=1)

# Calculate area in square feet from geometry (Shape__Area is already in square feet or convert from sq meters)
if 'Shape__Area' in export_gdf.columns:
    # Assume Shape__Area is in square meters, convert to square feet
    export_gdf['area_sqft'] = export_gdf['Shape__Area'] * 10.7639
else:
    # Fallback: calculate from geometry
    export_gdf['area_sqft'] = export_gdf.geometry.area * 10.7639

# Calculate current tax per square foot
export_gdf['current_tax_per_sqft'] = np.where(
    export_gdf['area_sqft'] > 0,
    export_gdf['current_tax'] / export_gdf['area_sqft'],
    0
)

# Calculate land value per square foot
export_gdf['land_value_per_sqft'] = np.where(
    export_gdf['area_sqft'] > 0,
    export_gdf['land_value'] / export_gdf['area_sqft'],
    0
)

# Calculate improvement value per square foot
export_gdf['improvement_value_per_sqft'] = np.where(
    export_gdf['area_sqft'] > 0,
    export_gdf['improvement_value'] / export_gdf['area_sqft'],
    0
)

# Select columns for export, matching Syracuse structure
columns_to_export = [
    'geometry',
    'exemption_flag',
    'property_land_use_category',
    'property_land_use_refined',
    'current_tax',
    'current_tax_per_sqft',
    'land_value',
    'land_value_per_sqft',
    'improvement_value',
    'improvement_value_per_sqft'
]

# Rename columns to match Syracuse naming convention
export_final = export_gdf[columns_to_export].rename(columns={
    'land_value': 'current_full_land_value'
})

# Ensure geometry is valid
export_final['geometry'] = export_final['geometry'].apply(lambda geom: geom if geom is None or geom.is_valid else geom.buffer(0))

# Ensure output is in WGS84 (EPSG:4326) before saving
if export_final.crs is None or export_final.crs.to_epsg() != 4326:
    export_final = export_final.to_crs("EPSG:4326")
    print("Converted to EPSG:4326")

# Save as Parquet
output_filename = os.path.expanduser("~/Downloads/spokane.parquet")
export_final.to_parquet(output_filename, index=False)

print(f"\n✅ Saved spokane.parquet to Downloads")
print("Saved columns:", export_final.columns.tolist())
print("Property refined category counts:")
print(export_final['property_land_use_refined'].value_counts(dropna=False))
print("Property category counts:")
print(export_final['property_land_use_category'].value_counts().head(10))

# Display first few rows
print(f"\n👀 First 5 rows of exported data:")
display(export_final.head())

In [None]:
import requests
from bs4 import BeautifulSoup

def load_owner_info(pid):
    """
    Loads the owner information for a given PID from the Spokane County property information site.
    Returns a dictionary with keys: owner_name, owner_address.
    """
    url = f"https://cp.spokanecounty.org/SCOUT/propertyinformation/Summary.aspx?PID={pid}"
    resp = requests.get(url)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")
    table = soup.find("table", id="MainContent_OwnerName_dlOwner")
    if table is None:
        raise ValueError("Could not find owner info table in the page.")

    # Find the owner name and address spans
    owner_name_span = table.find("span", id="MainContent_OwnerName_dlOwner_txtNameLabel_0")
    owner_address_span = table.find("span", id="MainContent_OwnerName_dlOwner_addressLabel_0")

    owner_name = owner_name_span.get_text(strip=True) if owner_name_span else None
    owner_address = owner_address_span.get_text(strip=True) if owner_address_span else None

    return {
        "owner_name": owner_name,
        "owner_address": owner_address
    }

# Test the function with the given PID
test_pid = "35182.4601"
owner_info = load_owner_info(test_pid)
print(owner_info)


In [None]:
if scrape_data == 1:
    import pandas as pd
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from tqdm import tqdm
    import os
    import time

    # Assume df is already loaded in the notebook and contains a 'PID_NUM' column
    # Remove any rows with missing or null PID_NUM
    pids = df['PID_NUM'].dropna().unique().tolist()

    results = []
    output_path = os.path.join("data", "spokane", "owner_info.csv")

    def safe_load_owner_info(pid):
        try:
            info = load_owner_info(pid)
            return {"pid": pid, "owner_name": info["owner_name"], "owner_address": info["owner_address"]}
        except Exception as e:
            return {"pid": pid, "owner_name": None, "owner_address": None, "error": str(e)}

    max_workers = min(32, (os.cpu_count() or 1) * 5)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_pid = {executor.submit(safe_load_owner_info, pid): pid for pid in pids}
        for future in tqdm(as_completed(future_to_pid), total=len(future_to_pid), desc="Fetching owner info"):
            result = future.result()
            results.append(result)
            # Optional: sleep a tiny bit to be nice to the server
            time.sleep(0.01)

    owner_info_df = pd.DataFrame(results)
    owner_info_df.to_csv(output_path, index=False)
    print(f"Saved owner info for {len(owner_info_df)} parcels to {output_path}")


In [None]:
# If you have run the owner info scraping above, load the resulting CSV and join it to your main dataframe.
import pandas as pd
import os

owner_info_path = os.path.join("data", "spokane", "owner_info.csv")
if os.path.exists(owner_info_path):
    owner_info_df = pd.read_csv(owner_info_path)
    # If 'df' is your main dataframe, join on PID_NUM (or 'pid' if that's the column in owner_info_df)
    # Make sure both columns are string type for a clean join
    df['PID_NUM'] = df['PID_NUM'].astype(str)
    owner_info_df['pid'] = owner_info_df['pid'].astype(str)
    df = df.merge(owner_info_df, left_on='PID_NUM', right_on='pid', how='left')
    print("Owner info joined. Example rows:")
    display(df[['PID_NUM', 'owner_name', 'owner_address']].head())
else:
    print(f"Owner info file not found at {owner_info_path}. Run the scraping cell above first.")


In [None]:
num_missing_owner_name = owner_info_df['owner_name'].isna().sum()
num_missing_owner_address = owner_info_df['owner_address'].isna().sum()
print(f"Number of missing owner_name: {num_missing_owner_name}")
print(f"Number of missing owner_address: {num_missing_owner_address}")


In [None]:
# For most addresses (3 commas), split into: street, city, state, zip
def split_address(row):
    addr = str(row['owner_address'])
    num_commas = addr.count(',')
    if num_commas == 3:
        # Standard: street, city, state, zip
        parts = [p.strip() for p in addr.split(',')]
        if len(parts) == 4:
            return pd.Series({
                'street': parts[0],
                'city': parts[1],
                'state': parts[2],
                'zip': parts[3]
            })
        else:
            return pd.Series({'street': None, 'city': None, 'state': None, 'zip': None})
    elif num_commas == 4:
        # Extra comma in street address: ignore the first comma, then split the rest
        first_comma = addr.find(',')
        rest = addr[first_comma+1:]
        # Now split rest into 3 parts
        parts = [addr[:first_comma].strip()] + [p.strip() for p in rest.split(',', 3)]
        if len(parts) == 4:
            return pd.Series({
                'street': parts[0] + ', ' + parts[1],  # combine street parts
                'city': parts[2],
                'state': parts[3],
                'zip': parts[4] if len(parts) > 4 else None
            })
        else:
            # Defensive fallback: try to split into 5 and combine first two as street
            parts = [p.strip() for p in addr.split(',', 4)]
            if len(parts) == 5:
                return pd.Series({
                    'street': parts[0] + ', ' + parts[1],
                    'city': parts[2],
                    'state': parts[3],
                    'zip': parts[4]
                })
            else:
                return pd.Series({'street': None, 'city': None, 'state': None, 'zip': None})
    else:
        # For 0 commas or other cases, return None
        return pd.Series({'street': None, 'city': None, 'state': None, 'zip': None})

# Apply the function to all rows
address_split = owner_info_df.apply(split_address, axis=1)
owner_info_df = pd.concat([owner_info_df, address_split], axis=1)

# Print the count of states
print("State value counts:")
print(owner_info_df['state'].value_counts(dropna=False))





In [None]:
# Fixed Chart: Calculate actual percentage values for the income quintile chart

# First, let's calculate the actual median percentage change for each quintile
def calculate_median_percentage_by_quintile(df, group_col, value_col):
    """Calculate median percentage change by quintiles"""
    # If grouping by income, exclude non-positive values
    if group_col == 'median_income':
        df = df[df['median_income'] > 0].copy()
    
    df[f'{group_col}_quintile'] = pd.qcut(df[group_col], 5, 
                                         labels=["Q1 (Lowest)", "Q2", "Q3", "Q4", "Q5 (Highest)"])
    
    summary = df.groupby(f'{group_col}_quintile').agg(
        count=('tax_change', 'count'),
        mean_tax_change=('tax_change', 'mean'),
        median_tax_change=('tax_change', 'median'),
        median_tax_change_pct=('tax_change_pct', 'median'),  # This is the key addition
        mean_value=(value_col, 'mean')
    ).reset_index()
    
    return summary

# Recalculate the income quintile summary with percentage data
non_vacant_income_quintile_summary_pct = calculate_median_percentage_by_quintile(
    non_vacant_gdf, 'median_income', 'median_income'
)

print("Corrected Income Quintile Summary (with percentages):")
display(non_vacant_income_quintile_summary_pct)

# CORRECTED Chart: Median Tax Change PERCENTAGE by Income Quintile
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Set a modern style
sns.set_theme(style="whitegrid", font_scale=1.15)

# Corrected Bar Graph: Median Tax Change PERCENTAGE by Neighborhood Median Income Excluding Vacant Land
fig, ax = plt.subplots(figsize=(10, 6))

# Use the ACTUAL PERCENTAGE values, not dollar amounts with % signs
vals = non_vacant_income_quintile_summary_pct['median_tax_change_pct']
labels = non_vacant_income_quintile_summary_pct['median_income_quintile']

# Color mapping: dark green (more negative) to light green (less negative)
colors = sns.color_palette("Greens", n_colors=len(vals))
# Sort so that the most negative (largest magnitude) is darkest
color_map = [colors[i] for i in np.argsort(np.argsort(-vals))]

# To make bars start at the top and go down, invert the y-axis and plot positive heights
bars = ax.bar(
    labels,
    np.abs(vals),
    color=color_map,
    edgecolor='black',
    width=0.7
)

# Invert the y-axis so bars start at the top and go down
ax.invert_yaxis()

# Remove y-axis
ax.yaxis.set_visible(False)
ax.set_ylabel("")
ax.set_xlabel("")
ax.set_title('CORRECTED: Median Tax Change PERCENTAGE by Income Quintile (Excl. Vacant Land)', 
             weight='bold', pad=30)

# Remove all spines (including bottom)
sns.despine(left=True, right=True, top=True, bottom=True)

# Add value labels (bold, % sign) centered inside each bar - NOW WITH ACTUAL PERCENTAGES
for bar, val in zip(bars, vals):
    ax.annotate(
        f"{val:.1f}%",
        xy=(bar.get_x() + bar.get_width() / 2, bar.get_height() / 2),
        xytext=(0, 0),
        textcoords="offset points",
        ha='center', va='center',
        fontsize=13, color='black', fontweight='bold'
    )

# Move x-tick labels to the top
ax.xaxis.set_ticks_position('top')
ax.xaxis.set_label_position('top')
plt.xticks(fontweight='bold')

# Set y-limits to show bars going down from the top
ymax = np.abs(vals).max() * 1.1
ax.set_ylim(ymax, 0)

plt.tight_layout()
plt.show()

print("\\nExplanation of the fix:")
print("- Previous chart showed DOLLAR amounts ($) with percentage signs (%) - mathematically impossible")
print("- This corrected chart shows actual PERCENTAGE changes")
print("- Values like -109% would mean impossible negative taxes; real percentages are much smaller")
print("- The tax change percentages are calculated as: (new_tax - current_tax) / current_tax * 100")


In [None]:

# Print the unique cities
print("\nUnique cities:")
print(owner_info_df['city'].value_counts(dropna=False))

In [None]:
print(owner_info_df.columns)
