In [1]:
# Cell 1: Install required libraries for our project
!pip install --quiet langchain langgraph langchain-groq "langchain_core>=0.1.28" python-dotenv pandas

# Import necessary modules
import os
import pandas as pd
from google.colab import drive
from dotenv import load_dotenv
from typing import TypedDict, Annotated, Sequence
import operator

# LangChain and LangGraph components
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage, HumanMessage, ToolMessage
from langchain_groq import ChatGroq
from langgraph.graph import StateGraph, END

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.7/43.7 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.3/153.3 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.9/134.9 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.9/43.9 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.6/54.6 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m216.7/216.7 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
import json, math

In [3]:
import io
import re
import time
import requests
import pandas as pd
from bs4 import BeautifulSoup
from textwrap import dedent
from langchain_core.tools import tool
from typing import Optional, List
from groq import Groq

# Cell 2: Mount Google Drive to access files
drive.mount('/content/drive')

# Load environment variables from the .env file in your Drive
env_path = '/content/drive/My Drive/env1.env'
load_dotenv(dotenv_path=env_path)

Mounted at /content/drive


True

In [4]:
# Access your API keys
groq_api_key = os.environ.get("GROQ_API_KEY")
if groq_api_key:
    print("✅ Groq API Key loaded successfully.")
else:
    print("❌ Groq API Key not found. Please check your env1.env file.")

geoapify_api_key = os.environ.get("Geoapify_API_Key")
print("Geoapify API Key loaded:", "Yes" if geoapify_api_key else "No")

✅ Groq API Key loaded successfully.
Geoapify API Key loaded: Yes


In [5]:


# --- Load and Prepare Data ---
# Load the Redfin dataset from your Drive
file_path = '/content/drive/My Drive/redfin.csv'
try:
    redfindf = pd.read_csv(file_path)
    print(f"✅ Redfin data loaded successfully. Shape: {redfindf.shape}")

    # Keep only the columns we need for analysis
    columns_to_keep = [
    'region_type_id','table_id','period_end', 'region', 'property_type', 'median_sale_price',
    'median_sale_price_mom', 'median_sale_price_yoy', 'median_list_price',
    'median_list_price_mom', 'median_list_price_yoy', 'median_ppsf',
    'median_ppsf_mom', 'median_ppsf_yoy', 'homes_sold', 'homes_sold_mom',
    'homes_sold_yoy', 'median_dom' ,'median_dom_mom' ,'median_dom_yoy' ,'avg_sale_to_list']
    data_subset = redfindf[columns_to_keep].copy() # Use .copy() to avoid SettingWithCopyWarning

    # --- STEP 2: Clean and Rename columns based on your logic ---
    # Clean the 'region' column BEFORE renaming it
    data_subset['region'] = data_subset['region'].str.replace('Zip Code: ', '')

    # Define the rename mapping
    rename_dict = {
        'table_id': 'region_id',
        'region_type_id': 'region_type',
        'region': 'zip_code'
    }

    # Rename the columns to their final, standardized names
    data_subset = data_subset.rename(columns=rename_dict)

    # --- STEP 3: Perform final data type conversions ---
    data_subset['period_end'] = pd.to_datetime(data_subset['period_end'])

    print("✅ Data preparation complete. Columns have been selected, cleaned, and renamed.")
    print("Final available columns:", list(data_subset.columns))


except FileNotFoundError:
    print(f"❌ Error: The file was not found at {file_path}. Please check the path.")
    data_subset = None
except ValueError as e:
    print(f"❌ Data Error: {e}")
    data_subset = None

✅ Redfin data loaded successfully. Shape: (8873849, 21)
✅ Data preparation complete. Columns have been selected, cleaned, and renamed.
Final available columns: ['region_type', 'region_id', 'period_end', 'zip_code', 'property_type', 'median_sale_price', 'median_sale_price_mom', 'median_sale_price_yoy', 'median_list_price', 'median_list_price_mom', 'median_list_price_yoy', 'median_ppsf', 'median_ppsf_mom', 'median_ppsf_yoy', 'homes_sold', 'homes_sold_mom', 'homes_sold_yoy', 'median_dom', 'median_dom_mom', 'median_dom_yoy', 'avg_sale_to_list']


In [6]:
'''
# Cell 2.1: Initialize the Groq language model
model = ChatGroq(temperature=0.2, model_name="openai/gpt-oss-120b", groq_api_key=groq_api_key) #llama-3.3-70b-versatile ; llama3-70b-8192
'''

'\n# Cell 2.1: Initialize the Groq language model\nmodel = ChatGroq(temperature=0.2, model_name="openai/gpt-oss-120b", groq_api_key=groq_api_key) #llama-3.3-70b-versatile ; llama3-70b-8192\n'

In [7]:
# Cell 2.2: define globals
CMA_RESULTS: list[dict] = []

# ---- Scraping caches ----
_listings_cache: dict[tuple, tuple[pd.DataFrame, float]] = {}

# ---- Geocode caches ----
# Subject geocode cache: address -> (lat, lon, formatted, postcode, ts)
_subject_geocode_cache: dict[str, tuple[float, float, str | None, str | None, float]] = {}

# Comps geocode cache: (mode, zip, min_k, max_k, pages) -> (DataFrame_with_lat_lon, ts)
_geocoded_comps_cache: dict[tuple, tuple[pd.DataFrame, float]] = {}



In [8]:
# Cell 3.0: helper functions
# Helper Functions for use throughout

def extract_tsv_data(text_blob: str) -> pd.DataFrame:
    """
    Parses a string containing TSV (tab-separated values) data and cleans it into a DataFrame.
    This function is designed to handle the specific output format from the Groq LLM.
    """
    # Pattern to match the TSV block, skipping any preamble text from the LLM.
    tsv_pattern = re.compile(r'(?:(?:[^\n]*\n)*?)^([^\n]+\t[^\n]*(?:\n[^\n]+\t[^\n]*)*)$', re.MULTILINE)
    tsv_match = tsv_pattern.search(text_blob)

    if tsv_match:
        tsv_data = tsv_match.group(1)
        df = pd.read_csv(io.StringIO(tsv_data), sep='\t', header=None)

        # Ensure the dataframe has exactly 5 columns before renaming
        if df.shape[1] != 5:
            # If not, return an empty dataframe as the data is malformed
            return pd.DataFrame(columns=["address", "price", "sq ft", "bedrooms", "bathrooms"])

        df.columns = ["address", "price", "sq ft", "bedrooms", "bathrooms"]

        # --- Data Cleaning Logic (from your original code) ---
        df['sq ft'] = df['sq ft'].astype(str).str.replace(r'[, ]*(sq ft|sqft|SF|ft²)$', '', regex=True, case=False).str.replace(',', '', regex=True)
        df['price'] = df['price'].astype(str).str.replace(r'[\$,]', '', regex=True)
        df['bedrooms'] = df['bedrooms'].astype(str).str.replace(r'\s*beds$', '', regex=True, case=False)
        df['bathrooms'] = df['bathrooms'].astype(str).str.replace(r'\s*baths$', '', regex=True, case=False)

        # Convert to numeric, coercing errors to NaN
        for col in ['sq ft', 'price', 'bedrooms', 'bathrooms']:
            df[col] = pd.to_numeric(df[col], errors='coerce')

        # Drop any row where a conversion failed or data was missing
        df.dropna(inplace=True)

        return df
    else:
        # If no TSV data is found in the LLM's response
        return pd.DataFrame(columns=["address", "price", "sq ft", "bedrooms", "bathrooms"])



def extract_tsv_data_rent(text_blob: str) -> pd.DataFrame:
    """
    Parses a string containing TSV (tab-separated values) data and cleans it into a DataFrame.
    """
    tsv_pattern = re.compile(r'(?:(?:[^\n]*\n)*?)^([^\n]+\t[^\n]*(?:\n[^\n]+\t[^\n]*)*)$', re.MULTILINE)
    tsv_match = tsv_pattern.search(text_blob)

    if tsv_match:
        tsv_data = tsv_match.group(1)
        # Use StringIO to handle the string as a file
        df = pd.read_csv(io.StringIO(tsv_data), sep='\t', header=None)

        if df.shape[1] != 5:
            return pd.DataFrame(columns=["address", "rent", "sq ft", "bedrooms", "bathrooms"])

        df.columns = ["address", "rent", "sq ft", "bedrooms", "bathrooms"]

        # --- Data Cleaning Logic ---
        df['sq ft'] = df['sq ft'].astype(str).str.replace(r'[, ]*(sq ft|sqft|SF|ft²)$', '', regex=True, case=False).str.replace(',', '', regex=True)
        df['rent'] = df['rent'].astype(str).str.replace(r'[\$,]', '', regex=True)
        df['bedrooms'] = df['bedrooms'].astype(str).str.replace(r'\s*beds$', '', regex=True, case=False)
        df['bathrooms'] = df['bathrooms'].astype(str).str.replace(r'\s*baths$', '', regex=True, case=False)

        for col in ['sq ft', 'rent', 'bedrooms', 'bathrooms']:
            df[col] = pd.to_numeric(df[col], errors='coerce')

        df.dropna(inplace=True)
        return df
    else:
        return pd.DataFrame(columns=["address", "rent", "sq ft", "bedrooms", "bathrooms"])


# ===== Shared Redfin scraper helper =====
from typing import Callable, Optional
import time, requests, pandas as pd
from bs4 import BeautifulSoup
from textwrap import dedent

from groq import Groq

# One cache for everything: key -> (DataFrame, timestamp)
#_listings_cache: dict[tuple, tuple[pd.DataFrame, float]] = {}

def _default_prompt(mode: str, zip_code: str, min_price: Optional[int], max_price: Optional[int]) -> str:
    if mode == "sale":
        return dedent(f"""
            Review this report for homes in {zip_code} with prices between ${min_price}k and ${max_price}k.
            Your job is to list all the homes in the report in a TSV (tab-separated) with columns:
            "address", "price", "sq ft", "bedrooms", "bathrooms".
            The report format repeats the address at the end of each block; always use that final address.
            Ignore any orphan records. Provide ONLY the TSV data.
        """)
    else:  # rent
        return dedent(f"""
            Review this report for homes for rent in {zip_code}.
            Output TSV with columns: "address", "rent", "sq ft", "bedrooms", "bathrooms".
            The address appears at the end of each block; use that final address.
            If a stat is blank, put 0. Ignore orphan records. Provide ONLY the TSV data.
        """)

def _default_extract_fn(mode: str):
    # Use your existing extractors
    return extract_tsv_data if mode == "sale" else extract_tsv_data_rent

def _page_url_fn(mode: str, zip_code: str, min_price: Optional[int], max_price: Optional[int]) -> Callable[[int], str]:
    if mode == "sale":
        base = f"https://www.redfin.com/zipcode/{zip_code}/filter/min-price={min_price}k,max-price={max_price}k"
        return lambda page: base if page == 1 else f"{base}/page-{page}"
    else:  # rent
        base = f"https://www.redfin.com/zipcode/{zip_code}/apartments-for-rent/filter/property-type=house+condo+townhouse"
        return lambda page: base if page == 1 else f"{base}/page-{page}"

def scrape_redfin_to_df(
    *,
    mode: str,                     # "sale" | "rent"
    zip_code: str,
    max_pages_to_scrape: int,
    min_price: Optional[int] = None,   # thousands (sale only)
    max_price: Optional[int] = None,   # thousands (sale only)
    cache_ttl_seconds: Optional[int] = None,  # None = never expires
    llm_client: Optional[Groq] = None,
    llm_model_name: str = "llama-3.3-70b-versatile",
    prompt: Optional[str] = None,
    extract_fn: Optional[Callable[[str], pd.DataFrame]] = None,
) -> Optional[pd.DataFrame]:
    """Scrape + parse via LLM + concat + cache; returns final DataFrame or None."""
    assert mode in ("sale", "rent")
    cache_key = (mode, zip_code, min_price, max_price, max_pages_to_scrape)

    # cache
    if cache_key in _listings_cache:
        df_cached, ts = _listings_cache[cache_key]
        if cache_ttl_seconds is None or (time.time() - ts) < cache_ttl_seconds:
            print(f"[CACHE HIT] {cache_key} -> {len(df_cached)} rows")
            return df_cached
        print(f"[CACHE EXPIRED] {cache_key}")

    # defaults
    if llm_client is None:
        llm_client = Groq(api_key=groq_api_key)
    if prompt is None:
        prompt = _default_prompt(mode, zip_code, min_price, max_price)
    if extract_fn is None:
        extract_fn = _default_extract_fn(mode)
    url_for_page = _page_url_fn(mode, zip_code, min_price, max_price)

    all_dfs: list[pd.DataFrame] = []
    for page in range(1, max_pages_to_scrape + 1):
        url = url_for_page(page)
        print(f"Scraping page {page}: {url}")
        try:
            resp = requests.get(url, headers={"User-Agent":"Mozilla/5.0"}, timeout=15)
            resp.raise_for_status()
            soup = BeautifulSoup(resp.text, "html.parser")
            cards = soup.select("div.HomeCardContainer")
            if not cards:
                print(f"No home cards found on page {page}. Stopping scrape.")
                break

            scraped_text = "\n\n".join(c.get_text(separator=" ", strip=True) for c in cards)

            # LLM parse → TSV
            llm_response = llm_client.chat.completions.create(
                model=llm_model_name,
                messages=[
                    {"role":"system","content":prompt},
                    {"role":"user",  "content":scraped_text},
                ],
                temperature=0.1,
                max_tokens=4000,
            )
            extracted = llm_response.choices[0].message.content
            page_df = extract_fn(extracted)

            if not page_df.empty:
                print(f"Successfully extracted {len(page_df)} items from page {page}.")
                all_dfs.append(page_df)
            else:
                print(f"Could not extract valid data from page {page}.")

            time.sleep(2)
        except requests.exceptions.RequestException as e:
            print(f"Error fetching page {page}: {e}")
            break

    if not all_dfs:
        return None

    final_df = pd.concat(all_dfs, ignore_index=True).drop_duplicates()
    _listings_cache[cache_key] = (final_df, time.time())
    print(f"[SCRAPED & CACHED] {len(final_df)} rows for {cache_key}")
    return final_df


# === CMA INCREMENTS ONLY: geo helpers, prompts, and the CMA tool ===
import os, re, io, time, json, math, requests
import pandas as pd
from bs4 import BeautifulSoup
from textwrap import dedent
from groq import Groq
from langchain_core.tools import tool

# ---------- Geo helpers (new) ----------

def _haversine_km(lat1, lon1, lat2, lon2) -> float:
    R = 6371.0
    p1, p2 = math.radians(lat1), math.radians(lat2)
    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    a = math.sin(dlat/2)**2 + math.cos(p1)*math.cos(p2)*math.sin(dlon/2)**2
    return R * (2 * math.asin(math.sqrt(a)))

