In [None]:
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.model_selection import KFold
from sklearn.metrics import mean_absolute_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
import matplotlib.pyplot as plt
import seaborn as sns


In [None]:
# fake_name = "fake20230912.npy"
train_data_o = np.array(pd.read_excel("../data/original_cohort_data.xlsx", sheet_name=1))
test_data_o = np.array(pd.read_excel("../data/original_cohort_data.xlsx", sheet_name=2))
fake_data_o = np.array(pd.read_excel("../data/synthetic_augmented_data.xlsx", sheet_name=0))

# 如果不用假数据 就把下面两行注释掉
# fake_data_o = np.load("../data/%s" % fake_name)
train_data_o = np.concatenate((train_data_o,fake_data_o))
train_x_o = train_data_o[:, 0: -1]
train_y_o = train_data_o[:, -1]
test_x_o = test_data_o[:, 0: -1]
test_y_o = test_data_o[:, -1]

train_x_o.shape, train_y_o.shape, test_x_o.shape, test_y_o.shape
mms_x = MinMaxScaler()
train_x = mms_x.fit_transform(train_x_o)
test_x = mms_x.transform(test_x_o)

mms_y = MinMaxScaler()
train_y = mms_y.fit_transform(train_y_o[:,np.newaxis])
test_y = mms_y.fit_transform(test_y_o[:,np.newaxis])
print(train_data_o.shape,train_x_o.shape,  train_y_o.shape, test_x_o.shape,test_y_o.shape )

In [None]:
def print_error_statistics(y_true, y_pred, label, scaler):
    # Normalized error statistics
    mean_error_norm = np.mean(y_true - y_pred)
    std_error_norm = np.std(y_true - y_pred)
    mae_norm = mean_absolute_error(y_true, y_pred)
    mse_norm = np.mean((y_true - y_pred)**2)
    rmse_norm = np.sqrt(mean_squared_error(y_true, y_pred))
    r2_norm = r2_score(y_true, y_pred)
    
    print(f"{label}\n归一化：        mean error:{mean_error_norm:.2f}  std error:{std_error_norm:.2f}  mae:{mae_norm:.2f}  mse:{mse_norm:.2f}  rmse:{rmse_norm:.2f}  r2:{r2_norm:.2f}")

    # Denormalized error statistics
    y_true_denorm = scaler.inverse_transform(y_true)
    y_pred_denorm = scaler.inverse_transform(y_pred)
    mean_error_denorm = np.mean(y_true_denorm - y_pred_denorm)
    std_error_denorm = np.std(y_true_denorm - y_pred_denorm)
    mae_denorm = mean_absolute_error(y_true_denorm, y_pred_denorm)
    mse_denorm = np.mean((y_true_denorm - y_pred_denorm)**2)
    rmse_denorm = np.sqrt(mean_squared_error(y_true_denorm, y_pred_denorm))
    r2_denorm = r2_score(y_true_denorm, y_pred_denorm)
    print(f"反归一化：      mean error:{mean_error_denorm:.2f}  std error:{std_error_denorm:.2f}  mae:{mae_denorm:.2f}  mse:{mse_denorm:.2f}  rmse:{rmse_denorm:.2f}  r2:{r2_denorm:.2f}")

# Placeholder for the best model

# Placeholder for the best learning rate
    # Placeholder for the best learning rate and best number of estimators
# Placeholder for the best learning rate and best number of estimators
best_learning_rate = None
best_n_estimators = None  # Initialize here
# Initialize best_val_loss with a large number
best_val_loss = float('inf')


# 设置筛选迭代次数与学习率
learning_rates = [0.01, 0.01, 0.001, 0.0001]
n_estimators_list = [100, 200, 300, 400, 500]  # Add more values as needed
# Initialize best_val_loss with a large number



In [None]:
for lr in learning_rates:
    for n_estimators in n_estimators_list:
        print(f"Evaluating model with learning rate: {lr}, number of estimators: {n_estimators}")

    # 10-fold cross-validation
    kf = KFold(n_splits=10)
    fold_num = 1
    val_losses = []

    for train_index, val_index in kf.split(train_x):
        # XGBoost model with specified learning rate, create a new instance for each fold
        xgb_model = xgb.XGBRegressor(learning_rate=lr, n_estimators=n_estimators, tree_method='gpu_hist')

        X_train, X_val = train_x[train_index], train_x[val_index]
        y_train, y_val = train_y[train_index], train_y[val_index]

        # Training the model
        xgb_model.fit(X_train, y_train.ravel())

        # Predictions
        val_pred = xgb_model.predict(X_val)

        # Mean Absolute Error for validation set
        val_loss = mean_absolute_error(y_val, val_pred)
        val_losses.append(val_loss)

        print(f"FOLD {fold_num}")
        print(f"Validation loss: {val_loss}")
        fold_num += 1

    # Average validation loss for this learning rate
    avg_val_loss = np.mean(val_losses)
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_learning_rate = lr
        best_n_estimators = n_estimators  # Update here
print(f"Best learning rate: {best_learning_rate}, Best number of estimators: {best_n_estimators}, validation loss: {best_val_loss}")



