In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from linearmodels.panel import PanelOLS


In [None]:
# Global Settings
DIST_COL = "d_net_min_km"    
ID_COL   = "lsoa21cd"
DATE_COL = "date"
COHORT_COL = "announcement_year"  
T1, T2 = 1.0, 2.0                  
BASE_CTRLS = ["share_detached","share_semi_detached","share_terraced","share_flat","total_sales","pop_density"]


# Read and preprocessing
panel = pd.read_csv("output/Final_panel_data.csv", dtype={ID_COL: str})
panel[DATE_COL] = pd.to_datetime(panel[DATE_COL])

# pop density
if "area_sq_km" not in panel.columns:
    if "Area Sq Km" in panel.columns:
        panel["area_sq_km"] = panel["Area Sq Km"]
    else:
        raise ValueError("Can't find the area")

panel["pop_density"] = panel["population"] / panel["area_sq_km"]
panel.loc[~np.isfinite(panel["pop_density"]), "pop_density"] = np.nan

# Treat vs Control
panel["treat"]   = (panel[DIST_COL] <= T1).astype(int)
panel["control"] = (panel[DIST_COL] >= T2).astype(int)



def cohort_size_table(df):
    """Statistics by announcement year: Number of "deduplicated" LSOA in the treatment group."""
    tmp = df.loc[df["treat"]==1, [ID_COL, COHORT_COL]].drop_duplicates()
    tab = tmp.groupby(COHORT_COL)[ID_COL].nunique().reset_index()
    tab.columns = ["cohort", "treated_LSOA"]
    tab = tab.sort_values("cohort")
    return tab

def plot_cohort_size(tab, cutoff=50, title="Cohort Treated LSOA Size"):
    plt.figure(figsize=(8,5))
    plt.bar(tab["cohort"], tab["treated_LSOA"], color="skyblue")
    plt.axhline(cutoff, color="red", linestyle="--", label=f"threshold={cutoff}")
    plt.xlabel("Announcement year (cohort)")
    plt.ylabel("Treated LSOA (unique)")
    plt.title(title)
    plt.legend()
    plt.tight_layout()
    plt.show()

def build_cohort_panel(df, cohort, window=8):
    
    sub = df[df[COHORT_COL] == cohort].copy()
    if sub.empty:
        return pd.DataFrame()

    sub = sub[(sub["treat"]==1) | (sub["control"]==1)].copy()

    event_ts = pd.Timestamp(f"{int(cohort)}-03-01")
    sub["event_time"] = event_ts

    # Relative quarterly window
    sub["date_q"]  = pd.PeriodIndex(sub[DATE_COL], freq="Q")
    sub["event_q"] = pd.PeriodIndex(sub["event_time"], freq="Q")
    sub["k"] = sub["date_q"].astype(int) - sub["event_q"].astype(int)
    sub = sub[(sub["k"]>=-window) & (sub["k"]<=window)].copy()

    # log median price
    sub = sub[sub["median_price"]>0].copy()
    sub["log_price"] = np.log(sub["median_price"])

    return sub

def stacked_event_study(df, window=8, cohorts_filter=None, plot_title_suffix="", do_wald=True):
    # 1) Select the cohort collection
    cohorts_all = sorted(df[COHORT_COL].dropna().unique())
    if cohorts_filter is not None:
        allow = set(int(c) for c in cohorts_filter)
        cohorts = [int(c) for c in cohorts_all if int(c) in allow]
    else:
        cohorts = [int(c) for c in cohorts_all]

    # 2) Subsamples are constructed cohort by cohort and stacked
    pieces = []
    for c in cohorts:
        sub = build_cohort_panel(df, c, window=window)
        if not sub.empty:
            pieces.append(sub)
    if not pieces:
        print("There is no stackable cohort sample.")
        return None, None, None

    stacked = pd.concat(pieces, ignore_index=True)

    # 3) Event period dummy (treatment group only)
    k_min, k_max = -window, window
    for kk in range(k_min, k_max + 1):
        if kk == -1:
            continue
        col = f"ev_{kk}"
        stacked[col] = ((stacked["k"] == kk) & (stacked["treat"] == 1)).astype(int)

    ev_cols = [f"ev_{kk}" for kk in range(k_min, k_max + 1) if kk != -1]

    # 4) Set up two levels of indexes, and then take Y/X and go to PanelOLS
    stacked = stacked.set_index([ID_COL, DATE_COL]).sort_index()

    # Control variables: Only those that exist and are nonconstant are kept
    use_ctrls = [c for c in BASE_CTRLS if (c in stacked.columns) and (stacked[c].nunique(dropna=True) > 1)]
    use_exog_cols = [c for c in ev_cols if c in stacked.columns] + use_ctrls
    if not use_exog_cols:
        print(" No independent variables available (event terms may be fully absorbed).")
        return None, None, None

    Y = stacked["log_price"]
    X = stacked[use_exog_cols]

    # 5) Panel regression
    mod = PanelOLS(Y, X, entity_effects=True, time_effects=True, drop_absorbed=True)
    res = mod.fit(cov_type="clustered", cluster_entity=True)

    # 6) Wald Test（k ≤ -2）
    wald = None
    if do_wald:
            actual_ev = [v for v in ev_cols if v in res.params.index]
            pre_vars = [v for v in actual_ev if int(v.split("_")[1]) <= -2]
    if pre_vars:
            try:
                import numpy as np
                idx = res.params.index
                # Each parameter to be tested corresponds to a row of unit vectors
                R = np.vstack([np.eye(len(idx))[idx.get_loc(v)] for v in pre_vars])
                q = np.zeros(len(pre_vars))
                wald = res.wald_test(R, q)
                print(f"[Wald] Parrallel：chi2={wald.stat:.2f}, p={wald.pval:.3g}")
            except Exception as e:
                print(f" Wald failed：{e}")
    else:
            print(" There are no prior coefficients available for parallel trend testing.")


    # 7) Plot
    actual_ev = [v for v in ev_cols if v in res.params.index]
    if not actual_ev:
        print(" The event period coefficients are all absorbed and cannot be plotted.")
        return res, wald, None

    betas = res.params.loc[actual_ev]
    ci_df = res.conf_int().loc[actual_ev]
    
    if "lower" in ci_df.columns:
        lower, upper = ci_df["lower"].values, ci_df["upper"].values
    else:
        lower, upper = ci_df.iloc[:, 0].values, ci_df.iloc[:, 1].values

    ks = np.array([int(v.split("_")[1]) for v in actual_ev])
    order = np.argsort(ks)
    x, y = ks[order], betas.values[order]
    l, u = lower[order], upper[order]

    fig = plt.figure(figsize=(8, 5.2))
    plt.plot(x, y, marker="o", label="Estimate")
    plt.fill_between(x, l, u, alpha=0.25, label="95% CI")
    plt.axhline(0, color="black", lw=1)
    plt.axvline(0, color="red", ls="--", label="Event (k=0)")
    ttl = "Stacked Event Study"
    if plot_title_suffix:
        ttl += f" — {plot_title_suffix}"
    plt.title(ttl)
    plt.xlabel("Quarters relative to announcement")
    plt.ylabel("Effect on log(price)")
    plt.legend()
    plt.tight_layout()
    plt.show()

    return res, wald, fig