def _geoapify_geocode(address: str, api_key: str, pause: float = 0.12):
    """Return (lat, lon, formatted, postcode) or (None, None, None, None)."""
    if not address:
        return None, None, None, None
    url = (
        "https://api.geoapify.com/v1/geocode/search"
        f"?text={requests.utils.quote(address)}&apiKey={api_key}"
    )
    try:
        resp = requests.get(url, timeout=12)
        resp.raise_for_status()
        data = resp.json()
        feats = data.get("features", [])
        if not feats:
            return None, None, None, None
        props = feats[0].get("properties", {})
        lat, lon = props.get("lat"), props.get("lon")
        fmt = props.get("formatted")
        pc  = props.get("postcode")
        time.sleep(pause)
        return lat, lon, fmt, pc
    except Exception:
        return None, None, None, None

def _zip_from_geocode(formatted: str | None, postcode: str | None) -> str | None:
    if postcode and len(postcode) >= 5:
        return postcode[:5]
    if formatted:
        m = re.search(r"\b(\d{5})(?:-\d{4})?\b", formatted)
        if m:
            return m.group(1)
    return None

def _geocode_df_addresses(df: pd.DataFrame, address_col: str, zip_hint: str,
                          api_key: str, max_to_geocode: int = 250, sleep_sec: float = 0.12) -> pd.DataFrame:
    """Adds latitude/longitude columns by geocoding up to max_to_geocode rows."""
    work = df.copy()
    if "latitude" not in work.columns:
        work["latitude"] = pd.NA
    if "longitude" not in work.columns:
        work["longitude"] = pd.NA
    work["full_address_for_geo"] = work[address_col].astype(str) + f", {zip_hint}"
    done = 0
    for idx, row in work.iterrows():
        if done >= max_to_geocode:
            break
        if pd.notna(row["latitude"]) and pd.notna(row["longitude"]):
            continue
        addr = row["full_address_for_geo"]
        lat, lon, _, _ = _geoapify_geocode(addr, api_key, pause=sleep_sec)
        if lat is not None and lon is not None:
            work.at[idx, "latitude"] = lat
            work.at[idx, "longitude"] = lon
            done += 1
    return work


def _select_nearest_by_distance_only(
    df: pd.DataFrame,
    max_comps: int,
    kind: str,                  # "rent" or "sale"
    min_distance_m: float = 0  # exclude anything within ~0 meters of subject
) -> pd.DataFrame:
    """
    Keep the nearest N rows by distance_km, excluding the subject itself
    by removing rows whose distance is < min_distance_m.
    """
    if df is None or df.empty:
        return df

    work = df.copy()
    if "distance_km" not in work.columns:
        print("[CMA] distance_km not found; returning input as-is.")
        return work.head(max_comps)

    # Exclude the subject (or duplicate geocodes) by distance threshold
    work = work[(work["distance_km"] * 1000.0) >= min_distance_m]

    # Pick nearest N
    chosen = work.sort_values("distance_km").head(max_comps)

    # Quick peek so you can verify what goes to the LLM
    cols = ["address", "distance_km", "bedrooms", "bathrooms", "sq ft"]
    if kind == "rent" and "rent" in chosen.columns:
        cols.insert(1, "rent")
    if kind == "sale" and "price" in chosen.columns:
        cols.insert(1, "price")

    print(f"[CMA] {kind.title()} comps (nearest-by-distance, top 5):")
    try:
        print(chosen[cols].head(5).to_string(index=False))
    except Exception:
        print(chosen.head(5).to_string(index=False))

    return chosen


# ---------- Lightweight prompt renderer + default prompts (new) ----------

def _render_prompt(template: str, ctx: dict) -> str:
    try:
        return template.format(**ctx)
    except Exception:
        # Fallback: append JSON blobs if formatting fails
        suffix = ""
        if "closest_rental_neighbors_str" in ctx:
            suffix += "\n\nNearest rental comps (JSON):\n" + ctx["closest_rental_neighbors_str"]
        if "closest_forsale_neighbors_str" in ctx:
            suffix += "\n\nNearest for-sale comps (JSON):\n" + ctx["closest_forsale_neighbors_str"]
        return template + suffix

prompt_rent_estimate = dedent(f""" You are an excellent real estate agent, with a strong analytical mind.
You have a client who want you to estimate the rent of a target property. You must use the target property's data on  bedrooms,  bathrooms and is area in sqft.
Your research team has given you rental property data in JSON format of 15 nearby addresses. Each object has keys:
address, rent, sq ft, bedrooms, bathrooms, latitude, longitude, distance_km.

***Important:*** If any neighbor’s address exactly matches the target property address, ignore that entry—do not include it in your analysis or counts.
***Important:*** Use brevity in your chain of thought.

Also - these 15 properties are nearest to target property, therefore try and focus on characteristics of the target property and not the distance. Dont obsess only on bedroom or bathroom numbers. Similarly dont obsess only on sq ft. Have a balanced approach.
Properties that are more similar to the target property will be the more immediate competition for the target property.
Given this information and guidance estimate the rent of the target address. You can provide a range of rent estimates. But ensure that your range is not too broad. No more than plus or minus 5% of the target price.
Finally,**clearly explain** in narrative form:
- List comparables and Why you chose each comparable ***Important:*** you must try and find at least 4 closest comparable properties.
- How the property features informed your final number

""").strip()

prompt_4sale_estimate = dedent(f""" You are an excellent real estate agent, with a strong analytical mind.
You have a client who want you to estimate the rent of a target property. You must use the target property's data on  bedrooms,  bathrooms and is area in sqft.
Your research team has given you rental property data in JSON format of 15 nearby addresses. Each object has keys:
address, price, sq ft, bedrooms, bathrooms, latitude, longitude, distance_km.

***Important:*** If any neighbor’s address exactly matches the target property address, ignore that entry—do not include it in your analysis.
***Important:*** Use brevity in your chain of thought.

Also - these properties are nearest to target property, therefore try and focus on characteristics of the target property and not the distance. Dont obsess only on bedroom or bathroom numbers. Similarly dont obsess only on sq ft. Have a balanced approach.
For example if the target property has 2 bathrooms, and a nearby competiting property has 3 bathrooms with same bedrooms and nearly same sq ft, they should be comparable.
Properties that are more similar to the target property will be the more immediate competition for the target property.
Given this information and guidance estimate the selling price of the target address. You can provide a range of sale price estimates. But ensure that your range is not too broad. No more than plus or minus 5% of the target price.
Finally, **clearly explain** in narrative form:
- List comparables and Why you chose each comparable. ***Important:*** you must try and find at least 4 closest comparable properties.
- How the property features informed your final number

""").strip()

