In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_squared_error
from scipy.stats import entropy
from xgboost import XGBRegressor
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch
from torch import nn, optim
from sklearn.gaussian_process.kernels import RBF

# Load and normalize stock market data
def load_stock_market_data(file_path='Stock_Market.csv'):
    data = pd.read_csv(file_path)
    hutchison = data['Hutchison'].values
    sun_hung_kai_prop = data['Sun_Hung_Kai_Prop'].values

    # Normalize variables
    X = (hutchison - np.mean(hutchison)) / np.std(hutchison)
    Y = (sun_hung_kai_prop - np.mean(sun_hung_kai_prop)) / np.std(sun_hung_kai_prop)
    return X, Y

# CANM Model
def CANM(X, Y):
    X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.3, random_state=0)

    # Model X -> Y
    model_xy = GradientBoostingRegressor().fit(X_train.reshape(-1, 1), Y_train)
    pred_y = model_xy.predict(X_test.reshape(-1, 1))
    residual_xy = Y_test - pred_y
    score_xy = np.var(residual_xy)

    # Model Y -> X
    model_yx = GradientBoostingRegressor().fit(Y_train.reshape(-1, 1), X_train)
    pred_x = model_yx.predict(Y_test.reshape(-1, 1))
    residual_yx = X_test - pred_x
    score_yx = np.var(residual_yx)

    return score_xy, score_yx

# IGCI Model
def IGCI(X, Y):
    def calculate_entropy(data):
        hist, _ = np.histogram(data, bins=50, density=True)
        return entropy(hist)

    hx = calculate_entropy(X)
    hy = calculate_entropy(Y)
    score_xy = hx - hy  # X -> Y
    score_yx = hy - hx  # Y -> X

    return score_xy, score_yx

# ANM Using XGBoost
def ANM_XGB(X, Y):
    model_xy = XGBRegressor(n_estimators=100, learning_rate=0.1).fit(X.reshape(-1, 1), Y)
    pred_y = model_xy.predict(X.reshape(-1, 1))
    residual_xy = Y - pred_y
    hsic_xy = np.var(residual_xy)

    model_yx = XGBRegressor(n_estimators=100, learning_rate=0.1).fit(Y.reshape(-1, 1), X)
    pred_x = model_yx.predict(Y.reshape(-1, 1))
    residual_yx = X - pred_x
    hsic_yx = np.var(residual_yx)

    return hsic_xy, hsic_yx

# Main execution
if __name__ == "__main__":
    X, Y = load_stock_market_data()

    # Apply CANM
    print("Running CANM...")
    score_xy, score_yx = CANM(X, Y)
    print(f"CANM Scores: X->Y: {score_xy:.4f}, Y->X: {score_yx:.4f}")
    print("CANM Inference:", "X -> Y" if score_xy < score_yx else "Y -> X")

    # Apply IGCI
    print("\nRunning IGCI...")
    score_xy, score_yx = IGCI(X, Y)
    print(f"IGCI Scores: X->Y: {score_xy:.4f}, Y->X: {score_yx:.4f}")
    print("IGCI Inference:", "X -> Y" if score_xy < score_yx else "Y -> X")

    # Apply ANM
    print("\nRunning ANM (XGBoost)...")
    hsic_xy, hsic_yx = ANM_XGB(X, Y)
    print(f"ANM Scores: X->Y: {hsic_xy:.4f}, Y->X: {hsic_yx:.4f}")
    print("ANM Inference:", "X -> Y" if hsic_xy < hsic_yx else "Y -> X")


# Load and normalize stock market data
def load_stock_market_data(file_path='Stock_Market.csv'):
    data = pd.read_csv(file_path)
    hutchison = data['Hutchison'].values
    sun_hung_kai_prop = data['Sun_Hung_Kai_Prop'].values

    # Normalize variables
    X = (hutchison - np.mean(hutchison)) / np.std(hutchison)
    Y = (sun_hung_kai_prop - np.mean(sun_hung_kai_prop)) / np.std(sun_hung_kai_prop)
    return X, Y

# Post-Nonlinear Model (PNL)
class PNLModel(nn.Module):
    def __init__(self):
        super(PNLModel, self).__init__()
        self.nonlinear = nn.Sequential(
            nn.Linear(1, 20), nn.ReLU(),
            nn.Linear(20, 20), nn.ReLU(),
            nn.Linear(20, 1)
        )
        self.linear = nn.Linear(1, 1)

    def forward(self, x):
        x_nonlinear = self.nonlinear(x)
        return self.linear(x) + x_nonlinear