# 1) Cohort Sample size (deduplicating LSOA)
tab_full = cohort_size_table(panel)
print("Cohort sizes (Full sample):\n", tab_full)

# Plot distribution
plot_cohort_size(tab_full, cutoff=50, title="Cohort Treated LSOA Size — Full sample")


# 2) Main result: full coverage, window ±8
res_full, wald_full, _ = stacked_event_study(
    panel, window=8, cohorts_filter=None, plot_title_suffix="All cohorts (no filter)", do_wald=True
)


# 3) Robustness A: screening (each cohort handles LSOA ≥ 50) with a window of ±8

allowed = set(tab_full.loc[tab_full["treated_LSOA"]>=50, "cohort"])
tab_flt = tab_full[tab_full["cohort"].isin(allowed)].copy()
print("Cohort sizes (Filtered):\n", tab_flt)


plot_cohort_size(tab_flt, cutoff=50, title="Cohort Treated LSOA Size — Filtered (≥50)")

res_flt, wald_flt, _ = stacked_event_study(
    panel, window=8, cohorts_filter=allowed, plot_title_suffix="Cohort treated≥50", do_wald=True
)

# Overlay main results vs filter results 
def overlay_two_curves(resA, labelA, resB, labelB, title="Robustness: cohort size filter"):
    def extract_xy(rr):
        evs = [v for v in rr.params.index if v.startswith("ev_")]
        ks = np.array([int(v.split("_")[1]) for v in evs])
        order = np.argsort(ks)
        y = rr.params.loc[evs].values[order]
        ci = rr.conf_int().loc[evs]
        if "lower" in ci.columns:
            l, u = ci["lower"].values[order], ci["upper"].values[order]
        else:
            l, u = ci.iloc[:,0].values[order], ci.iloc[:,1].values[order]
        return ks[order], y, l, u

    x1, y1, l1, u1 = extract_xy(resA)
    x2, y2, l2, u2 = extract_xy(resB)

    plt.figure(figsize=(8,5.2))
    plt.plot(x1, y1, marker="o", label=labelA, alpha=0.9)
    plt.fill_between(x1, l1, u1, alpha=0.15)
    plt.plot(x2, y2, marker="s", label=labelB, alpha=0.9)
    plt.fill_between(x2, l2, u2, alpha=0.15)
    plt.axhline(0, color="black", lw=1)
    plt.axvline(0, color="red", ls="--", label="Event (k=0)")
    plt.xlabel("Quarters relative to announcement")
    plt.ylabel("Effect on log(price)")
    plt.title(f"Stacked Event Study — {title}")
    plt.legend()
    plt.tight_layout()
    plt.show()

overlay_two_curves(res_full,  "Full sample",
                   res_flt,   "Filtered (treated≥50)",
                   title="Robustness: Cohort size filter")


# 4) Robustness B: expanded window ±12 (full sample)

res_full_w12, wald_full_w12, _ = stacked_event_study(
    panel, window=12, cohorts_filter=None, plot_title_suffix="All cohorts, window ±12", do_wald=True
)


In [None]:

DIST_COL   = "d_net_min_km"    
ID_COL     = "lsoa21cd"
DATE_COL   = "date"
COHORT_COL = "announcement_year"
T1, T2     = 1.0, 2.0
BASE_CTRLS = ["share_detached","share_semi_detached","share_terraced",
              "share_flat","total_sales","pop_density"]


panel = pd.read_csv("output/Final_panel_data.csv", dtype={ID_COL: str})
panel[DATE_COL] = pd.to_datetime(panel[DATE_COL])


if "area_sq_km" not in panel.columns and "Area Sq Km" in panel.columns:
    panel["area_sq_km"] = panel["Area Sq Km"]
panel["pop_density"] = panel["population"] / panel["area_sq_km"]
panel.loc[~np.isfinite(panel["pop_density"]), "pop_density"] = np.nan

panel["treat"] = (panel[DIST_COL] <= T1).astype(int)

# A) Control = same cohort and ≥T2
def build_cohort_panel_outer(df, cohort, window=8):
    sub = df[df[COHORT_COL] == cohort].copy()
    if sub.empty:
        return pd.DataFrame()
    # Control = same cohort and ≥T2; The treatment =≤T1
    sub["control_outer"] = (sub[DIST_COL] >= T2).astype(int)
    sub = sub[(sub["treat"]==1) | (sub["control_outer"]==1)].copy()

    event_ts = pd.Timestamp(f"{int(cohort)}-03-01")
    sub["event_time"] = event_ts
    sub["date_q"]  = pd.PeriodIndex(sub[DATE_COL], freq="Q")
    sub["event_q"] = pd.PeriodIndex(sub["event_time"], freq="Q")
    sub["k"] = sub["date_q"].astype(int) - sub["event_q"].astype(int)
    sub = sub[(sub["k"]>=-window) & (sub["k"]<=window)]
    sub = sub[sub["median_price"]>0].copy()
    sub["log_price"] = np.log(sub["median_price"])
    return sub

def run_stacked_es_outer(df, window=8):
    cohorts = sorted(df[COHORT_COL].dropna().astype(int).unique())
    pieces = []
    for c in cohorts:
        sub = build_cohort_panel_outer(df, c, window)
        if not sub.empty and sub["treat"].sum()>0 and sub["control_outer"].sum()>0:
            pieces.append(sub)
    stacked = pd.concat(pieces, ignore_index=True)

    # Events are virtual (only groups are processed), omitting k=-1
    k_min, k_max = -window, window
    for kk in range(k_min, k_max+1):
        if kk == -1: 
            continue
        col = f"ev_{kk}"
        stacked[col] = ((stacked["k"]==kk) & (stacked["treat"]==1)).astype(int)

    ev_cols = [f"ev_{kk}" for kk in range(k_min, k_max+1) if kk != -1]
    stacked = stacked.set_index([ID_COL, DATE_COL]).sort_index()
    use_ctrls = [c for c in BASE_CTRLS if c in stacked.columns and stacked[c].nunique(dropna=True)>1]
    Y = stacked["log_price"]; X = stacked[ev_cols + use_ctrls]

    mod = PanelOLS(Y, X, entity_effects=True, time_effects=True, drop_absorbed=True)
    res = mod.fit(cov_type="clustered", cluster_entity=True)

    # Extracting a plot array
    ev_keep = [c for c in ev_cols if c in res.params.index]
    ks = np.array([int(v.split("_")[1]) for v in ev_keep]); order = np.argsort(ks)
    beta = res.params.loc[ev_keep].values[order]
    ci = res.conf_int().loc[ev_keep]
    lo = (ci["lower"] if "lower" in ci.columns else ci.iloc[:,0]).values[order]
    hi = (ci["upper"] if "upper" in ci.columns else ci.iloc[:,1]).values[order]
    return ks[order], beta, lo, hi, res

