# **INDEPENDENT NORMAL**

In [17]:
#############

import numpy as np

# Fixed random seed for reproducibility
np.random.seed(12345)

# Number of samples
N = 3_000_000

# Means, stdevs
mu_x, mu_y = 1.2, 1.1
sigma_x, sigma_y = 1.0, 1.0

# Proposed solution
x_star = 1.00236
y_star = 1.04249
c_0 = 2

# Slope of line D that passes through (x_star, y_star) and (2,2)
#   slope = (2 - y_star)/(2 - x_star)
slope_D = (c_0 - y_star) / (c_0 - x_star)

def yD_of_x(x):
    return y_star + slope_D * (x - x_star)
def xD_of_y(y):
    # slope = dy/dx => dx/dy = 1/slope
    return x_star + (1./slope_D) * (y - y_star)

# 1) Generate random samples from the independent normals
X = np.random.normal(mu_x, sigma_x, N)
Y = np.random.normal(mu_y, sigma_y, N)

# 2) Define region B: 
#    "above line F" => y >= y_star if x <= x_star,
#    "above line D" => y >= yD_of_x(x) if x >= x_star
#    Combined logically:
left_mask_B  = (X <= x_star) & (Y >= y_star)
right_mask_B = (X >= x_star) & (Y >= yD_of_x(X))
B_mask = left_mask_B | right_mask_B

# 3) Define region A:
#    "right of line E" => x >= x_star if y <= y_star,
#    "right of line D" => x >= xD_of_y(y) if y >= y_star
#    Combined logically:
down_mask_A  = (Y <= y_star) & (X >= x_star)
up_mask_A    = (Y >= y_star) & (X >= xD_of_y(Y))
A_mask = down_mask_A | up_mask_A

# 4) Estimate E[Y | B]
Y_in_B = Y[B_mask]
meanY_in_B = np.mean(Y_in_B) if len(Y_in_B) > 0 else float('nan')

# 5) Estimate E[X | A]
X_in_A = X[A_mask]
meanX_in_A = np.mean(X_in_A) if len(X_in_A) > 0 else float('nan')

print("Share of X in A =", len(X_in_A)/N)
print("Share of Y in B =", len(Y_in_B)/N)
print("x* =", x_star)
print("y* =", y_star)
print("E(x) in A =", meanX_in_A)
print("E(y) in B =", meanY_in_B)



Share of X in A = 0.4305116666666667
Share of Y in B = 0.3681163333333333
x* = 1.00236
y* = 1.04249
E(x) in A = 2.000001467830328
E(y) in B = 2.00000768162508


In [12]:
# Initialize x_star and y_star
x_star = 1.1
y_star = 1.1

# Define the target mean value
target_mean = 2.0

# Define a tolerance for convergence
tolerance = 1e-6

# Define a maximum number of iterations to prevent infinite loops
max_iterations = 1000

# Function to update x_star and y_star
def update_stars(x_star, y_star, meanX_in_A, meanY_in_B, learning_rate=0.02):
    x_star += learning_rate * (target_mean - meanX_in_A)
    y_star += learning_rate * (target_mean - meanY_in_B)
    return x_star, y_star

# Iteratively update x_star and y_star until convergence
for iteration in range(max_iterations):
    # Update slope_D based on current x_star and y_star
    slope_D = (c_0 - y_star) / (c_0 - x_star)
    
    def yD_of_x(x):
        return y_star + slope_D * (x - x_star)
    
    def xD_of_y(y):
        return x_star + (1./slope_D) * (y - y_star)
    
    # Define region B
    left_mask_B  = (X <= x_star) & (Y >= y_star)
    right_mask_B = (X >= x_star) & (Y >= yD_of_x(X))
    B_mask = left_mask_B | right_mask_B
    
    # Define region A
    down_mask_A  = (Y <= y_star) & (X >= x_star)
    up_mask_A    = (Y >= y_star) & (X >= xD_of_y(Y))
    A_mask = down_mask_A | up_mask_A
    
    # Estimate E[Y | B]
    Y_in_B = Y[B_mask]
    meanY_in_B = np.mean(Y_in_B) if len(Y_in_B) > 0 else float('nan')
    
    # Estimate E[X | A]
    X_in_A = X[A_mask]
    meanX_in_A = np.mean(X_in_A) if len(X_in_A) > 0 else float('nan')
    
    # Check for convergence
    if abs(meanY_in_B - target_mean) < tolerance and abs(meanX_in_A - target_mean) < tolerance:
        break
    
    # Update x_star and y_star
    x_star, y_star = update_stars(x_star, y_star, meanX_in_A, meanY_in_B)

print("Converged after", iteration + 1, "iterations")
print("x* =", x_star)
print("y* =", y_star)
print("E(x) in A =", meanX_in_A)
print("E(y) in B =", meanY_in_B)

Converged after 1000 iterations
x* = 1.0023598531589843
y* = 1.0424880330393973
E(x) in A = 2.000003781092225
E(y) in B = 2.0000050850862245


# **CORRELATED NORMAL**

In [2]:
############# MILD POSITIVE CORRELATION #############

import numpy as np

# Fixed random seed for reproducibility
np.random.seed(12345)

# Number of samples
N = 3_000_000