In [None]:
import os
import matplotlib.pyplot as plt
import seaborn as sns
import xgboost as xgb
import pydot
from sklearn.model_selection import train_test_split

# --- 路径设置 (相对路径) ---
# 假设当前脚本在 modeling 文件夹下，回退一级找到 output
output_dir = '../output'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Split the original training data
train_x, valid_x, train_y, valid_y = train_test_split(train_x, train_y, test_size=0.2, random_state=42)

# Create DMatrix objects
dtrain = xgb.DMatrix(train_x, label=train_y)
dvalid = xgb.DMatrix(valid_x, label=valid_y)
dtest = xgb.DMatrix(test_x, label=test_y)

# Training parameters
test_losses = []
evals_result = {}
params = {
    'booster': 'gbtree',
    'objective': 'reg:squarederror',
    'eta': best_learning_rate,
    # add other parameters here as needed
}

# Training
final_xgb_model = xgb.train(
    params,
    dtrain,
    num_boost_round=best_n_estimators,
    evals=[(dtrain, 'train'), (dvalid, 'valid'), (dtest, 'test')],
    evals_result=evals_result,
    verbose_eval=False
)

test_losses = evals_result['test']['rmse']

# Prepare Loss DataFrame
loss_df = pd.DataFrame({
    'Iteration': list(range(1, len(test_losses) + 1)),
    'Loss': test_losses,
    'Type': ['Test Loss'] * len(test_losses)
})

# --- 绘图 1: Test Loss (保存并显示) ---
plt.figure(figsize=(10, 8))
sns.lineplot(data=loss_df, x='Iteration', y='Loss', hue='Type')
plt.title('Test Loss vs Iteration')

# 保存图片到 output
loss_plot_path = os.path.join(output_dir, 'test_loss_curve.png')
plt.savefig(loss_plot_path, dpi=300, bbox_inches='tight')
print(f"Loss curve saved to: {loss_plot_path}")

plt.show()

# Predictions
train_pred_final = final_xgb_model.predict(dtrain)
test_pred_final = final_xgb_model.predict(xgb.DMatrix(test_x))

# --- 绘图 2: 决策树 (保存到 output) ---
dot_data = xgb.to_graphviz(final_xgb_model, num_trees=0)

# 定义保存路径 (使用相对路径)
dot_file_path = os.path.join(output_dir, 'tree.dot')
resized_dot_file_path = os.path.join(output_dir, 'tree_resized.dot')
resized_png_file_path = os.path.join(output_dir, 'tree_visualization.png')

# 写入 dot 文件
with open(dot_file_path, 'w') as f:
    f.write(dot_data.source)

# Pydot 处理
(graph,) = pydot.graph_from_dot_file(dot_file_path)
graph.set('dpi', '300')
graph.set_rankdir('LR')    # 保持从左到右
graph.set_size('"20,10!"') # 保持之前的尺寸设置

# 保存调整后的文件和图片
graph.write_dot(resized_dot_file_path)
graph.write_png(resized_png_file_path)

print(f"Decision tree saved to: {resized_png_file_path}")

In [None]:
import sys
import contextlib

# --- 路径设置 ---
output_dir = '../output'
model_save_path = os.path.join(output_dir, 'final_xgb_model.json')
performance_txt_path = os.path.join(output_dir, 'model_performance_metrics.txt')
importance_plot_path = os.path.join(output_dir, 'feature_importance_plot.png')

# Predictions
train_pred_final = final_xgb_model.predict(dtrain)
valid_pred_final = final_xgb_model.predict(dvalid)
test_pred_final = final_xgb_model.predict(dtest)

# 1. 保存模型到 output
final_xgb_model.save_model(model_save_path)
print(f"Model saved to: {model_save_path}")

# 2. 将误差统计输出到 txt 文件 (同时在屏幕打印可选)
# 打开文件准备写入
with open(performance_txt_path, 'w') as f:
    # 使用 redirect_stdout 将 print 的内容捕获到文件 f 中
    with contextlib.redirect_stdout(f):
        print("=== Model Performance Metrics ===\n")
        print_error_statistics(train_y, train_pred_final[:, np.newaxis], 'train', mms_y)
        print("\n" + "-"*30 + "\n")
        print_error_statistics(valid_y, valid_pred_final[:, np.newaxis], 'valid', mms_y)
        print("\n" + "-"*30 + "\n")
        print_error_statistics(test_y, test_pred_final[:, np.newaxis], 'test', mms_y)

print(f"Performance metrics saved to: {performance_txt_path}")

# 3. 绘制并保存特征重要性图
plt.figure(figsize=(15, 10))
xgb.plot_importance(final_xgb_model, title='Feature importance')
plt.title('Feature importance', fontsize=16)
plt.xlabel('F score', fontsize=14)

# 保存图片
plt.savefig(importance_plot_path, dpi=300, bbox_inches='tight')
print(f"Feature importance plot saved to: {importance_plot_path}")

plt.show()

In [None]:
# --- 路径设置 ---
output_dir = '../output'
importance_data_path = os.path.join(output_dir, 'feature_importance_data.txt')

