#### Simulate the asymptotic variance of different identification strategies

In [None]:
import sys
import random
import time
import re

sys.path.append("../src")

from identification_strategy_finder import AdjustmentSetFinder

#### Helpers

In [None]:
def calculate_sandwich_covariance(X: np.ndarray, residuals: np.ndarray, I: np.ndarray, W: np.ndarray) -> np.ndarray:
    n, k = X.shape
    
    Q_XI = np.dot(X.T, I) / n
    Omega = np.zeros((I.shape[1], I.shape[1]))
    for i in range(n):
        Ii = I[i, :].reshape(-1, 1)
        Omega += np.dot(Ii, Ii.T) * (residuals[i] ** 2)
    Omega /= n
    
    Q_XI_W_Q_XI_T_inv = np.linalg.inv(np.dot(np.dot(Q_XI, W), Q_XI.T))
    filling = np.dot(np.dot(Q_XI, W), np.dot(Omega, np.dot(W, Q_XI.T)))
    
    sandwich_cov = np.dot(Q_XI_W_Q_XI_T_inv, np.dot(filling, Q_XI_W_Q_XI_T_inv)) / n
    return sandwich_cov

def estimate_results(X, Y, G, n_samples, type_effect, hidden_nodes, one_suffices):
    results_dict = {}
    data, coefficients = simulate_linear_SCM(G, n_samples)
    
    adjustment_set_finder = AdjustmentSetFinder(G, X, Y, hidden_nodes, type_effect, one_suffices)
    adjustment_sets = adjustment_set_finder.find_adjustment_sets_nuisance()

    for i, item in enumerate(adjustment_sets):
        I = list(item['Instrument Set']) if item['Instrument Set'] else None
        B = list(item['Conditional Set']) if item['Conditional Set'] else None
        if item['Nuisance Set']:
            N = list(item['Nuisance Set'])
        else:
            N = None
    
        N_data = np.array(data.loc[:, N]) if N is not None else None
        B_data = np.array(data.loc[:, B]) if B is not None else None
        I_data = np.array(data.loc[:, I]) if I is not None else None
        
        X_data = np.array(data.loc[:, list(X)])
        Y_data = np.array(data.loc[:, Y])
        
        result = civ(Y=Y_data, X=X_data, N=N_data, B=B_data, I=I_data)
        
        result_key = f"N:{N}, B:{B}, I:{I}"
        results_dict[result_key] = {
            'coefficients': result,
            'adjustment_set': result_key
        }
        
        if N_data is not None:
            XN_data = np.hstack((X_data, N_data))
        else:
            XN_data = X_data
        
        residuals = Y_data.copy()
        for j in range(XN_data.shape[1]):
            residuals -= result[j] * XN_data[:, j]
            
        I_dim = I_data.shape[1]
        
        sandwich_cov = calculate_sandwich_covariance(XN_data, residuals, I_data, W = np.eye(I_dim))
        standard_errors = np.sqrt(np.diag(sandwich_cov))
        
        results_dict[result_key]['standard_errors'] = standard_errors

    if type_effect == "total":
        total_causal_effect = 0
        for node_x in X: 
            causal_paths_from_x_to_Y = list(nx.all_simple_paths(G, node_x, Y))
            for path in causal_paths_from_x_to_Y:
                path_coefficients = 1 

                for i in range(len(path) - 1):
                    edge = (path[i], path[i+1])
                    edge_coefficient = coefficients[edge[1]][edge[0]] 
                    path_coefficients *= edge_coefficient    
                total_causal_effect += path_coefficients
            
    best_results = {}
    for i in range(len(list(X))):
        min_se = float('inf')
        best_key = None
        for key, value in results_dict.items():
            if value['standard_errors'][i] < min_se:
                min_se = value['standard_errors'][i]
                best_key = key
        if best_key:
            best_results[f'Best result for X_{i+1}'] = {
                'coefficient': results_dict[best_key]['coefficients'][i],
                'standard_error': results_dict[best_key]['standard_errors'][i],
                'adjustment_set': results_dict[best_key]['adjustment_set']
            }
        
    for key, value in results_dict.items():
        print(f"Adjustment Set: {key}")
        if type_effect == "direct":
            print(f"Population Coefficients: {coefficients[Y][list(X)[0]]}")
            
        elif type_effect == "total":
            print(f"Population Coefficients: {total_causal_effect}")
            
        else:
            raise ValueError("Invalid type_effect: must either be direct or total")

        if 'N:None' not in key:
            n_number = key.split(":")[1].split(",")[0].strip("[]")
            if n_number != 'None':
                if n_number.isdigit():
                    n_column = int(n_number)
                else:
                    n_column = n_number.strip("'") 
                    
                if n_column in coefficients[Y]:
                    print(f"Nuisance Population Coefficients: {coefficients[Y][n_column]}")
                else:
                    print(f"Nuisance Population Coefficients: Column '{n_column}' has no direct effect on Y")
                
        for i, coef in enumerate(value['coefficients'].flatten()):
            print(f"Coefficient {i+1}: {coef}")
            print(f"Standard Error {i+1}: {value['standard_errors'][i]}")
        print()
        
    if not results_dict:
        print("The effect is not identifiable, I am sorry!")

    return results_dict, coefficients, best_results


