In [1]:
import sys
import glob
import os
import re
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from matplotlib.ticker import FuncFormatter, MaxNLocator
import numpy as np
from datetime import datetime
from typing import Dict, Optional, List
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
gen_lookup = pd.read_csv("/content/drive/Shareddrives/market_analysis_v2/gen_lookup.csv")

In [4]:
import re
import pandas as pd
from typing import Dict, Optional, List

# ---------- Constants for CS (Carsales/General Scrapes) ----------
YEAR_MIN, YEAR_MAX = 1980, 2035
ORDER: List[str] = ['href', 'year_make_model', 'trim', "listed_price", 'transmission', 'odometer', 'seller_type']

YEAR_RE  = r'\b(19[89]\d|20[0-3]\d)\b'
PRICE_RE = r'^\s*\$\s*[\d,]+(?:\.\d{2})?\b'
ODOM_RE  = r'^\s*\d{1,3}(?:,\d{3})+\s*km\s*$'
URL_RE   = r'^(?:https?://|www\.)'
TX, SELLER = {'automatic', 'manual'}, {'private', 'dealer used'}

THRESH: Dict[str, float] = {
    'year_make_model': 0.50,
    "listed_price":           0.60,
    'transmission':    0.80,
    'odometer':        0.60,
    'seller_type':     0.70,
}

# ---------- Constants for FB (Facebook Marketplace Scrapes) ----------
FB_ORDER: List[str] = ['href', 'year_make_model', 'listed_price', 'odometer', 'location']
# THRESH_FB values are used for identifying FB specific columns.
# 'href' and 'year_make_model' are critical for data structuring.
# 'listed_price' and 'odometer' are numeric and generally have clear patterns.
# 'location' is the hardest to identify solely by content, so it has a lower threshold
# and is often inferred from remaining columns or specific column names like 'c'.
THRESH_FB: Dict[str, float] = {
    'href':            0.80,
    'year_make_model': 0.50,
    'listed_price':    0.60,
    'odometer':        0.60,
    'location':        0.40,
}

# ---------- Predicates ----------
def _ratio(mask: pd.Series) -> float:
    return float(mask.mean()) if len(mask) else 0.0

def _yr_ok(s: pd.Series) -> pd.Series:
    years = pd.to_numeric(s.astype(str).str.extract(YEAR_RE, expand=False), errors='coerce')
    return years.between(YEAR_MIN, YEAR_MAX)

PRED = {
    'year_make_model': lambda s: s.astype(str).pipe(_yr_ok) & s.astype(str).str.contains(r'[A-Za-z]', na=False),
    "listed_price":           lambda s: s.astype(str).str.match(PRICE_RE, na=False),
    'transmission':    lambda s: s.astype(str).str.strip().str.lower().isin(TX),
    'odometer':        lambda s: s.astype(str).str.match(ODOM_RE, flags=re.I, na=False),
    'seller_type':     lambda s: s.astype(str).str.strip().str.lower().isin(SELLER),
}

PRED_FB = {
    'href':            lambda s: s.astype(str).str.contains(URL_RE, case=False, na=False),
    'year_make_model': lambda s: s.astype(str).pipe(_yr_ok) & s.astype(str).str.contains(r'[A-Za-z]', na=False),
    'listed_price':    lambda s: s.astype(str).str.match(PRICE_RE, na=False),
    'odometer':        lambda s: s.astype(str).str.match(ODOM_RE, flags=re.I, na=False),
}

