In [None]:


import numpy as np
import pandas as pd
import scipy.stats as stats
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression

###################### Load dataset (replace with actual dataset loading) #################################
def load_target_data():
    data = pd.read_csv('Data_File.csv')  # Ensure your file is named correctly
    x = data['Age'].values
    y = data['Target'].values
    return x, y

####################### HSIC (Hilbert-Schmidt Independence Criterion) computation with optional sigma tuning#############################
def HSIC(X, Y, sigma_X=None, sigma_Y=None, epsilon=1e-8):
    n = X.shape[0]

    # If no custom sigma is provided, use the median heuristic
    if sigma_X is None:
        sigma_X = np.median(np.abs(X[:, None] - X[None, :]))
    if sigma_Y is None:
        sigma_Y = np.median(np.abs(Y[:, None] - Y[None, :]))

    # Compute RBF kernel matrices for X and Y
    K = np.exp(-np.square(X[:, None] - X[None, :]) / (2 * sigma_X ** 2 + epsilon))
    L = np.exp(-np.square(Y[:, None] - Y[None, :]) / (2 * sigma_Y ** 2 + epsilon))

    # Centering matrix
    H = np.eye(n) - (1.0 / n) * np.ones((n, n))

    # HSIC value calculation
    HSIC_value = np.trace(np.dot(np.dot(K, H), np.dot(L, H))) / (n - 1) ** 2
    return HSIC_value

# Additive Noise Model (ANM) using Linear Regression and HSIC
def ANM_LR(x, y):
    # Split the data into training and test sets
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0)

    # Standardize the data
    scaler_x = StandardScaler()
    scaler_y = StandardScaler()
    x_train_scaled = scaler_x.fit_transform(x_train.reshape(-1, 1))
    y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, 1))
    x_test_scaled = scaler_x.transform(x_test.reshape(-1, 1))
    y_test_scaled = scaler_y.transform(y_test.reshape(-1, 1))

    # Linear Regression model for Age -> Target
    model_xy = LinearRegression()
    model_xy.fit(x_train_scaled, y_train_scaled.ravel())
    y_pred_xy = model_xy.predict(x_test_scaled)
    residuals_xy = y_test_scaled.ravel() - y_pred_xy

    # Linear Regression model for Target -> Age
    model_yx = LinearRegression()
    model_yx.fit(y_train_scaled, x_train_scaled.ravel())
    x_pred_yx = model_yx.predict(y_test_scaled)
    residuals_yx = x_test_scaled.ravel() - x_pred_yx

    # Compute HSIC for both directions with manual sigma tuning (you can adjust sigma if needed)
    HSIC_xy = HSIC(x_test_scaled.ravel(), residuals_xy)
    HSIC_yx = HSIC(y_test_scaled.ravel(), residuals_yx)

    # Compute MSE for both directions
    mse_xy = mean_squared_error(y_test_scaled, y_pred_xy)
    mse_yx = mean_squared_error(x_test_scaled, x_pred_yx)

    return {'HSIC_xy': HSIC_xy, 'HSIC_yx': HSIC_yx, 'MSE_xy': mse_xy, 'MSE_yx': mse_yx}

# Main function to infer causal direction and calculate MSE
def infer_causal_direction():
    # Load your dataset
    x, y = load_target_data()

    # Apply ANM_LR to infer causal direction using Linear Regression
    result = ANM_LR(x, y)

    # Determine causal direction based on HSIC
    if result['HSIC_xy'] < result['HSIC_yx']:
        causal_direction = "Age -> Target"
    else:
        causal_direction = "Target -> Age"

    # Output the results
    print(f'Causal direction: {causal_direction}')
    print(f'HSIC (Age -> Target): {result["HSIC_xy"]:.4f}')
    print(f'HSIC (Target -> Age): {result["HSIC_yx"]:.4f}')
    print(f'MSE (Age -> Target): {result["MSE_xy"]:.4f}')
    print(f'MSE (Target -> Age): {result["MSE_yx"]:.4f}')

# Run the main function
if __name__ == "__main__":
    infer_causal_direction()

######################### Apply IGCI ########################################

