# Q1 Analysis Builder
Creates a gold table at `data/gold/zillow/q1_analysis.parquet` with columns:
`[Date, Region, RegionType, MedianSalePrice, MedianListingPrice, SaleToListRatio, PctSoldAboveList, PctSoldBelowList, DaysOnMarket]`.

In [13]:
from pathlib import Path
import polars as pl

def log(msg: str) -> None:
    print(f'[Q1] {msg}')

def find_repo_root(start: Path | None = None) -> Path:
    start = (start or Path.cwd()).resolve()
    for p in (start, *start.parents):
        if (p / 'ETL').is_dir() and (p / 'scripts').is_dir():
            return p
    return start  # fallback

ROOT = find_repo_root()
SILVER = ROOT / 'data' / 'silver' / 'zillow'
GOLD = ROOT / 'data' / 'gold' / 'zillow'

sales_path = SILVER / 'sales' / 'wide.parquet'
listings_path = SILVER / 'for_sale_listings' / 'wide.parquet'
dom_path = SILVER / 'days_on_market' / 'wide.parquet'

for p in (sales_path, listings_path, dom_path):
    if not p.exists():
        raise FileNotFoundError(f'Missing required silver file: {p}')

log('Input silver files located.')

[Q1] Input silver files located.


In [14]:
# Inspect schemas and confirm required columns exist
sales_cols = list(pl.read_parquet(str(sales_path), n_rows=0).schema.keys())
listings_cols = list(pl.read_parquet(str(listings_path), n_rows=0).schema.keys())
dom_cols = list(pl.read_parquet(str(dom_path), n_rows=0).schema.keys())

print('sales columns:', sales_cols)
print('listings columns:', listings_cols)
print('dom columns:', dom_cols)

needed_sales = ['State', 'date', 'Median Sale Price', 'Median Sale to List Ratio', '% Sold Above List', '% Sold Below List']
needed_listings = ['State', 'date', 'Median Listing Price']
needed_dom = ['State', 'date', 'Median Days on Pending']

missing = {
    'sales': [c for c in needed_sales if c not in sales_cols],
    'listings': [c for c in needed_listings if c not in listings_cols],
    'days_on_market': [c for c in needed_dom if c not in dom_cols],
}
missing


