In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold

# 1. 安裝 pysr 庫
# !pip install -q pysr

# 2. 導入並執行 julia 安裝 (這步在 Kaggle 需要一點時間，大約 2-3 分鐘)
# import pysr
# pysr.install()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

DATA_DIR = "/kaggle/input/function-approximation-using-multilayer-perceptron"
train_path = os.path.join(DATA_DIR, "train.csv")
test_path  = os.path.join(DATA_DIR, "test.csv")
sample_path = os.path.join(DATA_DIR, "sample_submission.csv")

train_df = pd.read_csv(train_path)
test_df  = pd.read_csv(test_path)
sample_df = pd.read_csv(sample_path)

In [None]:
def get_pysr_feature(df):
    # 這是根據你的 Hall of Fame 第 12 行轉換出來的 Python 代碼
    # 注意：在 PySR 中 x0 是 x1，x1 是 x2
    x1 = df['x1']
    x2 = df['x2']
    
    # 公式：square(cos((((0.014435811 - square(x0)) - square(x1)) * 3.0000703) - -0.7421232)) + 0.50067115
    inner = ((0.014435811 - x1**2 - x2**2) * 3.0000703) + 0.7421232
    feature = np.cos(inner)**2 + 0.50067115
    
    return feature

In [None]:
def apply_all_features(df):
    # 1. 原有的 14 個多項式與三角函數特徵
    df['x1_2'] = df['x1'] ** 2
    df['x2_2'] = df['x2'] ** 2
    df['x1x2'] = df['x1'] * df['x2']
    df['sin_x1'], df['cos_x1'] = np.sin(df['x1']), np.cos(df['x1'])
    df['sin_x2'], df['cos_x2'] = np.sin(df['x2']), np.cos(df['x2'])
    df['dist'] = np.sqrt(df['x1']**2 + df['x2']**2)
    df['abs_diff'] = np.abs(df['x1'] - df['x2'])
    df['sin_x1x2'] = np.sin(df['x1'] * df['x2'])
    df['x1_cos_x2'] = df['x1'] * np.cos(df['x2'])
    df['x2_cos_x1'] = df['x2'] * np.cos(df['x1'])
    df['x1_2_x2'] = (df['x1']**2) * df['x2']
    df['x2_2_x1'] = (df['x2']**2) * df['x1']

    # 2. 呼叫剛才定義的公式函數，產生第 17 個特徵
    df['sr_feature'] = get_pysr_feature(df)
    
    return df

# 應用至資料集
train_df = apply_all_features(train_df)

test_df = apply_all_features(test_df)

# 更新最終特徵清單，確保後續模型輸入維度為 17
# 根據你的相關係數表，只保留這 7 個最強特徵
features_final = ['sr_feature', 'dist', 'cos_x2', 'x2_2', 'cos_x1', 'x1_2', 'abs_diff']

In [None]:
class XYDataset(Dataset):
    def __init__(self, X, y=None):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = None if y is None else torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        if self.y is None:
            return self.X[idx]
        return self.X[idx], self.y[idx]

X = train_df[features_final].values.astype(np.float32)
y = train_df[["y"]].values.astype(np.float32)
X_test = test_df[features_final].values.astype(np.float32)

X_tr, X_va, y_tr, y_va = train_test_split(
    X, y, test_size=0.2, random_state=42, shuffle=True
)

scaler_x = StandardScaler()
scaler_y = StandardScaler()

X_tr = scaler_x.fit_transform(X_tr)
X_va = scaler_x.transform(X_va)
X_test_s = scaler_x.transform(X_test)

y_tr = scaler_y.fit_transform(y_tr)
y_va = scaler_y.transform(y_va)

train_loader = DataLoader(XYDataset(X_tr, y_tr), batch_size=128, shuffle=True)
val_loader   = DataLoader(XYDataset(X_va, y_va), batch_size=256, shuffle=False)
test_loader  = DataLoader(XYDataset(X_test_s, None), batch_size=256, shuffle=False)

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.block = nn.Sequential(
            nn.Linear(dim, dim),
            nn.BatchNorm1d(dim),
            nn.SiLU(), 
        )
    def forward(self, x):
        return x + self.block(x)

