In [1]:
import numpy as np 
import os
import tensorflow  as tf
from keras.models import Sequential
from keras.layers import Dense,Conv2D,Flatten,MaxPooling2D,UpSampling2D,InputLayer,Reshape
from keras.utils import image_dataset_from_directory
from keras.layers import Dropout,Activation,BatchNormalization
from keras.layers import LeakyReLU

import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import load_img
from tensorflow.keras.utils import img_to_array
from tensorflow.keras.utils import array_to_img
from tensorflow.keras import regularizers
from tensorflow.keras.applications import VGG16, InceptionV3, ResNet50
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.layers import Dense, Conv2D, Flatten, MaxPooling2D, GlobalAveragePooling2D
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
import itertools
import matplotlib.pyplot as plt

from pathlib import Path
from skimage.io import imread
from skimage.transform import resize
from sklearn.utils import Bunch
import math
import pandas as pd
import datetime
import time
import csv
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

import time
import datetime
import os
from PIL import Image
import sys

In [2]:
# 設定圖片大小和路徑
img_width, img_height = 224, 224  # ResNet50的標準輸入大小
train_data_dir = "C:/Users/user/Desktop/MLwork2/train"
test_data_dir = "C:/Users/user/Desktop/MLwork2/test"

In [3]:
epochs_list = [20, 40, 60]
batch_size_list = [8, 16]

In [4]:
# 設定結果儲存目錄
results_dir = 'C:/Users/user/Desktop/MLWORK/ResNet50'
os.makedirs(results_dir, exist_ok=True)

In [5]:
# 創建CSV檔案記錄結果
def create_result_csv_files():
    csv_files = {}
    
    # 創建基本模型結果CSV
    base_csv_path = os.path.join(results_dir, "ResNet50_base_model_results.csv")
    with open(base_csv_path, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['Model', 'Epochs', 'Train Accuracy', 'Train Loss', 'Val Accuracy', 'Val Loss', 'Test Accuracy', 'Test Loss'])
    csv_files['base'] = base_csv_path
    
    # 創建微調模型結果CSV
    tuned_csv_path = os.path.join(results_dir, "ResNet50_fine_tuned_model_results.csv")
    with open(tuned_csv_path, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['Model', 'Epochs', 'Train Accuracy', 'Train Loss', 'Val Accuracy', 'Val Loss', 'Test Accuracy', 'Test Loss'])
    csv_files['tuned'] = tuned_csv_path
    
    # 創建模型比較CSV
    comparison_csv_path = os.path.join(results_dir, "ResNet50_tuned_or_not_comparison.csv")
    with open(comparison_csv_path, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['Model', 'Epochs', 'Base Accuracy', 'Base Loss', 'Tuned Accuracy', 'Tuned Loss', 'Improvement'])
    csv_files['comparison'] = comparison_csv_path
    
    return csv_files


In [6]:
# 資料生成器 (無資料增強)
def create_data_generators(batch_size):
    # 僅做標準化處理，不增強數據
    train_datagen = ImageDataGenerator(
        rescale=1.0/255,
        validation_split=0.2  # 保留驗證分割
    )
    
    test_datagen = ImageDataGenerator(
        rescale=1.0/255
    )
    
    train_generator = train_datagen.flow_from_directory(
        train_data_dir,
        target_size=(img_width, img_height),
        batch_size=batch_size,
        class_mode='binary',
        subset='training'
    )
    
    val_generator = train_datagen.flow_from_directory(
        train_data_dir,
        target_size=(img_width, img_height),
        batch_size=batch_size,
        class_mode='binary',
        subset='validation'
    )
    
    test_generator = test_datagen.flow_from_directory(
        test_data_dir,
        target_size=(img_width, img_height),
        batch_size=batch_size,
        class_mode='binary',
        shuffle=False
    )
    
    return train_generator, val_generator, test_generator