# ---------- Core Identification Functions ----------
def identify_columns(df: pd.DataFrame) -> Dict[str, Optional[str]]:
    """Identify and map each canonical Carsales/General column."""
    cols = list(df.columns)
    if not cols:
        return {k: None for k in ORDER}

    href_col = cols[0]

    # exclude URL-like columns from other detection
    url_ratio = {c: _ratio(df[c].astype(str).str.contains(URL_RE, case=False, na=False)) for c in cols}
    urlish = {c for c, r in url_ratio.items() if r >= 0.50}
    blocked = {href_col} | urlish

    remaining = [c for c in cols if c not in blocked]
    picks = {t: None for t in PRED}

    for t in PRED:
        if not remaining:
            break
        scores = {c: _ratio(PRED[t](df[c])) for c in remaining}
        best_col, best_score = max(scores.items(), key=lambda kv: kv[1])
        if best_score >= THRESH[t]:
            picks[t] = best_col
            remaining.remove(best_col)

    trim_col = None
    ymm = picks.get('year_make_model')
    if ymm in cols:
        i = cols.index(ymm)
        if i + 1 < len(cols):
            trim_col = cols[i + 1]

    return {'href': href_col, **picks, 'trim': trim_col}

def identify_fb_columns(df: pd.DataFrame) -> Dict[str, Optional[str]]:
    """Identify and map each canonical Facebook Marketplace column."""
    cols = list(df.columns)
    if not cols:
        return {k: None for k in FB_ORDER}

    picks = {t: None for t in FB_ORDER}
    remaining = set(cols)

    # Prioritize 'href'
    href_scores = {c: _ratio(PRED_FB['href'](df[c])) for c in remaining}
    best_href_col, best_href_score = None, 0.0
    if href_scores:
        best_href_col, best_href_score = max(href_scores.items(), key=lambda kv: kv[1])

    if best_href_score >= THRESH_FB['href']:
        picks['href'] = best_href_col
        remaining.remove(best_href_col)
    elif 'x1i10hfl href' in remaining and _ratio(PRED_FB['href'](df['x1i10hfl href'])) >= THRESH_FB['href']:
        # Fallback to specific column name if it exists and matches pattern well
        picks['href'] = 'x1i10hfl href'
        remaining.remove('x1i10hfl href')


    # Identify 'year_make_model', 'listed_price', 'odometer'
    for t in ['year_make_model', 'listed_price', 'odometer']:
        if not remaining:
            break
        scores = {c: _ratio(PRED_FB[t](df[c])) for c in remaining}
        if scores:
            best_col, best_score = max(scores.items(), key=lambda kv: kv[1])
            if best_score >= THRESH_FB[t]:
                picks[t] = best_col
                remaining.remove(best_col)

    # Assign 'location'
    if picks['location'] is None:
        if 'c' in remaining: # The common FB Marketplace location column name
            picks['location'] = 'c'
            remaining.remove('c')
        elif len(remaining) == 1: # If only one column left, it's likely location
            picks['location'] = remaining.pop()

    return picks


# ---------- Cleaning Functions ----------
def clean_cs(df: pd.DataFrame) -> pd.DataFrame:
    """Detect, rename, clean numeric/text data, and return standardized columns."""
    mapping = identify_columns(df)
    out = pd.DataFrame()

    # Map columns
    if mapping['href'] is not None:
        out['href'] = df[mapping['href']]
    for col in ['year_make_model', 'trim', "listed_price", 'transmission', 'odometer', 'seller_type']:
        src = mapping.get(col)
        if src is not None:
            out[col] = df[src]

    # Split "year make model"
    if 'year_make_model' in out.columns:
        split_cols = out['year_make_model'].astype(str).str.split(expand=True, n=2)
        split_cols.columns = ['year', 'make', 'model']
        out = pd.concat([out, split_cols], axis=1)

    # Clean hrefs (remove query strings)
    if 'href' in out.columns:
        out['href'] = out['href'].astype(str).str.split('?').str[0]

    # Clean numeric columns
    for col in ["listed_price", 'odometer']:
        if col in out.columns:
            out[col] = (
                out[col].astype(str)
                .replace(r'[^\d]', '', regex=True) # Corrected regex
                .replace('', pd.NA)
                .astype(float)
                .astype('Int64')
            )

    # Convert odometer to thousands of km
    if 'odometer' in out.columns:
        out['odometer'] = out['odometer'] // 1000

    # Build final tidy table
    final_cols = ['href', 'year', 'make', 'model', "listed_price", 'trim', 'odometer', 'seller_type']
    return out[[c for c in final_cols if c in out.columns]]

