In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow as tf
import keras
from keras.models import Sequential, Model
from keras.layers import Dense
import os

In [None]:
def plot_loss_accuracy(history):
    """
    绘制训练过程中的 loss 和 sparse_categorical_accuracy 曲线。
    参数：
        history : tf.keras.callbacks.History 对象
    """
    historydf = pd.DataFrame(history.history, index=history.epoch)

    plt.figure(figsize=(10, 6))
    historydf[["loss", "val_loss", "sparse_categorical_accuracy", "val_sparse_categorical_accuracy"]].plot(
        ylim=(0, max(1.0, historydf.values.max())),
        title="Training and Validation Loss / Accuracy",
        grid=True,
        figsize=(10, 6)
    )

    final_loss = history.history['loss'][-1]
    final_acc = history.history['sparse_categorical_accuracy'][-1]
    plt.title(f'Final Loss: {final_loss:.3f}, Final Accuracy: {final_acc:.3f}')
    plt.xlabel("Epoch")
    plt.ylabel("Metric Value")
    plt.tight_layout()
    plt.show()


def plot_multiple_histories_with_annotations(histories, labels=None, metric="sparse_categorical_accuracy",
                                             figsize=(14, 6), save_path=None, dpi=300, file_format="png"):
    """
    绘制多个 history 对象的训练/验证曲线（支持不同颜色/线型，标注最高点，导出图像）

    参数：
    - histories: list of tf.keras.callbacks.History objects
    - labels: list of str, 用于标注每个模型
    - metric: str, 训练指标名，如 "sparse_categorical_accuracy", "loss"
    - figsize: tuple, 图像尺寸
    - save_path: str, 文件保存路径（无扩展名）
    - dpi: int, 导出图像分辨率
    - file_format: str, 'png' 或 'svg'
    """

    if labels is None:
        labels = [f"Model {i+1}" for i in range(len(histories))]

    colors = plt.cm.get_cmap('tab10', len(histories))  # 不同模型不同颜色

    plt.figure(figsize=figsize)

    # === 子图1：训练曲线（实线）
    plt.subplot(1, 2, 1)
    for i, (hist, label) in enumerate(zip(histories, labels)):
        plt.plot(hist.epoch, hist.history[metric], linestyle='-', color=colors(i), label=f"{label} (train)")
    plt.title(f"Training {metric}")
    plt.xlabel("Epoch")
    plt.ylabel(metric)
    plt.grid(True)
    plt.legend()

    # === 子图2：验证曲线（虚线 + 标注）
    plt.subplot(1, 2, 2)
    for i, (hist, label) in enumerate(zip(histories, labels)):
        val_metric = f"val_{metric}"
        val_values = hist.history[val_metric]
        epochs = hist.epoch
        plt.plot(epochs, val_values, linestyle='--', color=colors(i), label=f"{label} (val)")

        # 自动标注最大 val accuracy 位置
        best_epoch = int(pd.Series(val_values).idxmax())
        best_value = val_values[best_epoch]
        plt.scatter(best_epoch, best_value, color=colors(i), marker='o')
        plt.text(best_epoch, best_value + 0.01, f"{best_value:.3f}", fontsize=9, ha='center', color=colors(i))

    plt.title(f"Validation {metric}")
    plt.xlabel("Epoch")
    plt.ylabel(metric)
    plt.grid(True)
    plt.legend()

    plt.tight_layout()

    # 导出图像
    if save_path:
        full_path = f"{save_path}.{file_format}"
        plt.savefig(full_path, dpi=dpi, format=file_format)
        print(f"✅ 图像已保存为 {full_path}")

    plt.show()



# Data Preparation

In [None]:
# ImageNet normalization stats
IMAGENET_MEAN = tf.constant([0.485, 0.456, 0.406], dtype=tf.float32)
IMAGENET_STD = tf.constant([0.229, 0.224, 0.225], dtype=tf.float32)

# Custom preprocessing function
def preprocess_train(image, label):
    image = tf.image.resize_with_pad(image, 256, 256)  # 短边缩放到256，pad长边
    image = tf.image.random_crop(image, size=(224, 224, 3))  # 随机裁剪
    image = tf.image.random_flip_left_right(image)  # 随机水平翻转
    image = tf.image.random_brightness(image, max_delta=0.1)  # 明亮度抖动
    image = tf.cast(image, tf.float32) / 255.0  # 归一化到0~1
    image = (image - IMAGENET_MEAN) / IMAGENET_STD  # 使用ImageNet均值标准化
    return image, label

def preprocess_val(image, label):
    image = tf.image.resize_with_pad(image, 256, 256)
    image = tf.image.central_crop(image, central_fraction=0.875)  # 近似224 crop
    image = tf.cast(image, tf.float32) / 255.0
    image = (image - IMAGENET_MEAN) / IMAGENET_STD
    return image, label

