In [1]:
import sys
import datetime

sys.path.append("../../")

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import tqdm

from experiments.data.linear import LinearDGP
from src.hgic.method import HGIC
from src.ours.method import FalsificationAlgorithm
from src.transportability.method import TransportabilityTest

In [None]:
tmp = {
    "alpha_0": None,
    "alpha_X": None,
    "alpha_U": None,
    "beta_0": None,
    "beta_X": None,
    "beta_U": None,
    "gamma_T": None,
    "gamma_X": None,
    "gamma_U": None,
    "mu_U": None,
    "sigma_U": None,
    "mu_X": None,
    "sigma_X": None,
    "sigma_T": None,
    "sigma_Y": None,
}
dgp = LinearDGP(25, 1.0, 1.0, param_dist={}, binary_t=False)
data = dgp.sample(500)["data"]
dataset = data[0]
dataset.plot.scatter(x="X", y="Y", c="T")

In [3]:
nbr_samples = 1000
nbr_env = 250


def experiment_icm(iterations, param_dist, method):

    df = pd.DataFrame({"c": [], "detection": [], "bias": []})
    for c in [0, 1.0]:

        dgp = LinearDGP(
            n_envs=nbr_env,
            conf_strength_non_interaction=c,
            conf_strength_interaction=c,
            binary_t=False,
            param_dist=param_dist,
        )

        for i in range(iterations):

            sampled = dgp.sample(nbr_samples)
            data = sampled["data"]
            true_ate = sampled["ate"]

            out = method.test(data, observed_covariates=["X"])
            detection = out["pval"] < 0.05

            # compute bias
            if "ate" in out:
                bias = [true_ate[env] - out["ate"][env] for env in true_ate]
            else:
                bias = 0

            df.loc[len(df.index)] = [c, detection, np.mean(np.abs(bias))]

    return df

In [None]:
from scipy.stats import uniform, rv_discrete

iterations = 250

# Get the current date and time
current_datetime = datetime.datetime.now()
date_stamp_string = current_datetime.strftime("%Y-%m-%d_%H%M%S")
print(date_stamp_string)


xk = np.array([1 / 2, 1.0, 1.5])
pk = np.ones(xk.shape) / len(xk)  # Equal probability for all points
atomic_dist = rv_discrete(name="custom", values=(xk, pk))()
uniform_dist = uniform(0.1, 2.9)

df = pd.DataFrame(columns=["method", "c", "detection", "bias", "p"])
methods = {
    "Transportability test": TransportabilityTest(
        independence_test_args={"method": "fisherz"}, max_sample_size_test=50_000
    ),
    "HGIC": HGIC(
        independence_test_args={
            "method": "fisherz",
        },
        max_tests=1,
    ),
    "Ours": FalsificationAlgorithm(
        feature_representation="poly",
        feature_representation_params={
            "degree": 2,
            "use_sklearn": True,
            "interaction_only": False,
        },
    ),
}

# list_params = LinearDGP.param_names
list_params = [
    "alpha_0",
    "alpha_X",
    "alpha_U",
    "beta_0",
    "beta_X",
    "beta_U",
    "gamma_X",
    "gamma_U",
    "gamma_T",
    "mu_X",
    "mu_U",
]
for m in methods:
    tmp = experiment_icm(iterations, param_dist={}, method=methods[m])
    tmp["method"] = m
    tmp["p"] = "None"
    df = pd.concat([df, tmp])

    for p in tqdm(list_params):
        tmp = experiment_icm(
            iterations,
            param_dist={p: uniform_dist},
            method=methods[m],
        )
        tmp["method"] = m
        tmp["p"] = p
        df = pd.concat([df, tmp])

In [5]:
read_ts = None
if read_ts:
    date_stamp_string = read_ts
    df = pd.read_csv(f"results-necessary_mechanism_changes/{date_stamp_string}.csv")
else:
    df.to_csv(f"results-necessary_mechanism_changes/{date_stamp_string}.csv")

In [None]:
from experiments.notebooks.utils import names, name_colors

# Data preparation and filtering
plot_df = df.copy()

# Define mechanisms to plot and filter data
unmeasured_confounder_params = ["None", "alpha_U", "beta_U", "gamma_U", "mu_U"]