# B) Ring-control control=same cohort and T1<dist<T2）
def build_cohort_panel_ring(df, cohort, window=8, t1=T1, t2=T2):
    sub = df[df[COHORT_COL] == cohort].copy()
    if sub.empty:
        return pd.DataFrame()
    # "ring"：T1 < dist < T2
    sub["control_ring"] = ((sub[DIST_COL] > t1) & (sub[DIST_COL] < t2)).astype(int)
    sub = sub[(sub["treat"]==1) | (sub["control_ring"]==1)].copy()

    event_ts = pd.Timestamp(f"{int(cohort)}-03-01")
    sub["event_time"] = event_ts
    sub["date_q"]  = pd.PeriodIndex(sub[DATE_COL], freq="Q")
    sub["event_q"] = pd.PeriodIndex(sub["event_time"], freq="Q")
    sub["k"] = sub["date_q"].astype(int) - sub["event_q"].astype(int)
    sub = sub[(sub["k"]>=-window) & (sub["k"]<=window)]
    sub = sub[sub["median_price"]>0].copy()
    sub["log_price"] = np.log(sub["median_price"])
    return sub

def run_stacked_es_ring(df, window=8, t1=T1, t2=T2):
    cohorts = sorted(df[COHORT_COL].dropna().astype(int).unique())
    pieces = []
    for c in cohorts:
        sub = build_cohort_panel_ring(df, c, window, t1, t2)
        
        if not sub.empty and sub["treat"].sum()>0 and sub["control_ring"].sum()>0:
            pieces.append(sub)
    stacked = pd.concat(pieces, ignore_index=True)

   
    k_min, k_max = -window, window
    for kk in range(k_min, k_max+1):
        if kk == -1:
            continue
        col = f"ev_{kk}"
        stacked[col] = ((stacked["k"]==kk) & (stacked["treat"]==1)).astype(int)

    ev_cols = [f"ev_{kk}" for kk in range(k_min, k_max+1) if kk != -1]
    stacked = stacked.set_index([ID_COL, DATE_COL]).sort_index()
    use_ctrls = [c for c in BASE_CTRLS if c in stacked.columns and stacked[c].nunique(dropna=True)>1]
    Y = stacked["log_price"]; X = stacked[ev_cols + use_ctrls]

    mod = PanelOLS(Y, X, entity_effects=True, time_effects=True, drop_absorbed=True)
    res = mod.fit(cov_type="clustered", cluster_entity=True)

    
    ev_keep = [c for c in ev_cols if c in res.params.index]
    ks = np.array([int(v.split("_")[1]) for v in ev_keep]); order = np.argsort(ks)
    beta = res.params.loc[ev_keep].values[order]
    ci = res.conf_int().loc[ev_keep]
    lo = (ci["lower"] if "lower" in ci.columns else ci.iloc[:,0]).values[order]
    hi = (ci["upper"] if "upper" in ci.columns else ci.iloc[:,1]).values[order]
    return ks[order], beta, lo, hi, res, stacked

# C) Wald Test
def wald_pretrend(res, ev_prefix="ev_", pre_max=-2):
    idx = res.params.index
    ev_vars = [v for v in idx if v.startswith(ev_prefix)]
    pre_vars = [v for v in ev_vars if int(v.split("_")[1]) <= pre_max]
    if not pre_vars:
        print("There are no prior coefficients available for parallel trend testing.")
        return None
    R = np.vstack([np.eye(len(idx))[idx.get_loc(v)] for v in pre_vars])
    q = np.zeros(len(pre_vars))
    w = res.wald_test(R, q)
    print(f"[Wald pretrend] chi2={w.stat:.2f}, p={w.pval:.3g}  (H0: all pre-k<=-2 = 0)")
    return w


# Run and Plot
ks_out, b_out, l_out, u_out, res_out = run_stacked_es_outer(panel, window=8)
ks_ring, b_ring, l_ring, u_ring, res_ring, data_ring = run_stacked_es_ring(panel, window=8, t1=T1, t2=T2)

wald_pretrend(res_out)
wald_pretrend(res_ring)

# Align the vertical axis range
ymin = min(l_out.min(), l_ring.min()) - 0.01
ymax = max(u_out.max(), u_ring.max()) + 0.01

fig, axes = plt.subplots(1, 2, figsize=(13,5), sharey=True)

# Left: Baseline (outer circle ≥T2 is the control)
ax = axes[0]
ax.plot(ks_out, b_out, marker="o", label="Estimate")
ax.fill_between(ks_out, l_out, u_out, alpha=0.25, label="95% CI")
ax.axhline(0, color="black", lw=1)
ax.axvline(0, color="red", ls="--", label="Event (k=0)")
ax.set_title("Baseline — Control: same cohort, dist ≥ T2")
ax.set_xlabel("Quarters relative to announcement")
ax.set_ylabel("Effect on log(price)")
ax.set_ylim(ymin, ymax)
ax.legend()

# Right：Ring-control
ax = axes[1]
ax.plot(ks_ring, b_ring, marker="o", label="Estimate")
ax.fill_between(ks_ring, l_ring, u_ring, alpha=0.25, label="95% CI")
ax.axhline(0, color="black", lw=1)
ax.axvline(0, color="red", ls="--", label="Event (k=0)")
ax.set_title("Ring Control — Control: same cohort, T1 < dist < T2")
ax.set_xlabel("Quarters relative to announcement")
ax.set_ylim(ymin, ymax)
ax.legend()

plt.tight_layout()
plt.show()



In [None]:

DIST_COL   = "d_net_min_km"
ID_COL     = "lsoa21cd"
DATE_COL   = "date"
COHORT_COL = "announcement_year"
T1, T2     = 1.0, 2.0
WINDOW     = 8
POST_RANGE = range(0, WINDOW+1)   
BASE_CTRLS = ["share_detached","share_semi_detached","share_terraced",
              "share_flat","total_sales","pop_density"]

# Read and preprocessing
panel = pd.read_csv("output/Final_panel_data.csv", dtype={ID_COL: str})
panel[DATE_COL] = pd.to_datetime(panel[DATE_COL])