sales columns: ['Region ID', 'Size Rank', 'Region', 'Region Type', 'State', 'Home Type', 'Date', 'Mean Sale to List Ratio (Smoothed)', 'Median Sale to List Ratio', 'Median Sale Price', 'Median Sale Price (Smoothed) (Seasonally Adjusted)', 'Median Sale Price (Smoothed)', 'Median Sale to List Ratio (Smoothed)', '% Sold Below List', '% Sold Below List (Smoothed)', '% Sold Above List', '% Sold Above List (Smoothed)', 'Mean Sale to List Ratio', 'date', 'Region Type Name', 'Home Type Name', 'year', 'month', 'quarter']
listings columns: ['Region ID', 'Size Rank', 'Region', 'Region Type', 'State', 'Home Type', 'Date', 'Median Listing Price', 'Median Listing Price (Smoothed)', 'New Listings', 'New Listings (Smoothed)', 'New Pending (Smoothed)', 'New Pending', 'date', 'Region Type Name', 'Home Type Name', 'year', 'month', 'quarter']
dom columns: ['Region ID', 'Size Rank', 'Region', 'Region Type', 'State', 'Home Type', 'Date', 'Mean Listings Price Cut Amount (Smoothed)', 'Percent Listings Price C

{'sales': [], 'listings': [], 'days_on_market': []}

In [15]:
# Build per-dataset state-level tables and write debug outputs
DEBUG_DIR = GOLD / 'debug'
DEBUG_DIR.mkdir(parents=True, exist_ok=True)

# Normalize date to Date for key consistency
to_date = pl.col('date').cast(pl.Date).alias('Date')

df_sales_state = (
    pl.scan_parquet(str(sales_path))
    .select(
        'State',
        'date',
        pl.col('Median Sale Price').alias('MedianSalePrice'),
        pl.col('Median Sale to List Ratio').alias('SaleToListRatio'),
        pl.col('% Sold Above List').alias('PctSoldAboveList'),
        pl.col('% Sold Below List').alias('PctSoldBelowList'),
    )
    .with_columns(to_date)
    .group_by(['State', 'Date'])
    .agg([
        pl.col('MedianSalePrice').median().alias('MedianSalePrice'),
        pl.col('SaleToListRatio').median().alias('SaleToListRatio'),
        pl.col('PctSoldAboveList').mean().alias('PctSoldAboveList'),
        pl.col('PctSoldBelowList').mean().alias('PctSoldBelowList'),
    ])
).collect()
df_sales_state.write_parquet(str(DEBUG_DIR / 'sales_state.parquet'))
df_sales_state.shape, df_sales_state.head()


((34776, 6),
 shape: (5, 6)
 ┌───────┬────────────┬─────────────────┬─────────────────┬──────────────────┬──────────────────┐
 │ State ┆ Date       ┆ MedianSalePrice ┆ SaleToListRatio ┆ PctSoldAboveList ┆ PctSoldBelowList │
 │ ---   ┆ ---        ┆ ---             ┆ ---             ┆ ---              ┆ ---              │
 │ str   ┆ date       ┆ f32             ┆ f32             ┆ f32              ┆ f32              │
 ╞═══════╪════════════╪═════════════════╪═════════════════╪══════════════════╪══════════════════╡
 │ IA    ┆ 2010-11-13 ┆ 118500.0        ┆ null            ┆ null             ┆ null             │
 │ NH    ┆ 2023-09-09 ┆ null            ┆ null            ┆ null             ┆ null             │
 │ DE    ┆ 2015-03-14 ┆ 161137.5        ┆ null            ┆ null             ┆ null             │
 │ CT    ┆ 2011-05-07 ┆ 217625.0        ┆ null            ┆ null             ┆ null             │
 │ PA    ┆ 2018-11-03 ┆ 166150.0        ┆ 0.982659        ┆ 0.173631         ┆ 0.598239   

In [16]:
df_listings_state = (
    pl.scan_parquet(str(listings_path))
    .select(
        'State',
        'date',
        pl.col('Median Listing Price').alias('MedianListingPrice'),
    )
    .with_columns(to_date)
    .group_by(['State', 'Date'])
    .agg(pl.col('MedianListingPrice').median().alias('MedianListingPrice'))
).collect()
df_listings_state.write_parquet(str(DEBUG_DIR / 'listings_state.parquet'))
df_listings_state.shape, df_listings_state.head()


((16012, 3),
 shape: (5, 3)
 ┌───────┬────────────┬────────────────────┐
 │ State ┆ Date       ┆ MedianListingPrice │
 │ ---   ┆ ---        ┆ ---                │
 │ str   ┆ date       ┆ f32                │
 ╞═══════╪════════════╪════════════════════╡
 │ NC    ┆ 2018-02-03 ┆ 199900.0           │
 │ LA    ┆ 2019-11-23 ┆ 198250.0           │
 │ PA    ┆ 2018-07-21 ┆ 163950.0           │
 │ WI    ┆ 2019-07-27 ┆ 219000.0           │
 │ SC    ┆ 2020-08-22 ┆ 248281.0           │
 └───────┴────────────┴────────────────────┘)

In [17]:
df_dom_state = (
    pl.scan_parquet(str(dom_path))
    .select(
        'State',
        'date',
        pl.col('Median Days on Pending').alias('DaysOnMarket'),
    )
    .with_columns(to_date)
    .group_by(['State', 'Date'])
    .agg(pl.col('DaysOnMarket').median().alias('DaysOnMarket'))
).collect()
df_dom_state.write_parquet(str(DEBUG_DIR / 'dom_state.parquet'))
df_dom_state.shape, df_dom_state.head()


((16217, 3),
 shape: (5, 3)
 ┌───────┬────────────┬──────────────┐
 │ State ┆ Date       ┆ DaysOnMarket │
 │ ---   ┆ ---        ┆ ---          │
 │ str   ┆ date       ┆ f32          │
 ╞═══════╪════════════╪══════════════╡
 │ HI    ┆ 2021-08-28 ┆ 8.0          │
 │ NE    ┆ 2023-07-08 ┆ 7.5          │
 │ NC    ┆ 2023-05-13 ┆ 5.5          │
 │ VT    ┆ 2023-09-02 ┆ null         │
 │ UT    ┆ 2022-08-20 ┆ 25.0         │
 └───────┴────────────┴──────────────┘)

In [18]:
# Merge the debug tables (inner join on State+Date)
lf = (
    df_sales_state.lazy()
    .join(df_listings_state.lazy(), on=['State', 'Date'], how='inner')
    .join(df_dom_state.lazy(), on=['State', 'Date'], how='inner')
    .with_columns([
        pl.col('State').alias('Region'),
        pl.lit('State').alias('RegionType'),
    ])
    .select([
        'Date',
        'Region',
        'RegionType',
        'MedianSalePrice',
        'MedianListingPrice',
        'SaleToListRatio',
        'PctSoldAboveList',
        'PctSoldBelowList',
        'DaysOnMarket',
    ])
    .sort(['Date', 'Region'])
)
df = lf.collect()
df.shape, df.head()

((12710, 9),
 shape: (5, 9)
 ┌────────────┬────────┬────────────┬───────────┬───┬───────────┬───────────┬───────────┬───────────┐
 │ Date       ┆ Region ┆ RegionType ┆ MedianSal ┆ … ┆ SaleToLis ┆ PctSoldAb ┆ PctSoldBe ┆ DaysOnMar │
 │ ---        ┆ ---    ┆ ---        ┆ ePrice    ┆   ┆ tRatio    ┆ oveList   ┆ lowList   ┆ ket       │
 │ date       ┆ str    ┆ str        ┆ ---       ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---       │
 │            ┆        ┆            ┆ f32       ┆   ┆ f32       ┆ f32       ┆ f32       ┆ f32       │
 ╞════════════╪════════╪════════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪═══════════╡
 │ 2018-01-06 ┆ AL     ┆ State      ┆ 185419.0  ┆ … ┆ 0.973613  ┆ 0.096774  ┆ 0.725806  ┆ 77.0      │
 │ 2018-01-06 ┆ AR     ┆ State      ┆ 152130.0  ┆ … ┆ null      ┆ null      ┆ null      ┆ null      │
 │ 2018-01-06 ┆ AZ     ┆ State      ┆ 240100.0  ┆ … ┆ 0.980786  ┆ 0.161491  ┆ 0.668097  ┆ 41.0      │
 │ 2018-01-06 ┆ CA     ┆ State      ┆ 530500.0  ┆ … ┆ 

In [19]:
# Write to gold layer
GOLD.mkdir(parents=True, exist_ok=True)
out_path = GOLD / 'q1_analysis.parquet'
df.write_parquet(str(out_path))
log(f'Wrote {out_path} with {df.height} rows and {len(df.columns)} columns.')
out_path.as_posix(), df.shape

[Q1] Wrote D:\code\SEG\data\gold\zillow\q1_analysis.parquet with 12710 rows and 9 columns.


('D:/code/SEG/data/gold/zillow/q1_analysis.parquet', (12710, 9))

In [24]:
metrics_lf = (
    df.lazy()
    .with_columns(
        (
            (pl.col('MedianSalePrice') - pl.col('MedianListingPrice')) 
            / pl.col('MedianListingPrice') * 100
        ).alias('SaleListGapPct')
    )
    .group_by('Region')
    .agg([
        pl.col('SaleListGapPct').std().alias('Volatility_SaleListGapPct'),
        pl.col('SaleListGapPct').mean().alias('Avg_SaleListGapPct'),
        pl.col('PctSoldAboveList').mean().alias('Avg_PctSoldAboveList'),
        pl.col('DaysOnMarket').mean().alias('Avg_DaysOnMarket'),
        pl.len().alias('Observations'),
    ])
    .with_columns(
        pl.lit('State').alias('RegionType')
    )
    .select([
        'Region',
        'RegionType',
        'Avg_SaleListGapPct',
        'Volatility_SaleListGapPct',
        'Avg_PctSoldAboveList',
        'Avg_DaysOnMarket',
        'Observations'
    ])
    .sort('Region')
)

metrics_df = metrics_lf.collect()

# Write metrics to gold
metrics_path = GOLD / 'demand_consistency.parquet'
metrics_df.write_parquet(str(metrics_path))

log(f'Wrote {metrics_path} with {metrics_df.height} rows and {len(metrics_df.columns)} columns.')

metrics_df.shape, metrics_df.head()

[Q1] Wrote D:\code\SEG\data\gold\zillow\demand_consistency.parquet with 41 rows and 7 columns.


((41, 7),
 shape: (5, 7)
 ┌────────┬────────────┬───────────────┬───────────────┬──────────────┬──────────────┬──────────────┐
 │ Region ┆ RegionType ┆ Avg_SaleListG ┆ Volatility_Sa ┆ Avg_PctSoldA ┆ Avg_DaysOnMa ┆ Observations │
 │ ---    ┆ ---        ┆ apPct         ┆ leListGapPct  ┆ boveList     ┆ rket         ┆ ---          │
 │ str    ┆ str        ┆ ---           ┆ ---           ┆ ---          ┆ ---          ┆ u32          │
 │        ┆            ┆ f32           ┆ f32           ┆ f32          ┆ f32          ┆              │
 ╞════════╪════════════╪═══════════════╪═══════════════╪══════════════╪══════════════╪══════════════╡
 │ AL     ┆ State      ┆ 11.125981     ┆ 11.138463     ┆ 0.318655     ┆ 13.962783    ┆ 310          │
 │ AR     ┆ State      ┆ 20.125114     ┆ 12.842976     ┆ null         ┆ 17.0         ┆ 310          │
 │ AZ     ┆ State      ┆ 1.81364       ┆ 10.011738     ┆ 0.227766     ┆ 20.396774    ┆ 310          │
 │ CA     ┆ State      ┆ 39.647392     ┆ 6.671294      ┆ 