# Means, stdevs
mu_x, mu_y = 1.2, 1.1
sigma_x, sigma_y = 1.0, 1.0
rho=0.5

# Proposed solution
x_star = 1.0466
y_star = 1.0701
c_0 = 2

# Slope of line D that passes through (x_star, y_star) and (2,2)
#   slope = (2 - y_star)/(2 - x_star)
slope_D = (c_0 - y_star) / (c_0 - x_star)

def yD_of_x(x):
    return y_star + slope_D * (x - x_star)
def xD_of_y(y):
    # slope = dy/dx => dx/dy = 1/slope
    return x_star + (1./slope_D) * (y - y_star)

# 1) Generate random sample from the correlated normals

cov = np.array([[sigma_x**2, rho*sigma_x*sigma_y], [rho*sigma_x*sigma_y, sigma_y**2]])
mean = np.array([mu_x, mu_y])
X, Y = np.random.multivariate_normal(mean, cov, N).T




# 2) Define region B: 
#    "above line F" => y >= y_star if x <= x_star,
#    "above line D" => y >= yD_of_x(x) if x >= x_star
#    Combined logically:
left_mask_B  = (X <= x_star) & (Y >= y_star)
right_mask_B = (X >= x_star) & (Y >= yD_of_x(X))
B_mask = left_mask_B | right_mask_B

# 3) Define region A:
#    "right of line E" => x >= x_star if y <= y_star,
#    "right of line D" => x >= xD_of_y(y) if y >= y_star
#    Combined logically:
down_mask_A  = (Y <= y_star) & (X >= x_star)
up_mask_A    = (Y >= y_star) & (X >= xD_of_y(Y))
A_mask = down_mask_A | up_mask_A

# 4) Estimate E[Y | B]
Y_in_B = Y[B_mask]
meanY_in_B = np.mean(Y_in_B) if len(Y_in_B) > 0 else float('nan')

# 5) Estimate E[X | A]
X_in_A = X[A_mask]
meanX_in_A = np.mean(X_in_A) if len(X_in_A) > 0 else float('nan')

print("Share of X in A =", len(X_in_A)/N)
print("Share of Y in B =", len(Y_in_B)/N)
print("x* =", x_star)
print("y* =", y_star)
print("E(x) in A =", meanX_in_A)
print("E(y) in B =", meanY_in_B)



Share of X in A = 0.386033
Share of Y in B = 0.31746566666666665
x* = 1.0466
y* = 1.0701
E(x) in A = 2.0005940451964515
E(y) in B = 2.0005725014958866


In [3]:
############# MILD NEGATIVE CORRELATION #############

import numpy as np

# Fixed random seed for reproducibility
np.random.seed(12345)

# Number of samples
N = 3_000_000

# Means, stdevs
mu_x, mu_y = 1.2, 1.1
sigma_x, sigma_y = 1.0, 1.0
rho=-0.5

# Proposed solution
x_star = 1.015
y_star = 1.065
c_0 = 2

# Slope of line D that passes through (x_star, y_star) and (2,2)
#   slope = (2 - y_star)/(2 - x_star)
slope_D = (c_0 - y_star) / (c_0 - x_star)

def yD_of_x(x):
    return y_star + slope_D * (x - x_star)
def xD_of_y(y):
    # slope = dy/dx => dx/dy = 1/slope
    return x_star + (1./slope_D) * (y - y_star)

# 1) Generate random sample from the correlated normals

cov = np.array([[sigma_x**2, rho*sigma_x*sigma_y], [rho*sigma_x*sigma_y, sigma_y**2]])
mean = np.array([mu_x, mu_y])
X, Y = np.random.multivariate_normal(mean, cov, N).T




# 2) Define region B: 
#    "above line F" => y >= y_star if x <= x_star,
#    "above line D" => y >= yD_of_x(x) if x >= x_star
#    Combined logically:
left_mask_B  = (X <= x_star) & (Y >= y_star)
right_mask_B = (X >= x_star) & (Y >= yD_of_x(X))
B_mask = left_mask_B | right_mask_B

# 3) Define region A:
#    "right of line E" => x >= x_star if y <= y_star,
#    "right of line D" => x >= xD_of_y(y) if y >= y_star
#    Combined logically:
down_mask_A  = (Y <= y_star) & (X >= x_star)
up_mask_A    = (Y >= y_star) & (X >= xD_of_y(Y))
A_mask = down_mask_A | up_mask_A

# 4) Estimate E[Y | B]
Y_in_B = Y[B_mask]
meanY_in_B = np.mean(Y_in_B) if len(Y_in_B) > 0 else float('nan')

# 5) Estimate E[X | A]
X_in_A = X[A_mask]
meanX_in_A = np.mean(X_in_A) if len(X_in_A) > 0 else float('nan')

print("Share of X in A =", len(X_in_A)/N)
print("Share of Y in B =", len(Y_in_B)/N)
print("x* =", x_star)
print("y* =", y_star)
print("E(x) in A =", meanX_in_A)
print("E(y) in B =", meanY_in_B)



Share of X in A = 0.4671976666666667
Share of Y in B = 0.40716
x* = 1.015
y* = 1.065
E(x) in A = 2.0005809604376696
E(y) in B = 1.9992622009234258