def transform_strategy(strategy):
    """
    Rename the strategies to make them fit for latex
    """
    n_match = re.search(r'N:(\[[^\]]*\]|None)', strategy)
    b_match = re.search(r'B:\[([^\]]*)\]', strategy)
    i_match = re.search(r'I:\[([^\]]*)\]', strategy)
    
    N = n_match.group(1) if n_match else 'None'
    if N != 'None':
        N = N.strip('[]').split(', ') if N.strip('[]') else []
    else:
        N = []
    
    B = b_match.group(1).split(', ') if b_match and b_match.group(1) else []
    I = i_match.group(1).split(', ') if i_match and i_match.group(1) else []
    
    if not B:
        B.append(r"\emptyset")
    if N:
        civ = r"$\operatorname{CIV}(" + ', '.join(I) + r" \mid X, " + ', '.join(N) + r" \rightarrow Y \mid " + ', '.join(B) + r")$"
    else:
        civ = r"$\operatorname{CIV}(" + ', '.join(I) + r" \mid X \rightarrow Y \mid " + ', '.join(B) + r")$"
    return civ


def format_number(num):
    """
    Add an apostroph to numbers to make them easy to read
    """
    return f"{int(num):,}".replace(",", "'")

#### Run the experiment for the specified graph
Can easily be parallelized

In [None]:
G = nx.DiGraph()
G.add_edges_from([(1,2), (2,3), (4,2), (4,3), (4,5), (5,3),
                  (6,1), (6,5), (6,2), (7,1), (8,3)])
pos = {
    1: (0,0),
    2: (1,0),
    3: (2,0),
    4: (1.5,1),
    5: (1,-0.5),
    6: (0,-0.5),
    7: (0,1),
    8: (2,1)
}
nx.draw(G, pos = pos, with_labels = True)
X = {2}
Y = 3
hidden_nodes = {4}

sample_sizes = [20, 50, 200, 1000, 10000, 50000, 200000]
num_seeds = 200

coefficients = []
standard_errors = []
keys = []
observations = []
population_coefficients = []

for seed in range(num_seeds):
    np.random.seed(seed)  
    for n in sample_sizes:
        res, coefs, best = estimate_results(X=X, Y=Y, G=G, n_samples=n,
                                            type_effect="total",
                                            hidden_nodes=hidden_nodes,
                                            one_suffices=False)
        for key, value in res.items():
            keys.append(key)
            coefficients.append(value["coefficients"][0][0])
            standard_errors.append(value["standard_errors"][0])
            observations.append(n)
            population_coefficients.append(coefs[3][2])
            