class ResDeepMLP(nn.Module):
    def __init__(self, in_dim=7, hidden_dim=128): 
        super().__init__()
        self.input_layer = nn.Sequential(nn.Linear(in_dim, hidden_dim), nn.SiLU())
        self.res1 = ResidualBlock(hidden_dim)
        self.res2 = ResidualBlock(hidden_dim)
        self.output_layer = nn.Sequential(
            nn.Linear(hidden_dim, 32),
            nn.SiLU(),
            nn.Linear(32, 1)
        )
    def forward(self, x):
        x = self.input_layer(x)
        x = self.res1(x)
        x = self.res2(x)
        return self.output_layer(x)

In [None]:
n_splits = 5
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
test_preds_all = []

for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
    print(f"\n===== Fold {fold + 1} / {n_splits} =====")
    
    X_tr_f, X_va_f = X[train_idx], X[val_idx]
    y_tr_f, y_va_f = y[train_idx], y[val_idx]
    
    sc_x, sc_y = StandardScaler(), StandardScaler()
    X_tr_s = sc_x.fit_transform(X_tr_f)
    X_va_s = sc_x.transform(X_va_f)
    X_te_s = sc_x.transform(X_test)
    y_tr_s = sc_y.fit_transform(y_tr_f)
    y_va_s = sc_y.transform(y_va_f)
    
    train_loader = DataLoader(XYDataset(X_tr_s, y_tr_s), batch_size=32, shuffle=True, pin_memory=True, num_workers=2)
    val_loader   = DataLoader(XYDataset(X_va_s, y_va_s), batch_size=256, shuffle=False, pin_memory=True, num_workers=2)
    
    best_val_fold = float("inf")
    best_state_fold = None
    wait = 0
    PATIENCE = 100
    EPOCHS = 500
    
    model = ResDeepMLP(in_dim=len(features_final), hidden_dim=256).to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-5)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
        optimizer, T_0=40, T_mult=2, eta_min=1e-7
    )
    criterion = nn.MSELoss()

    for epoch in range(1, EPOCHS + 1):
        model.train()
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad()
            loss = criterion(model(xb), yb)
            loss.backward()
            optimizer.step()

        model.eval()
        val_losses = []
        with torch.no_grad():
            for xb, yb in val_loader:
                xb, yb = xb.to(device), yb.to(device)
                val_losses.append(criterion(model(xb), yb).item())
        
        va_loss = np.mean(val_losses)
        scheduler.step()

        if va_loss < best_val_fold - 1e-6:
            best_val_fold = va_loss
            best_state_fold = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
            wait = 0
        else:
            wait += 1
            if wait >= PATIENCE:
                print(f"Fold {fold+1} Early Stopping at Epoch {epoch}, Best Val: {best_val_fold:.6f}")
                break
    
    model.load_state_dict(best_state_fold)
    model.eval()
    with torch.no_grad():
        X_te_tensor = torch.tensor(X_te_s).to(device)
        fold_preds = model(X_te_tensor).cpu().numpy()
        fold_preds_orig = sc_y.inverse_transform(fold_preds)
        test_preds_all.append(fold_preds_orig)

final_preds = np.mean(test_preds_all, axis=0).flatten()

In [None]:
# 修改 Cell 75
sub = sample_df.copy()
sub["y"] = final_preds # 使用平均後的結果
sub.to_csv("submission.csv", index=False)
print("Saved K-Fold Ensemble result!")

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')

# 畫出 x1, x2 與 y 的關係
ax.scatter(train_df['x1'], train_df['x2'], train_df['y'], c=train_df['y'], cmap='viridis', s=2)

ax.set_xlabel('x1')
ax.set_ylabel('x2')
ax.set_zlabel('y')
ax.set_title('3D Surface Approximation Visualization')
plt.show()

In [None]:
# 看看哪個特徵跟 y 最像
correlations = train_df.drop(columns=['id']).corr()['y'].abs().sort_values(ascending=False)
print(correlations)