if "area_sq_km" not in panel.columns and "Area Sq Km" in panel.columns:
    panel["area_sq_km"] = panel["Area Sq Km"]
panel["pop_density"] = panel["population"] / panel["area_sq_km"]
panel.loc[~np.isfinite(panel["pop_density"]), "pop_density"] = np.nan

panel = panel[panel["median_price"]>0].copy()
panel["log_price"] = np.log(panel["median_price"])

# Distance grouping: near, ring, outer ring (outer ring is control)
panel["near"]  = (panel[DIST_COL] <= T1).astype(int)
panel["ring"]  = ((panel[DIST_COL] > T1) & (panel[DIST_COL] < T2)).astype(int)
panel["outer"] = (panel[DIST_COL] >= T2).astype(int)

# Construct "same cohort" stacked samples 
def build_stacked_same_cohort(df, cohort, window=WINDOW):
    sub = df[df[COHORT_COL]==cohort].copy()
    if sub.empty: 
        return pd.DataFrame()

    
    sub = sub[(sub["near"]==1) | (sub["ring"]==1) | (sub["outer"]==1)].copy()

    event_ts = pd.Timestamp(f"{int(cohort)}-03-01")
    sub["event_time"] = event_ts
    sub["date_q"]  = pd.PeriodIndex(sub[DATE_COL], freq="Q")
    sub["event_q"] = pd.PeriodIndex(sub["event_time"], freq="Q")
    sub["k"] = sub["date_q"].astype(int) - sub["event_q"].astype(int)
    sub = sub[(sub["k"]>=-window) & (sub["k"]<=window)]
    return sub

cohorts = sorted(panel[COHORT_COL].dropna().astype(int).unique())
stacked = pd.concat([build_stacked_same_cohort(panel, c, WINDOW) 
                     for c in cohorts if not build_stacked_same_cohort(panel, c, WINDOW).empty],
                    ignore_index=True)



# Event virtual × distance band (assigned to processing band only), omitting k=-1 base period
k_min, k_max = -WINDOW, WINDOW
for kk in range(k_min, k_max+1):
    if kk == -1:
        continue
    # Interaction terms are created for near/ring bands respectively. 
    stacked[f"ev_{kk}_near"] = ((stacked["k"]==kk) & (stacked["near"]==1)).astype(int)
    stacked[f"ev_{kk}_ring"] = ((stacked["k"]==kk) & (stacked["ring"]==1)).astype(int)

# Regression: TWFE
stacked = stacked.set_index([ID_COL, DATE_COL]).sort_index()
use_ctrls = [c for c in BASE_CTRLS if c in stacked.columns and stacked[c].nunique(dropna=True)>1]

ev_near = [f"ev_{kk}_near" for kk in range(k_min, k_max+1) if kk != -1]
ev_ring = [f"ev_{kk}_ring" for kk in range(k_min, k_max+1) if kk != -1]
X_cols  = ev_near + ev_ring + use_ctrls

mod = PanelOLS(stacked["log_price"], stacked[X_cols],
               entity_effects=True, time_effects=True, drop_absorbed=True)
res = mod.fit(cov_type="clustered", cluster_entity=True)

print(res.summary)

# "Mean" the coefficients in the post-announcement window and give the 95%CI (linear combination)
def avg_post_effect(res, name_prefix, post_ks=POST_RANGE):
    # choose k in post_ks 
    names = [f"ev_{k}_{name_prefix}" for k in range(k_min, k_max+1) if (k in post_ks and k != -1)]
    names = [n for n in names if n in res.params.index]
    if not names:
        return np.nan, (np.nan, np.nan)

    beta = res.params.loc[names].values
    cov  = res.cov.loc[names, names].values
    w = np.ones(len(names)) / len(names)         # Equal weight average
    est = float(w @ beta)
    se  = float(np.sqrt(w @ cov @ w))
    lo, hi = est - 1.96*se, est + 1.96*se
    return est, (lo, hi)

near_est, (near_lo, near_hi) = avg_post_effect(res, "near", POST_RANGE)
ring_est, (ring_lo, ring_hi) = avg_post_effect(res, "ring", POST_RANGE)

# Draw a distance decay curve
xs  = np.array([0.5, 1.5])     
ys  = np.array([near_est, ring_est])
los = np.array([near_lo, ring_lo])
his = np.array([near_hi, ring_hi])

plt.figure(figsize=(6,4.2))
plt.errorbar(xs, ys, yerr=[ys-los, his-ys], fmt="o-", capsize=4, linewidth=2, markersize=6, label="Avg post (k≥0)")
plt.axhline(0, color="black", lw=1)
plt.xlabel("Distance to nearest station (km)")
plt.ylabel("Average post-announcement effect on log(price)")
plt.title("Distance Decay of Announcement Effect (vs. ≥2km control)")
plt.xticks(xs, ["≤1 km (0.5)", "1–2 km (1.5)"])
plt.legend()
plt.tight_layout()
plt.show()


def extract_path(res, prefix):
    names = [f"ev_{k}_{prefix}" for k in range(k_min, k_max+1) if k != -1]
    names = [n for n in names if n in res.params.index]
    ks = np.array([int(n.split("_")[1]) for n in names])
    order = np.argsort(ks)
    bet  = res.params.loc[names].values[order]
    ci   = res.conf_int().loc[names]
    lo   = (ci["lower"] if "lower" in ci.columns else ci.iloc[:,0]).values[order]
    hi   = (ci["upper"] if "upper" in ci.columns else ci.iloc[:,1]).values[order]
    return ks[order], bet, lo, hi

ksN, bN, lN, uN = extract_path(res, "near")
ksR, bR, lR, uR = extract_path(res, "ring")

fig, axes = plt.subplots(1,2, figsize=(12,4.2), sharey=True)
for ax, ks, b, l, u, ttl in [(axes[0], ksN, bN, lN, uN, "Near ≤1km"),
                             (axes[1], ksR, bR, lR, uR, "Ring 1–2km")]:
    ax.plot(ks, b, marker="o", label="Estimate")
    ax.fill_between(ks, l, u, alpha=0.25, label="95% CI")
    ax.axhline(0, color="black", lw=1)
    ax.axvline(0, color="red", ls="--", label="Event (k=0)")
    ax.set_title(ttl); ax.set_xlabel("Quarters relative to announcement")
axes[0].set_ylabel("Effect on log(price)")
axes[0].legend(); plt.tight_layout(); plt.show()


In [None]:
DIST_COL   = "d_net_min_km"     
ID_COL     = "lsoa21cd"
DATE_COL   = "date"
COHORT_COL = "announcement_year"

