In [1]:
import os
import pandas as pd
import numpy as np
import statsmodels.api as sm
import statsmodels.formula.api as smf
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import seaborn as sns
from sklearn.preprocessing import StandardScaler

MERGED_DATA_DIR = "../../data/merged data"
OUTPUT_DIR = "../../output/assumption"

# Load the dataset
tnp_20 = pd.read_csv(os.path.join(MERGED_DATA_DIR, "2020", "merged_tnp_data.csv"))
tnp_19 = pd.read_csv(os.path.join(MERGED_DATA_DIR, "2019", "merged_tnp_data.csv"))

scaler = StandardScaler()
cols_to_scale = ["daily_bus_rides", "rides"]
tnp_20[cols_to_scale] = scaler.fit_transform(tnp_20[cols_to_scale])
tnp_19[cols_to_scale] = scaler.fit_transform(tnp_19[cols_to_scale])

In [2]:
def run_local_smoothness_tests(
    df: pd.DataFrame,
    vars_to_test: list,
    time_var: str,
    cutoff_date,
    bandwidth: int = 15,
    trend_order: int = 1,
    covariates: list = None
) -> list[dict]:
    """
    针对多个变量进行局部平稳性检验，返回结果列表供 tex 制表使用。
    
    数学模型：
    Y_{ij} = β0 + β1·Post_t + f(TFC_t) + f(TFC_t)·Post_t + γ^T X_{ij} + u_{it}
    """

    import numpy as np
    import statsmodels.formula.api as smf

    results = []
    df = df.copy()
    df[time_var] = pd.to_datetime(df[time_var])
    cutoff = pd.to_datetime(cutoff_date)
    df["days_from_cutoff"] = (df[time_var] - cutoff).dt.days
    df = df[df["days_from_cutoff"].between(-bandwidth, bandwidth)]
    df["post_cutoff"] = (df["days_from_cutoff"] >= 0).astype(int)

    for var in vars_to_test:
        trend_terms = []
        inter_terms = []

        for i in range(1, trend_order + 1):
            base = f"days_from_cutoff_pow{i}"
            inter = f"{base}_x_post"
            df[base] = df["days_from_cutoff"] ** i
            df[inter] = df[base] * df["post_cutoff"]
            trend_terms.append(base)
            inter_terms.append(inter)

        rhs = ["post_cutoff"] + trend_terms + inter_terms
        if covariates:
            rhs += covariates  # ✅ 加入控制变量线性项

        formula = f"{var} ~ " + " + ".join(rhs)
        model = smf.ols(formula=formula, data=df).fit()

        coef = model.params.get("post_cutoff", np.nan)
        stderr = model.bse.get("post_cutoff", np.nan)
        mean = df[var].mean()
        obs = df.shape[0]

        results.append({
            "label": var,
            "coef": coef,
            "stderr": stderr,
            "mean": mean,
            "obs": obs
        })

    return results

In [8]:
def latex_escape(s):
    return s.replace('_', '\\_').replace('%', '\\%').replace('&', '\\&').replace('$', '\\$')