results_df = pd.DataFrame({
    "strategy": keys,
    "coefficient": coefficients,
    "standard_error": standard_errors,
    "n_samples": observations,
    "population_coefficient": population_coefficients})

results_df.to_pickle("./results/asymptotic_variance_analysis.pkl")

#### Process the data for plotting

In [None]:
strategies_to_filter = [
    "N:[5], B:[1], I:[6, 7]",
    "N:[5], B:[8, 1], I:[6, 7]",
    "N:[5], B:None, I:[1, 7]",
    "N:[5], B:[6], I:[1, 7]",
    "N:[5], B:[8, 6], I:[1, 7]",
    "N:[5], B:[8], I:[1, 7]",
    "N:[8], B:[5, 6], I:[1, 7]",
    "N:[8], B:[6], I:[1, 7]",
    "N:[5, 8], B:None, I:[1, 6, 7]"] # they violate the rank condition 
results_df = results_df[~results_df['strategy'].isin(strategies_to_filter)]

average_se_df = results_df.groupby(["strategy", "n_samples"])["standard_error"].mean().reset_index()
results_df["coverage"] = results_df.apply(lambda row: 1 if (row["population_coefficient"] > row["coefficient"] - 1.96 * row["standard_error"] or row["population_coefficient"] < row["coefficient"] + 1.96 * row["standard_error"] < row["population_coefficient"]) else 0,axis=1)
average_coverage_df = results_df.groupby(["strategy", "n_samples"])["coverage"].mean().reset_index()

average_se_df["n_samples"] = average_se_df['n_samples'].apply(format_number)
average_coverage_df["n_samples"] = average_coverage_df["n_samples"].apply(format_number)

average_se_df["strategy"] = average_se_df["strategy"].apply(transform_strategy)
average_coverage_df["strategy"] = average_coverage_df["strategy"].apply(transform_strategy)

#### Plot

In [None]:
plt.rcParams.update({
    "text.usetex": True,
    "font.family": "serif",
    "font.serif": ["Computer Modern Roman"],
    "text.latex.preamble": r"\usepackage{amsmath}\usepackage{amssymb}",
    "axes.labelsize": 22,     
    "xtick.labelsize": 16,    
    "ytick.labelsize": 18,    
    "legend.fontsize": 14,    
    "figure.titlesize": 22,   
    "axes.titlesize": 2,
    "legend.title_fontsize": 18})

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 12), sharex=True)

palette = sns.color_palette("coolwarm", len(average_se_df["n_samples"].unique()))
sns.scatterplot(
    data=average_se_df,
    x="strategy",
    y="standard_error",
    hue="n_samples",
    palette=palette,
    s=100,
    ax=ax1
)

ax1.set_xlabel("")
ax1.set_ylabel("Average standard error (log)")
ax1.set_yscale("log")
ax1.tick_params(labelbottom=False)  


handles, labels = ax1.get_legend_handles_labels()
ax1.legend(handles=handles, labels=labels, title="Sample size", loc='upper center',
           bbox_to_anchor=(0.25, 1.01), ncol=4, handletextpad=0.01, columnspacing=0.25)
ax1.grid(which="major", linestyle="--", alpha=0.5, zorder=-1.0)
plt.xticks(rotation=90, ha="center")

sns.scatterplot(
    data=average_coverage_df,
    x="strategy",
    y="coverage",
    hue="n_samples",
    palette=palette,
    s=100,
    ax=ax2
)

ax2.set_xlabel("")
ax2.set_ylabel(r"$\widehat{\mathbb{P}}(\beta \in \operatorname{CI}_{\alpha}(\widehat{\beta}))$")
ax2.legend(title="Sample size", loc=3)
ax2.grid(which="major", linestyle="--", alpha=0.5, zorder=-1.0)
ax2.tick_params(labelbottom=True)  

plt.tight_layout(rect=[0, 0, 1, 0.95])

plt.savefig("./plots/combined_figure.svg", format="svg")
plt.show()