Let $X\in \{0,1\}$, $Y\in \{0,1\}$, $Z\in \{0,1,2\}$.
Suppose the distribution is Markov to $ X \to Y \to Z$.
Create a joint distribution $f(x,y,z)$ that is Markov to this DAG.
Generate 1000 random vectors from this distribution.
Estimate the distribution from the data using maximum likelihood.
Compare the estimated distribution to the true distribution.
Let $\theta = \left(\theta_{000}, \theta_{001}, \dots, \theta_{112}\right)$
where $\theta_{rst} = \mathbb{P}(X=r, Y=s, Z=t)$.
Use the bootstrap to get standard errors and
95 percent confidence intervals for these 12 parameters.

In [1]:
from collections import namedtuple

import numpy as np
import random
import scipy.stats

## Generate the data.

In [2]:
Parameter = namedtuple('Parameter', ['qx', 'qy0', 'qy1', 'qz0', 'qz1'])

def generate_data(parameter, n):

    # Generate X
    x = scipy.stats.bernoulli.rvs(p=parameter.qx, size=n)

    # We generate both Y and its couterfactual given X
    # (i.e., even if X = 0, we generate Y for both the actual case X = 0
    # and the counterfactual X = 1.)
    y_counterfactual = np.stack((
        scipy.stats.bernoulli.rvs(p=parameter.qy0, size=n),
        scipy.stats.bernoulli.rvs(p=parameter.qy1, size=n),
    ), axis=1)
    y = y_counterfactual[np.arange(n), x]

    # The random variable W has codomain {e0, e1, e2}
    # where (e0, e1, e2) denotes the standard ordered basis of R^3.
    # In other words W is a Categorical random variable.
    # As above, we generate both W and its counterfactual given Y.
    # We then obtain the random variable Z as a transformation of W.
    w_counterfactual = np.stack((
        scipy.stats.multinomial.rvs(n=1, p=parameter.qz0, size=n),
        scipy.stats.multinomial.rvs(n=1, p=parameter.qz1, size=n)
    ), axis=1)
    w = np.array([wi[yi] for yi, wi in zip(y, w_counterfactual)])
    w = w_counterfactual[np.arange(n), y]
    _, z = np.where(w==1)

    # Combine the data into a single array
    data = np.stack((x, y, z), axis=1)
    
    return data

## Compute the point estimators and report them.

In [3]:
Estimate = namedtuple('Estimate', ['theta', 'parameter'])

def estimator(data):

    # Compute the counts N and theta
    counts = np.array([[[
        (data == np.array([r, s, t])).all(axis=1).sum()
    for t in (0, 1, 2)] for s in (0, 1)] for r in (0, 1)])
    theta = counts/n

    # Compute the various estimates
    qx_est = counts.sum(axis=(1, 2))[1]/n
    qy0_est = counts.sum(axis=2)[0, 1]/counts.sum(axis=(1,2))[0]
    qy1_est = counts.sum(axis=2)[1, 1]/counts.sum(axis=(1, 2))[1]
    qz0_est = tuple(counts.sum(axis=0)[0, i] for i in (0, 1, 2))/counts.sum(axis=(0,2))[0]
    qz1_est = tuple(counts.sum(axis=0)[1, i] for i in (0, 1, 2))/counts.sum(axis=(0,2))[1]
    
    return Estimate(theta, Parameter(qx_est, qy0_est, qy1_est, qz0_est, qz1_est))

def report_parameter(parameter):

    # Report the results
    print(
        f"qX:   {parameter.qx:.2f}\n"
        + f"qY0:  {parameter.qy0:.2f}\n"
        + f"qY1:  {parameter.qy1:.2f}\n"
        + "qZ0: (" + ", ".join([f"{est:.2f}" for est in parameter.qz0]) + ")\n"
        + "qZ1: (" + ", ".join([f"{est:.2f}" for est in parameter.qz1]) + ")\n"
    )

In [4]:
# Parameter of the distribution
true_parameter = Parameter(
    qx = 0.3, # Bernoulli parameter for X
    qy0 = 0.6, # Bernoulli parameter for Y given X = 0
    qy1 = 0.8, # Bernoulli parameter for Y given X = 1
    qz0 = (0.5, 0.1, 0.3), # Categorical parameter for Z given Y = 0
    qz1 = (0.25, 0.65, 0.1), # Categorical parameter for Z given Y = 1
)

# Number of samples
n = int(1e3)

# Generate the data
current_data = generate_data(true_parameter, n)

# Compute the estimates
estimate = estimator(current_data)