# 获取数据
importances = final_xgb_model.get_score(importance_type='gain')
importance_df = pd.DataFrame(list(importances.items()), columns=['feature', 'importance'])
importance_df = importance_df.sort_values(by='importance', ascending=False)

# 打印到控制台
print(importance_df)

# 保存到 output 文件夹下的 txt
# 使用 to_string() 可以保持表格对齐的格式
with open(importance_data_path, 'w') as f:
    f.write("=== Feature Importance (Gain) ===\n\n")
    f.write(importance_df.to_string(index=False))

print(f"Feature importance data saved to: {importance_data_path}")

In [None]:
import os
import pandas as pd
import numpy as np
import xgboost as xgb

# --- 1. 路径设置 ---
input_file_path = "../input/new_subject_input_template.xlsx"
model_load_path = "../output/final_xgb_model.json"
result_output_path = "../output/optimal_recommendation_result.txt"

# --- 2. 加载数据 ---
if not os.path.exists(input_file_path):
    print(f"Error: Input file not found at {input_file_path}")
else:
    # 读取数据
    # 这一步非常重要：确保你的Excel里的列顺序，和训练时完全一致！
    try:
        # 尝试删除无关列 (序号 和 结果列)
        data_predicting = pd.read_excel(input_file_path).drop(["No.", "Rowing distance (m)"], axis=1)
    except KeyError:
        # 容错：防止列名有细微差别
        print("Warning: Standard columns not found, checking raw columns...")
        data_predicting = pd.read_excel(input_file_path)
        if "序号" in data_predicting.columns:
            data_predicting = data_predicting.drop(["序号"], axis=1)
        if "y" in data_predicting.columns:
            data_predicting = data_predicting.drop(["y"], axis=1)

    # --- 3. 加载模型 ---
    # 使用原生 Booster 加载
    loaded_xgb_model = xgb.Booster() 
    try:
        loaded_xgb_model.load_model(model_load_path)
        print(f"Successfully loaded model from: {model_load_path}")
    except Exception as e:
        print(f"Error loading model: {e}")

    # --- 4. 构造枚举法预测 ---
    data_list = []
    
    # 提取体重数据，用于计算摄入量的绝对值
    # 这里我们使用英文列名来定位数据，进行计算
    weight_col = data_predicting["Weight (kg)"] 

    # 循环枚举 CHO 摄入率 (0.5 - 1.2)
    for i in np.arange(0.5, 1.21, 0.01):
        
        # 1. 更新 CHO 摄入速率
        data_predicting["CHO intake rate (g/kg/h)"] = i
        
        # 2. 联动更新其他营养素 (基于公式)
        # 这里的英文列名必须和你Excel里的一致
        data_predicting["Fat intake (g)"] = 0.024 * i * weight_col
        data_predicting["Sodium intake (mg)"] = 0.0026643 * i * weight_col
        data_predicting["Magnesium intake (mg)"] = 0.00262 * i * weight_col
        data_predicting["Calcium intake (mg)"] = 0.0038383 * i * weight_col

        # 3. 转换为 Numpy 数组
        # 关键修改：直接取 values，不查 feature_names
        # 只要 input excel 的列顺序是对的，这就没问题
        tmp = data_predicting.values 
        data_list.append(tmp)

    data_list = np.concatenate(data_list)
    
    # --- 进行归一化 ---
    # 假设 mms_x 和 mms_y 还在内存中 (如果你重启了内核，需要重新定义它们)
    x = mms_x.transform(data_list)

    # --- 利用模型预测 ---
    # 关键修改：因为模型里没有存名字，这里只给数据 (DMatrix(x))
    dtest_input = xgb.DMatrix(x)
    prediction = loaded_xgb_model.predict(dtest_input)

    # --- 5. 找到最优解 ---
    max_indice = np.argmax(prediction)
    max_y_raw = prediction[max_indice]
    max_x_raw = x[max_indice]
    
    # 反归一化获取真实值
    max_x_original_scale = mms_x.inverse_transform(max_x_raw.reshape(1, -1))[0]
    
    # 获取最佳 CHO 摄入速率
    # 假设 CHO 在你的 45 个指标中排在第 2 个 (索引为 1)
    # 因为我们现在没法用名字查，只能按位置查。
    # 根据你提供的列表：Previous meal (0), CHO intake rate (1)... 
    max_x2 = max_x_original_scale[1] 
    
    # 获取最佳预测成绩 (y)
    max_y = mms_y.inverse_transform(np.array([max_y_raw]).reshape(1, -1))[0][0]

    # --- 6. 计算 PRO 摄入速率 ---
    max_pro = max_x2 / 4.0

    # --- 7. 格式化输出与保存 ---
    output_content = (
        "=== Personalized Recommendation Result ===\n"
        f"Predicted Max Performance (Rowing Distance): {max_y:.2f} m\n"
        f"Optimal CHO Intake Rate: {max_x2:.2f} g/kg/h\n"
        f"Optimal PRO Intake Rate: {max_pro:.2f} g/kg/h\n"
    )

    print(output_content)

    with open(result_output_path, "w") as f:
        f.write(output_content)
    
    print(f"Result saved to: {result_output_path}")