# Filter out unmeasured confounder parameters when c == 0
plot_df = plot_df[
    ~((plot_df["c"] == 0.0) & (plot_df["p"].isin(unmeasured_confounder_params)))
]
plot_df["p"] = (
    plot_df["p"].cat.remove_unused_categories()
    if plot_df["p"].dtype.name == "category"
    else plot_df["p"]
)
plot_df = plot_df[
    plot_df["p"] != "gamma_0"
]  # by mistake included a nonsense parameter change
plot_df["method"] = plot_df["method"].replace(
    {"Our proposed falsification strategy": "Ours"}
)
# Map values in column 'c' for readability
plot_df["c"] = plot_df["c"].replace(
    {0.0: "No unmeasured confounder", 1.0: "Unmeasured confounder present"}
)

# Format mechanism labels for LaTeX rendering
latex_labels = {
    "None" : "None",
    "alpha_0": "$\\alpha^{(0)}$",
    "alpha_X": "$\\alpha^{(X)}$",
    "alpha_U": "$\\alpha^{(U)}$",
    "beta_0": "$\\beta^{(0)}$",
    "beta_X": "$\\beta^{(X)}$",
    "gamma_T": "$\\beta^{(A)}$",
    "gamma_X": "$\\beta^{(AX)}$",
    "beta_U": "$\\beta^{(U)}$",
    "gamma_U": "$\\beta^{(AU)}$",
    "mu_X": "$\\mu^{(X)}$",
    "mu_U": "$\\mu^{(U)}$",
}
parameter_order_no_unmeasured = [
    latex_labels[key]
    for key in latex_labels.keys()
    if key not in unmeasured_confounder_params
]
parameter_order_unmeasured = [latex_labels[key] for key in latex_labels.keys()]
plot_df["p"] = plot_df["p"].replace(latex_labels)
method_order = ["Ours", "Transportability test", "HGIC"]

# Create subplots
fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True)

sns.set(style="whitegrid")
sns.set_context("talk", font_scale=1.1)
color_dict = {
    "Ours": name_colors[names["FalsificationAlgorithm_linear_bs"]],
    "Transportability test": name_colors[names["FisherZTransportability"]],
    "HGIC": name_colors[names["FisherZHGIC_fisher"]],
}


# Plotting for case c == 0
plot_df_c0 = plot_df[plot_df["c"] == "No unmeasured confounder"]


print(plot_df_c0.p.unique())
sns.barplot(
    data=plot_df_c0,
    x="p",
    y="detection",
    hue="method",
    hue_order=method_order,
    order=parameter_order_no_unmeasured,
    palette=color_dict,
    dodge=True,
    width=0.7,
    errorbar=None,
    ax=axes[0],  # Use the first axis for this plot
)

# Plotting for case c == 1
plot_df_c1 = plot_df[plot_df["c"] == "Unmeasured confounder present"]
sns.barplot(
    data=plot_df_c1,
    x="p",
    y="detection",
    hue="method",
    hue_order = method_order,
    order=parameter_order_unmeasured,
    palette=color_dict,
    dodge=True,
    width=0.7,
    errorbar=None,
    ax=axes[1],  # Use the second axis for this plot
)


# Set x-axis labels with smaller font size
for i in range(2):
    axes[i].axhline(0.05, linestyle="--", color="black", alpha=0.6)
    axes[i].set_xlabel("Parameter that changes across environments", fontsize="small")
    axes[i].set_ylabel("Falsification rate")
    axes[i].legend_.remove()  # Remove legend from the second subplot

axes[0].set_title("No unmeasured confounder")
axes[1].set_title("Unmeasured confounder present")

# Add a common legend below the plots

handles, labels = axes[0].get_legend_handles_labels()
fig.legend(
    handles,
    labels,
    loc="center",
    bbox_to_anchor=(0.5, -0.0),
    ncol=3,
    fontsize="small",
    title_fontsize="small",
)


# Adjust layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 0.95])

# Save and show the combined plot
plt.savefig(
    f"results-necessary_mechanism_changes/necessary_mechanism_changes-{date_stamp_string}.pdf", bbox_inches="tight"
)
plt.show()