# Report the true parameter and the estimates
print("True parameter")
report_parameter(true_parameter)
print("Parameter estimate")
report_parameter(estimate.parameter)

True parameter
qX:   0.30
qY0:  0.60
qY1:  0.80
qZ0: (0.50, 0.10, 0.30)
qZ1: (0.25, 0.65, 0.10)

Parameter estimate
qX:   0.29
qY0:  0.60
qY1:  0.83
qZ0: (0.43, 0.07, 0.50)
qZ1: (0.28, 0.64, 0.08)



## Bootstrap standard error and confidence intervals for $\theta$

In [5]:
def bootstrap_resample(data):
    return random.choices(population=data, k=len(data))

ThetaConfidenceInterval = namedtuple('ThetaConfidenceInterval', ['theta', 'se', 'lower_bound', 'upper_bound'])

def bootstrap_confidence_intervals(estimate, data, B, alpha=0.05):
    
    theta_replications = [estimator(bootstrap_resample(data)).theta for _ in range(B)]
    theta_se = np.std(theta_replications, axis=0)
    
    z = scipy.stats.norm.isf(alpha/2)
    lower_bounds = estimate.theta - z*theta_se
    upper_bounds = estimate.theta + z*theta_se
    
    return (
        [[[
            ThetaConfidenceInterval(
                estimate.theta[r, s, t],
                theta_se[r, s, t],
                lower_bounds[r, s, t],
                upper_bounds[r, s, t],
            )
        for t in (0, 1, 2)] for s in (0, 1)] for r in (0, 1)]
    )

def true_theta(parameter):
    
    # Define the q_bar versions of the parameter
    # (which make the computation of theta easier)
    q_bar_x = (1 - parameter.qx, parameter.qx)
    q_bar_y = (
        (1 - parameter.qy0, parameter.qy0),
        (1 - parameter.qy1, parameter.qy1),
    )
    q_bar_z = (
        parameter.qz0,
        parameter.qz1
    )
    
    true_theta = [[[
        q_bar_x[r] * q_bar_y[r][s] * q_bar_z[s][t]
    for t in (0, 1, 2)] for s in (0, 1)] for r in (0, 1)]
    
    return true_theta  
    
def report_theta_confidence_intervals(true_theta, theta_confidence_interval_array):
    for r in (0, 1):
        for s in (0, 1):
            for t in (0, 1, 2):
                print(
                    "----------------------------------\n"
                    f"Theta for X = {r}, Y = {s}, and Z = {t}:\n"
                    f"True value: {true_theta[r][s][t]:.3}\n"
                    f"Estimate: {theta_confidence_interval_array[r][s][t].theta:.3}\n"
                    f"Standard error: {theta_confidence_interval_array[r][s][t].se:.3}\n"
                    f"Confidence interval: ({theta_confidence_interval_array[r][s][t].lower_bound:.3},"
                    f"{theta_confidence_interval_array[r][s][t].upper_bound:.3})"
                )

In [6]:
# Compute the true value of theta
true_theta_value = true_theta(true_parameter)

# Compute the theta confidence intervals
theta_confidence_intervals = bootstrap_confidence_intervals(estimate, current_data, B=int(3e2))

# Report out the theta confidence intervals,
# compared to the true value of theta
report_theta_confidence_intervals(true_theta_value, theta_confidence_intervals)

----------------------------------
Theta for X = 0, Y = 0, and Z = 0:
True value: 0.14
Estimate: 0.126
Standard error: 0.00992
Confidence interval: (0.107,0.145)
----------------------------------
Theta for X = 0, Y = 0, and Z = 1:
True value: 0.028
Estimate: 0.019
Standard error: 0.00416
Confidence interval: (0.0108,0.0272)
----------------------------------
Theta for X = 0, Y = 0, and Z = 2:
True value: 0.084
Estimate: 0.138
Standard error: 0.00998
Confidence interval: (0.118,0.158)
----------------------------------
Theta for X = 0, Y = 1, and Z = 0:
True value: 0.105
Estimate: 0.116
Standard error: 0.0104
Confidence interval: (0.0956,0.136)
----------------------------------
Theta for X = 0, Y = 1, and Z = 1:
True value: 0.273
Estimate: 0.276
Standard error: 0.0133
Confidence interval: (0.25,0.302)
----------------------------------
Theta for X = 0, Y = 1, and Z = 2:
True value: 0.042
Estimate: 0.035
Standard error: 0.00561
Confidence interval: (0.024,0.046)
-----------------------