WINDOW     = 8                 
CTRL_MIN   = 2.0               
BANDS = [(0.0,0.5), (0.5,1.0), (1.0,1.5), (1.5,2.0)]
BASE_CTRLS = ["share_detached","share_semi_detached","share_terraced",
              "share_flat","total_sales","pop_density"]

# Read and preprocessing
panel = pd.read_csv("data/Final_panel_data3.csv", dtype={ID_COL: str})
panel[DATE_COL] = pd.to_datetime(panel[DATE_COL])

if "area_sq_km" not in panel.columns and "Area Sq Km" in panel.columns:
    panel["area_sq_km"] = panel["Area Sq Km"]
panel["pop_density"] = panel["population"] / panel["area_sq_km"]
panel.loc[~np.isfinite(panel["pop_density"]), "pop_density"] = np.nan

panel = panel[panel["median_price"]>0].copy()
panel["log_price"] = np.log(panel["median_price"])
panel["dist_km"] = panel[DIST_COL]

# Constructing band label
def band_label(lo, hi):
    # (lo, hi]
    if lo == 0.0:
        return f"≤{hi:.1f}"
    return f"{lo:.1f}–{hi:.1f}"

def in_band(d, lo, hi):
    # (lo, hi] rule, 0 belongs to the first bracket
    if lo == 0.0:
        return (d <= hi)
    return (d > lo) & (d <= hi)

for (lo, hi) in BANDS:
    col = f"band_{lo}_{hi}"
    panel[col] = in_band(panel["dist_km"], lo, hi).astype(int)

panel["outer2"] = (panel["dist_km"] > CTRL_MIN).astype(int) 

# Stacking samples from the same cohort
def build_stacked(df, cohort, window=WINDOW):
    sub = df[df[COHORT_COL]==cohort].copy()
    if sub.empty:
        return pd.DataFrame()
    
    keep_mask = sub["outer2"]==1
    for (lo,hi) in BANDS:
        keep_mask |= (sub[f"band_{lo}_{hi}"]==1)
    sub = sub[keep_mask].copy()

    sub["event_time"] = pd.Timestamp(f"{int(cohort)}-03-01")
    sub["date_q"]  = pd.PeriodIndex(sub[DATE_COL], freq="Q")
    sub["event_q"] = pd.PeriodIndex(sub["event_time"], freq="Q")
    sub["k"] = sub["date_q"].astype(int) - sub["event_q"].astype(int)
    sub = sub[(sub["k"]>=-window) & (sub["k"]<=window)]
    return sub

cohorts = sorted(panel[COHORT_COL].dropna().astype(int).unique())
pieces = []
for c in cohorts:
    sub = build_stacked(panel, c, WINDOW)
    if not sub.empty:
        
        if (sub["outer2"].sum()>0) and any(sub[f"band_{lo}_{hi}"].sum()>0 for (lo,hi) in BANDS):
            pieces.append(sub)
stacked = pd.concat(pieces, ignore_index=True)

# Event virtual × each band (omitting k=-1)
k_min, k_max = -WINDOW, WINDOW
for (lo,hi) in BANDS:
    for kk in range(k_min, k_max+1):
        if kk == -1:
            continue
        col = f"ev_{kk}_b{lo}_{hi}"
        bandcol = f"band_{lo}_{hi}"
        stacked[col] = ((stacked["k"]==kk) & (stacked[bandcol]==1)).astype(int)

# Regression TWFE
df_idx = stacked.set_index([ID_COL, DATE_COL]).sort_index()
use_ctrls = [c for c in BASE_CTRLS if c in df_idx.columns and df_idx[c].nunique(dropna=True)>1]

ev_cols = []
for (lo,hi) in BANDS:
    ev_cols += [f"ev_{kk}_b{lo}_{hi}" for kk in range(k_min, k_max+1) if kk != -1]

X = df_idx[ev_cols + use_ctrls].copy()
Y = df_idx["log_price"]

# Constant/all-zero columns are automatically dropped to avoid rank deficiency
keep_cols = [c for c in X.columns if X[c].std(skipna=True) > 0]
X = X[keep_cols]

mod = PanelOLS(Y, X, entity_effects=True, time_effects=True, drop_absorbed=True)
res = mod.fit(cov_type="clustered", cluster_entity=True)
print(res.summary)

# Extract Path, mean and CI, Wald pretrend 
def extract_path(res, lo, hi):
    names = [f"ev_{k}_b{lo}_{hi}" for k in range(k_min, k_max+1) if k != -1]
    names = [n for n in names if n in res.params.index]
    ks = np.array([int(n.split("_")[1]) for n in names])
    order = np.argsort(ks)
    bet  = res.params.loc[names].values[order]
    ci   = res.conf_int().loc[names]
    low  = (ci["lower"] if "lower" in ci.columns else ci.iloc[:,0]).values[order]
    high = (ci["upper"] if "upper" in ci.columns else ci.iloc[:,1]).values[order]
    return ks[order], bet, low, high

def avg_post(res, lo, hi, post_range=range(0, WINDOW+1)):
    names = [f"ev_{k}_b{lo}_{hi}" for k in post_range if k != -1]
    names = [n for n in names if n in res.params.index]
    if not names:
        return np.nan, (np.nan, np.nan)
    beta = res.params.loc[names].values
    cov  = res.cov.loc[names, names].values
    w = np.ones(len(names))/len(names)
    est = float(w @ beta)
    se  = float(np.sqrt(w @ cov @ w))
    lo95, hi95 = est - 1.96*se, est + 1.96*se
    return est, (lo95, hi95)

def wald_pretrend_by_band(res, lo, hi, pre_max=-2):
    idx = res.params.index
    vars_ = [f"ev_{k}_b{lo}_{hi}" for k in range(k_min, pre_max+1)]
    vars_ = [v for v in vars_ if v in idx]
    if not vars_:
        print(f"Band({lo}-{hi}): no pretrend terms.")
        return None
    R = np.vstack([np.eye(len(idx))[idx.get_loc(v)] for v in vars_])
    q = np.zeros(len(vars_))
    w = res.wald_test(R, q)
    print(f"[Pretrend] Band({lo}-{hi}) chi2={w.stat:.2f}, p={w.pval:.4f}")
    return w

# Figure 1: Distance decay curve 
midpoints, ests, los, his = [], [], [], []
for (lo,hi) in BANDS:
    est, (lo95, hi95) = avg_post(res, lo, hi, post_range=range(0, WINDOW+1))
    mid = (lo+hi)/2.0 if lo>0 else hi/2.0
    midpoints.append(mid); ests.append(est); los.append(lo95); his.append(hi95)

midpoints = np.array(midpoints); order = np.argsort(midpoints)
midpoints, ests, los, his = midpoints[order], np.array(ests)[order], np.array(los)[order], np.array(his)[order]