def _infer_sale_band_k_from_llm(
    address: str,
    groq_key: str,
    model: str = "compound-beta",
    spread_k: int = 200,     # +$200k above floored min
    floor_to_k: int = 100    # floor to nearest $100k
) -> Optional[tuple[int, int]]:
    """
    Ask an LLM for a single-point price (USD) and convert it to a [min_k, max_k] band:
      min_k = floor(price_k to nearest floor_to_k)
      max_k = min_k + spread_k
    Returns (min_k, max_k) in thousands, or None if parsing fails.
    """
    client = Groq(api_key=groq_key)

    system = (
        "You are a valuation assistant. "
        "Return ONLY valid JSON like {\"price_usd\": 320000} representing your best single-point "
        "estimate of the current market sale price of the given residential address. No prose."
    )
    user = f"Address: {address}\nReturn only JSON with 'price_usd'."

    resp = client.chat.completions.create(
        model=model,
        messages=[{"role":"system","content":system},{"role":"user","content":user}],
        temperature=0.1,
        max_tokens=100,
    )
    txt = (resp.choices[0].message.content or "").strip()

    price = None
    # Try strict JSON first
    try:
        data = json.loads(txt)
        price = data.get("price_usd")
    except Exception:
        pass

    # Fallback: regex the largest dollar-like number
    if not price:
        m = re.search(r'(\$?\s?)(\d{2,3}(?:[,\s]\d{3})+|\d{5,})', txt)
        if m:
            s = m.group(2).replace(",", "").replace(" ", "")
            try:
                price = int(float(s))
            except Exception:
                price = None

    if not price or price <= 0:
        return None

    k = int(price // 1000)           # convert to thousands
    k_floor = (k // floor_to_k) * floor_to_k
    min_k = max(k_floor, 50)         # guardrail: at least $50k
    max_k = min_k + spread_k
    return (min_k, max_k)

# --- Helper: prefer superset cache; fall back to partial (upper-bound) reuse ---

def _get_superset_cached_df(
    *,
    mode: str,
    zip_code: str,
    min_price_k: int,
    max_price_k: int,
    max_pages_to_scrape: int,
    cache_ttl_seconds: int | None = None,
):
    """
    Tries to reuse a *superset* cached scrape (kmin <= min_price_k and kmax >= max_price_k).
    If not found, falls back to a *partial* reuse: any cached band that at least covers the
    requested upper bound (kmax >= max_price_k). In that case, we filter to the overlap.

    Returns a filtered DataFrame or None.
    """
    try:
        cache = _listings_cache  # from scrape_redfin_to_df
    except NameError:
        return None

    now = time.time()
    best_superset = None
    best_superset_ts = -1

    best_partial = None
    best_partial_ts = -1
    best_partial_key = None
    best_partial_kmin = None
    best_partial_kmax = None

    for key, (df, ts) in cache.items():
        try:
            m, z, kmin, kmax, pages = key
        except Exception:
            continue
        if m != mode or z != zip_code or pages != max_pages_to_scrape:
            continue
        if cache_ttl_seconds is not None and (now - ts) >= cache_ttl_seconds:
            continue

        # SUPerset: covers full requested range
        min_ok = (kmin is None) or (kmin <= min_price_k)
        max_ok = (kmax is None) or (kmax >= max_price_k)
        if min_ok and max_ok and ts > best_superset_ts:
            best_superset, best_superset_ts = (key, df), ts
            continue

        # PARTIAL: at least covers the requested upper bound (<= X)
        # e.g., cached 300–500k and user asks 100–450k; we can still reuse 300–450k slice
        if (kmax is None or kmax >= max_price_k) and ts > best_partial_ts:
            best_partial = df
            best_partial_ts = ts
            best_partial_key = key
            best_partial_kmin = kmin
            best_partial_kmax = kmax

    # Prefer strict superset
    if best_superset is not None:
        key, df = best_superset
        lo = min_price_k * 1000
        hi = max_price_k * 1000
        out = df.copy()
        out["price"] = pd.to_numeric(out["price"], errors="coerce")
        out = out[(out["price"] >= lo) & (out["price"] <= hi)]
        print(f"[CACHE SUBSET HIT] superset {key} → subset {len(out)} rows")
        return out

    # Otherwise, reuse partial coverage if available
    if best_partial is not None:
        df = best_partial.copy()
        df["price"] = pd.to_numeric(df["price"], errors="coerce")
        lo = min_price_k * 1000
        hi = max_price_k * 1000

        # Only confident coverage where cached band exists.
        # If cached min (kmin) is above requested min, we still filter from that kmin.
        if best_partial_kmin is not None:
            lo = max(lo, best_partial_kmin * 1000)
        if best_partial_kmax is not None:
            hi = min(hi, best_partial_kmax * 1000)

        out = df[(df["price"] >= lo) & (df["price"] <= hi)]
        print(f"[CACHE PARTIAL HIT] using {best_partial_key} → subset {len(out)} rows")
        return out

    # No cache suitable
    return None


def _get_subject_geocode_cached(address: str, api_key: str, ttl_seconds: int | None = 864000):
    """Return (lat, lon, formatted, postcode) using a small cache."""
    key = (address or "").strip().lower()
    now = time.time()
    if key in _subject_geocode_cache:
        lat, lon, fmt, pc, ts = _subject_geocode_cache[key]
        if ttl_seconds is None or (now - ts) < ttl_seconds:
            print(f"[GEO CACHE HIT] subject '{address}'")
            return lat, lon, fmt, pc
        else:
            print(f"[GEO CACHE EXPIRED] subject '{address}'")

    lat, lon, fmt, pc = _geoapify_geocode(address, api_key, pause=0.0)
    if lat is not None and lon is not None:
        _subject_geocode_cache[key] = (lat, lon, fmt, pc, now)
        print(f"[GEO CACHED] subject '{address}' -> ({lat:.6f},{lon:.6f})")
    return lat, lon, fmt, pc

def _geocode_df_addresses_cached(
    base_df: pd.DataFrame,
    address_col: str,
    zip_hint: str,
    api_key: str,
    cache_key: tuple,
    ttl_seconds: int | None = 86400,
    max_to_geocode: int = 250,
    sleep_sec: float = 0.12,
) -> pd.DataFrame:
    """Reuse a cached geocoded DF for comps; otherwise geocode once and cache."""
    now = time.time()
    if cache_key in _geocoded_comps_cache:
        cached_df, ts = _geocoded_comps_cache[cache_key]
        if ttl_seconds is None or (now - ts) < ttl_seconds:
            print(f"[GEO CACHE HIT] {cache_key} -> {len(cached_df)} rows")
            return cached_df.copy()
        else:
            print(f"[GEO CACHE EXPIRED] {cache_key}")

    geocoded = _geocode_df_addresses(
        base_df, address_col, zip_hint, api_key,
        max_to_geocode=max_to_geocode, sleep_sec=sleep_sec
    )
    _geocoded_comps_cache[cache_key] = (geocoded.copy(), now)
    print(f"[GEO CACHED] {cache_key} -> {len(geocoded)} rows")
    return geocoded


In [9]:
# Cell 3.1: Define tool 1: TOOL FOR CALLING REAL ESTATE STATS
from typing import Optional
import requests
import io

@tool
def get_zipcode_stats(zip_code: str, property_type: str = 'All Residential') -> str:
    """
    Retrieves the latest real estate market statistics for a given US ZIP code and property type.
    Use this when a user asks for general questions about a specific zipcode or city OR when the user asks for real estate statistics for a zipcode or city.

    Args:
        zip_code (str): The 5-digit US ZIP code to query.
        property_type (str): The type of property. Must be one of:
                             'Condo/Co-op', 'Townhouse', 'Single Family Residential',
                             'All Residential', 'Multi-Family (2-4 Unit)'.
                             Defaults to 'All Residential' if not specified.

    Returns:
        str: Returns all stats in the output followed by a summary of the latest market stats, or an error message if the
             ZIP code or property type is not found.
    """
    print(f"🔧 TOOL get_zipcode_stats called with zip_code={zip_code}, property_type={property_type}")
    if data_subset is None:
        return "Error: The real estate dataset is not available."

    # Pad the zip_code if a user enters something like "9021"
    zip_code = zip_code.zfill(5)

    # Filter data for the requested zip code and property type
    filtered_data = data_subset[
        (data_subset['zip_code'] == zip_code) &
        (data_subset['property_type'] == property_type)
    ]

    if filtered_data.empty:
        return f"No data found for ZIP code {zip_code} with property type '{property_type}'."

    # Get the most recent data
    latest_data = filtered_data.sort_values(by='period_end', ascending=False).iloc[0]

    # Format the output nicely
    output = (
          f"The following are the stats for zip code '{zip_code}' as of '{latest_data['period_end'].strftime('%Y-%m-%d')}':\n"
          f"The Median Sale Price = '${latest_data['median_sale_price']:,.0f}'\n"
          f"The Month over Month change in sale Price (%) = '{latest_data['median_sale_price_mom'] * 100:.2f}'\n"
          f"The Year over Year change in sale Price (%) = '{latest_data['median_sale_price_yoy'] * 100:.2f}'\n"
          f"The Median List price = '${latest_data['median_list_price']:,.2f}'\n"
          f"The Month over Month change in list price (%) = '{latest_data['median_list_price_mom'] * 100:.2f}'\n"
          f"The Year over Year change in list price (%) = '{latest_data['median_list_price_yoy'] * 100:.2f}'\n"
          f"The Avg Sale to List Price (%) = '{latest_data['avg_sale_to_list'] * 100:.2f}'\n"
          f"Homes sold in the last 90 days = '{latest_data['homes_sold']:.0f}'\n"
          f"The Month over Month change in homes sold (%) = '{latest_data['homes_sold_mom'] * 100:.2f}'\n"
          f"The Year over Year change in homes sold (%) = '{latest_data['homes_sold_yoy'] * 100:.2f}'\n"
          f"Days on Market = '{latest_data['median_dom']:.0f}'\n"
          f"The Month over Month change in Days on Market = '{latest_data['median_dom_mom'] * 1:.0f}'\n"
          f"The Year over Year change in Days on Market = '{latest_data['median_dom_yoy'] * 1:.0f}'\n"
    )
    return output


In [10]:
# Cell 3.2: TOOL FOR ENDING THE CONVERSATION
@tool
def end_conversation() -> str:
    """
    Call this specific tool when the user indicates they are finished with the conversation.
    Use this for phrases like 'thank you', 'that's all', 'I'm done', 'thanks for the help', etc.
    This tool signals that the conversation should end.
    """
    return "Conversation ended by user."

In [11]:
# Cell 3.3: TOOL FOR CALLING RECENTLY SOLD HOMES
from typing import Optional, Union
import requests, io
import pandas as pd

@tool
def list_recently_sold_homes(
    zip_code: str,
    property_type: Optional[str] = None,
    min_price: Optional[int] = None,
    max_price: Optional[int] = None,
    num_results: Union[int, str] = 5
) -> str:
    """
    Fetches a DataFrame of homes sold in the last 90 days for a given ZIP code,
    applies optional filters (property_type, min_price, max_price), and returns
    a formatted string listing up to `num_results` homes.
    If num_results <= 0, returns *all* matching homes (up to a practical limit).
    Use this when the user asks for a list of recently sold homes or for any stats around homes sold.
    """
    print(f"🔧 TOOL list_recently_sold_homes called with zip_code={zip_code}, num_results={num_results}")
    # 1) Normalize num_results
    try:
        num_results = int(num_results)
    except (TypeError, ValueError):
        return "⚠️ Error: `num_results` must be an integer."
    # If negative or zero → show everything
    show_all = num_results <= 0

    # 2) Look up region identifiers from your preloaded redfin subset
    if data_subset is None:
        return "Error: real-estate dataset unavailable."
    df_meta = data_subset[data_subset["zip_code"] == str(zip_code)]
    if df_meta.empty:
        return f"No region data found for ZIP code {zip_code}."
    region_id = str(df_meta["region_id"].iloc[0])
    region_type = int(df_meta["region_type"].iloc[0])

    # 3) Hit Redfin’s CSV API
    params = {
        "region_id": region_id,
        "region_type": region_type,
        "sold_within_days": 90,
        "al": 1
    }
    headers = {"User-Agent": "Mozilla/5.0"}
    try:
        resp = requests.get("https://www.redfin.com/stingray/api/gis-csv",
                             params=params, headers=headers, timeout=10)
        sold_df = pd.read_csv(io.BytesIO(resp.content))
    except Exception as e:
        return f"Could not fetch sales data: {e}"

    # Debug: raw row count
    print(f"🔧 Retrieved {len(sold_df)} rows of data")

    # 4) Clean & filter
    cols = ['PROPERTY TYPE','ADDRESS','PRICE','BEDS','BATHS','SQUARE FEET', 'DAYS ON MARKET']
    sold_df = sold_df[cols].copy().fillna({
        'PROPERTY TYPE': 'Unknown',
        'ADDRESS': 'Unknown',
        'PRICE': 0,
        'BEDS': 0,
        'BATHS': 0,
        'SQUARE FEET': 0,
        'DAYS ON MARKET': 0
    })
    # ensure numeric
    for c in ['PRICE','BEDS','BATHS','SQUARE FEET', 'DAYS ON MARKET']:
        sold_df[c] = sold_df[c].astype(int)
    sold_df = sold_df[sold_df['PRICE'] > 1]

    # 5) Apply user filters
    if property_type:
        # map user‐friendly names to API values
        type_map = {
            'single family residential': 'Single Family Residential',
            'condo/co-op': 'Condo/Co-op',
            'townhouse': 'Townhouse'
        }
        target = type_map.get(property_type.lower())
        if target:
            sold_df = sold_df[sold_df['PROPERTY TYPE'] == target]
    if min_price is not None:
        sold_df = sold_df[sold_df['PRICE'] >= min_price]
    if max_price is not None:
        sold_df = sold_df[sold_df['PRICE'] <= max_price]

    # Debug: filtered row count
    print(f"🔧 Filtered to {len(sold_df)} rows of data")

    if sold_df.empty:
        return "No homes match your criteria. Try adjusting your filters."

    # 6) Build the output
    total_available = len(sold_df)
    to_show = total_available if show_all else min(num_results, total_available)
    header = (
        f"Here {'are all' if show_all else 'are'} "
        f"{to_show} of {total_available} homes sold in {zip_code}:\n"
    )

    lines = []
    for _, row in sold_df.head(to_show).iterrows():
        lines.append(
            f"- {row['ADDRESS']} ({row['PROPERTY TYPE']})\n"
            f"  Price: ${row['PRICE']:,} | Beds: {row['BEDS']} | "
            f"Baths: {row['BATHS']} | SqFt: {row['SQUARE FEET']}\n"
        )

    return header + "".join(lines)



In [12]:
# Cell 3.4: Tool for Finding Homes for Sale with cache (random sample ≤10, reuses superset cache, supports bed/bath/sqft filters)

@tool
def list_homes_for_sale(
    zip_code: str,
    max_price: int,
    min_price: int = 100,
    max_pages_to_scrape: int = 3,
    min_beds: int | None = None,
    min_baths: float | None = None,
    min_sqft: int | None = None,              # <-- NEW
    cache_ttl_seconds: int | None = 36000,
) -> str:
    """
    Scrapes Redfin.com for homes for sale within a ZIP & price range,
    but first tries to reuse a cached *superset* scrape to avoid re-scraping.
    Returns a random sample of up to 10 homes. Supports optional min_beds/min_baths/min_sqft filters.

    Price inputs are in *thousands* (e.g., 300 → $300,000).
    """

    print(f"--- TOOL: `list_homes_for_sale` STARTED for ZIP {zip_code} (${min_price}k–${max_price}k) ---")

    # 1) Try to reuse a superset cache to avoid scraping again
    final_df = _get_superset_cached_df(
        mode="sale",
        zip_code=zip_code,
        min_price_k=min_price,
        max_price_k=max_price,
        max_pages_to_scrape=max_pages_to_scrape,
        cache_ttl_seconds=cache_ttl_seconds,
    )

    # 2) If no suitable superset cache, scrape (scrape_redfin_to_df has its own cache too)
    if final_df is None:
        final_df = scrape_redfin_to_df(
            mode="sale",
            zip_code=zip_code,
            min_price=min_price,
            max_price=max_price,
            max_pages_to_scrape=max_pages_to_scrape,
            cache_ttl_seconds=cache_ttl_seconds,
        )

    if final_df is None or final_df.empty:
        return (
            f"I was unable to find or extract any homes for sale in ZIP code {zip_code} "
            f"between ${min_price}k and ${max_price}k."
        )

    # 3) De-dupe by address
    df = final_df.copy()
    if "address" in df.columns:
        df = df.drop_duplicates(subset=["address"])

    # 4) Normalize numerics; support both "sq ft" and "sqft" column names
    for col in ["bedrooms", "bathrooms", "price", "sq ft", "sqft"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")
    # Ensure we have a unified "sq ft" column for filtering/printing
    if "sq ft" not in df.columns and "sqft" in df.columns:
        df["sq ft"] = df["sqft"]

    # Apply optional bed/bath/sqft filters (treat as minimums)
    if min_beds is not None and "bedrooms" in df.columns:
        df = df[df["bedrooms"] >= min_beds]

    if min_baths is not None and "bathrooms" in df.columns:
        df = df[df["bathrooms"] >= float(min_baths)]

    if min_sqft is not None and "sq ft" in df.columns:         # <-- NEW
        df = df[df["sq ft"] >= int(min_sqft)]                  # <-- NEW

    matched = len(df)

    if matched == 0:
        msg = (
            f"I found {len(final_df)} homes for sale in {zip_code} between "
            f"${min_price}k and ${max_price}k, but none match your filters"
        )
        if min_beds is not None:  msg += f" (≥{min_beds} beds)"
        if min_baths is not None: msg += f" (≥{min_baths} baths)"
        if min_sqft is not None:  msg += f" (≥{min_sqft} sq ft)"
        msg += ". Try adjusting the criteria."
        print(f"--- TOOL: `list_homes_for_sale` FINISHED ---")
        return msg

    # 5) Random sample ≤ 10
    sample_n = min(10, matched)
    sample_df = df.sample(n=sample_n, random_state=None)

    # 6) Sort sampled set by price for readability (optional)
    if "price" in sample_df.columns:
        sample_df = sample_df.sort_values("price", kind="stable")

    # 7) Build output
    filt_bits = []
    if min_beds is not None:   filt_bits.append(f"≥{min_beds} beds")
    if min_baths is not None:  filt_bits.append(f"≥{min_baths} baths")
    if min_sqft is not None:   filt_bits.append(f"≥{min_sqft} sq ft")   # <-- NEW
    filt_str = f" (filtered: {', '.join(filt_bits)})" if filt_bits else ""

    header = (
        f"I found {matched} matching homes in {zip_code} between "
        f"${min_price}k and ${max_price}k{filt_str}. Showing {sample_n} randomly selected:\n"
    )

    lines = [header]
    for _, row in sample_df.iterrows():
        address = str(row.get("address", "N/A"))
        price_val = row.get("price")
        beds_val = row.get("bedrooms")
        baths_val = row.get("bathrooms")
        sqft_val = row.get("sq ft") if "sq ft" in row else row.get("sqft")

        price_str = f"${int(price_val):,}" if pd.notna(price_val) else "N/A"
        beds_str  = f"{int(beds_val)}" if pd.notna(beds_val) else "N/A"
        baths_str = "N/A"
        if pd.notna(baths_val):
            try:    baths_str = f"{float(baths_val):.1f}"
            except: baths_str = str(baths_val)
        sqft_str = "N/A"
        if pd.notna(sqft_val):
            try:    sqft_str = f"{int(float(sqft_val))}"
            except: sqft_str = str(sqft_val)

        lines.append(
            f"- Address: {address}\n"
            f"  Price: {price_str} | Beds: {beds_str} | Baths: {baths_str} | SqFt: {sqft_str}\n"
        )

    lines.append("\n(Ask for “more” to see another random set, or refine beds/baths/price/sqft.)")

    print(f"--- TOOL: `list_homes_for_sale` FINISHED ---")
    return "".join(lines)




In [13]:
# Cell 3.5: Tool for Finding Homes for Rent (random sample + filters + cache TTL)

@tool
def list_homes_for_rent(
    zip_code: str,
    max_pages_to_scrape: int = 3,
    # --- optional filters ---
    min_beds: int | None = None,
    min_baths: float | None = None,
    min_sqft: int | None = None,
    min_rent: int | None = None,   # dollars/month
    max_rent: int | None = None,   # dollars/month
    # sampling
    limit: int = 10,
    seed: int | None = None,
    # cache TTL (seconds) so follow-ups reuse scrape
    cache_ttl_seconds: int = 36000,
) -> str:
    """
    Scrapes Redfin.com for rentals in a ZIP, then applies optional filters and returns
    a random sample (default 10). Uses cache TTL to avoid re-scraping on follow-ups.

    Filters (all optional):
      - min_beds, min_baths, min_sqft
      - min_rent, max_rent (in dollars/month)
    """

    print(f"--- TOOL: `list_homes_for_rent` STARTED for ZIP {zip_code} ---")

    final_df = scrape_redfin_to_df(
        mode="rent",
        zip_code=zip_code,
        max_pages_to_scrape=max_pages_to_scrape,
        cache_ttl_seconds=cache_ttl_seconds,
    )

    if final_df is None:
        return f"I was unable to find or extract any homes for rent in ZIP code {zip_code}."

    # Normalize + clean
    df = final_df.copy()
    for col in ["rent", "sq ft", "bedrooms", "bathrooms"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    # Basic quality filters
    df = df[
        df["address"].astype(str).str.len().ge(10)
        & df["rent"].gt(0)
        & df["sq ft"].gt(0)
    ].copy()

    if df.empty:
        return f"Rental data was scraped for ZIP {zip_code}, but could not be cleaned into a valid list."

    # Apply user filters
    filters_applied = []
    if min_beds is not None:
        df = df[df["bedrooms"].ge(min_beds)]
        filters_applied.append(f"≥{min_beds} beds")
    if min_baths is not None:
        df = df[df["bathrooms"].ge(min_baths)]
        filters_applied.append(f"≥{min_baths} baths")
    if min_sqft is not None:
        df = df[df["sq ft"].ge(min_sqft)]
        filters_applied.append(f"≥{min_sqft} sq ft")
    if min_rent is not None:
        df = df[df["rent"].ge(min_rent)]
        filters_applied.append(f"≥${min_rent:,}/mo")
    if max_rent is not None:
        df = df[df["rent"].le(max_rent)]
        filters_applied.append(f"≤${max_rent:,}/mo")

    total_matches = len(df)
    if total_matches == 0:
        filters_note = f" with filters ({', '.join(filters_applied)})" if filters_applied else ""
        return f"I didn’t find any rentals in {zip_code}{filters_note}. Try loosening the filters."

    # Random sample
    n = min(limit, total_matches)
    sample_kwargs = {"n": n, "replace": False}
    if seed is not None:
        sample_kwargs["random_state"] = seed
    sample_df = df.sample(**sample_kwargs)

    # Format output
    filters_note = f" (filters: {', '.join(filters_applied)})" if filters_applied else ""
    header = f"I found {total_matches} homes for rent in {zip_code}{filters_note}. Showing {n} randomly selected:\n"

    lines = [header]
    for _, row in sample_df.iterrows():
        beds = int(row["bedrooms"]) if pd.notna(row["bedrooms"]) else "—"
        baths = f"{float(row['bathrooms']):.1f}" if pd.notna(row["bathrooms"]) else "—"
        sqft = int(row["sq ft"]) if pd.notna(row["sq ft"]) else "—"
        rent_val = int(row["rent"]) if pd.notna(row["rent"]) else 0
        lines.append(
            f"- Address: {row['address']}\n"
            f"  Rent: ${rent_val:,.0f}/mo | Beds: {beds} | Baths: {baths} | SqFt: {sqft}\n"
        )

    lines.append("\n(Ask for “more” to see another random set, or refine beds/baths/sqft/rent.)")
    print(f"--- TOOL: `list_homes_for_rent` FINISHED ---")
    return "".join(lines)


In [15]:
# Cell 3.6: Tool for running competitive market assessment (now includes RECENTLY SOLD comps)

from typing import Optional
import io, json, time, requests
import pandas as pd
from groq import Groq
from langchain_core.tools import tool

@tool
def competitive_market_assessment(
    address: str,
    subject_bedrooms: int,
    subject_bathrooms: float,
    subject_sqft: int,
    radius_miles: float = 3.0,
    max_comps: int = 15,
    max_pages_to_scrape: int = 3,
    sale_min_price_k: Optional[int] = None,
    sale_max_price_k: Optional[int] = None,
    cache_ttl_seconds: int = 36000,
    refresh_scrape: bool = False,
    groq_model: str = "openai/gpt-oss-120b"  # deepseek-r1-distill-llama-70b
) -> str:
    """
    Geocode the subject; if sale band missing, infer it via Groq (compound-beta);
    then fetch cached rentals, active for-sale listings, and RECENTLY SOLD comps.
    Compute distance, pick nearest N; estimate rent & sale price (active + sold).
    """

    # Keys
    geo_key = geoapify_api_key
    groq_key = groq_api_key

    # 1) Geocode subject (cached)
    lat, lon, formatted, postcode = _get_subject_geocode_cached(
        address, geo_key, ttl_seconds=cache_ttl_seconds
    )
    if lat is None or lon is None:
        return f"Could not geocode the target address: {address}"
    zip_code = _zip_from_geocode(formatted, postcode)
    if not zip_code:
        return f"Could not determine ZIP code for: {formatted or address}"
    print(f"[CMA] Subject: {formatted or address} @ ({lat:.6f},{lon:.6f}) ZIP={zip_code}")

    # 1b) If sale band is missing, infer via LLM and continue
    if (sale_min_price_k is None) or (sale_max_price_k is None):
        print("[CMA] No sale band provided; probing approximate price via LLM…")
        inferred = _infer_sale_band_k_from_llm(formatted or address, groq_key, model="compound-beta")
        if inferred is None:
            return (
                f"I couldn’t infer a sale price band for {formatted or address} (ZIP {zip_code}). "
                "Please provide `sale_min_price_k` and `sale_max_price_k` in thousands, e.g., 300 and 500."
            )
        sale_min_price_k, sale_max_price_k = inferred
        print(f"[CMA] Inferred sale band: {sale_min_price_k}k–{sale_max_price_k}k")

    # Normalize band
    if sale_min_price_k is not None and sale_min_price_k > 5000:
        sale_min_price_k = int(round(sale_min_price_k / 1000))
    if sale_max_price_k is not None and sale_max_price_k > 5000:
        sale_max_price_k = int(round(sale_max_price_k / 1000))
    if (sale_min_price_k is not None and sale_max_price_k is not None
            and sale_min_price_k > sale_max_price_k):
        sale_min_price_k, sale_max_price_k = sale_max_price_k, sale_min_price_k

    # 2) Pull comps (uses your scrape cache for rent/active)
    rent_df = scrape_redfin_to_df(
        mode="rent",
        zip_code=zip_code,
        max_pages_to_scrape=max_pages_to_scrape,
        cache_ttl_seconds=cache_ttl_seconds
    )
    sale_df = scrape_redfin_to_df(
        mode="sale",
        zip_code=zip_code,
        max_pages_to_scrape=max_pages_to_scrape,
        min_price=sale_min_price_k,
        max_price=sale_max_price_k,
        cache_ttl_seconds=cache_ttl_seconds
    )

    # NEW 2b) Pull RECENTLY SOLD comps via Redfin CSV (uses built-in lat/long; no geocode)
    sold_df = None
    try:
        _ = data_subset  # ensure global exists
        df_meta = data_subset[data_subset["zip_code"] == str(zip_code)]
        if not df_meta.empty:
            region_id = str(df_meta["region_id"].iloc[0])
            region_type = int(df_meta["region_type"].iloc[0])
            params = {
                "region_id": region_id,
                "region_type": region_type,
                "sold_within_days": 90,
                "al": 1
            }
            headers = {"User-Agent": "Mozilla/5.0"}
            resp = requests.get("https://www.redfin.com/stingray/api/gis-csv",
                                params=params, headers=headers, timeout=12)
            raw = pd.read_csv(io.BytesIO(resp.content))
            if not raw.empty:
                cols = ['ADDRESS','PRICE','BEDS','BATHS','SQUARE FEET','LATITUDE','LONGITUDE','SOLD DATE']
                present = [c for c in cols if c in raw.columns]
                sold_df = raw[present].copy()
                sold_df = sold_df.rename(columns={
                    'ADDRESS':'address',
                    'PRICE':'price',
                    'BEDS':'bedrooms',
                    'BATHS':'bathrooms',
                    'SQUARE FEET':'sq ft',
                    'LATITUDE':'latitude',
                    'LONGITUDE':'longitude',
                    'SOLD DATE':'sold_date'
                })
                # numeric normalize
                for c in ['price','bedrooms','bathrooms','sq ft','latitude','longitude']:
                    if c in sold_df.columns:
                        sold_df[c] = pd.to_numeric(sold_df[c], errors='coerce')
                # drop invalid coords/prices
                sold_df = sold_df.dropna(subset=['latitude','longitude','price'])
                if (sale_min_price_k is not None) or (sale_max_price_k is not None):
                  lo = sale_min_price_k * 1000 if sale_min_price_k is not None else -float("inf")
                  hi = sale_max_price_k * 1000 if sale_max_price_k is not None else  float("inf")
                  pre_rows = len(sold_df)
                  sold_df = sold_df[(sold_df["price"] >= lo) & (sold_df["price"] <= hi)]
                  print(f"[CMA] Sold comps price-band filter: {pre_rows} → {len(sold_df)} rows "
                  f"(band {sale_min_price_k}k–{sale_max_price_k}k)")
        else:
            print(f"[CMA] No region meta for ZIP {zip_code}; skipping sold comps.")
    except NameError:
        print("[CMA] data_subset not available; skipping sold comps.")
    except Exception as e:
        print(f"[CMA] Sold comps fetch error: {e}")

    if (rent_df is None or rent_df.empty) and (sale_df is None or sale_df.empty) and (sold_df is None or sold_df.empty):
        return f"No comps available for ZIP {zip_code}. Try a different area or widen search."

    # 3) Geocode/measure distance + pick nearest
    radius_km = radius_miles * 1.60934
    comps = {}

    # RENT (geocode cached)
    if rent_df is not None and not rent_df.empty:
        rent_key = ("rent", zip_code, None, None, max_pages_to_scrape)
        rent_geo = _geocode_df_addresses_cached(
            rent_df, "address", zip_code, geo_key,
            cache_key=rent_key, ttl_seconds=cache_ttl_seconds
        )
        rent_geo["distance_km"] = rent_geo.apply(
            lambda r: _haversine_km(lat, lon, r["latitude"], r["longitude"])
            if pd.notna(r["latitude"]) and pd.notna(r["longitude"]) else float("inf"),
            axis=1
        )
        rent_geo = rent_geo[pd.to_numeric(rent_geo["distance_km"], errors="coerce") < float("inf")]
        rent_geo = rent_geo.sort_values("distance_km")
        within = rent_geo[rent_geo["distance_km"] <= radius_km]
        pool = within if not within.empty else rent_geo
        comps["rent"] = _select_nearest_by_distance_only(pool, max_comps, kind="rent", min_distance_m=0).copy()
        print(f"[CMA] Rentals picked: {len(comps['rent'])}")

    # ACTIVE FOR-SALE (geocode cached)
    if sale_df is not None and not sale_df.empty:
        sale_key = ("sale", zip_code, sale_min_price_k, sale_max_price_k, max_pages_to_scrape)
        sale_geo = _geocode_df_addresses_cached(
            sale_df, "address", zip_code, geo_key,
            cache_key=sale_key, ttl_seconds=cache_ttl_seconds
        )
        sale_geo["distance_km"] = sale_geo.apply(
            lambda r: _haversine_km(lat, lon, r["latitude"], r["longitude"])
            if pd.notna(r["latitude"]) and pd.notna(r["longitude"]) else float("inf"),
            axis=1
        )
        sale_geo = sale_geo[pd.to_numeric(sale_geo["distance_km"], errors="coerce") < float("inf")]
        sale_geo = sale_geo.sort_values("distance_km")
        within = sale_geo[sale_geo["distance_km"] <= radius_km]
        pool = within if not within.empty else sale_geo
        comps["sale"] = _select_nearest_by_distance_only(pool, max_comps, kind="sale", min_distance_m=0).copy()
        print(f"[CMA] For-sale picked: {len(comps['sale'])}")

    # NEW: RECENTLY SOLD (no geocode; distance from CSV lat/lon)
    if sold_df is not None and not sold_df.empty:
        sold_geo = sold_df.copy()
        sold_geo["distance_km"] = sold_geo.apply(
            lambda r: _haversine_km(lat, lon, r["latitude"], r["longitude"])
            if pd.notna(r["latitude"]) and pd.notna(r["longitude"]) else float("inf"),
            axis=1
        )
        sold_geo = sold_geo[pd.to_numeric(sold_geo["distance_km"], errors="coerce") < float("inf")]
        sold_geo = sold_geo.sort_values("distance_km")
        within = sold_geo[sold_geo["distance_km"] <= radius_km]
        pool = within if not within.empty else sold_geo
        comps["sold"] = pool.head(max_comps).copy()
        print(f"[CMA] Recently sold picked: {len(comps['sold'])}")

    # 4) Build JSON comps for prompts
    def _records(df: pd.DataFrame, kind: str):
        recs = []
        for _, r in df.iterrows():
            base = {
                "address": str(r.get("address","")),
                "sq ft": int(r.get("sq ft", 0)) if pd.notna(r.get("sq ft")) else 0,
                "bedrooms": int(r.get("bedrooms", 0)) if pd.notna(r.get("bedrooms")) else 0,
                "bathrooms": float(r.get("bathrooms", 0)) if pd.notna(r.get("bathrooms")) else 0.0,
                "latitude": float(r.get("latitude")) if pd.notna(r.get("latitude")) else None,
                "longitude": float(r.get("longitude")) if pd.notna(r.get("longitude")) else None,
                "distance_km": float(r.get("distance_km")) if pd.notna(r.get("distance_km")) else None
            }
            if kind == "rent":
                base["rent"] = int(r.get("rent", 0)) if pd.notna(r.get("rent")) else 0
            else:
                base["price"] = int(r.get("price", 0)) if pd.notna(r.get("price")) else 0
            # If this is a sold comp and date exists, include it
            if "sold_date" in r.index and pd.notna(r.get("sold_date")):
                base["sold_date"] = str(r.get("sold_date"))
            recs.append(base)
        return json.dumps(recs, indent=2)

    rent_json = _records(comps["rent"], "rent") if "rent" in comps else None
    sale_json = _records(comps["sale"], "sale") if "sale" in comps else None
    sold_json = _records(comps["sold"], "sold") if "sold" in comps else None  # NEW

    # 5) Ask Groq to estimate
    client = Groq(api_key=groq_key)

    rent_section = ""
    if rent_json and len(json.loads(rent_json)) > 0:
        system_rent = (
            "You are an excellent real estate agent with a strong analytical mind. "
            "Estimate monthly rent for the target using nearby comps. "
            "Return a comprehensive rationale but dont share any inner chain-of-thought."
        )
        rent_msgs = [
            {"role": "system", "content": system_rent},
            {"role": "user", "content":
                f"Target: {formatted or address}\n"
                f"Beds: {subject_bedrooms}  Baths: {subject_bathrooms}  SqFt: {subject_sqft}\n\n"
                "Nearby rental comps (JSON):\n"
                f"{rent_json}\n\n"
                "Requirements:\n"
                "- Ignore any comp whose address exactly matches the target.\n"
                "- Prioritize similarity (beds/baths/sqft) over raw distance.\n"
                "- Provide an estimate with a narrow range (±5%).\n"
                "- List at least 4 comparable properties if available and explain why each was chosen.\n"
                "- Clearly articulate how these comparable properties influenced your estimate"
            }
        ]
        r = client.chat.completions.create(
            model=groq_model, messages=rent_msgs, temperature=0.2, top_p=0.9, max_tokens=10000, reasoning_effort="medium",
        )
        rent_section = (r.choices[0].message.content or "").strip()

    sale_section = ""
    if (sale_json and len(json.loads(sale_json)) > 0) or (sold_json and len(json.loads(sold_json)) > 0):
        system_sale = (
            "You are an excellent real estate agent with a strong analytical mind. "
            "Estimate the selling price for the target using BOTH nearby active listings and recently SOLD comps. "
            "Analyze SOLD and ACTIVE separately, then reconcile. "
            "Return a comprehensive rationale but do not reveal chain-of-thought."
        )

        # You can tweak these weights as you like
        sold_weight = 0.3
        active_weight = 0.7

        sale_user = f"""
    Target: {formatted or address}
    Beds: {subject_bedrooms}  Baths: {subject_bathrooms}  SqFt: {subject_sqft}

    ACTIVE_LISTINGS_JSON:
    {sale_json or '[]'}

    SOLD_COMPS_JSON:
    {sold_json or '[]'}

    Instructions (follow exactly):
    1) Create two distinct sections. Do NOT mix rows:
      - "SOLD ANALYSIS": choose the 3–6 strongest SOLD comps (closest + most similar beds/baths/sqft). For each, show address, beds, baths, sqft, price, $/sf, distance.
        - Compute SOLD price-per-sf stats: median, low, high.
        - Brief bullets on why each sold comp was chosen.
      - "ACTIVE ANALYSIS": choose the 3–6 strongest ACTIVE listings. Same columns, separate table.
        - Compute ACTIVE price-per-sf stats: median, low, high.
        - Brief bullets on why each active comp was chosen.

    2) Reconciliation (separate section titled "RECONCILIATION & FINAL ESTIMATE"):
      - If BOTH sets exist: compute a blended $/sf = (SOLD_median * {sold_weight:.2f}) + (ACTIVE_median * {active_weight:.2f}).
      - If only one set exists: use that set’s median $/sf (explain).
      - Multiply blended $/sf by subject sqft to get a single point estimate.
      - Provide a +/- 5% band around that number.
      - 2–4 bullets explaining the key adjustments (beds/baths/sqft, recency, finish/condition signals).

    3) Formatting rules:
      - Two separate tables: one for SOLD, one for ACTIVE (omit a table if that set is empty and explicitly say so).
      - Then a short "RECONCILIATION & FINAL ESTIMATE" section with the final dollar estimate and band.
      - Keep it crisp; no chain-of-thought, just conclusions and brief justifications.
    """

        sale_msgs = [
            {"role": "system", "content": system_sale},
            {"role": "user", "content": sale_user}
        ]

        s = client.chat.completions.create(
            model=groq_model,
            messages=sale_msgs,
            temperature=0.2,
            top_p=0.9,
            max_tokens=10000,
        )
        sale_section = (s.choices[0].message.content or "").strip()


    # 6) Final text
    header = (
        f"🏠 Competitive Market Assessment\n"
        f"Subject: {formatted or address}\n"
        f"Beds: {subject_bedrooms} • Baths: {subject_bathrooms} • SqFt: {subject_sqft}\n"
        f"Radius: {radius_miles} mi • Max comps: {max_comps}\n"
        f"ZIP: {zip_code}\n"
        f"(Sale band used: {sale_min_price_k}k–{sale_max_price_k}k)\n"
    )
    parts = [header]
    if rent_section:
        parts.append("\n=== Rent Estimate ===\n")
        parts.append(rent_section)
    if sale_section:
        parts.append("\n=== Sale Estimate (Actives + Sold) ===\n")
        parts.append(sale_section)
    if not rent_section and not sale_section:
        parts.append("\nNo comps could be evaluated.")

    # Save minimal snapshot for later cross-referencing
    CMA_RESULTS.append({
        "address": formatted or address,
        "zip_code": zip_code,
        "beds": subject_bedrooms,
        "baths": subject_bathrooms,
        "sqft": subject_sqft,
        "latitude": lat,
        "longitude": lon,
        "rent_section": rent_section,
        "sale_section": sale_section,
        "timestamp": pd.Timestamp.utcnow().isoformat()
    })
    print(f"[CMA] Saved CMA results for {formatted or address} (ZIP {zip_code})")

    return "".join(parts)

print("✅ CMA increments loaded. (Actives + RECENTLY SOLD comps included.)")


✅ CMA increments loaded. (Actives + RECENTLY SOLD comps included.)


In [16]:
# Cell 3.7: Web Intelligence via compound-beta (simple pass-through)

from typing import Optional, List, Tuple
import time
from textwrap import dedent
from groq import Groq
from langchain_core.tools import tool

# ---- lightweight cache (keyed by query+addr+zip+domains+freshness) ----
_webintel_cache: dict[tuple, tuple[str, float]] = {}

def _norm(s: Optional[str]) -> Optional[str]:
    return s.strip().lower() if isinstance(s, str) else None

def _norm_domains(domains: Optional[List[str]]) -> Optional[Tuple[str, ...]]:
    if not domains:
        return None
    return tuple(sorted({d.strip().lower() for d in domains if d and d.strip()}))

@tool
def web_intelligence_real_estate(
    query: str,
    address: Optional[str] = None,
    zip_code: Optional[str] = None,
    domains: Optional[List[str]] = None,
    freshness_days: int = 7,
    tokens_max: int = 1500,
    cache_ttl_seconds: Optional[int] = 6 * 36000
) -> str:
    """
    Simple pass-through real-estate search using Groq 'compound-beta'.
    Returns EXACTLY the model's text content (no reformatting, no JSON coercion).

    Usage:
      raw_text = web_intelligence_real_estate.invoke({...})
      # If you want reasoning/tools, print them yourself from the raw client response.
    """
    # cache hit?
    key = (_norm(query), _norm(address), _norm(zip_code), _norm_domains(domains), int(freshness_days))
    now = time.time()
    if cache_ttl_seconds is not None and key in _webintel_cache:
        payload, ts = _webintel_cache[key]
        if (now - ts) < cache_ttl_seconds:
            return payload  # return exact cached text

    # minimal system prompt (ask for concise, plain-text answer)
    sys = dedent(f"""
    You are a real-estate web intelligence assistant.
    Answer in clear, plain English with concise facts only.
    Prefer information published within the last {freshness_days} days when applicable.
    Do NOT include extra sections, headers, or JSON. Just answer the user's question directly.
    """).strip()

    # optional guidance for the model (not shown to the user)
    guidance_lines = []
    if address:  guidance_lines.append(f"Address focus: {address}")
    if zip_code: guidance_lines.append(f"ZIP focus: {zip_code}")
    if domains:  guidance_lines.append("Prefer domains: " + ", ".join(domains))
    guidance = ("\n" + "\n".join(guidance_lines)) if guidance_lines else ""

    user = f"{query.strip()}{guidance}"

    try:
        client = Groq(api_key=groq_api_key)
        resp = client.chat.completions.create(
            model="compound-beta",
            messages=[
                {"role": "system", "content": sys},
                {"role": "user", "content": user}
            ],
            temperature=0.0,
            max_tokens=tokens_max,
        )
        content = (resp.choices[0].message.content or "").strip()

        # cache & return exact text
        _webintel_cache[key] = (content, now)
        return content

    except Exception as e:
        # On any error, return a plain string (keeps the contract simple)
        return f"Search failed: {e}"


In [17]:
# Cell 3.8: NPV tool (hardened JSON parse + address canonicalization + inputs source echo)

from typing import Optional, Dict, Any, Tuple, List
import math, json, re, time
import pandas as pd
from groq import Groq
from langchain_core.tools import tool
from difflib import SequenceMatcher

# -------------------------------
# Globals / caches
# -------------------------------
try:
    CMA_RESULTS
except NameError:
    CMA_RESULTS = []

try:
    NPV_RESULTS
except NameError:
    NPV_RESULTS: List[Dict[str, Any]] = []

# -------------------------------
# Small helpers
# -------------------------------
def _extract_json_block(text: str) -> str:
    """Extract a JSON object from plain or ```json fenced``` text."""
    if not text:
        return "{}"
    t = text.strip()
    m = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", t, flags=re.S | re.I)
    if m:
        return m.group(1)
    if t.startswith("{") and t.endswith("}"):
        return t
    i, j = t.find("{"), t.rfind("}")
    if i != -1 and j > i:
        return t[i:j+1]
    return "{}"

# -------------------------------
# Address picking (best address match only)
# -------------------------------
def _norm_addr(s: str) -> str:
    if not s: return ""
    s = s.lower()
    s = re.sub(r'[^a-z0-9 ]+', ' ', s)
    s = re.sub(r'\s+', ' ', s).strip()
    s = s.replace(" street", " st").replace(" avenue", " ave").replace(" boulevard", " blvd")
    s = s.replace(" drive", " dr").replace(" road", " rd").replace(" lane", " ln").replace(" trail", " trl")
    return s

def _addr_similarity(a: str, b: str) -> float:
    return SequenceMatcher(None, _norm_addr(a), _norm_addr(b)).ratio()

def _has_sections(c):
    return int(bool(c.get("rent_section"))) + int(bool(c.get("sale_section")))

def _pick_cma_by_address(target_address: str, min_sim: float = 0.60):
    try:
        rows = list(CMA_RESULTS)
    except NameError:
        return None
    if not rows:
        return None
    scored = []
    for c in rows:
        sim = _addr_similarity(target_address or "", c.get("address", ""))
        secs = _has_sections(c)
        ts = 0.0
        try: ts = pd.to_datetime(c.get("timestamp")).timestamp()
        except Exception: pass
        scored.append((-sim, -secs, -ts, c, sim))
    scored.sort()
    best = scored[0][3]
    best_sim = scored[0][4]
    if best_sim < min_sim:
        best = max(rows, key=lambda c: pd.to_datetime(c.get("timestamp", "1970-01-01")), default=None)
    return best

# -------------------------------
# LLM extractors (small/cheap model, NO regex fallback)
# -------------------------------
def _extract_sale_price_llm(sale_section_text: str, ctx: Dict[str, Any]) -> Optional[float]:
    if not sale_section_text:
        return None
    sys = (
        "You extract ONE number as the best single-point estimate of market sale price "
        "from the provided CMA narrative. Output ONLY JSON: {\"price_usd\": <number>} with no currency symbols."
    )
    hint = f"Address: {ctx.get('address','')} | Beds: {ctx.get('beds')} | Baths: {ctx.get('baths')} | Sqft: {ctx.get('sqft')}"
    user = f"{hint}\n\nCMA Sale Section:\n{sale_section_text}"
    try:
        client = Groq(api_key=groq_api_key)
        resp = client.chat.completions.create(
            model="openai/gpt-oss-20b",
            messages=[{"role":"system","content":sys},{"role":"user","content":user}],
            temperature=0.0,
            max_tokens=80,
        )
        txt = (resp.choices[0].message.content or "").strip()
        data = json.loads(txt)
        val = data.get("price_usd")
        return float(val) if val is not None else None
    except Exception:
        return None

def _extract_rent_llm(rent_section_text: str, ctx: Dict[str, Any]) -> Optional[float]:
    if not rent_section_text:
        return None
    sys = (
        "You extract ONE number as the best single-point estimate of MONTHLY RENT "
        "from the provided CMA narrative. Output ONLY JSON: {\"rent_monthly_usd\": <number>} with no currency symbols."
    )
    hint = f"Address: {ctx.get('address','')} | Beds: {ctx.get('beds')} | Baths: {ctx.get('baths')} | Sqft: {ctx.get('sqft')}"
    user = f"{hint}\n\nCMA Rent Section:\n{rent_section_text}"
    try:
        client = Groq(api_key=groq_api_key)
        resp = client.chat_completions.create(  # for some SDKs this is chat.completions; keep your original if needed
            model="openai/gpt-oss-20b",
            messages=[{"role":"system","content":sys},{"role":"user","content":user}],
            temperature=0.0,
            max_tokens=80,
        )
        txt = (resp.choices[0].message.content or "").strip()
        data = json.loads(txt)
        val = data.get("rent_monthly_usd")
        return float(val) if val is not None else None
    except Exception:
        # If your SDK uses client.chat.completions, revert to that call signature.
        try:
            client = Groq(api_key=groq_api_key)
            resp = client.chat.completions.create(
                model="openai/gpt-oss-20b",
                messages=[{"role":"system","content":sys},{"role":"user","content":user}],
                temperature=0.0,
                max_tokens=80,
            )
            txt = (resp.choices[0].message.content or "").strip()
            data = json.loads(txt)
            val = data.get("rent_monthly_usd")
            return float(val) if val is not None else None
        except Exception:
            return None

# -------------------------------
# One-shot web intel (compound-beta) with robust JSON extraction
# -------------------------------
def _fetch_inputs_compound_beta(address: str, zip_code: Optional[str], state_hint: Optional[str], price_usd: float) -> Dict[str, Any]:
    """
    Single compound-beta call. Returns numeric inputs + a __meta flag telling you
    if the data came from compound-beta or from defaults.
    """
    sys = (
        "You are a real-estate investment inputs assistant. "
        "Return ONLY strict JSON with numeric fields in decimals (not percents) and booleans. "
        "Assume investor-occupied (non-owner) conventional 30-year fixed mortgage."
    )
    user = f"""
Address: {address}
ZIP: {zip_code or ""}
State: {state_hint or ""}

Target purchase price (USD): {price_usd:.2f}

Return JSON with keys exactly:
- mortgage_rate_30y: decimal (e.g., 0.063 for 6.3%), for investment property in the given state.
- property_tax_rate: decimal of value (e.g., 0.018 = 1.8%).
- insurance_annual_usd: number, estimated homeowner's insurance cost per year for a home near this price in this area.
- hoa_monthly_usd: number (0 if none known).
- inflation_rate: decimal (national CPI y/y if unsure).
- appreciation_rate: decimal local home price appreciation trend (city/county best effort).
- condition_ok: boolean; true if public listings suggest typical good condition; else false.

Output ONLY compact JSON, no explanations.
""".strip()

    try:
        client = Groq(api_key=groq_api_key)
        resp = client.chat.completions.create(
            model="compound-beta",
            messages=[{"role":"system","content":sys},{"role":"user","content":user}],
            temperature=0.0,
            max_tokens=300,
        )
        raw = (resp.choices[0].message.content or "").strip()
        txt = _extract_json_block(raw)
        data = json.loads(txt)

        out = {
            "mortgage_rate_30y": float(data.get("mortgage_rate_30y", 0.065)),
            "property_tax_rate": float(data.get("property_tax_rate", 0.018)),
            "insurance_annual_usd": float(data.get("insurance_annual_usd", 1500.0)),
            "hoa_monthly_usd": float(data.get("hoa_monthly_usd", 0.0)),
            "inflation_rate": float(data.get("inflation_rate", 0.03)),
            "appreciation_rate": float(data.get("appreciation_rate", 0.02)),
            "condition_ok": bool(data.get("condition_ok", True)),
            "__meta": {"ok": True, "source": "compound-beta", "raw_len": len(raw)}
        }
        return out
    except Exception as e:
        out = {
            "mortgage_rate_30y": 0.065,
            "property_tax_rate": 0.018,
            "insurance_annual_usd": 1500.0,
            "hoa_monthly_usd": 0.0,
            "inflation_rate": 0.03,
            "appreciation_rate": 0.02,
            "condition_ok": True,
            "__meta": {"ok": False, "source": "defaults", "error": str(e)}
        }
        return out

# -------------------------------
# Finance helpers
# -------------------------------
def _pmt(principal: float, annual_rate: float, years: int) -> float:
    r = annual_rate / 12.0
    n = years * 12
    if r == 0:
        return principal / n
    return principal * (r * (1 + r)**n) / ((1 + r)**n - 1)

def _balance_after(principal: float, annual_rate: float, years: int, months_paid: int) -> float:
    r = annual_rate / 12.0
    N = years * 12
    n = months_paid
    if r == 0:
        return max(principal * (1 - n / N), 0)
    return principal * (((1 + r)**N - (1 + r)**n) / ((1 + r)**N - 1))

def _npv(discount_rate_annual: float, cashflows_by_year: List[float]) -> float:
    r = discount_rate_annual
    total = 0.0
    for t, cf in enumerate(cashflows_by_year, start=1):
        total += cf / ((1 + r) ** t)
    return total

# -------------------------------
# NPV Tool
# -------------------------------
@tool
def investment_npv(
    address: str,
    analysis_years: int = 30,
    # optional overrides for what-if
    override_price_usd: Optional[float] = None,
    override_rent_monthly_usd: Optional[float] = None,
    override_mortgage_rate_30y: Optional[float] = None,   # decimal
    override_property_tax_rate: Optional[float] = None,   # decimal of value
    override_insurance_annual_usd: Optional[float] = None,
    override_hoa_monthly_usd: Optional[float] = None,
    override_inflation_rate: Optional[float] = None,      # decimal
    override_appreciation_rate: Optional[float] = None,
    override_discount_rate_annual: Optional[float] = None
) -> str:
    """
    Compute 30-year NPV and Year 0/1/5/10 cashflow + net return (incl. Y0 & cumulative CF).
    Pull purchase price & rent from the most relevant CMA (address match), then fetch market inputs
    via a single compound-beta call. Assumptions:
      - Down payment: 25%
      - Closing costs (Y0): 2.5% of price
      - Selling costs: 7% of market value at sale
      - Vacancy: 8.33% (1 month/yr)
      - Mgmt: 8% EGI; Maint: 10% EGI; Misc: 5% EGI
      - Taxes: Year-1 = rate × price; then grow by min(CPI, 0.5 × effective appreciation)
      - Insurance (Y1): max(web, 0.5% × price); then grows with CPI
      - HOA: grows at 0.5 × CPI
      - Rent: grows with CPI
      - Discount rate: safe rate by default (3%) unless overridden
      - Improvements (Y0): 0 if condition_ok else 2% of price
      - If (appreciation − inflation) > 2%, after Year 5 set appreciation = (CPI + 1%)
    """

    # --- 1) Pick CMA entry by address (allows override-only mode) ---
    cma = _pick_cma_by_address(address)

    # Minimal ctx usable even without CMA
    cma_ctx = {
        "address": address if not cma else cma.get("address",""),
        "zip_code": None if not cma else cma.get("zip_code"),
        "beds": None if not cma else cma.get("beds"),
        "baths": None if not cma else cma.get("baths"),
        "sqft": None if not cma else cma.get("sqft"),
    }
    sale_text = "" if not cma else (cma.get("sale_section","") or "")
    rent_text = "" if not cma else (cma.get("rent_section","") or "")

    # --- 1a) Canonicalize address via geocoding (preferred over LLM "correction") ---
    geo_key = globals().get("geoapify_api_key")
    canonical_addr, canonical_zip, state_hint = None, None, None
    if geo_key:
        try:
            lat0, lon0, formatted0, postcode0 = _get_subject_geocode_cached(address, geo_key, ttl_seconds=3600)
            if formatted0:
                canonical_addr = formatted0
            if postcode0:
                mzip0 = re.search(r'\b(\d{5})(?:-\d{4})?\b', str(postcode0))
                if mzip0:
                    canonical_zip = mzip0.group(1)
            mstate0 = re.search(r',\s*([A-Z]{2})\b', formatted0 or "")
            if mstate0:
                state_hint = mstate0.group(1)
        except Exception:
            pass

    # Preferred address/ZIP for lookups
    addr_for_parsing = canonical_addr or (cma_ctx.get("address") or address or "")
    if canonical_zip:
        cma_ctx["zip_code"] = canonical_zip
    if not state_hint:
        m = re.search(r',\s*([A-Z]{2})\b', addr_for_parsing)
        if m:
            state_hint = m.group(1)

    # --- 2) Extract purchase price & monthly rent (allow overrides + partials) ---
    price_usd = None
    rent_monthly = None
    if override_price_usd is not None:
        price_usd = float(override_price_usd)
    if override_rent_monthly_usd is not None:
        rent_monthly = float(override_rent_monthly_usd)
    if price_usd is None and sale_text:
        price_usd = _extract_sale_price_llm(sale_text, cma_ctx)
    if rent_monthly is None and rent_text:
        rent_monthly = _extract_rent_llm(rent_text, cma_ctx)

    if price_usd is None or rent_monthly is None:
        if cma is None:
            return ("To run NPV without a CMA, please provide BOTH "
                    "`override_price_usd` and `override_rent_monthly_usd`.")
        missing = []
        if price_usd is None: missing.append("price")
        if rent_monthly is None: missing.append("rent")
        return (f"I couldn’t extract {', '.join(missing)} from the CMA. "
                f"Please pass override(s) for the missing value(s).")

    # --- 3) Market inputs (compound-beta) + overrides ---
    wi = _fetch_inputs_compound_beta(addr_for_parsing, cma_ctx.get("zip_code"), state_hint, price_usd)
    mortgage_rate = override_mortgage_rate_30y if override_mortgage_rate_30y is not None else wi["mortgage_rate_30y"]
    tax_rate      = override_property_tax_rate   if override_property_tax_rate   is not None else wi["property_tax_rate"]
    ins_annual    = override_insurance_annual_usd if override_insurance_annual_usd is not None else wi["insurance_annual_usd"]
    hoa_monthly   = override_hoa_monthly_usd    if override_hoa_monthly_usd    is not None else wi["hoa_monthly_usd"]
    inflation     = override_inflation_rate     if override_inflation_rate     is not None else wi["inflation_rate"]
    appreciation  = override_appreciation_rate  if override_appreciation_rate  is not None else wi["appreciation_rate"]
    condition_ok  = wi["condition_ok"]
    web_meta      = wi.get("__meta", {"ok": False, "source": "unknown"})

    # Discount rate: safe rate default (3%) unless overridden
    discount_rate = 0.03 if override_discount_rate_annual is None else float(override_discount_rate_annual)

    # ---- (4) Assumptions & growth rules ----
    down_pct = 0.25
    closing_costs_pct = 0.025
    selling_costs_pct = 0.07
    vacancy_rate = 1.0/12.0
    mgmt_pct = 0.08
    maint_pct = 0.10
    misc_pct = 0.05

    # Insurance min rule (Year 1 baseline)
    ins_annual = max(ins_annual, 0.005 * price_usd)
    insurance_growth = inflation

    # HOA growth
    hoa_growth = 0.5 * inflation

    # Temper appreciation after Year 5 if spread > 2%
    use_tempered_after5 = (appreciation - inflation) > 0.02
    late_appreciation = (inflation + 0.01) if use_tempered_after5 else appreciation

    # Taxes growth: per-year min(CPI, 0.5 × effective_appreciation_t)
    tax_base_year1 = tax_rate * price_usd

    # Improvements at Y0
    improve_cost = 0.0 if condition_ok else 0.02 * price_usd

    # ---- (5) Loan setup ----
    down_payment = down_pct * price_usd
    closing_costs_y0 = closing_costs_pct * price_usd
    loan_principal = price_usd - down_payment
    monthly_pmt = _pmt(loan_principal, mortgage_rate, 30)
    annual_debt_service = monthly_pmt * 12

    # ---- (6) Year 0 cashflow (initial outlay) ----
    y0_cashflow = -(down_payment + closing_costs_y0 + improve_cost)

    # ---- (7) Annual projections ----
    years = int(analysis_years)
    cf_by_year, mv_by_year, bal_by_year = [], [], []
    egi_by_year, noi_by_year = [], []
    equity_by_year, cum_cf_by_year, net_return_incl_y0_by_year = [], [], []

    rent_annual = rent_monthly * 12.0
    hoa_annual = hoa_monthly * 12.0
    insurance = ins_annual
    value = price_usd
    taxes_prev = tax_base_year1
    cumulative_cf = 0.0

    for t in range(1, years+1):
        app_t = appreciation if t <= 5 else late_appreciation
        value = value * (1 + app_t)
        mv_by_year.append(value)

        egi = rent_annual * (1 - vacancy_rate)
        egi_by_year.append(egi)

        tax_growth_t = min(inflation, 0.5 * app_t)
        if t == 1:
            taxes = taxes_prev
        else:
            taxes = taxes_prev * (1 + tax_growth_t)
        taxes_prev = taxes

        if t > 1:
            insurance = insurance * (1 + insurance_growth)
        hoa = hoa_annual * ((1 + hoa_growth) ** (t-1))

        mgmt = mgmt_pct * egi
        maint = maint_pct * egi
        misc = misc_pct * egi

        op_ex = taxes + insurance + hoa + mgmt + maint + misc
        noi = egi - op_ex
        noi_by_year.append(noi)

        cf = noi - annual_debt_service
        cf_by_year.append(cf)

        rent_annual = rent_annual * (1 + inflation)

        bal = _balance_after(loan_principal, mortgage_rate, 30, t*12)
        bal_by_year.append(bal)
        equity = value - bal
        equity_by_year.append(equity)

        cumulative_cf += cf
        cum_cf_by_year.append(cumulative_cf)
        net_return_incl_y0_by_year.append(equity + cumulative_cf + y0_cashflow)

    # ---- (8) Terminal proceeds if sold end of final year ----
    terminal_mv = mv_by_year[-1]
    terminal_sell_costs = selling_costs_pct * terminal_mv
    terminal_balance = bal_by_year[-1]
    terminal_net_proceeds = terminal_mv - terminal_sell_costs - terminal_balance

    # ---- (9) NPV (discount @ chosen safe rate by default) ----
    npv_stream = cf_by_year[:-1] + [cf_by_year[-1] + terminal_net_proceeds]
    npv_30y = y0_cashflow + _npv(discount_rate, npv_stream)

    # ---- (10) Point-in-time summaries ----
    def _pt(year_idx: int) -> Dict[str, float]:
        i = year_idx - 1
        if i < 0 or i >= years:
            return {"cashflow": None, "equity": None, "paper_total": None,
                    "net_return_incl_y0": None, "net_proceeds_if_sold": None}
        cashflow = cf_by_year[i]
        equity = equity_by_year[i]
        paper_total = cashflow + equity
        net_ret_y0 = net_return_incl_y0_by_year[i]
        net_proceeds = mv_by_year[i] * (1 - selling_costs_pct) - bal_by_year[i]
        return {
            "cashflow": cashflow,
            "equity": equity,
            "paper_total": paper_total,
            "net_return_incl_y0": net_ret_y0,
            "net_proceeds_if_sold": net_proceeds
        }

    y1  = _pt(1)
    y5  = _pt(5)
    y10 = _pt(10)

    # ---- (11) Persist full snapshot for what-ifs ----
    snapshot = {
        "address": addr_for_parsing,  # use canonicalized address if available
        "zip_code": cma_ctx.get("zip_code"),
        "beds": cma_ctx.get("beds"),
        "baths": cma_ctx.get("baths"),
        "sqft": cma_ctx.get("sqft"),
        "inputs": {
            "price_usd": price_usd,
            "rent_monthly_usd": rent_monthly,
            "mortgage_rate_30y": mortgage_rate,
            "property_tax_rate": tax_rate,
            "insurance_annual_usd": ins_annual,   # saved baseline (pre-growth)
            "hoa_monthly_usd": hoa_monthly,
            "inflation_rate": inflation,
            "appreciation_rate": appreciation,
            "down_payment_pct": 0.25,
            "closing_costs_pct_y0": 0.025,
            "selling_costs_pct": 0.07,
            "vacancy_rate": 1.0/12.0,
            "mgmt_pct_of_egi": 0.08,
            "maint_pct_of_egi": 0.10,
            "misc_pct_of_egi": 0.05,
            "hoa_growth": 0.5 * inflation,
            "insurance_growth": insurance_growth,
            "condition_ok": condition_ok,
            "improvement_cost_y0": improve_cost,
            "tempered_after_year5": (appreciation - inflation) > 0.02,
            "late_appreciation_used": (0.01 + inflation) if (appreciation - inflation) > 0.02 else appreciation,
            "discount_rate_annual": discount_rate,
            "web_meta": web_meta
        },
        "derived": {
            "down_payment_usd": down_payment,
            "closing_costs_y0_usd": closing_costs_y0,
            "loan_principal_usd": loan_principal,
            "monthly_pmt_usd": monthly_pmt,
            "annual_debt_service_usd": annual_debt_service,
            "y0_cashflow_usd": y0_cashflow,
            "npv_30y_usd": npv_30y
        },
        "series": {
            "year": list(range(1, years+1)),
            "egi": egi_by_year,
            "noi": noi_by_year,
            "cashflow": cf_by_year,
            "cum_cashflow": cum_cf_by_year,
            "market_value": mv_by_year,
            "loan_balance": bal_by_year,
            "equity": equity_by_year,
            "net_return_incl_y0": net_return_incl_y0_by_year
        },
        "summaries": {
            "year1": y1,
            "year5": y5,
            "year10": y10,
            "terminal_net_proceeds_yearN": terminal_net_proceeds
        },
        "timestamp": pd.Timestamp.utcnow().isoformat()
    }
    NPV_RESULTS.append(snapshot)
    print(f"[NPV] Saved NPV snapshot for {addr_for_parsing}")

    # ---- (12) Human-readable summary ----
    def _fmt(n):
        return f"${n:,.0f}" if n is not None else "n/a"
    pct = lambda x: f"{x*100:.2f}%"

    # Note if we standardized the address
    addr_line = addr_for_parsing
    if _norm_addr(addr_line) != _norm_addr(address):
        addr_line += "  _(standardized from your input)_"

    # Surface where inputs came from
    source_line = f"Inputs source: {web_meta.get('source','unknown')}"
    if not web_meta.get("ok", False):
        source_line += " (fell back to defaults)"

    assumptions = (
        f"- Down payment: 25%  •  Closing costs (Y0): 2.5%  •  Selling costs: 7%\n"
        f"- Vacancy: 8.33%  •  Mgmt: 8% EGI  •  Maint: 10% EGI  •  Misc: 5% EGI\n"
        f"- Taxes: Year-1 = rate × price; thereafter grow by min(CPI, ½×effective appreciation)\n"
        f"- Insurance (Y1) = max(web, 0.5%×price); Insurance ↑ with CPI; HOA ↑ at 0.5×CPI; Rent ↑ with CPI\n"
        f"- If (appreciation − inflation) > 2%: after Year 5, appreciation set to (CPI + 1%)\n"
        f"- Discount rate = {pct(discount_rate)} (safe rate); "
        f"Improvements at Y0: {_fmt(improve_cost)} ({'0%' if improve_cost == 0 else '2% of price'})"
    )

    summary = [
        f"📍 **NPV Analysis for** {addr_line}",
        f"{source_line}",
        f"• Price: {_fmt(price_usd)}  • Rent (mo): {_fmt(rent_monthly)}",
        f"• Mortgage rate (30y): {pct(mortgage_rate)}  • Inflation: {pct(inflation)}  • Appreciation: {pct(appreciation)}",
        f"• Tax rate on value: {pct(tax_rate)}  • Insurance (yr1): {_fmt(max(ins_annual, 0.005*price_usd))}  • HOA (mo, yr1): {_fmt(hoa_monthly)}",
        "",
        f"**Year 0 cash outlay** = Downpayment + Closing + Improvements = {_fmt(-y0_cashflow)}",
        "",
        f"**Cashflow (Income − Expenses incl. debt)**",
        f"• Year 1:  {_fmt(y1['cashflow'])}",
        f"• Year 5:  {_fmt(y5['cashflow'])}",
        f"• Year 10: {_fmt(y10['cashflow'])}",
        "",
        f"**Net Return (incl. Y0 outlay & cumulative cashflows)**",
        f"• Year 1:  {_fmt(y1['net_return_incl_y0'])}",
        f"• Year 5:  {_fmt(y5['net_return_incl_y0'])}",
        f"• Year 10: {_fmt(y10['net_return_incl_y0'])}",
        f"_Paper view (equity + this year's cashflow): Y1 {_fmt(y1['paper_total'])}, Y5 {_fmt(y5['paper_total'])}, Y10 {_fmt(y10['paper_total'])}_",
        "",
        f"• Discount rate used for NPV calculation: {pct(discount_rate)}",
        f"**NPV over {years} years** (terminal sale net of 7% costs at Year {years}): {_fmt(npv_30y)}",
        "",
        "**Assumptions**",
        assumptions
    ]

    return "\n".join(summary)

print("✅ Tool 3.8 `investment_npv` loaded (robust JSON parse, address canonicalization, inputs source echo, safe-rate discount).")


✅ Tool 3.8 `investment_npv` loaded (robust JSON parse, address canonicalization, inputs source echo, safe-rate discount).


In [18]:
# Cell 3.9: Quick Property Discovery (tool) — scrape → kNN → cash ROI
# Depends on your existing UDFs:
#  - scrape_redfin_to_df(...)
#  - _get_superset_cached_df(...)   (optional; falls back if missing)
#
# Notes:
# - LLM parsing inside scrape_redfin_to_df should use llama-3.3-70b-versatile (your default).
# - We only need price & basic specs (sq ft, beds, baths). HOA/taxes not used here.
# - Exposes globals: dfmain (rent comps), homes_for_sale_df_final (sale set).

from typing import List, Dict, Any, Optional
import numpy as np
import pandas as pd
from langchain_core.tools import tool

# sklearn imports
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.neighbors import KNeighborsRegressor
from sklearn.metrics import mean_squared_error, r2_score

# -----------------------------------------------------------------------------
# Globals / caches (safe init)
# -----------------------------------------------------------------------------
try:
    DISCOVERY_RESULTS
except NameError:
    DISCOVERY_RESULTS: List[Dict[str, Any]] = []

def _fmt_money(x):
    try: return f"${float(x):,.0f}"
    except: return str(x)

def _normalize_listing_df(df: pd.DataFrame, mode: str) -> pd.DataFrame:
    """Unify column names & numerics; drop dupes by address; basic sanity filters."""
    if df is None or df.empty:
        return pd.DataFrame()

    df = df.copy()

    # unify sqft column
    if "sq ft" not in df.columns and "sqft" in df.columns:
        df["sq ft"] = df["sqft"]

    # numeric coercion
    numeric_cols = ["sq ft", "bedrooms", "bathrooms"] + (["rent"] if mode == "rent" else ["price"])
    for col in numeric_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    # drop rows with missing core fields
    need = ["sq ft", "bedrooms", "bathrooms"] + (["rent"] if mode == "rent" else ["price"])
    df = df.dropna(subset=[c for c in need if c in df.columns])

    # positive sanity
    if "sq ft" in df.columns: df = df[df["sq ft"] > 0]
    if "rent" in df.columns:  df = df[df["rent"] > 0]
    if "price" in df.columns: df = df[df["price"] > 0]

    # de-dupe by address if present
    if "address" in df.columns:
        df = df.drop_duplicates(subset=["address"])

    return df

def _scrape_sale_with_superset(
    zip_code: str,
    min_price_k: int,
    max_price_k: int,
    max_pages_sale: int,
    cache_ttl_seconds: Optional[int],
):
    """Try 3.4's superset cache first; fall back to direct scrape."""
    final_df = None
    try:
        final_df = _get_superset_cached_df(
            mode="sale",
            zip_code=zip_code,
            min_price_k=min_price_k,
            max_price_k=max_price_k,
            max_pages_to_scrape=max_pages_sale,
            cache_ttl_seconds=cache_ttl_seconds,
        )
    except NameError:
        pass

    if final_df is None:
        final_df = scrape_redfin_to_df(
            mode="sale",
            zip_code=zip_code,
            min_price=min_price_k,
            max_price=max_price_k,
            max_pages_to_scrape=max_pages_sale,
            cache_ttl_seconds=cache_ttl_seconds,
        )
    return final_df

@tool
def discover_candidates_knn(
    zip_code: str,
    min_price_k: int,
    max_price_k: int,
    max_pages_sale: int = 4,
    max_pages_rent: int = 4,
    top_n: int = 10,
    # optional coarse filters for the SALE pool (big-picture prefilter)
    min_beds: int | None = None,
    min_baths: float | None = None,
    min_sqft: int | None = None,
    cache_ttl_seconds: Optional[int] = 60 * 3600
) -> str:
    """
    Big-picture discovery: find 5–10 interesting for-sale properties in a ZIP.
    Steps:
      1) Scrape rentals (train KNN on rent = f(sqft, beds, baths)).
      2) Scrape for-sale homes (within price bounds).
      3) Predict rent for each for-sale home; compute monthly cash ROI = rent / price.
      4) Return top-N by cash ROI (desc). Also saves a snapshot in DISCOVERY_RESULTS.

    Inputs:
      - zip_code: e.g., "78613"
      - min_price_k, max_price_k: integers in thousands (e.g., 300, 700)
      - Optional: min_beds, min_baths, min_sqft to prefilter the SALE pool.
    """
    # -------------------------------
    # 0) Scrape data (rent & sale)
    # -------------------------------
    print(f"[3.9] RENT scrape for {zip_code} (pages={max_pages_rent}) …")
    rent_df_raw = scrape_redfin_to_df(
        mode="rent",
        zip_code=zip_code,
        max_pages_to_scrape=max_pages_rent,
        cache_ttl_seconds=cache_ttl_seconds,
    )
    rent_df = _normalize_listing_df(rent_df_raw, mode="rent")
    if rent_df.empty:
        return f"Couldn’t get usable rental comps for {zip_code}. Try increasing pages or a different ZIP."

    print(f"[3.9] SALE scrape for {zip_code} ${min_price_k}k–${max_price_k}k (pages={max_pages_sale}) …")
    sale_df_raw = _scrape_sale_with_superset(
        zip_code, min_price_k, max_price_k, max_pages_sale, cache_ttl_seconds
    )
    sale_df = _normalize_listing_df(sale_df_raw, mode="sale")
    if sale_df.empty:
        return f"Couldn’t get for-sale listings in {zip_code} between ${min_price_k}k and ${max_price_k}k."

    # Optional coarse filters on SALE pool
    if min_beds is not None and "bedrooms" in sale_df.columns:
        sale_df = sale_df[sale_df["bedrooms"] >= int(min_beds)]
    if min_baths is not None and "bathrooms" in sale_df.columns:
        sale_df = sale_df[sale_df["bathrooms"] >= float(min_baths)]
    if min_sqft is not None and "sq ft" in sale_df.columns:
        sale_df = sale_df[sale_df["sq ft"] >= int(min_sqft)]

    if sale_df.empty:
        return "After applying the sale filters, there are no homes left to score. Try relaxing filters."

    # Expose cleaned tables globally for follow-on use (per your UX)
    globals()["dfmain"] = rent_df.copy()
    globals()["homes_for_sale_df_final"] = sale_df.copy()

    # -------------------------------
    # 1) Prepare training data (rent comps)
    # -------------------------------
    X = rent_df[["sq ft", "bedrooms", "bathrooms"]]
    y = rent_df["rent"].astype(float)

    if len(X) < 20:
        print(f"[3.9] Warning: only {len(X)} rental rows; KNN may be noisy.")

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.30, random_state=42
    )
    if len(X_train) < 3:
        return "Training set too small for KNN/CV. Need more rental comps."

    preprocessor = ColumnTransformer(
        transformers=[("num", StandardScaler(), ["sq ft", "bedrooms", "bathrooms"])]
    )
    X_train_scaled = preprocessor.fit_transform(X_train)
    X_test_scaled  = preprocessor.transform(X_test)

    # -------------------------------
    # 2) Hyperparameter search (KNN)
    # -------------------------------
    cv_folds = max(2, min(10, len(X_train)))
    # cap K by fold size & a hard upper bound
    k_by_fold  = max(1, len(X_train) // cv_folds)
    k_upper    = max(1, min(30, k_by_fold + 1))

    param_grid = {
        "n_neighbors": list(range(1, k_upper + 1)),
        "weights": ["uniform", "distance"],
        "metric": ["euclidean", "manhattan", "chebyshev"],
    }

    grid = GridSearchCV(
        estimator=KNeighborsRegressor(),
        param_grid=param_grid,
        scoring="neg_mean_squared_error",
        cv=cv_folds,
        n_jobs=None
    )
    grid.fit(X_train_scaled, y_train)
    best_knn = grid.best_estimator_

    # Evaluate
    y_pred = best_knn.predict(X_test_scaled)
    mse = float(mean_squared_error(y_test, y_pred))
    r2  = float(r2_score(y_test, y_pred))
    print(f"[3.9] Best KNN: {best_knn.get_params()} | MSE: {mse:,.2f} | R²: {r2:,.3f} "
          f"| Train n={len(X_train)}, Test n={len(X_test)}, CV={cv_folds}")

    # -------------------------------
    # 3) Inference on for-sale homes
    # -------------------------------
    X_new = sale_df[["sq ft", "bedrooms", "bathrooms"]]
    X_new_scaled = preprocessor.transform(X_new)
    rent_pred = best_knn.predict(X_new_scaled)
    sale_df["predicted_rent"] = np.maximum(0, rent_pred).astype(float)

    # Cash ROI (monthly gross yield, %)
    sale_df["cash_roi"] = (sale_df["predicted_rent"] / sale_df["price"]) * 100.0

    # Rank & pick top candidates
    cols_out = ["address", "price", "sq ft", "bedrooms", "bathrooms", "predicted_rent", "cash_roi"]
    df_ranked = (
        sale_df.loc[:, cols_out]
        .sort_values("cash_roi", ascending=False)
        .head(int(top_n))
        .reset_index(drop=True)
    )

    # -------------------------------
    # 4) Snapshot & return
    # -------------------------------
    result_records = df_ranked.to_dict(orient="records")
    snapshot = {
        "tool": "3.9_property_discovery_knn",
        "zip_code": zip_code,
        "price_bounds_k": [int(min_price_k), int(max_price_k)],
        "filters": {"min_beds": min_beds, "min_baths": min_baths, "min_sqft": min_sqft},
        "model": {
            "estimator": "KNeighborsRegressor",
            "best_params": best_knn.get_params(),
            "cv_folds": cv_folds,
            "test_mse": mse,
            "test_r2": r2,
            "train_rows": int(len(X_train)),
            "test_rows": int(len(X_test))
        },
        "inputs": {
            "rent_comps_rows": int(len(rent_df)),
            "sale_candidates_rows": int(len(sale_df))
        },
        "results": result_records,
        "timestamp": pd.Timestamp.utcnow().isoformat()
    }
    DISCOVERY_RESULTS.append(snapshot)
    print(f"[3.9] Saved discovery snapshot. Total runs cached: {len(DISCOVERY_RESULTS)}")

    # Human-friendly + machine-friendly output
    lines = []
    lines.append(f"🔎 **Top {len(df_ranked)} candidates in {zip_code} by monthly cash ROI** "
                 f"(predicted_rent / price × 100) — price range ${min_price_k}k–${max_price_k}k")
    lines.append(f"_KNN test MSE: {mse:,.0f}  •  R²: {r2:,.3f}  •  CV folds: {cv_folds}_\n")
    for i, r in enumerate(result_records, 1):
        lines.append(
            f"{i:>2}. {r['address']}\n"
            f"    Price: {_fmt_money(r['price'])} | {int(r['sq ft'])} sq ft | "
            f"{int(r['bedrooms'])} bd / {int(r['bathrooms'])} ba\n"
            f"    Pred Rent: {_fmt_money(r['predicted_rent'])} | Cash ROI: {r['cash_roi']:.2f}%"
        )

    # JSON payload for programmatic follow-ups
    lines.append("\n```json")
    lines.append(pd.Series({"results": result_records}).to_json())
    lines.append("```")

    return "\n".join(lines)

print("✅ Tool 3.9 `discover_candidates_knn` loaded (scrape→KNN→ROI, tool-callable).")


✅ Tool 3.9 `discover_candidates_knn` loaded (scrape→KNN→ROI, tool-callable).


In [19]:
# Cell 3.FINAL: Create a list of all the tools our agent can use.
tools = [get_zipcode_stats, end_conversation, list_recently_sold_homes, list_homes_for_sale, list_homes_for_rent, competitive_market_assessment, web_intelligence_real_estate, investment_npv, discover_candidates_knn]

In [20]:
# Cell 4: Define agent persona

from langchain_core.messages import SystemMessage

agent_persona = """
You are GRACIE, an AI Realtor Assistant.
You provide market stats, list properties, and explain real-estate trends in a friendly, conversational tone.
You work best with zip codes. If a user provides a city and State, confirm with user the precise zip code. If user unsure then use your knowledge to convert city and state to the most dense zip code.
CRITICAL: If the user needs to do a Competitive market assessment (CMA) and you are unsure of the price band from previous chats, you MUST ask the user for the target competitive price band BEFORE calling the CMA tool.
Use web_intelligence_real_estate only when the user’s question cannot be answered by the other tools (stats, listings, rents, CMA) or when the user asks for definitions, policies, local facts, or citations not available from those tools (e.g., HOA details, school assignments, crime/livability, taxes/insurance, permitting/zoning, “what is a 1031 exchange”). Do not guess—if unsure which tool applies, ask one clarifying question, then call the tool.
CRITICAL: You must use investment_npv EITHER after a CMA is complete OR YOU MUST ASK THE USER TO PROVIDE YOU WITH COMPLETE ADDRESS, ESTIMATED RENT and ESTIMATED PURCHASE PRICE.
If the user asks for good investment candidates in a city, you must ask the user for a target zip code before calling the discover_candidates_knn tool. CRITICAL - in addition to the zip code you MUST ASK THE USER FOR INVESTMENT PRICE RANGE such as between 400K - 600K.
You have access to the following tools:
1. get_zipcode_stats: for general questions about a specific zipcode or city
  Example:
  User: “What is the median sale price in 75034?”
  Assistant: <function=get_zipcode_stats>{"zip_code":"75034", "property_type":"All Residential"}</function>
2. list_recently_sold_homes: for a list of recently sold homes or for any stats around homes sold.
  Example:
  User: “List some homes sold in 75034 between price 400K and 300K.”
  Assistant: <function=list_recently_sold_homes>{"zip_code":"75034","min_price":400000,"max_price":500000}</function>
3. list_homes_for_sale: Scrapes Redfin.com for a list of homes for sale within a given ZIP code and price range. When using this tool, you must provide the full address including city and state and all other relevant information. CRITICAL: before calling this tool, ask the user for the price band.
  Example:
  User: “List homes for sale in 75034 between $400k and $500k.”
  Assistant: <function=list_homes_for_sale>{"zip_code":"75034","min_price":400,"max_price":500}</function>
4. list_homes_for_rent: Scrapes Redfin.com for a list of single-family homes, condos, and townhouses for rent in a given ZIP code. When using this tool, you must provide the full address including city and state and all other relevant information.
  Example:
  User: “List homes for rent in 75034.”
  Assistant: <function=list_homes_for_rent>{"zip_code":"75034"}</function>
5. competitive_market_assessment: Geocode the subject; scrape cached rentals + for-sale; geocode comps; pick nearest N; estimate rent & sale price with Groq using compact prompts. When using this tool, you must use the full address including bedrooms, bathrooms, square footage, estimated price range. If beds/baths/sqft missing/price range → ask once for each missing field. Do NOT proceed if the user does not provide with you the information you need.
  Example:
  User: "should I invest in 123 Main Street, Frisco 75035. It has 3 bedrooms, 2.5 baths, and 1500 sq ft."
  Assistant: <function=competitive_market_assessment>{"address":"123 Main Street, Frisco, TX 75035","subject_bedrooms":3,"subject_bathrooms":2.5,"subject_sqft":1500}</function>
6. web_intelligence_real_estate: Uses Groq 'compound-beta' to answer *real-estate* questions with cited sources.
  Example:
  User: "what is a 1031 exchange?"
  Assistant: <function=web_intelligence_real_estate>{"query":"what is a 1031 exchange?"}</function>
7. investment_npv: Compute 30-year NPV and Year 0/1/5/10 cashflow + total return (cashflow + equity).
  Example:
  User: "How profitable is 123 Main Street, Frisco 75035. It has 3 bedrooms, 2.5 baths, and 1500 sq ft?"
  Assistant: <function=investment_npv>{"address":"123 Main Street, Frisco,75035"}</function>
8. discover_candidates_knn: Provides potential investment opportunities in a target zip code by Scraping Redfin.com for a list of homes for sale within a given ZIP code and price range. When using this tool, you must provide target zip code and investment range (e.g., between 400K - 600K)
  Example:
  User: "Can you find me attractive investment opportunities in 75035?"
  Assistant: <function=discover_candidates_knn>{"zip_code":"75035","min_price_k":400,"max_price_k":600}</function>
9. end_conversation: Call this specific tool when the user indicates they are finished with the conversation. Use this for phrases like 'thank you', 'that's all', 'I'm done', 'thanks for the help', etc.
  Example:
  User: “I'm done.”
  Assistant: <function=end_conversation>{}</function>


When interacting:
- CRITICAL RULE: YOU MUST ALWAYS USE THE TOOL WHEN REQUIRED. IF YOU CANNOT USE THE TOOL DO NOT MAKE UP FACTS. DO NOT HALLUCINATE.
- CRITICAL RULE: YOU MUST ALWAYS PROVIDE SUPPORTING DATA IN ADDITION TO YOUR RESPONSE. This builds trust with the user.
- CRITICAL RULE: After you answer a question, you must have a follow up question. Your follow up question must be limited to the capabilities the tools provide.
- Acceptable Follow up question example 1: if the user asks for stats on zipcode 75024, acceptable follow up questions are "would you like to see some homes for sale in 75024?" or "would you like to see some homes for rent in 75024?"
- Acceptable Follow up question example 2: after you have provided a list of homes for sale to a user, acceptable follow up questions are "would you like to run some competitive assessment for any property you like?"
- Unacceptable Follow up question example: After you have provided competitive assessment do not say "would you like to see pictures?", since you dont have access to any tool that can fetch pictures.
- Ask for clarification on ambiguious requests.
- End gracefully when the user signals end of conversation with no follow up question.
- Keep everything seamless—no mentions of “tools” or “APIs.”
- CRITICAL: do not answer any questions unrelate
"""

system_message = SystemMessage(content=agent_persona)

print("✅ Agent persona defined.")


✅ Agent persona defined.


In [21]:
'''
# Cell 5: Define the state for our graph. It will be a list of messages.
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]

# Initialize the Groq language model
model = ChatGroq(temperature=0.6, model_name="openai/gpt-oss-120b", reasoning_effort="high",groq_api_key=groq_api_key) #llama-3.3-70b-versatile ; llama3-70b-8192; reasoning_effort="medium"

# Bind the tools to the model. This allows the model to "see" the tools and know how to call them.
model_with_tools = model.bind_tools(tools)
'''

'\n# Cell 5: Define the state for our graph. It will be a list of messages.\nclass AgentState(TypedDict):\n    messages: Annotated[Sequence[BaseMessage], operator.add]\n\n# Initialize the Groq language model\nmodel = ChatGroq(temperature=0.6, model_name="openai/gpt-oss-120b", reasoning_effort="high",groq_api_key=groq_api_key) #llama-3.3-70b-versatile ; llama3-70b-8192; reasoning_effort="medium"\n\n# Bind the tools to the model. This allows the model to "see" the tools and know how to call them.\nmodel_with_tools = model.bind_tools(tools)\n'

In [22]:
# Cell 5: Define the state & model
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]

# Initialize the Groq language model
model = ChatGroq(
    model="openai/gpt-oss-120b",
    temperature=0.6,                  # Groq’s recommended balanced default in 0–2 range
    reasoning_effort = "high",  # turn on deeper reasoning
    groq_api_key=groq_api_key,
)

# Bind tools once
model_with_tools = model.bind_tools(tools)


In [23]:
# Cell 6: Build the graph with smart continuation logic

import json
from langchain_core.messages import SystemMessage, AIMessage, HumanMessage, ToolMessage
from langgraph.graph import StateGraph, END

def _compact_history(messages, keep_turns: int = 10, keep_tool_messages: int = 3):
    """
    Keep:
      - the latest SystemMessage (persona),
      - the last `keep_tool_messages` ToolMessage(s),
      - the last `keep_turns` conversation turns (user/assistant).
    This preserves recent context and fresh tool outputs without ballooning tokens.
    """
    # Latest persona
    system_kept = [m for m in messages if isinstance(m, SystemMessage)][-1:]

    # Most recent tool results
    tool_msgs = [m for m in messages if isinstance(m, ToolMessage)]
    tool_kept = tool_msgs[-keep_tool_messages:] if keep_tool_messages > 0 else []

    # Recent conversation messages (exclude system/tool)
    convo = [m for m in messages if not isinstance(m, (SystemMessage, ToolMessage))]
    convo_tail = convo[-keep_turns:]

    return system_kept + tool_kept + convo_tail


# 1) Node that calls the LLM (and may emit a tool call)
def call_model(state: AgentState):
    last_msg = state["messages"][-1]

    # After a tool runs, surface its output once and STOP (single bounce design).
    if isinstance(last_msg, ToolMessage):
        txt = (last_msg.content or "").strip()
        return {"messages": [AIMessage(
            content=txt,
            additional_kwargs={"from_tool_surface": True}  # tag it so router ends the turn
        )]}

    # Otherwise proceed normally (use your wider window)
    messages = _compact_history(state["messages"], keep_turns=10, keep_tool_messages=3)
    response = model_with_tools.invoke(messages)
    return {"messages": [response]}


# 2) Node that executes a tool if the LLM requested one
def call_tool_node(state: AgentState):
    last_message = state["messages"][-1]

    # If the model didn't request a tool, we're done.
    if not getattr(last_message, "tool_calls", None):
        return {}

    tool_msgs = []

    for tc in last_message.tool_calls:
        # Accept both OpenAI/Groq styles: {"name","args"} OR {"function":{"name","arguments"}}
        fn_block = tc.get("function") or {}
        tool_name = tc.get("name") or fn_block.get("name")

        raw_args = tc.get("args")
        if raw_args is None:
            raw_args = fn_block.get("arguments")

        # Normalize arguments to a dict (model may send JSON string)
        if isinstance(raw_args, str):
            try:
                tool_args = json.loads(raw_args) if raw_args.strip() else {}
            except Exception:
                tool_args = {}
        elif isinstance(raw_args, dict):
            tool_args = raw_args
        else:
            tool_args = {}

        tool_id = tc.get("id") or tc.get("tool_call_id") or f"{tool_name or 'unknown'}_call"

        if not tool_name:
            tool_msgs.append(
                ToolMessage(
                    content="ERROR: Tool name missing in tool call.",
                    tool_call_id=tool_id,
                    name="__tool_error__",
                    additional_kwargs={"name": "__tool_error__"},
                )
            )
            continue

        # Find the tool by name (assumes `tools` is defined globally)
        tool = next((t for t in tools if getattr(t, "name", None) == tool_name), None)
        if tool is None:
            tool_msgs.append(
                ToolMessage(
                    content=f"ERROR: Tool '{tool_name}' not found.",
                    tool_call_id=tool_id,
                    name=tool_name,
                    additional_kwargs={"name": tool_name},
                )
            )
            continue

        # Execute the tool safely
        try:
            result = tool.invoke(tool_args or {})
            content = str(result)
        except Exception as e:
            content = f"ERROR while running '{tool_name}': {e}"

        # Include the tool name (helps certain backends route correctly)
        tool_msgs.append(
            ToolMessage(
                content=content,
                tool_call_id=tool_id,
                name=tool_name,
                additional_kwargs={"name": tool_name},
            )
        )

    return {"messages": tool_msgs}


# 3) Smart continuation function (single bounce after tool result)
def should_continue(state: AgentState) -> str:
    last_msg = state["messages"][-1]

    # If we just surfaced a tool result, end the turn immediately.
    if isinstance(last_msg, AIMessage) and getattr(last_msg, "additional_kwargs", {}).get("from_tool_surface"):
        return "end"

    # If the model has requested a tool, go execute it.
    if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
        return "continue"

    # If the last thing is a tool result (pre-surface), bounce back once.
    if isinstance(last_msg, ToolMessage):
        return "continue"

    # Otherwise stop and wait for the user.
    return "end"


# ---- Build & compile the graph ----
workflow = StateGraph(AgentState)
workflow.add_node("agent", call_model)
workflow.add_node("tool", call_tool_node)

workflow.set_entry_point("agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {"continue": "tool", "end": END},
)

# Always loop back after tool → agent (so the agent can surface the tool output once)
workflow.add_edge("tool", "agent")

app = workflow.compile()



In [24]:
# Cell 7: run interactive chat

def run_interactive_chat():
    """
    Initiates a stateful, interactive chat session with the realtor agent.
    The agent will remember context, follow its persona, and detect when to end the conversation.
    """
    disclaimer = (
        "⚠️  Important disclaimer\n"
        "GRACIE provides information for educational purposes only and does not constitute\n"
        "legal, tax, financial, or real estate advice. Always consult licensed professionals —\n"
        "a realtor/agent, attorney, CPA/tax advisor, lender, and/or insurance professional —\n"
        "before making decisions. Data and estimates may be incomplete or out of date.\n"
        "By continuing to use GRACIE, you explicitly consent to this disclaimer."
    )
    print(disclaimer)
    print("-" * 50)
    print("🤖 Hello! I am GRACIE your AI Realtor Assistant.")
    print("I can help with real estate stats and investment opportunities.")
    print("-" * 50)

    # Initialize conversation history with the system prompt
    conversation_history = [system_message]

    while True:
        user_input = input("You: ")

        conversation_history.append(HumanMessage(content=user_input))

        inputs = {"messages": conversation_history}
        # Invoke the agent graph (recursion_limit raised to 10 to avoid premature cutoff)
        result = app.invoke(inputs, {"recursion_limit": 10})

        # --- ROBUST CHECK FOR CONVERSATION END ---
        conversation_over = False
        for message in result["messages"]:
            if hasattr(message, "tool_calls") and message.tool_calls:
                for tool_call in message.tool_calls:
                    if tool_call["name"] == "end_conversation":
                        print("Ending Conversation")
                        conversation_over = True
                        break
            if conversation_over:
                break
        # --- END OF ROBUST CHECK ---

        # Keep the entire updated message state (includes ToolMessage(s))
        conversation_history = result["messages"]

        # The final response from the AI is the last AIMessage in the result (fallback: last message)
        final_response_message = None
        for msg in reversed(result["messages"]):
            if getattr(msg, "type", None) in ("ai",) or msg.__class__.__name__ == "AIMessage":
                final_response_message = msg
                break
        if final_response_message is None:
            final_response_message = result["messages"][-1]

        if getattr(final_response_message, "content", None):
            print(f"GRACIE: {final_response_message.content}")

        print("-" * 50)

        # Exit the loop if the end_conversation tool was called
        if conversation_over:
            break


In [25]:
# --- Start the interactive chat session ---
run_interactive_chat()

⚠️  Important disclaimer
GRACIE provides information for educational purposes only and does not constitute
legal, tax, financial, or real estate advice. Always consult licensed professionals —
a realtor/agent, attorney, CPA/tax advisor, lender, and/or insurance professional —
before making decisions. Data and estimates may be incomplete or out of date.
By continuing to use GRACIE, you explicitly consent to this disclaimer.
--------------------------------------------------
🤖 Hello! I am GRACIE your AI Realtor Assistant.
I can help with real estate stats and investment opportunities.
--------------------------------------------------
You: bye
Ending Conversation
GRACIE: Conversation ended by user.
--------------------------------------------------


In [30]:
'''
Next steps
1 - adjust system prompt to create good follow up question (**done**)
2 - from CMA, return (address, zipcode, latitude, longitude, beds, baths, sqft, rent, sell_price) tuple (**done**)
3 - guard rail tool to limit chat to real estate convos - else hard close chat. 3 strikes and you are out (**done**)
4 - model="compound-beta" to find insurance and property tax information. Prophet for predicting growth. Use this for vargas analysis tool (**done**)
5 - tool to create trend graphs (tool)
6 - caching geocode results (**done**)
7 - general chat engine (**done**)
8 - find best investment opportunities (**done**)
'''

'\nNext steps\n1 - adjust system prompt to create good follow up question (**done**)\n2 - from CMA, return (address, zipcode, latitude, longitude, beds, baths, sqft, rent, sell_price) tuple (**done**)\n3 - guard rail tool to limit chat to real estate convos - else hard close chat. 3 strikes and you are out (tool)\n4 - model="compound-beta" to find insurance and property tax information. Prophet for predicting growth. Use this for vargas analysis tool (tool)\n5 - tool to create trend graphs (tool)\n6 - caching geocode results (**done**)\n7 - general chat engine (**done**)\n8 - find best investment opportunities (tool)\n'