def train_pnl_model(X, Y, epochs=1000, lr=0.001):
    X_tensor = torch.FloatTensor(X).view(-1, 1)
    Y_tensor = torch.FloatTensor(Y).view(-1, 1)
    model = PNLModel()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_tensor)
        loss = criterion(output, Y_tensor)
        loss.backward()
        optimizer.step()
    return model, loss.item()

# Loss-based Causal Inference
class LossBasedModel(nn.Module):
    def __init__(self, input_dim=1):
        super(LossBasedModel, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 20), nn.ReLU(),
            nn.Linear(20, 10), nn.ReLU(),
            nn.Linear(10, 1)
        )

    def forward(self, x):
        return self.fc(x)

def train_loss_based_model(X, Y, epochs=1000, lr=0.001):
    X_tensor = torch.FloatTensor(X).view(-1, 1)
    Y_tensor = torch.FloatTensor(Y).view(-1, 1)
    model = LossBasedModel()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_tensor)
        loss = criterion(output, Y_tensor)
        loss.backward()
        optimizer.step()
    return model, loss.item()

# Kernel-based Causal Inference
def kernel_entropy_score(X, Y, gamma=1.0):
    """
    Compute entropy-based causal direction score using RBF kernels.
    """
    def compute_rbf_kernel(X, gamma):
        pairwise_sq_dists = np.square(X[:, None] - X[None, :])
        return np.exp(-gamma * pairwise_sq_dists)

    X = (X - np.mean(X)) / np.std(X)
    Y = (Y - np.mean(Y)) / np.std(Y)

    K_X = compute_rbf_kernel(X, gamma)
    K_Y = compute_rbf_kernel(Y, gamma)

    entropy_X = np.array([entropy(K_X[i]) for i in range(K_X.shape[0])])
    entropy_Y = np.array([entropy(K_Y[i]) for i in range(K_Y.shape[0])])

    return np.mean(entropy_X) - np.mean(entropy_Y)

def kernel_causal_inference(X, Y, gamma=1.0):
    """
    Perform causal inference using kernel entropy scores.
    """
    score_X_to_Y = kernel_entropy_score(X, Y, gamma)
    score_Y_to_X = kernel_entropy_score(Y, X, gamma)

    if score_X_to_Y > score_Y_to_X:
        direction = "X -> Y"
    else:
        direction = "Y -> X"

    return direction, score_X_to_Y, score_Y_to_X

# Main script
if __name__ == "__main__":
    # Load data
    X, Y = load_stock_market_data()

    # Post-Nonlinear Model (PNL)
    print("\nRunning PNL Model...")
    _, loss_X_to_Y_pnl = train_pnl_model(X, Y)
    _, loss_Y_to_X_pnl = train_pnl_model(Y, X)
    pnl_direction = "X -> Y" if loss_X_to_Y_pnl < loss_Y_to_X_pnl else "Y -> X"
    print(f"PNL Loss (X -> Y): {loss_X_to_Y_pnl:.4f}")
    print(f"PNL Loss (Y -> X): {loss_Y_to_X_pnl:.4f}")
    print(f"PNL Inference: {pnl_direction}")

    # Loss-based Causal Inference
    print("\nRunning Loss-based Model...")
    _, loss_X_to_Y = train_loss_based_model(X, Y)
    _, loss_Y_to_X = train_loss_based_model(Y, X)
    loss_direction = "X -> Y" if loss_X_to_Y < loss_Y_to_X else "Y -> X"
    print(f"Loss-based Loss (X -> Y): {loss_X_to_Y:.4f}")
    print(f"Loss-based Loss (Y -> X): {loss_Y_to_X:.4f}")
    print(f"Loss-based Inference: {loss_direction}")

    # Kernel-based Causal Inference
    print("\nRunning Kernel-based Causal Inference...")
    kernel_direction, kernel_score_X_to_Y, kernel_score_Y_to_X = kernel_causal_inference(X, Y, gamma=0.5)
    print(f"Kernel Entropy Score (X -> Y): {kernel_score_X_to_Y:.4f}")
    print(f"Kernel Entropy Score (Y -> X): {kernel_score_Y_to_X:.4f}")
    print(f"Kernel-based Inference: {kernel_direction}")