def igci_score(x, y, refMeasure=2, estimator=1, epsilon=1e-8):
    # Ensure data is sorted for proper evaluation
    idx = np.argsort(x)
    x = x[idx]
    y = y[idx]

    if refMeasure == 1:
        # Ref Measure: Uniform
        delta_x = np.diff(x)
        delta_y = np.diff(y)
    elif refMeasure == 2:
        # Ref Measure: Gaussian
        delta_x = np.diff(stats.norm.cdf(x))
        delta_y = np.diff(stats.norm.cdf(y))
    else:
        raise ValueError("refMeasure should be 1 (Uniform) or 2 (Gaussian)")

    # Add a small epsilon to avoid division by zero
    delta_x = np.clip(delta_x, epsilon, None)
    delta_y = np.clip(delta_y, epsilon, None)

    # Compute the IGCI score based on the estimator chosen
    if estimator == 1:
        score = np.sum(np.log(np.abs(delta_y)) - np.log(np.abs(delta_x)))
    elif estimator == 2:
        score = np.mean(np.log(np.abs(delta_y / delta_x)))
    else:
        raise ValueError("estimator should be 1 or 2")

    return score

# Apply IGCI method to determine causal direction and get scores
def apply_igci(x, y):
    # Apply IGCI score in both directions
    score_xy = igci_score(x, y, refMeasure=2, estimator=1)
    score_yx = igci_score(y, x, refMeasure=2, estimator=1)

    # Determine causal direction based on the scores
    if score_xy < score_yx:
        causal_direction = "Age -> Target"
    else:
        causal_direction = "Target -> Age"

    return causal_direction, score_xy, score_yx

######## Load your dataset
def load_height_data():
    data = pd.read_csv('Data_File.csv')  # Ensure your file is named correctly
    age = data['Age'].values
    target = data['Target'].values
    return age, target

# Main function to apply IGCI and display average scores
if __name__ == "__main__":
    age, target = load_target_data()

    # Apply IGCI method
    causal_direction, score_xy, score_yx = apply_igci(age, target)

    # Display the results
    print(f'Causal direction: {causal_direction}')
    print(f'Average IGCI score (Age -> Target): {score_xy:.4f}')
    print(f'Average IGCI score (Height -> Target): {score_yx:.4f}')


############################## Apply CAM model ##############################################
# Load and preprocess data
data = pd.read_csv("Data_File.csv")
data['Age'] = (data['Age'] - data['Age'].mean()) / data['Age'].std()
data['Target'] = (data['Target'] - data['Target'].mean()) / data['Target'].std()

# Prepare data for both directions
X_age_to_target = data[['Age']].values
y_age_to_target = data['Target'].values
X_target_to_age = data[['Target']].values
y_target_to_age = data['Age'].values

# Fit CAM model and track loss scores in both directions
_, loss_age_to_target = fit_cam_and_track_loss(X_age_to_target, y_age_to_target)
_, loss_target_to_age = fit_cam_and_track_loss(X_target_to_age, y_target_to_age)

# Calculate average loss over the last 10 epochs for each direction
avg_loss_age_to_target = np.mean(loss_age_to_target[-10:])
avg_loss_target_to_age = np.mean(loss_target_to_age[-10:])

# Determine inferred direction
direction = "Age -> Target" if avg_loss_age_to_target < avg_loss_target_to_age else "Target -> Age"

# Print results
print("\nCAM Model Results Based on Average Loss:")
print(f"  Avg Loss Age -> Target: {avg_loss_age_to_target}")
print(f"  Avg Loss target -> Age: {avg_loss_target_to_age}")
print(f"Inferred Causal Direction: {direction}")

# Plotting loss convergence for visualization
plt.figure(figsize=(10, 6))
plt.plot(loss_age_to_target, label='CAM Age -> Target', color='blue')
plt.plot(loss_target_to_age, label='CAM Target -> Age', color='orange', linestyle='--')
plt.xlabel('Epoch')
plt.ylabel('Mean Squared Error')
plt.title('Causal Direction Convergence for CAM')
plt.legend()
plt.show()

######################## Apply PNL and Loss-based causal inference ##############################
import numpy as np
import pandas as pd
import torch
from torch import nn, optim
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import rbf_kernel
from sklearn.feature_selection import mutual_info_regression

# Load and preprocess data
data = pd.read_csv("Data_File.csv")
data['Age'] = (data['Age'] - data['Age'].mean()) / data['Age'].std()
data['Target'] = (data['Target'] - data['Target'].mean()) / data['Target'].std()

### 1. Improved PNL Model without Early Stopping ###
class ImprovedNonlinearTransformation(nn.Module):
    def __init__(self, input_dim):
        super(ImprovedNonlinearTransformation, self).__init__()
        self.fc1 = nn.Linear(input_dim, 20)
        self.bn1 = nn.BatchNorm1d(20)
        self.fc2 = nn.Linear(20, 20)
        self.bn2 = nn.BatchNorm1d(20)
        self.fc3 = nn.Linear(20, 10)
        self.bn3 = nn.BatchNorm1d(10)
        self.fc4 = nn.Linear(10, 1)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.bn1(self.fc1(x)))
        x = self.relu(self.bn2(self.fc2(x)))
        x = self.relu(self.bn3(self.fc3(x)))
        return self.fc4(x)

class ImprovedPNLModel(nn.Module):
    def __init__(self):
        super(ImprovedPNLModel, self).__init__()
        self.nonlinear_transformation = ImprovedNonlinearTransformation(1)
        self.predictor = nn.Linear(1, 1)

    def forward(self, x):
        x_transformed = self.nonlinear_transformation(x)
        return self.predictor(x_transformed)

# Training function without early stopping
def train_improved_pnl_model(train_x, train_y, epochs=500, lr=1e-3):
    model = ImprovedPNLModel().to('cuda' if torch.cuda.is_available() else 'cpu')
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)  # L2 regularization
    criterion = nn.MSELoss()
    train_x, train_y = torch.FloatTensor(train_x).view(-1, 1), torch.FloatTensor(train_y).view(-1, 1)

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(train_x)
        loss = criterion(output, train_y)
        loss.backward()
        optimizer.step()

    return model, loss.item()

# Prepare data for both directions
train_x_age_to_target, train_y_age_to_target = data['Age'].values, data['Target'].values
train_x_target_to_age, train_y_target_to_age = data['Target'].values, data['Age'].values

# Train improved PNL models for both directions
_, loss_age_to_target = train_improved_pnl_model(train_x_age_to_target, train_y_age_to_target)
_, loss_target_to_age = train_improved_pnl_model(train_x_target_to_age, train_y_target_to_age)

# Determine the inferred causal direction based on lower loss
direction = "Age -> Target" if loss_age_to_target < loss_target_to_age else "Target -> Age"
print(f"  Loss for Age -> Target: {loss_age_to_target}")
print(f"  Loss for Target -> Age: {loss_target_to_age}")
print(f"Inferred Causal Direction: {direction}")


################################### 4. Simple Neural Network Model for Dependency Measurement #############################
class SimpleRegressor(nn.Module):
    def __init__(self, input_dim=1, hidden_dim=10):
        super(SimpleRegressor, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        return self.fc3(x)

def train_model(train_x, train_y, epochs=200, lr=1e-3):
    model = SimpleRegressor().to('cuda' if torch.cuda.is_available() else 'cpu')
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()
    train_x, train_y = torch.FloatTensor(train_x).view(-1, 1), torch.FloatTensor(train_y).view(-1, 1)

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(train_x)
        loss = criterion(output, train_y)
        loss.backward()
        optimizer.step()
    return model, loss.item()

# Train models for both directions
_, loss_age_to_target = train_model(train_x_age_to_target, train_y_age_to_target)
_, loss_target_to_age = train_model(train_x_target_to_age, train_y_target_to_age)

# Determine inferred causal direction based on lower loss
direction = "Age -> Target" if loss_age_to_target < loss_target_to_age else "Target -> Age"
print("\nLoss-Based Causal Inference Results:")
print(f"  Loss for Age -> Target: {loss_age_to_target}")
print(f"  Loss for Target -> Age: {loss_target_to_age}")
print(f"Inferred Causal Direction: {direction}")