def clean_fb(df: pd.DataFrame) -> pd.DataFrame:
    """Detect, rename, clean numeric/text data, and return standardized columns for Facebook Marketplace."""
    mapping = identify_fb_columns(df)
    out = pd.DataFrame() # Initialize an empty DataFrame to store cleaned data

    # Map columns based on identified mapping
    # Only create columns in 'out' if they were successfully mapped and exist in original df
    for canonical_col, src_col in mapping.items():
        if src_col is not None and src_col in df.columns:
            out[canonical_col] = df[src_col]

    # Split the year_make_model column into 'year', 'make', 'model'
    if 'year_make_model' in out.columns:
        split_df = out['year_make_model'].astype(str).str.split(expand=True, n=2)
        # Assign split parts to new columns, handling cases where parts might be missing
        out['year'] = split_df[0] if 0 in split_df.columns else pd.NA
        out['make'] = split_df[1] if 1 in split_df.columns else pd.NA
        out['model'] = split_df[2] if 2 in split_df.columns else pd.NA
    else:
        # If 'year_make_model' was not found, initialize year/make/model to NA
        out[['year', 'make', 'model']] = pd.NA

    # Clean hrefs (remove query strings)
    if 'href' in out.columns:
        out['href'] = out['href'].astype(str).str.split('?').str[0]

    # Clean numeric columns: listed_price, odometer
    for col in ["listed_price", 'odometer']:
        if col in out.columns:
            # Handle 'Free' for listed_price specifically before conversion
            if col == 'listed_price':
                out = out[out[col].astype(str).str.lower() != "free"]

            out[col] = (
                out[col].astype(str)
                .replace(r'[^\d]', '', regex=True) # Remove non-digit characters
                .replace('', pd.NA) # Replace empty strings with NA
                .astype(float) # Convert to float first to handle NA
                .astype('Int64') # Convert to nullable integer type
            )

    # Removing listings with null values for essential columns
    cols_to_check_for_na = []
    if 'listed_price' in out.columns: cols_to_check_for_na.append('listed_price')
    if 'odometer' in out.columns: cols_to_check_for_na.append('odometer')
    if 'year' in out.columns: cols_to_check_for_na.append('year')

    if cols_to_check_for_na:
        out = out.dropna(subset=cols_to_check_for_na)

    # Remove crashed listings (magic numbers, ideally these would be parameters)
    if 'listed_price' in out.columns:
        out = out[out["listed_price"] != 1234]
        out = out[out["listed_price"] != 12345]

    # Select only the required columns in order
    final_columns = ['href', 'year', 'make', 'model', "listed_price", 'odometer', 'location']
    return out[[c for c in final_columns if c in out.columns]]

def enrich_df(df: pd.DataFrame) -> pd.DataFrame:
    """Final clean after clean_cs or clean_fb."""

    # --- 1. Add date_scraped ---
    df["date_scraped"] = pd.Timestamp.now().normalize()

    # --- 2. Normalise make & model ---
    for col in ["make", "model"]:
        if col in df.columns:
            df[col] = (
                df[col]
                .astype(str)
                .str.lower()
                .str.replace(r"[^a-z0-9]+", "", regex=True)
            )

    # --- 3. Ensure year is numeric ---
    if "year" in df.columns:
        df["year"] = pd.to_numeric(df["year"], errors="coerce").astype("Int64")

    # --- 4. Assign generation manually (no merge, no year_start/year_end contamination) ---
    df["gen"] = pd.NA

    for idx, row in gen_lookup.iterrows():
        mask = (
            (df["make"] == row["make"]) &
            (df["model"] == row["model"]) &
            (df["year"].between(row["year_start"], row["year_end"], inclusive="both"))
        )
        df.loc[mask, "gen"] = row["gen"]

    df["gen"] = df["gen"].astype("Int64")

    # --- 5. Create model_gen ---
    df["model_gen"] = df.apply(
        lambda r: f"{r['model']}_{r['gen']}" if pd.notna(r["gen"]) else None,
        axis=1
    )

    return df