In [7]:
# 創建ResNet50模型
def create_resnet_model():
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(img_width, img_height, 3))
    
    # 凍結基礎模型的層
    for layer in base_model.layers:
        layer.trainable = False
    
    # 添加新的分類層
    model = Sequential([
        base_model,
        GlobalAveragePooling2D(),  # ResNet50通常使用GlobalAveragePooling而非Flatten
        Dense(256, activation='relu'),
        Dropout(0.5),
        Dense(1, activation='sigmoid')  # 二元分類
    ])
    
    model.compile(
        optimizer=Adam(learning_rate=0.0001),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    return model


In [8]:
# 微調模型：解凍部分基礎模型的層
def fine_tune_model(model):
    # 計算要解凍的層數（所有模型使用相同的比例：頂部20%的層）
    # 確保 model 是一個 Sequential 模型且第一個層是 base_model
    if isinstance(model, Sequential) and len(model.layers) > 0:
        base_model = model.layers[0] # <-- 正確地獲取 ResNet50 基礎模型
    else:
        print("Warning: model structure unexpected, could not identify base_model for fine-tuning.")
        return model

    if not hasattr(base_model, 'layers'):
        print("Warning: base_model does not have layers to unfreeze.")
        return model

    total_layers = len(base_model.layers) # <-- 計算 ResNet50 基礎模型的層數
    if total_layers == 0:
         print("Warning: base_model has no layers to unfreeze.")
         return model

    unfreeze_layers = max(0, int(total_layers * 0.2))  # 解凍頂部20%的層

    # 先凍結所有基礎模型的層
    for layer in base_model.layers:
        layer.trainable = False

    # 然後解凍 ResNet50 基礎模型頂部的指定層數
    for layer in base_model.layers[-unfreeze_layers:]: # <-- 解凍 ResNet50 基礎模型的頂部層
        layer.trainable = True

    print(f"Fine-tuning model: Unfrozen {unfreeze_layers} layers out of {total_layers} total layers in the base model.")

    # 重新編譯模型以應用更低的學習率
    model.compile(
        optimizer=Adam(learning_rate=1e-5),  # 更低的學習率用於微調
        loss='binary_crossentropy',
        metrics=['accuracy']
    )

    return model

In [9]:
# 訓練和評估模型
def train_and_evaluate_model(model, train_generator, val_generator, test_generator, 
                           model_name, epochs, batch_size, is_fine_tuned=False):
    model_prefix = "fine_tuned_" if is_fine_tuned else "base_"
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True),
        ModelCheckpoint(
            os.path.join(results_dir, f"{model_prefix}{model_name}_e{epochs}.h5"),
            save_best_only=True,
            monitor='val_accuracy'
        )
    ]
    
    # 訓練模型
    print(f"Training {'fine-tuned' if is_fine_tuned else 'base'} {model_name} with epochs={epochs}, batch_size={batch_size}")
    history = model.fit(
        train_generator,
        steps_per_epoch=train_generator.samples // batch_size,
        epochs=epochs,
        validation_data=val_generator,
        validation_steps=val_generator.samples // batch_size,
        callbacks=callbacks
    )
    
    # 評估訓練集
    train_eval = model.evaluate(train_generator)
    train_loss, train_acc = train_eval
    
    # 評估驗證集
    val_eval = model.evaluate(val_generator)
    val_loss, val_acc = val_eval
    
    # 評估測試集
    test_eval = model.evaluate(test_generator)
    test_loss, test_acc = test_eval
    
    # 繪製訓練歷史
    plt.figure(figsize=(12, 5))
    
    # 準確率圖
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], 'b', label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], 'r', label='Validation Accuracy')
    plt.title(f'{model_prefix.capitalize()} {model_name} - Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    # 損失函數圖
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], 'b', label='Training Loss')
    plt.plot(history.history['val_loss'], 'r', label='Validation Loss')
    plt.title(f'{model_prefix.capitalize()} {model_name} - Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig(os.path.join(results_dir, f"{model_prefix.lower()}_{model_name.lower()}_e{epochs}_b{batch_size}_history.png"))
    plt.close()
    
    return {
        'history': history.history,
        'train_accuracy': train_acc,
        'train_loss': train_loss,
        'val_accuracy': val_acc,
        'val_loss': val_loss,
        'test_accuracy': test_acc,
        'test_loss': test_loss
    }


In [10]:
# 將結果保存到CSV
def save_results_to_csv(results, csv_path, model_name, epochs, is_fine_tuned=False):
    with open(csv_path, 'a', newline='') as f:
        writer = csv.writer(f)
        writer.writerow([
            model_name,
            epochs,
            results['train_accuracy'],
            results['train_loss'],
            results['val_accuracy'],
            results['val_loss'],
            results['test_accuracy'],
            results['test_loss']
        ])

In [11]:
# 保存比較結果到CSV
def save_comparison_to_csv(base_results, tuned_results, csv_path, model_name, epochs):
    improvement = tuned_results['test_accuracy'] - base_results['test_accuracy']
    
    with open(csv_path, 'a', newline='') as f:
        writer = csv.writer(f)
        writer.writerow([
            model_name,
            epochs,
            base_results['test_accuracy'],
            base_results['test_loss'],
            tuned_results['test_accuracy'],
            tuned_results['test_loss'],
            improvement
        ])


In [12]:
# 主函數
def main():
    # 創建CSV結果文件
    csv_files = create_result_csv_files()
    
    model_name = "ResNet50"
    all_results = {}  # 儲存所有結果以供後續比較
    
    # 第1階段：訓練和評估基礎模型
    print("======== Stage 1: Training and Evaluating Base Models ========")
    
    for epochs in epochs_list:
        for batch_size in batch_size_list:
            print(f"\n===== Training Base {model_name} - Epochs: {epochs}, Batch Size: {batch_size} =====")
            
            # 創建資料生成器
            train_gen, val_gen, test_gen = create_data_generators(batch_size)
            
            # 創建基礎模型
            base_model = create_resnet_model()
            
            # 訓練和評估基礎模型
            base_results = train_and_evaluate_model(
                base_model, train_gen, val_gen, test_gen,
                model_name, epochs, batch_size, is_fine_tuned=False
            )
            
            # 保存結果到CSV
            save_results_to_csv(
                base_results, csv_files['base'], 
                model_name, epochs, is_fine_tuned=False
            )
            
            # 保存模型結果以供後續比較
            result_key = f"{model_name}_e{epochs}_b{batch_size}"
            all_results[result_key] = {'base': base_results}
            
            # 清理內存
            tf.keras.backend.clear_session()
    
    # 第2階段：微調模型
    print("\n======== Stage 2: Fine-tuning Models ========")
    
    for epochs in epochs_list:
        for batch_size in batch_size_list:
            print(f"\n===== Fine-tuning {model_name} - Epochs: {epochs}, Batch Size: {batch_size} =====")
            
            # 創建資料生成器 (重新創建是為了確保數據的一致性)
            train_gen, val_gen, test_gen = create_data_generators(batch_size)
            
            # 創建模型
            tuned_model = create_resnet_model()
            
            # 微調模型
            tuned_model = fine_tune_model(tuned_model)
            
            # 訓練和評估微調後的模型
            tuned_results = train_and_evaluate_model(
                tuned_model, train_gen, val_gen, test_gen,
                model_name, epochs, batch_size, is_fine_tuned=True
            )
            
            # 保存結果到CSV
            save_results_to_csv(
                tuned_results, csv_files['tuned'], 
                model_name, epochs, is_fine_tuned=True
            )
            
            # 添加到結果字典
            result_key = f"{model_name}_e{epochs}_b{batch_size}"
            if result_key in all_results:
                all_results[result_key]['tuned'] = tuned_results
                
                # 保存比較結果
                base_results = all_results[result_key]['base']
                save_comparison_to_csv(
                    base_results, tuned_results, csv_files['comparison'],
                    model_name, epochs
                )
            
            # 清理內存
            tf.keras.backend.clear_session()
    
    print("\nAll processes completed successfully!")
    print(f"Results saved to: {results_dir}")

if __name__ == "__main__":
    main()


===== Training Base ResNet50 - Epochs: 20, Batch Size: 8 =====
Found 176 images belonging to 2 classes.
Found 44 images belonging to 2 classes.
Found 80 images belonging to 2 classes.
Training base ResNet50 with epochs=20, batch_size=8
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20

===== Training Base ResNet50 - Epochs: 20, Batch Size: 16 =====
Found 176 images belonging to 2 classes.
Found 44 images belonging to 2 classes.
Found 80 images belonging to 2 classes.
Training base ResNet50 with epochs=20, batch_size=16
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20

===== Training Base ResNet50 - Epochs: 40, Batch Size: 8 =====
Found 176 images belonging to 2 classes.
Found 44 images belonging to 2 classes.
Found 80 images belonging to 2 classes.
Training base ResNet50 with epochs=40, batch_size=8
Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40

===== Training Base ResNet50 - Epochs: 40, Batch Size: