In [1]:
import sys
from pathlib import Path

ROOT = Path().resolve().parents[1]
sys.path.append(str(ROOT / "src"))

import re
import numpy as np
import pandas as pd
from config import BANK_PANEL_PARQUET, OIL_PANEL_PARQUET
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from linearmodels.panel import PanelOLS
import statsmodels.api as sm
from scipy.stats import norm

In [2]:
df_bank_panel = (
    pd.read_parquet(BANK_PANEL_PARQUET)
      .set_index(["firm", "date"])
      .sort_index()
)

df_oil_panel = (
    pd.read_parquet(OIL_PANEL_PARQUET)
      .set_index(["firm", "date"])
      .sort_index()
)

In [6]:
df_bank_panel.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,rendite_1_quartal,volatility 260 day calc,return on common equity,price to book ratio,book to market ratio,bloomberg_esg,refinitiv_esg,rendite_2_quartal,sector,bloomberg_esg_z,refinitiv_esg_z
firm,date,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
ABN AMRO Bank NV,2010-03-31,,,,,,,,,banks,,
ABN AMRO Bank NV,2010-06-30,,,,,,,,,banks,,
ABN AMRO Bank NV,2010-09-30,,,,,,,,,banks,,
ABN AMRO Bank NV,2010-12-30,,,,,,,,,banks,,
ABN AMRO Bank NV,2011-03-31,,,,,,,,,banks,,


In [3]:
def save_esg_quantile_bands_grid(
    df_bank_panel: pd.DataFrame,
    df_oil_panel: pd.DataFrame,
    use_zscores: bool = False,
    filename: str = "esg_quantile_bands.png",
    dpi: int = 300,
    q_lo: float = 0.25,
    q_hi: float = 0.75,
    debug: bool = False,
) -> str:
    """
    Saves a 2x2 grid of time-quantile-band plots (IQR band + median + mean) for:
      - Banks vs Energy (rows)
      - Bloomberg vs LSEG/Refinitiv (cols)

    Functionality matches the original notebook code, with:
      - no plt.show()
      - prints only the saved file path
      - optional switch between raw ESG columns and z-score columns via use_zscores

    Parameters
    ----------
    use_zscores : bool
        If True, uses 'bloomberg_esg_z' and 'refinitiv_esg_z'.
        If False, uses 'bloomberg_esg' and 'refinitiv_esg'.
    out_path : str
        File path where the figure is saved (PNG recommended).
    """
    # Select columns
    bbg_col = "bloomberg_esg_z" if use_zscores else "bloomberg_esg"
    lseg_col = "refinitiv_esg_z" if use_zscores else "refinitiv_esg"

    def _first_valid_date(panel: pd.DataFrame, col: str):
        df = panel.reset_index()[["date", col]].copy()
        df["date"] = pd.to_datetime(df["date"], errors="coerce")
        df[col] = pd.to_numeric(df[col], errors="coerce")
        df = df.dropna(subset=["date", col])
        if len(df) == 0:
            return pd.NaT, pd.NaT
        return df["date"].min(), df["date"].max()

    def _plot_time_quantile_bands_ax(
        ax,
        df_panel: pd.DataFrame,
        col: str,
        title: str,
        q_lo: float,
        q_hi: float,
        debug: bool,
    ):
        df = df_panel.reset_index().copy()

        if "date" not in df.columns:
            raise KeyError("Column 'date' not found after reset_index(). Check your index levels.")

        df["date"] = pd.to_datetime(df["date"], errors="coerce")

        if col not in df.columns:
            raise KeyError(f"Column '{col}' not found. Available columns: {list(df.columns)}")

        df[col] = pd.to_numeric(df[col], errors="coerce")
        df = df.dropna(subset=["date", col])

        if debug:
            print(f"[{title}] rows after cleaning: {len(df):,}")
            if len(df) == 0:
                print(f"[{title}] EMPTY after cleaning. Likely wrong column name or all-NaN values.")
                return
            print(f"[{title}] date range: {df['date'].min()} .. {df['date'].max()}")
            print(f"[{title}] non-null '{col}': {df[col].notna().sum():,}")

        grp = df.groupby("date")[col]
        out = pd.DataFrame(
            {
                "q_lo": grp.quantile(q_lo),
                "median": grp.quantile(0.50),
                "q_hi": grp.quantile(q_hi),
                "mean": grp.mean(),
            }
        ).dropna()

        out = out.sort_index()

        if debug:
            print(f"[{title}] grouped dates: {len(out):,}")
            if len(out) > 0:
                print(out.head())

        if len(out) == 0:
            if debug:
                print(f"[{title}] 'out' is empty after grouping. Often happens if each date has <2 observations.")
            return

        ax.fill_between(
            out.index, out["q_lo"], out["q_hi"],
            alpha=0.25,
            label=f"IQR ({int(q_lo*100)}–{int(q_hi*100)}%)"
        )
        ax.plot(out.index, out["median"], linewidth=2, label="Median")
        ax.plot(out.index, out["mean"], linestyle="--", linewidth=1.5, label="Mean")

        ax.set_title(title)
        ax.grid(True, alpha=0.3)

    # Provider-specific ranges (banks + energy; earliest start / latest end per provider)
    bbg_b_start, bbg_b_end = _first_valid_date(df_bank_panel, bbg_col)
    bbg_e_start, bbg_e_end = _first_valid_date(df_oil_panel,  bbg_col)
    lseg_b_start, lseg_b_end = _first_valid_date(df_bank_panel, lseg_col)
    lseg_e_start, lseg_e_end = _first_valid_date(df_oil_panel,  lseg_col)

    # Match original behavior: min/max across sectors (note: if a panel is empty, min/max may be NaT)
    bbg_start = min(bbg_b_start, bbg_e_start)
    bbg_end   = max(bbg_b_end,   bbg_e_end)
    lseg_start = min(lseg_b_start, lseg_e_start)
    lseg_end   = max(lseg_b_end,   lseg_e_end)

    if debug:
        print("Bloomberg range:", bbg_start, "to", bbg_end)
        print("LSEG range:", lseg_start, "to", lseg_end)

    fig, axes = plt.subplots(2, 2, figsize=(12, 7), sharex=False)

    _plot_time_quantile_bands_ax(axes[0, 0], df_bank_panel, bbg_col,  "Banks — Bloomberg ESG", q_lo, q_hi, debug)
    _plot_time_quantile_bands_ax(axes[0, 1], df_bank_panel, lseg_col, "Banks — LSEG ESG",      q_lo, q_hi, debug)
    _plot_time_quantile_bands_ax(axes[1, 0], df_oil_panel,  bbg_col,  "Energy — Bloomberg ESG", q_lo, q_hi, debug)
    _plot_time_quantile_bands_ax(axes[1, 1], df_oil_panel,  lseg_col, "Energy — LSEG ESG",      q_lo, q_hi, debug)

    # Panel-specific x-limits (only set if we have valid dates)
    if pd.notna(bbg_start) and pd.notna(bbg_end):
        axes[0, 0].set_xlim(bbg_start, bbg_end)
        axes[1, 0].set_xlim(bbg_start, bbg_end)

    if pd.notna(lseg_start) and pd.notna(lseg_end):
        axes[0, 1].set_xlim(lseg_start, lseg_end)
        axes[1, 1].set_xlim(lseg_start, lseg_end)

    # Labels (minimal)
    axes[0, 0].set_ylabel("ESG score")
    axes[1, 0].set_ylabel("ESG score")
    axes[1, 0].set_xlabel("Date")
    axes[1, 1].set_xlabel("Date")

    # One legend (keep original behavior: pull from axes[0,0])
    handles, labels = axes[0, 0].get_legend_handles_labels()
    if handles:
        fig.legend(handles, labels, loc="lower center", ncol=3, frameon=False)

    fig.tight_layout(rect=[0, 0.05, 1, 1])
    
    from config import FIGURES_DIR
    from pathlib import Path
    
    out_path = FIGURES_DIR / filename
    out_path.parent.mkdir(parents=True, exist_ok=True)
    fig.savefig(out_path, dpi=dpi, bbox_inches="tight")
    plt.close(fig)

    return out_path

In [4]:
save_esg_quantile_bands_grid(
    df_bank_panel=df_bank_panel,
    df_oil_panel=df_oil_panel,
    use_zscores=False,
    filename="esg_quantile_bands_raw.png",
)

save_esg_quantile_bands_grid(
    df_bank_panel,
    df_oil_panel,
    use_zscores=True,
    filename="esg_quantile_bands_zscores.png",
)

PosixPath('/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/esg_quantile_bands_zscores.png')

In [5]:
def plot_provider_comparison_twfe_onefigure(
    df_bank_panel: pd.DataFrame,
    df_oil_panel: pd.DataFrame,
    outcome_blocks: list[tuple[str, str]],
    controls: list[str] | None = None,
    controls_by_outcome: list[list[str]] | None = None,
    providers: list[tuple[str, str]] | None = None,
    entity_col: str = "firm",
    time_col: str = "date",
    min_obs: int = 25,
    block_gap: float = 0.9,
    filename_prefix: str = "provider_compare_twfe_ONEFIG",
    dpi: int = 300,
    add_title: bool = False,

    # scaling: percentage points (pp)
    scale: float = 100.0,
    scale_unit: str = "percentage points",

    # display windows (in scaled units, i.e. pp). Different left bounds per sector.
    xlim_left_banks: float = -4.0,
    xlim_left_energy: float = -6.0,
    xlim_right: float | None = None,

    # CI settings
    ci_outer: float = 0.95,
    ci_inner: float = 0.68,

    # choose raw vs z-score ESG columns
    use_zscores: bool = False,

    # output filenames (no directories) — directories come from config.py
    filename_png: str | None = None,
    filename_pdf: str | None = None,
):
    """
    One figure with 2 subplots (Banks | Energy). Each subplot stacks outcomes and compares
    two ESG providers (default: LSEG/Refinitiv vs Bloomberg) using TWFE (firm + time FE),
    SE clustered by firm.

    Enhancements
    ------------
    - Adds an additional outcome block: "Valuation: book to market ratio" (pass it in outcome_blocks).
    - Supports outcome-specific controls via `controls_by_outcome` (list of lists aligned to outcome_blocks).
      If provided, `controls` is ignored for that outcome.

    Notes
    -----
    - Output paths are taken from config.py (FIGURES_DIR); caller provides filenames only.
    - `use_zscores=True` uses '<provider_col>_z' columns.
    - Provider label "Refinitiv" is displayed as "LSEG" in the figure/legend.
    """

    # --- config-driven figure directory (centralized paths) ---
    from pathlib import Path
    from config import FIGURES_DIR  # must exist in src/config.py

    FIGURES_DIR.mkdir(parents=True, exist_ok=True)

    if providers is None:
        providers = [("Refinitiv", "refinitiv_esg"), ("Bloomberg", "bloomberg_esg")]
    if len(providers) != 2:
        raise ValueError("This layout expects exactly 2 providers (e.g., Refinitiv and Bloomberg).")

    # Controls logic: either global controls OR per-outcome controls
    if controls_by_outcome is not None:
        if len(controls_by_outcome) != len(outcome_blocks):
            raise ValueError(
                f"controls_by_outcome must have same length as outcome_blocks "
                f"({len(controls_by_outcome)} != {len(outcome_blocks)})."
            )
        controls_by_outcome = [list(c) for c in controls_by_outcome]
    else:
        controls = [] if controls is None else list(controls)

    # Plot label mapping
    def _provider_plot_label(name: str) -> str:
        return "LSEG" if name.strip().lower() == "refinitiv" else name

    def _provider_col(base_col: str) -> str:
        return f"{base_col}_z" if use_zscores else base_col

    providers_used = [(pname, _provider_col(col)) for pname, col in providers]
    prov_names_internal = [p[0] for p in providers_used]
    prov_names_plot = [_provider_plot_label(p[0]) for p in providers_used]

    # --- z-values for confidence intervals ---
    from scipy.stats import norm
    z_outer = float(norm.ppf(0.5 + ci_outer / 2))
    z_inner = float(norm.ppf(0.5 + ci_inner / 2))

    def _slug(s: str) -> str:
        s = s.strip().lower()
        s = re.sub(r"[^a-z0-9]+", "_", s)
        return re.sub(r"_+", "_", s).strip("_")

    def _validate_panel(df_panel: pd.DataFrame):
        if not isinstance(df_panel.index, pd.MultiIndex):
            raise ValueError("Panel df must have MultiIndex (firm, date).")
        if entity_col not in df_panel.index.names or time_col not in df_panel.index.names:
            raise ValueError(f"Index must include levels '{entity_col}' and '{time_col}'.")

    def _prep_df(df_panel: pd.DataFrame, y: str, x: str, ctrls: list[str]) -> pd.DataFrame:
        _validate_panel(df_panel)
        needed = [y, x] + list(ctrls)
        missing = [c for c in needed if c not in df_panel.columns]
        if missing:
            raise ValueError(f"Missing columns: {missing}")

        df = df_panel[needed].copy()
        for c in needed:
            df[c] = pd.to_numeric(df[c], errors="coerce")
        df = df.dropna(subset=needed).copy()

        if len(df) < min_obs:
            raise ValueError(f"Not enough obs for {y} ~ {x}: {len(df)} < {min_obs}")
        return df

    def _fit_twfe(df: pd.DataFrame, y: str, x: str, ctrls: list[str]):
        X = df[[x] + list(ctrls)]
        mod = PanelOLS(
            df[y], X,
            entity_effects=True,
            time_effects=True,
            drop_absorbed=True
        ).fit(cov_type="clustered", cluster_entity=True)

        beta = float(mod.params.get(x, np.nan))
        se = float(mod.std_errors.get(x, np.nan))
        pval = float(mod.pvalues.get(x, np.nan))
        nobs = int(mod.nobs)
        return beta, se, pval, nobs

    def _controls_for_outcome_idx(i: int) -> list[str]:
        if controls_by_outcome is not None:
            return controls_by_outcome[i]
        return controls

    # --- y positions: two rows per outcome, then a gap ---
    y_positions: dict[tuple[str, str], float] = {}
    current_y = 0.0
    for outcome_name, _ in outcome_blocks:
        for prov_internal in prov_names_internal:
            y_positions[(outcome_name, prov_internal)] = current_y
            current_y += 1.0
        current_y += block_gap

    # centered outcome labels (one per block)
    center_ticks, center_labels = [], []
    for outcome_name, _ in outcome_blocks:
        y_top = y_positions[(outcome_name, prov_names_internal[0])]
        y_bot = y_positions[(outcome_name, prov_names_internal[1])]
        center_ticks.append((y_top + y_bot) / 2.0)
        center_labels.append(outcome_name)

    # --- run regressions for all combos ---
    rows = []
    for sector_name, df_sector in [("Banks", df_bank_panel), ("Energy", df_oil_panel)]:
        for i, (outcome_name, y) in enumerate(outcome_blocks):
            ctrls = _controls_for_outcome_idx(i)
            for prov_internal, x in providers_used:
                df = _prep_df(df_sector, y, x, ctrls)
                beta, se, pval, nobs = _fit_twfe(df, y, x, ctrls)

                beta_s = scale * beta
                se_s = scale * se

                rows.append({
                    "sector": sector_name,
                    "outcome": outcome_name,
                    "provider_internal": prov_internal,
                    "provider_plot": _provider_plot_label(prov_internal),
                    "x": x,
                    "y": y,
                    "beta": beta_s,
                    "se": se_s,
                    "p_value": pval,
                    "n_obs": nobs,
                    "ci95_low": beta_s - z_outer * se_s,
                    "ci95_high": beta_s + z_outer * se_s,
                    "ci68_low": beta_s - z_inner * se_s,
                    "ci68_high": beta_s + z_inner * se_s,
                    "y_plot": y_positions[(outcome_name, prov_internal)],
                    "controls": "|".join(ctrls) if ctrls else "",
                    "k_controls": len(ctrls),
                    "model": "TWFE (clustered by firm)",
                    "uses_zscores": bool(use_zscores),
                })

    results_df = pd.DataFrame(rows)

    # --- axis limits per sector ---
    limits = {}
    for sector_name in ["Banks", "Energy"]:
        sub = results_df[results_df["sector"] == sector_name].copy()
        lo = float(np.nanmin(np.minimum(sub["beta"], sub["ci68_low"])))
        hi = float(np.nanmax(np.maximum(sub["beta"], sub["ci68_high"])))
        pad = 0.12 * (hi - lo) if hi > lo else 1.0

        left_cap = xlim_left_banks if sector_name == "Banks" else xlim_left_energy
        x_left = min(left_cap, lo - pad)
        x_right = hi + pad
        if xlim_right is not None:
            x_right = min(x_right, float(xlim_right))

        limits[sector_name] = (x_left, x_right)

    # --- provider colors (kept stable) ---
    prov_color = {
        prov_names_internal[0]: "#1f77b4",
        prov_names_internal[1]: "#d62728",
    }

    legend_handles = [
        Line2D([0], [0], marker="o", linestyle="-", color=prov_color[prov_names_internal[0]],
               label=prov_names_plot[0]),
        Line2D([0], [0], marker="o", linestyle="-", color=prov_color[prov_names_internal[1]],
               label=prov_names_plot[1]),
        Line2D([0], [0], linestyle="-", color="black", linewidth=2.2, label=f"{int(ci_inner*100)}% CI"),
        Line2D([0], [0], linestyle="-", color="black", linewidth=1.2, alpha=0.55, label=f"{int(ci_outer*100)}% CI"),
    ]

    def _draw_ci_with_arrows(
        ax, y, lo, hi, color, x_left, x_right,
        lw=1.6, alpha=1.0, cap_height=0.18, arrow_inset_frac=0.015,
    ):
        lo_c = max(lo, x_left)
        hi_c = min(hi, x_right)

        ax.hlines(y, lo_c, hi_c, linewidth=lw, color=color, alpha=alpha)
        x_span = x_right - x_left
        eps = arrow_inset_frac * x_span

        if lo >= x_left:
            ax.vlines(lo, y - cap_height, y + cap_height, linewidth=lw, color=color, alpha=alpha)
        else:
            ax.plot(x_left + eps, y, marker="<", color=color, alpha=alpha, markersize=7)

        if hi <= x_right:
            ax.vlines(hi, y - cap_height, y + cap_height, linewidth=lw, color=color, alpha=alpha)
        else:
            ax.plot(x_right - eps, y, marker=">", color=color, alpha=alpha, markersize=7)

    # --- plot ---
    fig, axes = plt.subplots(1, 2, figsize=(12.4, 6.0), sharey=True)

    for ax, sector_name in zip(axes, ["Banks", "Energy"]):
        sub = results_df[results_df["sector"] == sector_name].copy()

        ax.axvline(0.0, linewidth=1.0)

        # separators between blocks
        for bi in range(len(outcome_blocks) - 1):
            out_i = outcome_blocks[bi][0]
            out_next = outcome_blocks[bi + 1][0]
            y_last = y_positions[(out_i, prov_names_internal[-1])]
            y_first_next = y_positions[(out_next, prov_names_internal[0])]
            ax.axhline((y_last + y_first_next) / 2.0, linewidth=0.8, alpha=0.35)

        x_left, x_right = limits[sector_name]
        ax.set_xlim(x_left, x_right)

        for _, r in sub.iterrows():
            prov_internal = r["provider_internal"]
            c = prov_color[prov_internal]
            y0 = float(r["y_plot"])
            beta = float(r["beta"])

            _draw_ci_with_arrows(
                ax, y0,
                float(r["ci95_low"]), float(r["ci95_high"]),
                color=c, x_left=x_left, x_right=x_right,
                lw=1.2, alpha=0.55
            )
            _draw_ci_with_arrows(
                ax, y0,
                float(r["ci68_low"]), float(r["ci68_high"]),
                color=c, x_left=x_left, x_right=x_right,
                lw=2.2, alpha=1.0
            )
            ax.plot(beta, y0, marker="o", color=c, markersize=6)

        ax.set_title(sector_name)
        ax.set_yticks(center_ticks)
        ax.set_yticklabels(center_labels)
        ax.grid(True, axis="x", linewidth=0.4, alpha=0.35)
        ax.spines["top"].set_visible(False)
        ax.spines["right"].set_visible(False)

    fig.supxlabel(
        f"ESG coefficient estimate (TWFE), in {scale_unit} (pp); "
        f"{int(ci_inner*100)}% and {int(ci_outer*100)}% CIs"
    )

    if add_title:
        z_tag = " (z-scores)" if use_zscores else ""
        fig.suptitle(f"ESG provider comparison (TWFE, outcome-specific controls){z_tag}", y=0.98)

    fig.legend(
        handles=legend_handles,
        loc="upper center",
        ncol=4,
        frameon=True,
        bbox_to_anchor=(0.5, 0.995),
        borderaxespad=0.0
    )

    fig.tight_layout(rect=(0, 0.04, 1, 0.94))

    # --- filenames (caller provides filename only; directory inferred from config) ---
    outcomes_tag = "_".join(_slug(b) for b, _ in outcome_blocks)
    controls_tag = "varying" if controls_by_outcome is not None else f"k{len(controls)}"
    z_tag = "z" if use_zscores else "raw"
    base = f"{filename_prefix}_{outcomes_tag}_{controls_tag}_{z_tag}_pp"

    if filename_png is None:
        filename_png = base + ".png"
    if filename_pdf is None:
        filename_pdf = base + ".pdf"

    png_path = FIGURES_DIR / filename_png
    pdf_path = FIGURES_DIR / filename_pdf
    png_path.parent.mkdir(parents=True, exist_ok=True)

    fig.savefig(png_path, dpi=dpi, bbox_inches="tight")
    fig.savefig(pdf_path, bbox_inches="tight")
    plt.close(fig)

    return results_df, {"png_path": str(png_path), "pdf_path": str(pdf_path)}


In [7]:
outcome_blocks = [
    ("Return", "rendite_1_quartal"),
    ("Volatility", "volatility 260 day calc"),
    ("Profitability", "return on common equity"),
    ("Valuation", "book to market ratio"),
]

controls_by_outcome = [
    ["book to market ratio", "volatility 260 day calc"],# controls for Return regression
    ["book to market ratio", "return on common equity"], # controls for Volatility regression
    ["book to market ratio", "volatility 260 day calc"], # controls for Profitability regression
    ["volatility 260 day calc", "return on common equity"],# controls for Valuation regression (example)
]

results_df, paths = plot_provider_comparison_twfe_onefigure(
    df_bank_panel,
    df_oil_panel,
    outcome_blocks=outcome_blocks,
    controls_by_outcome=controls_by_outcome,   # varies per outcome
    use_zscores=False,                          # False for raw ESG
    filename_prefix="twfe_provider_compare",
)

results_df, paths = plot_provider_comparison_twfe_onefigure(
    df_bank_panel,
    df_oil_panel,
    outcome_blocks=outcome_blocks,
    controls_by_outcome=controls_by_outcome,   # varies per outcome
    use_zscores=True,                          # False for raw ESG
    filename_prefix="twfe_provider_compare",
)

In [8]:
def coef_path_dotwhisker(
    df_bank_panel: pd.DataFrame,
    df_oil_panel: pd.DataFrame,
    y_var: str,
    controls: list[str] | None = None,
    providers: dict[str, str] | None = None,   # {"Bloomberg": "bloomberg_esg", "Refinitiv": "refinitiv_esg"}
    entity_col: str = "firm",
    time_col: str = "date",
    min_obs: int = 25,
    add_title: bool = False,
    dpi: int = 300,

    # CI levels and styling
    ci_outer: float = 0.95,
    ci_inner: float = 0.68,
    lw_outer: float = 1.1,     # thin (95%)
    lw_inner: float = 2.2,     # thick (68%)
    cap_height: float = 0.18,  # y-units; works with y positions 0..3

    # x-limits per panel (use None for auto)
    # keys must match panel titles: ("Banks","Bloomberg"), ("Banks","Refinitiv"), ("Energy","Bloomberg"), ("Energy","Refinitiv")
    panel_xlim: dict[tuple[str, str], tuple[float | None, float | None]] | None = None,

    # column-wise x-limits (applies to both Banks/Energy in that provider column)
    col_xlim: dict[str, tuple[float | None, float | None]] | None = None,

    # global x-limits for all panels (overridden by panel_xlim/col_xlim if provided)
    xlim_global: tuple[float | None, float | None] | None = None,

    # NEW: choose raw vs z-score ESG columns
    use_zscores: bool = False,

    # NEW: filenames only; directory inferred from config.py (FIGURES_DIR)
    filename_prefix: str = "dotwhisker_path",
    filename_png: str | None = None,
    filename_pdf: str | None = None,
    filename_table_pdf: str | None = None,
):
    """
    Runs pooled OLS + FE models for Banks vs Energy and Bloomberg vs Refinitiv/LSEG, then plots coefficient
    dot-and-whisker in a 2x2 grid. Includes BOTH 68% and 95% confidence intervals:
      - 68%: thicker whisker
      - 95%: thinner whisker
    If an interval exceeds the x-range, arrows are drawn slightly inside the axis so they are not clipped.

    Output (same directory)
    -----------------------
    - PNG figure
    - PDF figure
    - PDF table with regression outputs (beta, se, p-value, nobs, CIs)

    Returns
    -------
    results_df : pd.DataFrame (long format)
    out_paths : dict with png/pdf/table_pdf paths
    """

    # --- config-driven figure directory ---
    from pathlib import Path
    from config import FIGURES_DIR  # ensure this exists in src/config.py
    FIGURES_DIR.mkdir(parents=True, exist_ok=True)

    if controls is None:
        controls = []
    if providers is None:
        providers = {"Bloomberg": "bloomberg_esg", "Refinitiv": "refinitiv_esg"}

    # provider display label mapping (Refinitiv -> LSEG in titles)
    def _prov_label(name: str) -> str:
        return "LSEG" if name.strip().lower() == "refinitiv" else name

    # apply z-score suffix if requested
    def _xcol(col: str) -> str:
        return f"{col}_z" if use_zscores else col

    providers_used = {pname: _xcol(xcol) for pname, xcol in providers.items()}

    z_outer = float(norm.ppf(0.5 + ci_outer / 2))
    z_inner = float(norm.ppf(0.5 + ci_inner / 2))

    models = [
        ("Pooled OLS (HC3)", "pooled"),
        ("Firm FE (clustered)", "firm_fe"),
        ("Time FE (clustered)", "time_fe"),
        ("Two-way FE (clustered)", "twfe"),
    ]

    sectors = [
        ("Banks", df_bank_panel),
        ("Energy", df_oil_panel),
    ]

    def _slug(s: str) -> str:
        s = s.strip().lower()
        s = re.sub(r"[^a-z0-9]+", "_", s)
        return re.sub(r"_+", "_", s).strip("_")

    def _prep_df(df_panel: pd.DataFrame, x_var: str) -> pd.DataFrame:
        if not isinstance(df_panel.index, pd.MultiIndex):
            raise ValueError("Panel df must have MultiIndex (firm, date).")
        if entity_col not in df_panel.index.names or time_col not in df_panel.index.names:
            raise ValueError(f"Index must include levels '{entity_col}' and '{time_col}'.")

        needed = [y_var, x_var] + list(controls)
        missing = [c for c in needed if c not in df_panel.columns]
        if missing:
            raise ValueError(f"Missing columns in panel df: {missing}")

        df = df_panel[needed].copy()
        for c in needed:
            df[c] = pd.to_numeric(df[c], errors="coerce")

        df = df.dropna(subset=needed).copy()
        if len(df) < min_obs:
            raise ValueError(f"Not enough usable obs for {y_var}~{x_var}: {len(df)} < {min_obs}")
        return df

    def _fit_all_models(df: pd.DataFrame, x_var: str):
        out = {}

        # pooled
        dfr = df.reset_index()
        X_cols = [x_var] + list(controls)
        X = sm.add_constant(dfr[X_cols], has_constant="add")
        y = dfr[y_var]
        pooled = sm.OLS(y, X).fit(cov_type="HC3")
        out["pooled"] = (
            float(pooled.params.get(x_var, np.nan)),
            float(pooled.bse.get(x_var, np.nan)),
            float(pooled.pvalues.get(x_var, np.nan)),
            int(pooled.nobs),
        )

        # FE models
        Xp = df[[x_var] + list(controls)]
        fe_firm = PanelOLS(df[y_var], Xp, entity_effects=True, drop_absorbed=True) \
            .fit(cov_type="clustered", cluster_entity=True)
        out["firm_fe"] = (
            float(fe_firm.params.get(x_var, np.nan)),
            float(fe_firm.std_errors.get(x_var, np.nan)),
            float(fe_firm.pvalues.get(x_var, np.nan)),
            int(fe_firm.nobs),
        )

        fe_time = PanelOLS(df[y_var], Xp, time_effects=True, drop_absorbed=True) \
            .fit(cov_type="clustered", cluster_entity=True)
        out["time_fe"] = (
            float(fe_time.params.get(x_var, np.nan)),
            float(fe_time.std_errors.get(x_var, np.nan)),
            float(fe_time.pvalues.get(x_var, np.nan)),
            int(fe_time.nobs),
        )

        fe_tw = PanelOLS(df[y_var], Xp, entity_effects=True, time_effects=True, drop_absorbed=True) \
            .fit(cov_type="clustered", cluster_entity=True)
        out["twfe"] = (
            float(fe_tw.params.get(x_var, np.nan)),
            float(fe_tw.std_errors.get(x_var, np.nan)),
            float(fe_tw.pvalues.get(x_var, np.nan)),
            int(fe_tw.nobs),
        )

        return out

    # ---- run suite ----
    rows = []
    for sector_name, df_panel in sectors:
        for provider_name, x_var in providers_used.items():
            df = _prep_df(df_panel, x_var)
            fits = _fit_all_models(df, x_var)

            for model_label, model_key in models:
                beta, se, pval, nobs = fits[model_key]
                rows.append({
                    "sector": sector_name,
                    "provider": provider_name,
                    "provider_plot": _prov_label(provider_name),
                    "x": x_var,
                    "y": y_var,
                    "model": model_label,
                    "model_key": model_key,
                    "beta": beta,
                    "se": se,
                    "p_value": pval,
                    "ci95_low": beta - z_outer * se,
                    "ci95_high": beta + z_outer * se,
                    "ci68_low": beta - z_inner * se,
                    "ci68_high": beta + z_inner * se,
                    "n_obs": nobs,
                    "k_controls": len(controls),
                    "controls": "|".join(controls) if controls else "",
                    "uses_zscores": bool(use_zscores),
                })

    results_df = pd.DataFrame(rows)

    # ---- helper: whiskers with caps + inward arrows if truncated ----
    def _draw_ci(ax, y, lo, hi, color, x_left, x_right, lw, alpha, cap_h, arrow_inset_frac=0.015):
        lo_c = max(lo, x_left)
        hi_c = min(hi, x_right)
        ax.hlines(y, lo_c, hi_c, linewidth=lw, color=color, alpha=alpha)

        eps = arrow_inset_frac * (x_right - x_left)

        if lo >= x_left:
            ax.vlines(lo, y - cap_h, y + cap_h, linewidth=lw, color=color, alpha=alpha)
        else:
            ax.plot(x_left + eps, y, marker="<", color=color, alpha=alpha, markersize=7)

        if hi <= x_right:
            ax.vlines(hi, y - cap_h, y + cap_h, linewidth=lw, color=color, alpha=alpha)
        else:
            ax.plot(x_right - eps, y, marker=">", color=color, alpha=alpha, markersize=7)

    # ---- plot: 2x2 grid ----
    fig, axes = plt.subplots(2, 2, figsize=(10.8, 7.4), sharey=True)
    axes = axes.ravel()

    model_order = [m[0] for m in models]
    y_pos = np.arange(len(model_order))

    # keep panel mapping keyed by *provider keys* (as provided)
    # NOTE: if you keep "Refinitiv" in providers dict key, panel keys remain ("Banks","Refinitiv") etc.
    provider_keys = list(providers_used.keys())
    if len(provider_keys) != 2:
        raise ValueError("This function expects exactly 2 providers in the providers dict.")

    p0, p1 = provider_keys[0], provider_keys[1]

    panel_map = {
        ("Banks", p0): 0,
        ("Banks", p1): 1,
        ("Energy", p0): 2,
        ("Energy", p1): 3,
    }

    def _auto_xlim_for_panel(sub: pd.DataFrame):
        lo = float(np.nanmin(np.minimum(sub["beta"], sub["ci68_low"])))
        hi = float(np.nanmax(np.maximum(sub["beta"], sub["ci68_high"])))
        pad = 0.12 * (hi - lo) if hi > lo else 1.0
        return lo - pad, hi + pad

    for (sector_name, provider_name), ax_idx in panel_map.items():
        ax = axes[ax_idx]
        sub = results_df[(results_df["sector"] == sector_name) & (results_df["provider"] == provider_name)].copy()
        sub["model"] = pd.Categorical(sub["model"], categories=model_order, ordered=True)
        sub = sub.sort_values("model")

        x_left, x_right = None, None
        if xlim_global is not None:
            x_left, x_right = xlim_global
        if col_xlim is not None and provider_name in col_xlim:
            x_left, x_right = col_xlim[provider_name]
        if panel_xlim is not None and (sector_name, provider_name) in panel_xlim:
            x_left, x_right = panel_xlim[(sector_name, provider_name)]

        if x_left is None or x_right is None:
            a_left, a_right = _auto_xlim_for_panel(sub)
            if x_left is None:
                x_left = a_left
            if x_right is None:
                x_right = a_right

        ax.set_xlim(float(x_left), float(x_right))
        ax.axvline(0.0, linewidth=1.0)

        # plot each model row
        for i, mlabel in enumerate(model_order):
            r = sub[sub["model"] == mlabel].iloc[0]
            beta = float(r["beta"])

            # consistent color per model row (based on axis color cycle)
            tmp_line = ax.plot([], [])[0]
            c = tmp_line.get_color()
            tmp_line.remove()

            _draw_ci(ax, i, float(r["ci95_low"]), float(r["ci95_high"]),
                     color=c, x_left=float(x_left), x_right=float(x_right),
                     lw=lw_outer, alpha=0.55, cap_h=cap_height)
            _draw_ci(ax, i, float(r["ci68_low"]), float(r["ci68_high"]),
                     color=c, x_left=float(x_left), x_right=float(x_right),
                     lw=lw_inner, alpha=1.0, cap_h=cap_height)

            # point estimate (clip-safe)
            span = float(x_right) - float(x_left)
            inset = 0.015 * span
            if beta < float(x_left):
                ax.plot(float(x_left) + inset, i, marker="<", color=c, markersize=8)
            elif beta > float(x_right):
                ax.plot(float(x_right) - inset, i, marker=">", color=c, markersize=8)
            else:
                ax.plot(beta, i, marker="o", color=c, markersize=6, zorder=4)

        ax.set_title(f"{sector_name} — {_prov_label(provider_name)}")
        ax.set_yticks(y_pos)
        ax.set_yticklabels(model_order)
        ax.spines["top"].set_visible(False)
        ax.spines["right"].set_visible(False)
        ax.grid(True, axis="x", linewidth=0.4, alpha=0.35)

    fig.supxlabel("ESG coefficient estimate (dot) with 68% (thick) and 95% (thin) CIs")
    fig.supylabel("Model specification")

    if add_title:
        z_tag = " (z-scores)" if use_zscores else ""
        fig.suptitle(f"Coefficient path across specifications: y={y_var}{z_tag}", y=0.995)

    legend_handles = [
        Line2D([0], [0], color="black", linewidth=lw_inner, label=f"{int(ci_inner*100)}% CI (thick)"),
        Line2D([0], [0], color="black", linewidth=lw_outer, alpha=0.55, label=f"{int(ci_outer*100)}% CI (thin)"),
        Line2D([0], [0], marker="o", linestyle="None", color="black", label="Point estimate"),
        Line2D([0], [0], marker="<", linestyle="None", color="black", label="CI truncated (left)"),
        Line2D([0], [0], marker=">", linestyle="None", color="black", label="CI truncated (right)"),
    ]

    fig.legend(
        handles=legend_handles,
        loc="upper center",
        ncol=3,
        frameon=True,
        bbox_to_anchor=(0.5, 0.995),
        borderaxespad=0.0
    )

    fig.tight_layout(rect=(0, 0.04, 1, 0.93))

    # ---- save figure + table (same directory via config.FIGURES_DIR) ----
    y_tag = _slug(y_var)
    prov_tag = _slug("_".join([_prov_label(k) for k in providers_used.keys()]))
    ctrl_tag = f"k{len(controls)}"
    z_tag = "z" if use_zscores else "raw"
    base = f"{filename_prefix}_{y_tag}_{prov_tag}_{ctrl_tag}_{z_tag}"

    if filename_png is None:
        filename_png = base + ".png"
    if filename_pdf is None:
        filename_pdf = base + ".pdf"
    if filename_table_pdf is None:
        filename_table_pdf = base + "_table.pdf"

    png_path = FIGURES_DIR / filename_png
    pdf_path = FIGURES_DIR / filename_pdf
    table_pdf_path = FIGURES_DIR / filename_table_pdf
    png_path.parent.mkdir(parents=True, exist_ok=True)

    fig.savefig(png_path, dpi=dpi, bbox_inches="tight")
    fig.savefig(pdf_path, bbox_inches="tight")
    plt.close(fig)

    # ---- table PDF (regression outputs) ----
    # Format a readable table
    table_df = results_df.copy()
    table_df["provider"] = table_df["provider_plot"]
    table_df = table_df.drop(columns=["provider_plot"], errors="ignore")

    # Order rows in a nice way
    table_df["sector"] = pd.Categorical(table_df["sector"], categories=["Banks", "Energy"], ordered=True)
    table_df["provider"] = pd.Categorical(table_df["provider"], categories=[_prov_label(p0), _prov_label(p1)], ordered=True)
    table_df["model"] = pd.Categorical(table_df["model"], categories=model_order, ordered=True)
    table_df = table_df.sort_values(["sector", "provider", "model"])

    # Select/format columns for the PDF
    cols = ["sector", "provider", "model", "beta", "se", "p_value", "ci68_low", "ci68_high", "ci95_low", "ci95_high", "n_obs"]
    table_df = table_df[cols].copy()

    # numeric formatting
    def _fmt(x):
        try:
            if pd.isna(x):
                return ""
            return f"{float(x):.4f}"
        except Exception:
            return str(x)

    display_df = table_df.copy()
    for c in ["beta", "se", "p_value", "ci68_low", "ci68_high", "ci95_low", "ci95_high"]:
        display_df[c] = display_df[c].map(_fmt)
    display_df["n_obs"] = display_df["n_obs"].astype(int).astype(str)

    # Render table to PDF using matplotlib
    nrows = len(display_df)
    fig_h = max(3.0, 0.35 * nrows + 1.6)
    fig_w = 16.0

    fig_t, ax_t = plt.subplots(figsize=(fig_w, fig_h))
    ax_t.axis("off")

    title = f"Regression outputs for y={y_var} ({'z-scores' if use_zscores else 'raw ESG'})"
    subtitle = f"Controls: {', '.join(controls) if controls else '(none)'} | CI: {int(ci_inner*100)}% / {int(ci_outer*100)}%"
    ax_t.text(0.0, 1.02, title, transform=ax_t.transAxes, fontsize=12, fontweight="bold", va="bottom")
    ax_t.text(0.0, 0.985, subtitle, transform=ax_t.transAxes, fontsize=10, va="bottom")

    table = ax_t.table(
        cellText=display_df.values,
        colLabels=display_df.columns.tolist(),
        cellLoc="center",
        colLoc="center",
        loc="upper left",
        bbox=[0.0, 0.0, 1.0, 0.94],
    )
    table.auto_set_font_size(False)
    table.set_fontsize(8)

    # Slightly emphasize header row
    for (row, col), cell in table.get_celld().items():
        if row == 0:
            cell.set_text_props(weight="bold")

    fig_t.savefig(table_pdf_path, bbox_inches="tight")
    plt.close(fig_t)

    out_paths = {"png_path": str(png_path), "pdf_path": str(pdf_path), "table_pdf_path": str(table_pdf_path)}
    print(out_paths["png_path"])
    print(out_paths["pdf_path"])
    print(out_paths["table_pdf_path"])
    return results_df, out_paths


In [9]:
results_df, paths = coef_path_dotwhisker(
    df_bank_panel,
    df_oil_panel,
    y_var="rendite_1_quartal",
    controls=["volatility 260 day calc", "book to market ratio"],
    providers={"Bloomberg": "bloomberg_esg", "Refinitiv": "refinitiv_esg"},
    use_zscores=True,
    filename_prefix="coefpath",
)

results_df, paths = coef_path_dotwhisker(
    df_bank_panel,
    df_oil_panel,
    y_var="book to market ratio",
    controls=["volatility 260 day calc", "return on common equity"],
    providers={"Bloomberg": "bloomberg_esg", "Refinitiv": "refinitiv_esg"},
    use_zscores=True,
    filename_prefix="coefpath",
)

results_df, paths = coef_path_dotwhisker(
    df_bank_panel,
    df_oil_panel,
    y_var="volatility 260 day calc",
    controls=["return on common equity", "book to market ratio"],
    providers={"Bloomberg": "bloomberg_esg", "Refinitiv": "refinitiv_esg"},
    use_zscores=True,
    filename_prefix="coefpath",
)

results_df, paths = coef_path_dotwhisker(
    df_bank_panel,
    df_oil_panel,
    y_var="return on common equity",
    controls=["volatility 260 day calc", "book to market ratio"],
    providers={"Bloomberg": "bloomberg_esg", "Refinitiv": "refinitiv_esg"},
    use_zscores=True,
    filename_prefix="coefpath",
)

/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_rendite_1_quartal_bloomberg_lseg_k2_z.png
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_rendite_1_quartal_bloomberg_lseg_k2_z.pdf
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_rendite_1_quartal_bloomberg_lseg_k2_z_table.pdf
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_book_to_market_ratio_bloomberg_lseg_k2_z.png
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_book_to_market_ratio_bloomberg_lseg_k2_z.pdf
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_book_to_market_ratio_bloomberg_lseg_k2_z_table.pdf
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_volatility_260_day_calc_bloomberg_lseg_k2_z.png
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_volatility_260_day_calc_bloomberg_lseg_k2_z.pdf
/home/d

In [11]:
results_df, paths = coef_path_dotwhisker(
    df_bank_panel,
    df_oil_panel,
    y_var="rendite_1_quartal",
    controls=["volatility 260 day calc", "book to market ratio"],
    providers={"Bloomberg": "bloomberg_esg", "Refinitiv": "refinitiv_esg"},
    use_zscores=False,
    filename_prefix="coefpath",
)

results_df, paths = coef_path_dotwhisker(
    df_bank_panel,
    df_oil_panel,
    y_var="book to market ratio",
    controls=["volatility 260 day calc", "return on common equity"],
    providers={"Bloomberg": "bloomberg_esg", "Refinitiv": "refinitiv_esg"},
    use_zscores=False,
    filename_prefix="coefpath",
)

results_df, paths = coef_path_dotwhisker(
    df_bank_panel,
    df_oil_panel,
    y_var="volatility 260 day calc",
    controls=["return on common equity", "book to market ratio"],
    providers={"Bloomberg": "bloomberg_esg", "Refinitiv": "refinitiv_esg"},
    use_zscores=False,
    filename_prefix="coefpath",
)

results_df, paths = coef_path_dotwhisker(
    df_bank_panel,
    df_oil_panel,
    y_var="return on common equity",
    controls=["volatility 260 day calc", "book to market ratio"],
    providers={"Bloomberg": "bloomberg_esg", "Refinitiv": "refinitiv_esg"},
    use_zscores=False,
    filename_prefix="coefpath",
)

/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_rendite_1_quartal_bloomberg_lseg_k2_raw.png
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_rendite_1_quartal_bloomberg_lseg_k2_raw.pdf
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_rendite_1_quartal_bloomberg_lseg_k2_raw_table.pdf
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_book_to_market_ratio_bloomberg_lseg_k2_raw.png
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_book_to_market_ratio_bloomberg_lseg_k2_raw.pdf
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_book_to_market_ratio_bloomberg_lseg_k2_raw_table.pdf
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_volatility_260_day_calc_bloomberg_lseg_k2_raw.png
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/coefpath_volatility_260_day_calc_bloomberg_lseg_k2

In [12]:
def plot_esg_regression(
    df_panel: pd.DataFrame,
    y_var: str,
    provider: str = "bloomberg",              # "bloomberg" or "refinitiv"
    controls: list[str] | None = None,
    model: str = "pooled",                    # used when compare_models=False
    plot_space: str = "partial",              # "levels" or "partial"
    compare_models: bool = False,             # if True -> 2x2 pooled/firm/time/twfe
    compare_plot_space: str = "partial",      # recommended for FE comparison
    entity_col: str = "firm",
    time_col: str = "date",
    min_obs: int = 25,
    alpha: float = 0.35,
    point_size: float = 18.0,
    add_title: bool = False,
    dpi: int = 300,
    verbose: bool = False,

    # NEW: raw vs z-score ESG column
    use_zscores: bool = False,

    # NEW: config-driven output directory; caller provides filenames only
    filename_prefix: str = "scatter_reg",
    filename_png: str | None = None,
    filename_pdf: str | None = None,
) -> dict:
    """
    Produces:
      - single scatter+fit plot (compare_models=False), OR
      - 2x2 subplot comparison (compare_models=True) for:
          pooled, firm_fe, time_fe, twfe
    with one shared legend. Saves PNG+PDF into config.FIGURES_DIR.

    Conventions applied
    -------------------
    - Output directory is taken from config.py (FIGURES_DIR).
    - Caller provides filenames only (no directory paths).
    - Optional ESG z-scores via `use_zscores=True` (uses *_esg_z columns).
    - Provider label "Refinitiv" is displayed as "LSEG" in titles/filenames, while column name remains refinitiv_esg[_z].
    """

    # --- config-driven figure directory ---
    from config import FIGURES_DIR
    FIGURES_DIR.mkdir(parents=True, exist_ok=True)

    if controls is None:
        controls = []

    if not isinstance(df_panel.index, pd.MultiIndex):
        raise ValueError("df_panel must have MultiIndex (firm, date).")
    if entity_col not in df_panel.index.names or time_col not in df_panel.index.names:
        raise ValueError(f"Index levels must include '{entity_col}' and '{time_col}'.")

    def _slug(s: str) -> str:
        s = s.strip().lower()
        s = re.sub(r"[^a-z0-9]+", "_", s)
        return re.sub(r"_+", "_", s).strip("_")

    def _prov_label(p: str) -> str:
        return "LSEG" if p.strip().lower().startswith("r") else "Bloomberg"

    # Select ESG column (raw or z)
    if provider.lower().startswith("b"):
        base_esg = "bloomberg_esg"
        prov_tag = "Bloomberg"
    else:
        base_esg = "refinitiv_esg"
        prov_tag = "LSEG"  # display label
    esg_var = f"{base_esg}_z" if use_zscores else base_esg

    needed = [y_var, esg_var] + list(controls)
    missing = [c for c in needed if c not in df_panel.columns]
    if missing:
        raise ValueError(f"Missing columns: {missing}")

    dfr = df_panel[needed].reset_index().copy()

    # numeric coercion
    for c in [y_var, esg_var] + list(controls):
        dfr[c] = pd.to_numeric(dfr[c], errors="coerce")
    dfr = dfr.dropna(subset=[y_var, esg_var] + list(controls)).copy()

    if len(dfr) < min_obs:
        raise ValueError(f"Not enough observations after dropna: {len(dfr)} < {min_obs}")

    # time label for dummies (partial plot FE)
    if np.issubdtype(dfr[time_col].dtype, np.datetime64):
        time_label = dfr[time_col].dt.to_period("Q").astype(str)
    else:
        time_label = dfr[time_col].astype(str)
    firm_label = dfr[entity_col].astype(str)

    def _residualize(endog: pd.Series, exog: pd.DataFrame) -> np.ndarray:
        Xr = sm.add_constant(exog, has_constant="add").astype(float)
        yv = endog.astype(float)
        rr = sm.OLS(yv, Xr).fit()
        return rr.resid.to_numpy()

    def _compute_plot_data(one_model: str, one_space: str):
        """
        Returns: x_plot, y_plot, line_x, line_y, line_label, beta, se, pval
        """
        if one_model not in {"pooled", "firm_fe", "time_fe", "twfe"}:
            raise ValueError("model must be pooled/firm_fe/time_fe/twfe")
        if one_space not in {"levels", "partial"}:
            raise ValueError("plot_space must be levels/partial")

        # --- levels: pooled OLS in levels (controls optional) ---
        if one_space == "levels":
            X_cols = [esg_var] + list(controls)
            X = sm.add_constant(dfr[X_cols], has_constant="add")
            y = dfr[y_var]
            res = sm.OLS(y, X).fit(cov_type="HC3")

            x_plot = dfr[esg_var].to_numpy()
            y_plot = dfr[y_var].to_numpy()

            beta = float(res.params[esg_var])
            se = float(res.bse[esg_var])
            pval = float(res.pvalues[esg_var])

            line_label = f"Fit: β={beta:.4g}, p={pval:.3g}"

        # --- partial: residualize y and x on controls + requested FE, then regress residual y on residual x ---
        else:
            Z = pd.DataFrame(index=dfr.index)
            for c in controls:
                Z[c] = dfr[c].astype(float)

            if one_model in {"firm_fe", "twfe"}:
                Z = pd.concat([Z, pd.get_dummies(firm_label, prefix="firm", drop_first=True)], axis=1)

            if one_model in {"time_fe", "twfe"}:
                Z = pd.concat([Z, pd.get_dummies(time_label, prefix="t", drop_first=True)], axis=1)

            if Z.shape[1] > 0:
                Z = Z.apply(pd.to_numeric, errors="coerce").fillna(0.0)
                y_resid = _residualize(dfr[y_var], Z)
                x_resid = _residualize(dfr[esg_var], Z)
            else:
                y_resid = (dfr[y_var] - dfr[y_var].mean()).to_numpy()
                x_resid = (dfr[esg_var] - dfr[esg_var].mean()).to_numpy()

            res = sm.OLS(
                y_resid.astype(float),
                sm.add_constant(x_resid.astype(float), has_constant="add")
            ).fit(cov_type="HC3")

            beta = float(res.params[1])
            se = float(res.bse[1])
            pval = float(res.pvalues[1])

            x_plot = x_resid
            y_plot = y_resid

            fe_name = {"pooled": "Pooled", "firm_fe": "Firm FE", "time_fe": "Time FE", "twfe": "Two-way FE"}[one_model]
            line_label = f"{fe_name}: β={beta:.4g}, p={pval:.3g}"

        # fitted line in the plotted space
        x_min, x_max = np.nanpercentile(x_plot, [1, 99])
        xx = np.linspace(x_min, x_max, 200)
        if one_space == "levels":
            a = float(res.params["const"])
            b = float(res.params[esg_var])
        else:
            a = float(res.params[0])
            b = float(res.params[1])
        yy = a + b * xx

        return x_plot, y_plot, xx, yy, line_label, beta, se, pval

    # ---------- output naming ----------
    controls_tag = "none" if len(controls) == 0 else f"k{len(controls)}"
    z_tag = "z" if use_zscores else "raw"
    x_tag = _slug(prov_tag)  # Bloomberg or LSEG (display tag)
    y_tag = _slug(y_var)

    # ---------- plotting ----------
    if not compare_models:
        x_plot, y_plot, xx, yy, line_label, beta, se, pval = _compute_plot_data(model, plot_space)

        fig, ax = plt.subplots(figsize=(7.2, 4.8))
        ax.scatter(x_plot, y_plot, s=point_size, alpha=alpha, label="Observations")
        ax.plot(xx, yy, linewidth=2.0, color="#8B0000", label=line_label)

        if plot_space == "levels":
            ax.set_xlabel(esg_var)
            ax.set_ylabel(y_var)
        else:
            ax.set_xlabel(f"{esg_var} (residualized)")
            ax.set_ylabel(f"{y_var} (residualized)")

        if add_title:
            ax.set_title(f"{y_var} vs {prov_tag} ({model}, {plot_space})")

        ax.legend(
            loc="upper center",
            bbox_to_anchor=(0.5, 1.02),
            ncol=2,
            frameon=True,
            borderaxespad=0.0
        )
        ax.spines["top"].set_visible(False)
        ax.spines["right"].set_visible(False)
        ax.grid(True, linewidth=0.4, alpha=0.35)

        base = f"{filename_prefix}_{y_tag}_x_{x_tag}_{model}_{plot_space}_{controls_tag}_{z_tag}_n{len(dfr)}"
        if filename_png is None:
            filename_png = base + ".png"
        if filename_pdf is None:
            filename_pdf = base + ".pdf"

        png_path = FIGURES_DIR / filename_png
        pdf_path = FIGURES_DIR / filename_pdf

        fig.tight_layout(rect=(0, 0, 1, 0.95))
        fig.savefig(png_path, dpi=dpi, bbox_inches="tight")
        fig.savefig(pdf_path, bbox_inches="tight")
        plt.close(fig)

        out = {
            "png_path": str(png_path),
            "pdf_path": str(pdf_path),
            "n_obs": int(len(dfr)),
            "beta": beta,
            "se": se,
            "p_value": pval,
            "model": model,
            "plot_space": plot_space,
            "y": y_var,
            "x": esg_var,
            "provider_label": prov_tag,
            "controls": controls,
            "use_zscores": bool(use_zscores),
        }
        print(out["png_path"])
        print(out["pdf_path"])
        return out

    # ---------- 2x2 comparison ----------
    models = ["pooled", "firm_fe", "time_fe", "twfe"]
    titles = ["Pooled OLS", "Firm FE", "Time FE", "Firm + Time FE"]

    fig, axes = plt.subplots(2, 2, figsize=(10.5, 7.2), sharex=True, sharey=True)
    axes = axes.ravel()

    legend_handles = None
    legend_labels = None
    stats_out = {}

    for i, (m, ttl) in enumerate(zip(models, titles)):
        ax = axes[i]
        x_plot, y_plot, xx, yy, line_label, beta, se, pval = _compute_plot_data(m, compare_plot_space)

        ax.scatter(x_plot, y_plot, s=point_size, alpha=alpha, label="Observations")
        ax.plot(xx, yy, linewidth=2.0, color="#8B0000", label=line_label)

        ax.set_title(ttl)
        ax.spines["top"].set_visible(False)
        ax.spines["right"].set_visible(False)
        ax.grid(True, linewidth=0.4, alpha=0.35)

        if legend_handles is None:
            legend_handles, legend_labels = ax.get_legend_handles_labels()

        stats_out[m] = {"beta": beta, "se": se, "p_value": pval}

    if compare_plot_space == "levels":
        fig.supxlabel(esg_var)
        fig.supylabel(y_var)
    else:
        fig.supxlabel(f"{esg_var} (residualized)")
        fig.supylabel(f"{y_var} (residualized)")

    fig.legend(
        legend_handles, legend_labels,
        loc="upper center",
        ncol=2,
        frameon=True,
        bbox_to_anchor=(0.5, 0.995),
        borderaxespad=0.0
    )

    fig.tight_layout(rect=(0, 0.04, 1, 0.94))

    base = f"{filename_prefix}_COMPARE4_{y_tag}_x_{x_tag}_{compare_plot_space}_{controls_tag}_{z_tag}_n{len(dfr)}"
    if filename_png is None:
        filename_png = base + ".png"
    if filename_pdf is None:
        filename_pdf = base + ".pdf"

    png_path = FIGURES_DIR / filename_png
    pdf_path = FIGURES_DIR / filename_pdf

    fig.savefig(png_path, dpi=dpi, bbox_inches="tight")
    fig.savefig(pdf_path, bbox_inches="tight")
    plt.close(fig)

    out = {
        "png_path": str(png_path),
        "pdf_path": str(pdf_path),
        "n_obs": int(len(dfr)),
        "model": "COMPARE4",
        "plot_space": compare_plot_space,
        "y": y_var,
        "x": esg_var,
        "provider_label": prov_tag,
        "controls": controls,
        "use_zscores": bool(use_zscores),
        "stats_by_model": stats_out,
    }
    
    return out