def export_smoothness_table_to_tex(
    panel_a_results: list[dict],
    panel_b_results: list[dict],
    file_path: str = "smoothness_table.tex",
    caption: str = "Regression Discontinuity Estimates for the Effect of Exceeding Cutoffs on Predetermined Characteristics",
    label: str = "tab:smoothness_test",
    sig_levels: list = [0.1, 0.05, 0.01],
    custom_labels: dict = None,
    rotate: bool = False,
    controls_used: bool = False,
    note: str = "Robust standard errors in parentheses. * $p<0.1$, ** $p<0.05$, *** $p<0.01$"
):
    """
    生成符合 AEJ 风格的局部平稳性检验表格 (.tex)

    参数：
    - panel_a_results: List[dict]，每列一个 dict，包含 keys: 'label', 'coef', 'stderr', 'mean', 'obs'
    - panel_b_results: 同上（placebo 阶段）
    - file_path: 保存路径
    - caption: LaTeX 表格标题
    - label: LaTeX 表格标签
    - rotate: 是否使用 sidewaystable（横向旋转 90 度）
    - controls_used: 是否使用控制变量
    """
    import os

    def format_row(values):
        return " & ".join(str(v) for v in values) + " \\\\"
    
    def get_stars(p):
        if p < sig_levels[2]: return '***'
        elif p < sig_levels[1]: return '**'
        elif p < sig_levels[0]: return '*'
        return ''
    
    def get_stars_from_dict(d):
        if 'p_value' in d:
            p = d['p_value']
        else:
            if 'coef' in d and 'stderr' in d and d['stderr'] != 0:
                import scipy.stats as stats
                t_stat = d['coef'] / d['stderr']
                p = 2 * (1 - stats.norm.cdf(abs(t_stat)))
            else:
                p = 1.0  # conservative fallback
        return get_stars(p)

    os.makedirs(os.path.dirname(file_path), exist_ok=True)

    def get_label(d):
        label = d['label']
        if custom_labels and label in custom_labels:
            return custom_labels[label].replace('_', '\\_')
        return label.replace('_', '\\_')

    headers = [get_label(d) for d in panel_a_results]
    coef_a = [f"{d['coef']:.3f}{get_stars_from_dict(d)}" for d in panel_a_results]
    stderr_a = [f"({d['stderr']:.3f})" for d in panel_a_results]
    mean_a = [f"{d['mean']:.3f}" for d in panel_a_results]
    obs_a = [f"{int(d['obs'])}" for d in panel_a_results]

    coef_b = [f"{d['coef']:.3f}{get_stars_from_dict(d)}" for d in panel_b_results]
    stderr_b = [f"({d['stderr']:.3f})" for d in panel_b_results]
    mean_b = [f"{d['mean']:.3f}" for d in panel_b_results]
    obs_b = [f"{int(d['obs'])}" for d in panel_b_results]

    # ✅ 动态选择 table 类型
    table_env = "sidewaystable" if rotate else "table"
    controls_text = "Yes" if controls_used else "No"

    tex = []
    tex.append(f"\\begin{{{table_env}}}[H]\\centering")
    tex.append(f"\\caption{{{caption}}}")
    tex.append(f"\\label{{{label}}}")
    tex.append(f"\\begin{{tabular}}{{l{'c' * len(headers)}}}")
    tex.append("\\toprule")
    tex.append(format_row([" "] + headers))
    tex.append("\\midrule")
    tex.append("\\textbf{Treatment Year} \\\\")
    tex.append(format_row(["Treatment"] + coef_a))
    tex.append(format_row([" "] + stderr_a))
    tex.append(format_row(["Mean at cutoff"] + mean_a))
    tex.append(format_row(["Controls"] + [controls_text] * len(headers)))
    tex.append(format_row(["Observations"] + obs_a))
    tex.append("\\addlinespace")
    tex.append("\\textbf{Placebo Year} \\\\")
    tex.append(format_row(["Placebo Treat"] + coef_b))
    tex.append(format_row([" "] + stderr_b))
    tex.append(format_row(["Mean at cutoff"] + mean_b))
    tex.append(format_row(["Controls"] + [controls_text] * len(headers)))
    tex.append(format_row(["Observations"] + obs_b))
    tex.append("\\bottomrule")
    tex.append("\\end{tabular}")
    tex.append(f"\\vspace{{0.2em}}\\begin{{minipage}}{{0.95\\textwidth}}\\footnotesize\\textit{{Notes:}} {note}\\end{{minipage}}")
    tex.append(f"\\end{{{table_env}}}")

    with open(file_path, "w", encoding="utf-8") as f:
        f.write("\n".join(tex))

    print(f"Saved to: {file_path}")

In [4]:
# 创建工作日虚拟变量，drop_first=True 是为了避免虚拟变量陷阱
day_dummies = pd.get_dummies(tnp_20['day_of_week'], prefix='dow', drop_first=True)
tnp_20 = pd.concat([tnp_20, day_dummies], axis=1)

day_dummies = pd.get_dummies(tnp_19['day_of_week'], prefix='dow', drop_first=True)
tnp_19 = pd.concat([tnp_19, day_dummies], axis=1)

In [13]:
tnp_20_1 = tnp_20.groupby('trip_start_date').agg({
    'Avg_Temp_C': 'first',
    'Precipitation_mm': 'first',
    'Snowfall_mm': 'first',
    'Avg_Wind_Speed_mps': 'first',

    'total_rides': 'first',
    'rides': 'first',
    'daily_bus_rides': 'first',

    'dow_1': 'first',
    'dow_2': 'first',
    'dow_3': 'first',
    'dow_4': 'first',
}).reset_index()

tnp_20_2 = tnp_20.groupby(['trip_start_timestamp', 'pickup_census_tract']).agg({
    'trip_start_date': 'first',
    'taxi': 'first',
    'dow_1': 'first',
    'dow_2': 'first',
    'dow_3': 'first',
    'dow_4': 'first',
}).reset_index()