# Load raw dataset
raw_dataset = tf.keras.utils.image_dataset_from_directory(
    "../data/CUB_200_2011/CUB_200_2011/images",
    labels='inferred',
    label_mode='int',
    image_size=(256, 256),  # 初步resize到统一尺寸（不作变形）
    batch_size=None,  # 返回未批量化的 (image, label), 保证 map 时传入的是单张图像
    shuffle=True,
    seed=888
)

# Train/Val split
total_size = 11788  # CUB-200-2011 总样本数
train_size = 5994   # 按照官方 split
val_size = total_size - train_size

train_ds = raw_dataset.take(train_size).map(preprocess_train).batch(256).shuffle(1000).prefetch(tf.data.AUTOTUNE)
val_ds = raw_dataset.skip(train_size).map(preprocess_val).batch(256).prefetch(tf.data.AUTOTUNE)


In [None]:
# Check the dataset
for images, labels in train_ds.take(1):
    print(images.shape)  # (256, 224, 224, 3)
    print(labels)

# Model Creation

## Model Build

In [None]:
from tensorflow.keras.applications import ResNet101V2
from tensorflow.keras import layers, Model, Input

def build_model():
    inputs = Input(shape=(224, 224, 3))
    base_model = ResNet101V2(weights="imagenet", include_top=False, input_tensor=inputs)
    base_model.trainable = True  # Full fine-tuning

    x = base_model.output
    x = layers.GlobalAveragePooling2D()(x)
    # x = layers.Dropout(0.2)(x)  # optional

    outputs = layers.Dense(200, activation="softmax", name="Predictions")(x)

    model = Model(inputs=inputs, outputs=outputs)
    model.summary(show_trainable=True)

    return model



## Model Compile

In [None]:
def compile_model(model, lr=0.01, m=0.9, wd=0.0001):
    optimizer = tf.keras.optimizers.SGD(
        learning_rate=lr,
        momentum=m,
        weight_decay=wd  # TF ≥ 2.9 iff available, our tf.__version__ = 2.16
    )

    model.compile(
        optimizer=optimizer,
        loss="sparse_categorical_crossentropy",
        metrics=["sparse_categorical_accuracy"]
    )
    return model



## Model Fitting

### step decay

In [None]:
# Step Decay LearningRateScheduler Function（Li et al., 2020）
def step_decay(epoch):
    if epoch < 150:
        return 0.01
    elif epoch < 250:
        return 0.001
    else:
        return 0.0001

lr_callback = tf.keras.callbacks.LearningRateScheduler(step_decay)




### Callbacks

In [None]:
import os
from tensorflow import keras

checkpoint_path = "checkpoints/best_model.keras"
log_dir = "logs/single_run"

callbacks = [
    keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_path,
        monitor="val_loss",
        save_best_only=True,
        mode="min",
        verbose=1
    ),
    keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=15,
        restore_best_weights=True,
        verbose=1
    ),
    keras.callbacks.TensorBoard(
        log_dir=log_dir,
        histogram_freq=1,
        write_graph=True,
        write_images=True
    ),
    lr_callback
]


# Output Settings

In [None]:
import datetime

# output dir
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M")
base_ckpt_dir = f"checkpoints_grid_{timestamp}"
base_log_dir = f"logs_grid_{timestamp}"
os.makedirs(base_ckpt_dir, exist_ok=True)
os.makedirs(base_log_dir, exist_ok=True)

# result initial
results = []



# Model Fitting

In [None]:
def run_experiment(lr=0.01, m=0.9, wd=0.0001, tag=None, epochs=1, use_early_stopping=True):
    """
    构建、编译并训练模型，支持指定超参数和输出路径。
    """
    model = build_model()
    model = compile_model(model, lr=lr, m=m, wd=wd)

    if tag is None:
        tag = f"lr_{lr}_m_{m}"

    ckpt_dir = os.path.join(base_ckpt_dir, tag)
    log_dir = os.path.join(base_log_dir, tag)
    os.makedirs(ckpt_dir, exist_ok=True)
    os.makedirs(log_dir, exist_ok=True)

    callbacks = [
        keras.callbacks.ModelCheckpoint(
            filepath=os.path.join(ckpt_dir, "model_best.keras"),
            monitor="val_loss",
            save_best_only=True,
            mode="min",
            verbose=1
        ),
        keras.callbacks.TensorBoard(
            log_dir=log_dir,
            histogram_freq=1,
            write_graph=True,
            write_images=True
        ),
        lr_callback
    ]

    if use_early_stopping:
        callbacks.append(
            keras.callbacks.EarlyStopping(
                monitor="val_loss",
                patience=15,
                restore_best_weights=True,
                verbose=1
            )
        )

    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=epochs,
        callbacks=callbacks,
        verbose=2
    )

    best_val_loss = min(history.history["val_loss"])
    best_val_acc = max(history.history["val_sparse_categorical_accuracy"])
    best_epoch = history.history["val_loss"].index(best_val_loss)

    return {
        "learning_rate": lr,
        "momentum": m,
        "best_val_loss": best_val_loss,
        "best_val_accuracy": best_val_acc,
        "epoch": best_epoch,
        "history": history
    }