In [13]:
plot_esg_regression(
    df_panel=df_bank_panel,
    y_var="rendite_1_quartal",
    provider="refinitiv",
    controls=["price to book ratio", "book to market ratio"],
    model="twfe",
    plot_space="partial",
    compare_models=False,
    use_zscores=True,
)

/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/scatter_reg_rendite_1_quartal_x_lseg_twfe_partial_k2_z_n1475.png
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/scatter_reg_rendite_1_quartal_x_lseg_twfe_partial_k2_z_n1475.pdf


{'png_path': '/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/scatter_reg_rendite_1_quartal_x_lseg_twfe_partial_k2_z_n1475.png',
 'pdf_path': '/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/scatter_reg_rendite_1_quartal_x_lseg_twfe_partial_k2_z_n1475.pdf',
 'n_obs': 1475,
 'beta': 0.011850910576179912,
 'se': 0.008390304190395263,
 'p_value': 0.15781661146972825,
 'model': 'twfe',
 'plot_space': 'partial',
 'y': 'rendite_1_quartal',
 'x': 'refinitiv_esg_z',
 'provider_label': 'LSEG',
 'controls': ['price to book ratio', 'book to market ratio'],
 'use_zscores': True}

In [14]:
def run_4model_comparison_and_save_plot(
    df_panel: pd.DataFrame,
    y_var: str,
    x_var: str,
    controls: list[str] | None = None,
    entity_col: str = "firm",
    time_col: str = "date",
    min_obs: int = 25,

    # CI + styling (same convention: 68% thick, 95% thin)
    ci_outer: float = 0.95,
    ci_inner: float = 0.68,
    lw_outer: float = 1.1,
    lw_inner: float = 2.2,
    cap_height: float = 0.18,

    # x-limits (None => auto from 68% CI range)
    xlim: tuple[float | None, float | None] | None = None,

    # output (filenames only; directory from config.FIGURES_DIR)
    filename_prefix: str = "method_compare4",
    filename_png: str | None = None,
    filename_pdf: str | None = None,
    dpi: int = 300,
    add_title: bool = False,
):
    """
    Runs exactly 4 models for one (y, x, controls) spec on ONE panel:
      - Pooled OLS (HC3)
      - Firm FE (clustered by entity)
      - Time FE (clustered by entity)
      - Two-way FE (clustered by entity)

    Saves a dot-and-whisker plot (PNG + PDF) into config.FIGURES_DIR, using:
      - 68% CI thick
      - 95% CI thin
      - arrows if CI truncated by x-limits

    Returns
    -------
    results_df : pd.DataFrame
    out_paths  : dict with png/pdf paths
    """

    # --- config-driven output directory ---
    from config import FIGURES_DIR
    FIGURES_DIR.mkdir(parents=True, exist_ok=True)

    if controls is None:
        controls = []

    if not isinstance(df_panel.index, pd.MultiIndex):
        raise ValueError("df_panel must have MultiIndex (firm, date).")
    if entity_col not in df_panel.index.names or time_col not in df_panel.index.names:
        raise ValueError(f"Index must include levels '{entity_col}' and '{time_col}'.")

    needed = [y_var, x_var] + list(controls)
    missing = [c for c in needed if c not in df_panel.columns]
    if missing:
        raise ValueError(f"Missing columns: {missing}")

    df = df_panel[needed].copy()
    for c in needed:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    df = df.dropna(subset=needed).copy()

    if len(df) < min_obs:
        raise ValueError(f"Not enough observations after dropna: {len(df)} < {min_obs}")

    # z for CIs
    z_outer = float(norm.ppf(0.5 + ci_outer / 2))
    z_inner = float(norm.ppf(0.5 + ci_inner / 2))

    # --------- fit models ----------
    # 1) pooled OLS (HC3)
    dfr = df.reset_index()
    X_cols = [x_var] + list(controls)
    X = sm.add_constant(dfr[X_cols], has_constant="add")
    y = dfr[y_var]
    pooled = sm.OLS(y, X).fit(cov_type="HC3")

    # FE models via PanelOLS
    Xp = df[[x_var] + list(controls)]

    firm_fe = PanelOLS(
        df[y_var], Xp, entity_effects=True, drop_absorbed=True
    ).fit(cov_type="clustered", cluster_entity=True)

    time_fe = PanelOLS(
        df[y_var], Xp, time_effects=True, drop_absorbed=True
    ).fit(cov_type="clustered", cluster_entity=True)

    twfe = PanelOLS(
        df[y_var], Xp, entity_effects=True, time_effects=True, drop_absorbed=True
    ).fit(cov_type="clustered", cluster_entity=True)

    models = [
        ("Pooled OLS (HC3)", pooled, "pooled"),
        ("Firm FE (clustered)", firm_fe, "firm_fe"),
        ("Time FE (clustered)", time_fe, "time_fe"),
        ("Two-way FE (clustered)", twfe, "twfe"),
    ]

    rows = []
    for label, res, key in models:
        if key == "pooled":
            beta = float(res.params.get(x_var, np.nan))
            se = float(res.bse.get(x_var, np.nan))
            pval = float(res.pvalues.get(x_var, np.nan))
            nobs = int(res.nobs)
        else:
            beta = float(res.params.get(x_var, np.nan))
            se = float(res.std_errors.get(x_var, np.nan))
            pval = float(res.pvalues.get(x_var, np.nan))
            nobs = int(res.nobs)

        rows.append({
            "model": label,
            "model_key": key,
            "y": y_var,
            "x": x_var,
            "k_controls": len(controls),
            "controls": "|".join(controls) if controls else "",
            "beta": beta,
            "se": se,
            "p_value": pval,
            "ci95_low": beta - z_outer * se,
            "ci95_high": beta + z_outer * se,
            "ci68_low": beta - z_inner * se,
            "ci68_high": beta + z_inner * se,
            "n_obs": nobs,
        })

    results_df = pd.DataFrame(rows)

    # ---------- plotting helpers ----------
    def _slug(s: str) -> str:
        s = str(s).strip().lower()
        s = re.sub(r"[^a-z0-9]+", "_", s)
        return re.sub(r"_+", "_", s).strip("_")

    def _auto_xlim(sub: pd.DataFrame) -> tuple[float, float]:
        lo = float(np.nanmin(np.minimum(sub["beta"], sub["ci68_low"])))
        hi = float(np.nanmax(np.maximum(sub["beta"], sub["ci68_high"])))
        pad = 0.12 * (hi - lo) if hi > lo else 1.0
        return lo - pad, hi + pad

    def _draw_ci(ax, y0, lo, hi, color, x_left, x_right, lw, alpha, cap_h, arrow_inset_frac=0.015):
        lo_c = max(lo, x_left)
        hi_c = min(hi, x_right)
        ax.hlines(y0, lo_c, hi_c, linewidth=lw, color=color, alpha=alpha)

        eps = arrow_inset_frac * (x_right - x_left)

        if lo >= x_left:
            ax.vlines(lo, y0 - cap_h, y0 + cap_h, linewidth=lw, color=color, alpha=alpha)
        else:
            ax.plot(x_left + eps, y0, marker="<", color=color, alpha=alpha, markersize=7)

        if hi <= x_right:
            ax.vlines(hi, y0 - cap_h, y0 + cap_h, linewidth=lw, color=color, alpha=alpha)
        else:
            ax.plot(x_right - eps, y0, marker=">", color=color, alpha=alpha, markersize=7)

    # x-limits
    x_left, x_right = (None, None)
    if xlim is not None:
        x_left, x_right = xlim
    if x_left is None or x_right is None:
        a_left, a_right = _auto_xlim(results_df)
        if x_left is None:
            x_left = a_left
        if x_right is None:
            x_right = a_right
    x_left = float(x_left)
    x_right = float(x_right)

    # ---------- build figure ----------
    fig, ax = plt.subplots(figsize=(7.6, 4.6))

    model_order = [m[0] for m in models]
    results_df["model"] = pd.Categorical(results_df["model"], categories=model_order, ordered=True)
    results_df = results_df.sort_values("model")

    y_pos = np.arange(len(model_order))

    ax.set_xlim(x_left, x_right)
    ax.axvline(0.0, linewidth=1.0)
    ax.set_yticks(y_pos)
    ax.set_yticklabels(model_order)

    # consistent color per row from axis cycle
    for i, (_, r) in enumerate(results_df.iterrows()):
        tmp_line = ax.plot([], [])[0]
        c = tmp_line.get_color()
        tmp_line.remove()

        beta = float(r["beta"])

        _draw_ci(ax, i, float(r["ci95_low"]), float(r["ci95_high"]),
                 color=c, x_left=x_left, x_right=x_right,
                 lw=lw_outer, alpha=0.55, cap_h=cap_height)

        _draw_ci(ax, i, float(r["ci68_low"]), float(r["ci68_high"]),
                 color=c, x_left=x_left, x_right=x_right,
                 lw=lw_inner, alpha=1.0, cap_h=cap_height)

        span = x_right - x_left
        inset = 0.015 * span
        if beta < x_left:
            ax.plot(x_left + inset, i, marker="<", color=c, markersize=8)
        elif beta > x_right:
            ax.plot(x_right - inset, i, marker=">", color=c, markersize=8)
        else:
            ax.plot(beta, i, marker="o", color=c, markersize=6, zorder=4)

    ax.grid(True, axis="x", linewidth=0.4, alpha=0.35)
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)

    ax.set_xlabel(f"ESG coefficient estimate for {x_var} (68% thick, 95% thin)")

    if add_title:
        ax.set_title(f"Methodology comparison (y={y_var})")

    legend_handles = [
        Line2D([0], [0], color="black", linewidth=lw_inner, label=f"{int(ci_inner*100)}% CI (thick)"),
        Line2D([0], [0], color="black", linewidth=lw_outer, alpha=0.55, label=f"{int(ci_outer*100)}% CI (thin)"),
        Line2D([0], [0], marker="o", linestyle="None", color="black", label="Point estimate"),
        Line2D([0], [0], marker="<", linestyle="None", color="black", label="CI truncated (left)"),
        Line2D([0], [0], marker=">", linestyle="None", color="black", label="CI truncated (right)"),
    ]
    fig.legend(
        handles=legend_handles,
        loc="upper center",
        ncol=3,
        frameon=True,
        bbox_to_anchor=(0.5, 0.995),
        borderaxespad=0.0
    )
    fig.tight_layout(rect=(0, 0.04, 1, 0.90))

    # ---------- save (filenames only; FIGURES_DIR from config) ----------
    controls_tag = "none" if len(controls) == 0 else f"k{len(controls)}"
    base = f"{filename_prefix}_{_slug(y_var)}_x_{_slug(x_var)}_{controls_tag}_n{len(df)}"

    if filename_png is None:
        filename_png = base + ".png"
    if filename_pdf is None:
        filename_pdf = base + ".pdf"

    png_path = FIGURES_DIR / filename_png
    pdf_path = FIGURES_DIR / filename_pdf

    fig.savefig(png_path, dpi=dpi, bbox_inches="tight")
    fig.savefig(pdf_path, bbox_inches="tight")
    plt.close(fig)

    out_paths = {"png_path": str(png_path), "pdf_path": str(pdf_path)}
    print(out_paths["png_path"])
    print(out_paths["pdf_path"])
    return results_df, out_paths


In [17]:
results_df, paths = run_4model_comparison_and_save_plot(
    df_panel=df_bank_panel,
    y_var="rendite_1_quartal",
    x_var="bloomberg_esg_z",  # or bloomberg_esg / refinitiv_esg_z / refinitiv_esg
    controls=["volatility 260 day calc", "book to market ratio"],
    filename_prefix="compare_methods",
)

/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/compare_methods_rendite_1_quartal_x_bloomberg_esg_z_k2_n989.png
/home/dragon99999/Downloads/BA BWL/BachelorProjectSrc/output/figures/compare_methods_rendite_1_quartal_x_bloomberg_esg_z_k2_n989.pdf