plt.figure(figsize=(6,4.2))
plt.errorbar(midpoints, ests, yerr=[ests-los, his-ests], fmt="o-", capsize=4, linewidth=2, markersize=6, label="Avg post (k≥0)")
plt.axhline(0, color="black", lw=1)
plt.xlabel("Distance to nearest station (km)")
plt.ylabel("Average post-announcement effect on log(price)\n(vs. >2 km control)")
plt.title("Distance Decay — 0.5 km Bands")
plt.xticks(midpoints, [band_label(lo,hi) for (lo,hi) in np.array(BANDS)[order]])
plt.legend(); plt.tight_layout(); plt.show()

# Figure 2: Event study plots for each band
n = len(BANDS)
fig, axes = plt.subplots(1, n, figsize=(4.2*n, 4.0), sharey=True)
if n == 1:
    axes = [axes]
for ax, (lo,hi) in zip(axes, BANDS):
    ks, b, l, u = extract_path(res, lo, hi)
    ax.plot(ks, b, marker="o", label="Estimate")
    ax.fill_between(ks, l, u, alpha=0.25, label="95% CI")
    ax.axhline(0, color="black", lw=1)
    ax.axvline(0, color="red", ls="--", label="Event (k=0)")
    ax.set_title(f"{band_label(lo,hi)} km")
    ax.set_xlabel("Quarters relative to announcement")
axes[0].set_ylabel("Effect on log(price)")
axes[0].legend()
plt.tight_layout(); plt.show()

# Pretrend Wald 
for (lo,hi) in BANDS:
    wald_pretrend_by_band(res, lo, hi, pre_max=-2)


In [None]:
DIST_COL   = "d_net_min_km"   
ID_COL     = "lsoa21cd"
DATE_COL   = "date"
COHORT_COL = "announcement_year"

T_NEAR_MAX = 2.0             
CONTROL_MIN = 2.0            
WINDOW     = 8               
BASE_CTRLS = ["share_detached","share_semi_detached","share_terraced",
              "share_flat","total_sales","pop_density"]

# Read an preprocessing
panel = pd.read_csv("data/Final_panel_data3.csv", dtype={ID_COL: str})
panel[DATE_COL] = pd.to_datetime(panel[DATE_COL])

if "area_sq_km" not in panel.columns and "Area Sq Km" in panel.columns:
    panel["area_sq_km"] = panel["Area Sq Km"]
panel["pop_density"] = panel["population"] / panel["area_sq_km"]
panel.loc[~np.isfinite(panel["pop_density"]), "pop_density"] = np.nan

panel = panel[panel["median_price"]>0].copy()
panel["log_price"] = np.log(panel["median_price"])

# Distance and Grouping
panel["dist_km"]   = panel[DIST_COL]
panel["near2"]     = (panel["dist_km"] <= T_NEAR_MAX).astype(int)       # Continuous intensities are meaningful only for ≤2km
panel["outer2"]    = (panel["dist_km"] > CONTROL_MIN).astype(int)       
panel["dist_in2"]  = np.clip(panel["dist_km"], 0, T_NEAR_MAX)           # The distance within [0,2]
panel["dist_in2_c"] = panel["dist_in2"]                                 

# Stacking the same-cohort samples
def build_stacked(df, cohort, window=WINDOW):
    sub = df[df[COHORT_COL]==cohort].copy()
    if sub.empty: 
        return pd.DataFrame()
    
    sub = sub[(sub["near2"]==1) | (sub["outer2"]==1)].copy()
    sub["event_time"] = pd.Timestamp(f"{int(cohort)}-03-01")
    sub["date_q"]  = pd.PeriodIndex(sub[DATE_COL], freq="Q")
    sub["event_q"] = pd.PeriodIndex(sub["event_time"], freq="Q")
    sub["k"] = sub["date_q"].astype(int) - sub["event_q"].astype(int)
    sub = sub[(sub["k"]>=-window) & (sub["k"]<=window)]
    return sub

cohorts = sorted(panel[COHORT_COL].dropna().astype(int).unique())
stacked = pd.concat([build_stacked(panel, c, WINDOW) 
                     for c in cohorts if not build_stacked(panel, c, WINDOW).empty],
                    ignore_index=True)

# Instructions after announcement
stacked["post"] = (stacked["k"]>=0).astype(int)

# Fixed Effects and controls
def run_panel_ols(df, Xcols):
    df_idx = df.set_index([ID_COL, DATE_COL]).sort_index()
    use_ctrls = [c for c in BASE_CTRLS if c in df_idx.columns and df_idx[c].nunique(dropna=True)>1]
    X = df_idx[Xcols + use_ctrls]
    Y = df_idx["log_price"]
    mod = PanelOLS(Y, X, entity_effects=True, time_effects=True, drop_absorbed=True)
    res = mod.fit(cov_type="clustered", cluster_entity=True)
    return res


# A) Average post-effect (k≥0) : Continuous distance gradient（Linear & quadratic）

# linear：(d) = b0 + b1 * d （d in [0,2]）
stacked["G0"] = stacked["near2"] * stacked["post"]
stacked["G1"] = stacked["near2"] * stacked["dist_in2_c"] * stacked["post"]


X_cont = ["G0","G1"]  
res_cont = run_panel_ols(stacked, X_cont)
print("\n=== A Continuous distance (average post-effect) - Linear model ===")
print(res_cont.summary)

# Prediction: Plot effect with 95%CI on d∈[0,2]
def pred_effect_continuous(res, d_grid, quadratic=False):
    # beta = [b0, b1(, b2)]
    names = res.params.index
    have_quad = quadratic and ("G2" in names)
    b = res.params.loc[["G0","G1"] + (["G2"] if have_quad else [])].values
    cov = res.cov.loc[["G0","G1"] + (["G2"] if have_quad else []), ["G0","G1"] + (["G2"] if have_quad else [])].values
    eff, lo, hi = [], [], []
    for d in d_grid:
        if have_quad:
            v = np.array([1.0, d, d*d])
        else:
            v = np.array([1.0, d])
        m = float(v @ b)
        se = float(np.sqrt(v @ cov @ v))
        eff.append(m); lo.append(m - 1.96*se); hi.append(m + 1.96*se)
    return np.array(eff), np.array(lo), np.array(hi)

d_grid = np.linspace(0, 2, 41)
eff, lo, hi = pred_effect_continuous(res_cont, d_grid, quadratic=False)

plt.figure(figsize=(6,4.2))
plt.plot(d_grid, eff, label="Avg post effect (≤2km)")
plt.fill_between(d_grid, lo, hi, alpha=0.25, label="95% CI")
plt.axhline(0, color="black", lw=1)
plt.xlabel("Distance within 2 km (km)")
plt.ylabel("Effect on log(price) relative to >2 km")
plt.title("Continuous Distance Gradient (Avg post, ≤2km vs >2km control)")
plt.legend(); plt.tight_layout(); plt.show()