In [None]:
# Grid Search Parameter
learning_rates = [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1]
momentums = [0.0, 0.8, 0.9, 0.95, 0.99]
results = []

for lr in learning_rates:
    for m in momentums:
        print(f\"\\n🚀 Training: lr={lr}, m={m}\")
        res = run_experiment(lr=lr, m=m, epochs=300, use_early_stopping=False)
        results.append({k: res[k] for k in res if k != \"history\"})  # history 可选保存


In [None]:
results_df = pd.DataFrame(results)
results_df.to_csv(f"grid_search_results_{timestamp}.csv", index=False)

# 曲线图 1：不同学习率下 val accuracy 曲线（按 momentum 分组）
plt.figure(figsize=(10, 6))
for m in sorted(results_df['momentum'].unique()):
    subset = results_df[results_df['momentum'] == m]
    plt.plot(subset['learning_rate'], subset['best_val_accuracy'], marker='o', label=f'm={m}')

plt.title("Validation Accuracy vs Learning Rate")
plt.xlabel("Learning Rate")
plt.ylabel("Best Validation Accuracy")
plt.xscale('log')  # 对学习率使用 log 坐标更直观
plt.legend(title="Momentum")
plt.grid(True)
plt.tight_layout()
acc_curve_path = f"val_accuracy_curve_{timestamp}.png"
plt.savefig(acc_curve_path, dpi=300)
plt.show()
print(f"✅ 曲线图（Accuracy）已保存为：{acc_curve_path}")


# 曲线图 2：不同动量下 val accuracy 曲线（按 learning_rate 分组）
plt.figure(figsize=(10, 6))
for lr in sorted(results_df['learning_rate'].unique()):
    subset = results_df[results_df['learning_rate'] == lr]
    plt.plot(subset['momentum'], subset['best_val_accuracy'], marker='o', label=f'lr={lr}')

plt.title("Validation Accuracy vs Momentum")
plt.xlabel("Momentum")
plt.ylabel("Best Validation Accuracy")
plt.legend(title="Learning Rate")
plt.grid(True)
plt.tight_layout()
momentum_curve_path = f"val_accuracy_vs_momentum_curve_{timestamp}.png"
plt.savefig(momentum_curve_path, dpi=300)
plt.show()
print(f"✅ 曲线图（vs Momentum）已保存为：{momentum_curve_path}")

# 热力图
import seaborn as sns
import matplotlib.pyplot as plt

# 保存 CSV 文件
results_df = pd.DataFrame(results)
csv_path = f"grid_search_results_{timestamp}.csv"
results_df.to_csv(csv_path, index=False)
print(f"\n✅ 所有实验完成，结果已保存：{csv_path}")

# === 输出 Heatmap 图像 ===

# Pivot 成矩阵形式
acc_matrix = results_df.pivot(index="momentum", columns="learning_rate", values="best_val_accuracy")
loss_matrix = results_df.pivot(index="momentum", columns="learning_rate", values="best_val_loss")

# 绘制 Accuracy 热力图
plt.figure(figsize=(10, 6))
sns.heatmap(acc_matrix, annot=True, fmt=".3f", cmap="YlGnBu")
plt.title("Validation Accuracy Heatmap")
plt.xlabel("Learning Rate")
plt.ylabel("Momentum")
plt.tight_layout()
acc_fig_path = f"val_accuracy_heatmap_{timestamp}.png"
plt.savefig(acc_fig_path, dpi=300)
plt.show()
print(f"✅ 准确率图像已保存为：{acc_fig_path}")

# （可选）绘制 Loss 热力图
plt.figure(figsize=(10, 6))
sns.heatmap(loss_matrix, annot=True, fmt=".3f", cmap="Reds_r")
plt.title("Validation Loss Heatmap")
plt.xlabel("Learning Rate")
plt.ylabel("Momentum")
plt.tight_layout()
loss_fig_path = f"val_loss_heatmap_{timestamp}.png"
plt.savefig(loss_fig_path, dpi=300)
plt.show()
print(f"✅ 损失图像已保存为：{loss_fig_path}")