tnp_19_1 = tnp_19.groupby('trip_start_date').agg({
    'Avg_Temp_C': 'first',
    'Precipitation_mm': 'first',
    'Snowfall_mm': 'first',
    'Avg_Wind_Speed_mps': 'first',

    'total_rides': 'first',
    'rides': 'first',
    'daily_bus_rides': 'first',

    'dow_1': 'first',
    'dow_2': 'first',
    'dow_3': 'first',
    'dow_4': 'first',
}).reset_index()

tnp_19_2 = tnp_19.groupby(['trip_start_timestamp', 'pickup_census_tract']).agg({
    'trip_start_date': 'first',
    'taxi': 'first',
    'dow_1': 'first',
    'dow_2': 'first',
    'dow_3': 'first',
    'dow_4': 'first',
}).reset_index()

weather_control_vars = [
    "Avg_Temp_C", "Precipitation_mm",
    "Snowfall_mm", "Avg_Wind_Speed_mps"
]

panel_a = run_local_smoothness_tests(
    df=tnp_20_1,
    vars_to_test=weather_control_vars,
    time_var="trip_start_date",
    cutoff_date="2020-01-06"
)

panel_b = run_local_smoothness_tests(
    df=tnp_19_1,
    vars_to_test=weather_control_vars,
    time_var="trip_start_date",
    cutoff_date="2019-01-07"
)

export_smoothness_table_to_tex(
    panel_a_results=panel_a,
    panel_b_results=panel_b,
    file_path="../../output/assumption/weather_smoothness_table.tex",
    custom_labels={
        "Avg_Temp_C": "Average Temperature (°C)",
        "Precipitation_mm": "Precipitation (mm)",
        "Snowfall_mm": "Snowfall (mm)",
        "Avg_Wind_Speed_mps": "Wind Speed (m/s)"
    },
    caption="Smoothness Test of Weather Variables",
    label="tab:custom_smoothness",
    note="This table reports the smoothness tests for weather variables around the policy cutoff. ``Treatment'' refers to the coefficient for the treatment indicator in the treatment year, while ``Placebo Treat'' refers to the same coefficient in the placebo year. No additional control variables are included in these regressions. Robust standard errors are reported in parentheses. * $p<0.1$, ** $p<0.05$, *** $p<0.01$."
)

substitutes_control_vars = [
    # "total_rides", "taxi",
    "rides", "daily_bus_rides", "taxi",
]

day_of_week_controls = ["dow_1", "dow_2", "dow_3", "dow_4"]

panel_a_1 = run_local_smoothness_tests(
    df=tnp_20_1,
    vars_to_test=["rides", "daily_bus_rides"],
    time_var="trip_start_date",
    cutoff_date="2020-01-06",
    covariates=day_of_week_controls
)

panel_a_2 = run_local_smoothness_tests(
    df=tnp_20_2,
    vars_to_test=["taxi"],
    time_var="trip_start_date",
    cutoff_date="2020-01-06",
    covariates=day_of_week_controls
)

panel_b_1 = run_local_smoothness_tests(
    df=tnp_19_1,
    vars_to_test=["rides", "daily_bus_rides"],
    time_var="trip_start_date",
    cutoff_date="2019-01-07",
    covariates=day_of_week_controls
)

panel_b_2 = run_local_smoothness_tests(
    df=tnp_19_2,
    vars_to_test=["taxi"],
    time_var="trip_start_date",
    cutoff_date="2019-01-07",
    covariates=day_of_week_controls
)

panel_a = panel_a_1 + panel_a_2
panel_b = panel_b_1 + panel_b_2

export_smoothness_table_to_tex(
    panel_a_results=panel_a,
    panel_b_results=panel_b,
    file_path="../../output/assumption/substitutes_smoothness_table.tex",
    custom_labels={
        # "total_rides": "Public Transportation",
        "rides": "L Rail",
        "daily_bus_rides": "Bus",
        "taxi": "Taxi"
    },
    caption="Smoothness Test of Substitute Transportation Modes",
    controls_used=True,
    note="This table reports the smoothness tests for potential substitute transportation modes around the policy cutoff. Each column corresponds to a different mode (``L'' Rail System, Bus, Taxi). ``Treatment'' refers to the coefficient for the treatment indicator in the treatment year, while ``Placebo Treat'' refers to the same coefficient in the placebo year. Robust standard errors are reported in parentheses. * $p<0.1$, ** $p<0.05$, *** $p<0.01$."
)


Saved to: ../../output/assumption/weather_smoothness_table.tex
Saved to: ../../output/assumption/substitutes_smoothness_table.tex