# Output point estimates with d=0.5, 1.0, and 1.5 (Easy to compare with segment plots)
for dd in [0.5, 1.0, 1.5]:
    e, l, u = pred_effect_continuous(res_cont, np.array([dd]), quadratic=False)
    print(f"d={dd:.1f} km -> est={float(e):.4f}, 95%CI=({float(l):.4f},{float(u):.4f})")

# Distance centralization (centered at 1 km)
stacked["dist_c"]  = stacked["dist_in2"] - 1.0     # in [-1, +1]
stacked["dist_c2"] = stacked["dist_c"]**2

# Quadratic form：G0 + G1*d_c + G2*d_c^2 
stacked["G0"] = stacked["near2"] * stacked["post"]
stacked["G1"] = stacked["near2"] * stacked["dist_c"]  * stacked["post"]
stacked["G2"] = stacked["near2"] * stacked["dist_c2"] * stacked["post"]

def run_panel_ols(df, Xcols, BASE_CTRLS):
    df_idx = df.set_index(["lsoa21cd","date"]).sort_index()
    use_ctrls = [c for c in BASE_CTRLS if c in df_idx.columns and df_idx[c].nunique(dropna=True)>1]
    X = df_idx[Xcols + use_ctrls].copy()
    
    keep = [c for c in X.columns if X[c].std(skipna=True) > 0]
    X = X[keep]
    Y = df_idx["log_price"]
    mod = PanelOLS(Y, X, entity_effects=True, time_effects=True, drop_absorbed=True)
    return mod.fit(cov_type="clustered", cluster_entity=True)

res_cont_q = run_panel_ols(stacked, ["G0","G1","G2"], BASE_CTRLS)
print("\n=== A Quadratic（Avg post, ≤2km vs >2km）===")
print(res_cont_q.summary)

# d∈[0,2] → d_c = d - 1
def pred_effect_continuous_quadratic(res, d_grid):
    names = res.params.index
    cols  = [c for c in ["G0","G1","G2"] if c in names]
    b = res.params.loc[cols].values
    V = res.cov.loc[cols, cols].values
    eff, lo, hi = [], [], []
    for d in d_grid:
        dc  = d - 1.0
        v   = np.array([1.0, dc, dc*dc])[:len(cols)]
        m   = float(v @ b)
        se  = float(np.sqrt(v @ V @ v))
        eff.append(m); lo.append(m-1.96*se); hi.append(m+1.96*se)
    return np.array(eff), np.array(lo), np.array(hi)

d_grid = np.linspace(0, 2, 41)
eff, lo, hi = pred_effect_continuous_quadratic(res_cont_q, d_grid)

plt.figure(figsize=(6.2,4.2))
plt.plot(d_grid, eff, label="Avg post effect (≤2km)")
plt.fill_between(d_grid, lo, hi, alpha=0.25, label="95% CI")
plt.axhline(0, color="black", lw=1)
plt.xlabel("Distance within 2 km (km)")
plt.ylabel("Effect on log(price) relative to >2 km")
plt.title("Continuous Distance Gradient — Quadratic (Avg post)")
plt.legend(); plt.tight_layout(); plt.show()

# Print a few representative points (avoid scalar [0] with DeprecationWarning)
for dd in [0.0, 0.5, 1.0, 1.5, 2.0]:
    e, l, h = pred_effect_continuous_quadratic(res_cont_q, np.array([dd]))
    print(f"d={dd:.1f} km -> est={e[0]:.4f}, 95%CI=({l[0]:.4f},{h[0]:.4f})")


# B Dynamic continuous Distance (Event study)
# Near ≤2km vs >2km，Estimate "intercept + slope" for each k
import numpy as np
import matplotlib.pyplot as plt
from linearmodels.panel import PanelOLS

# 1) Two types of variables are generated for each k:
#    EV{k}_0: (k==K and near2==1) 0/1 -- represents "intercept difference between ≤2km and >2km"
#    EV{k}_1: (k==K and near2==1) * dist_in2_c -- means "marginal change (slope) of the effect on distance"
k_min, k_max = -WINDOW, WINDOW
for kk in range(k_min, k_max + 1):
    if kk == -1:
        continue
    mask = ((stacked["k"] == kk) & (stacked["near2"] == 1))
    stacked[f"EV{kk}_0"] = mask.astype(int)
    stacked[f"EV{kk}_1"] = mask.astype(int) * stacked["dist_in2_c"]  

# 2) The regression data is assembled and constant/all-zero columns are automatically eliminated
ev_cols_dyn = []
for kk in range(k_min, k_max + 1):
    if kk == -1:
        continue
    ev_cols_dyn += [f"EV{kk}_0", f"EV{kk}_1"]

df_idx = stacked.set_index([ID_COL, DATE_COL]).sort_index()

# List of control variables
use_ctrls = [c for c in BASE_CTRLS if c in df_idx.columns and df_idx[c].nunique(dropna=True) > 1]

Xdyn_full = df_idx[ev_cols_dyn + use_ctrls].copy()
Ydyn = df_idx["log_price"]


keep_cols = [c for c in Xdyn_full.columns if Xdyn_full[c].std(skipna=True) > 0]
Xdyn = Xdyn_full[keep_cols]

mod_dyn = PanelOLS(Ydyn, Xdyn, entity_effects=True, time_effects=True, drop_absorbed=True)
res_dyn = mod_dyn.fit(cov_type="clustered", cluster_entity=True)
print("\n=== B Dynamic Continuous Distance (Event Study) - Linear models ===")
print(res_dyn.summary)

# 3) Draw the effect-distance curve at k=0,4,8 (we can change this to observe more situation)
def extract_k_curve(res, kk, d_grid):
    n0, n1 = f"EV{kk}_0", f"EV{kk}_1"
    if (n0 not in res.params.index) or (n1 not in res.params.index):
        return None
    b0 = res.params[n0]; b1 = res.params[n1]
    cov = res.cov.loc[[n0, n1], [n0, n1]].values
    eff, lo, hi = [], [], []
    for d in d_grid:
        v = np.array([1.0, d])
        m = float(v @ np.array([b0, b1]))
        se = float(np.sqrt(v @ cov @ v))
        eff.append(m); lo.append(m - 1.96 * se); hi.append(m + 1.96 * se)
    return np.array(eff), np.array(lo), np.array(hi)

d_grid = np.linspace(0, 2, 41)
plt.figure(figsize=(9, 4.2))
for kk in [0, 4, 8]:
    curves = extract_k_curve(res_dyn, kk, d_grid)
    if curves is None:
        continue
    eff_k, lo_k, hi_k = curves
    plt.plot(d_grid, eff_k, label=f"k={kk}")
    plt.fill_between(d_grid, lo_k, hi_k, alpha=0.15)
plt.axhline(0, color="black", lw=1)
plt.xlabel("Distance within 2 km (km)")
plt.ylabel("Effect on log(price) relative to >2 km")
plt.title("Event-time Distance Gradient (k=0,4,8)")
plt.legend(); plt.tight_layout(); plt.show()

# 4) Wald Test

def wald_pretrend_dynamic(res, part="intercept", pre_max=-2):
    """
    part: 'intercept' test EV{k}_0；'slope' test EV{k}_1
    pre_max: The preambles with k<=pre_max are tested together
    """
    idx = res.params.index
    if part == "intercept":
        varnames = [f"EV{kk}_0" for kk in range(k_min, pre_max + 1) if kk != -1 and f"EV{kk}_0" in idx]
        label = "Intercept (EV_k_0)"
    else:
        varnames = [f"EV{kk}_1" for kk in range(k_min, pre_max + 1) if kk != -1 and f"EV{kk}_1" in idx]
        label = "Slope (EV_k_1)"
    if not varnames:
        print(f"[Pretrend] {label}: no pre-k terms found.")
        return None
    R = np.vstack([np.eye(len(idx))[idx.get_loc(v)] for v in varnames])
    q = np.zeros(len(varnames))
    w = res.wald_test(R, q)
    print(f"[Pretrend] {label} chi2={w.stat:.2f}, p={w.pval:.4f}")
    return w


wald_pretrend_dynamic(res_dyn, part="intercept", pre_max=-2)
wald_pretrend_dynamic(res_dyn, part="slope",     pre_max=-2)

In [None]:
# Generate three types of variables for each k: EV{k}_0, EV{k}_1, EV{k}_2.
k_min, k_max = -WINDOW, WINDOW
for kk in range(k_min, k_max+1):
    if kk == -1:
        continue
    mask = ((stacked["k"]==kk) & (stacked["near2"]==1))
    stacked[f"EV{kk}_0"] = mask.astype(int)
    stacked[f"EV{kk}_1"] = mask.astype(int) * stacked["dist_c"]
    stacked[f"EV{kk}_2"] = mask.astype(int) * stacked["dist_c2"]


ev_cols_dyn_q = []
for kk in range(k_min, k_max+1):
    if kk == -1:
        continue
    ev_cols_dyn_q += [f"EV{kk}_0", f"EV{kk}_1", f"EV{kk}_2"]

df_idx = stacked.set_index(["lsoa21cd","date"]).sort_index()
use_ctrls = [c for c in BASE_CTRLS if c in df_idx.columns and df_idx[c].nunique(dropna=True)>1]
Xdyn = df_idx[ev_cols_dyn_q + use_ctrls].copy()

keep = [c for c in Xdyn.columns if Xdyn[c].std(skipna=True) > 0]
Xdyn = Xdyn[keep]
Ydyn = df_idx["log_price"]

mod_dyn_q = PanelOLS(Ydyn, Xdyn, entity_effects=True, time_effects=True, drop_absorbed=True)
res_dyn_q = mod_dyn_q.fit(cov_type="clustered", cluster_entity=True)
print("\n=== B Quadratic（Dynamic, ≤2km vs >2km）===")
print(res_dyn_q.summary)

# Extract the "effect-distance curve" for the given k.Extract the "effect-distance curve" for the given k.
def extract_k_curve_quadratic(res, kk, d_grid):
    names = res.params.index
    cols  = [c for c in [f"EV{kk}_0", f"EV{kk}_1", f"EV{kk}_2"] if c in names]
    if len(cols) < 2:   # At least _0 and _1 are required.
        return None
    b = res.params.loc[cols].values
    V = res.cov.loc[cols, cols].values
    eff, lo, hi = [], [], []
    for d in d_grid:
        dc  = d - 1.0
        v   = np.array([1.0, dc, dc*dc])[:len(cols)]
        m   = float(v @ b)
        se  = float(np.sqrt(v @ V @ v))
        eff.append(m); lo.append(m-1.96*se); hi.append(m+1.96*se)
    return np.array(eff), np.array(lo), np.array(hi)

# Draw the quadratic curves for k = 0, 4, 8
d_grid = np.linspace(0, 2, 41)
plt.figure(figsize=(9,4.2))
for kk, lab in zip([0,4,8], ["k=0","k=4","k=8"]):
    curves = extract_k_curve_quadratic(res_dyn_q, kk, d_grid)
    if curves is None: 
        continue
    e,l,h = curves
    plt.plot(d_grid, e, label=lab)
    plt.fill_between(d_grid, l, h, alpha=0.15)
plt.axhline(0, color="black", lw=1)
plt.xlabel("Distance within 2 km (km)")
plt.ylabel("Effect on log(price) relative to >2 km")
plt.title("Event-time Distance Gradient — Quadratic (k=0,4,8)")
plt.legend(); plt.tight_layout(); plt.show()

# Dynamic Leading Joint Test: Whether the intercept/first-order/second-order are jointly equal to 0 when k ≤ -2Dynamic pretest joint test: intercept/first-order/second-order in k ≤ -2 whether jointly equal to 0
def wald_pretrend_dyn_part(res, part="intercept", pre_max=-2):
    idx = res.params.index
    if part=="intercept":
        vars_ = [f"EV{kk}_0" for kk in range(k_min, pre_max+1) if kk != -1 and f"EV{kk}_0" in idx]
        label = "Intercept (EV_k_0)"
    elif part=="slope":
        vars_ = [f"EV{kk}_1" for kk in range(k_min, pre_max+1) if kk != -1 and f"EV{kk}_1" in idx]
        label = "Linear slope (EV_k_1)"
    else:
        vars_ = [f"EV{kk}_2" for kk in range(k_min, pre_max+1) if kk != -1 and f"EV{kk}_2" in idx]
        label = "Quadratic term (EV_k_2)"
    if not vars_:
        print(f"[Pretrend] {label}: no pre-k terms.")
        return None
    R = np.vstack([np.eye(len(idx))[idx.get_loc(v)] for v in vars_])
    q = np.zeros(len(vars_))
    w = res.wald_test(R, q)
    print(f"[Pretrend] {label} chi2={w.stat:.2f}, p={w.pval:.4f}")
    return w

wald_pretrend_dyn_part(res_dyn_q, "intercept", pre_max=-2)
wald_pretrend_dyn_part(res_dyn_q, "slope",     pre_max=-2)
wald_pretrend_dyn_part(res_dyn_q, "quad",      pre_max=-2)
