In [2]:
import os
import pandas as pd
import numpy as np
import xml.etree.ElementTree as ET
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical


# 数据路径
train_dir = r'C:\Users\xiaog\Desktop\Project\Datasets\dataset\dataset 4\Data Files\train'
valid_dir = r'C:\Users\xiaog\Desktop\Project\Datasets\dataset\dataset 4\Data Files\valid'
test_dir = r'C:\Users\xiaog\Desktop\Project\Datasets\dataset\dataset 4\Data Files\test'
# 解析XML文件，获取标签
def get_label_from_xml(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    objects = root.findall('object')

    # 如果存在 "thermal_defect"，返回 1，否则返回 0
    for obj in objects:
        if obj.find('name').text == 'thermal_defect':
            return "1"  # 必须是字符串

    return "0"  # 也必须是字符串


def count_defects_in_directory(data_dir):
    with_defect = 0
    without_defect = 0
    
    for xml_file in os.listdir(data_dir):
        if xml_file.endswith('.xml'):
            xml_path = os.path.join(data_dir, xml_file)
            label = get_label_from_xml(xml_path)
            
            if label == "1":
                with_defect += 1
            else:
                without_defect += 1
    
    return with_defect, without_defect

# 统计数据集中有缺陷和无缺陷的图像数量
train_with_defect, train_without_defect = count_defects_in_directory(train_dir)
valid_with_defect, valid_without_defect = count_defects_in_directory(valid_dir)
test_with_defect, test_without_defect = count_defects_in_directory(test_dir)


# 处理数据，返回 DataFrame（包含文件路径和标签）
def create_dataframe(data_dir):
    data = []
    
    for xml_file in os.listdir(data_dir):
        if xml_file.endswith('.xml'):
            xml_path = os.path.join(data_dir, xml_file)
            img_path = xml_path.replace('.xml', '.jpg')  # 假设图像文件与 XML 同名
            
            if os.path.exists(img_path):  # 确保图像文件存在
                label = get_label_from_xml(xml_path)  # 确保是字符串
                data.append([img_path, label])
    
    return pd.DataFrame(data, columns=['filepath', 'label'])

# 创建数据集 DataFrame
train_df = create_dataframe(train_dir)
valid_df = create_dataframe(valid_dir)
test_df = create_dataframe(test_dir)

# 统计数据
print(f"Train set: {len(train_df)} images")
print(f"Valid set: {len(valid_df)} images")
print(f"Test set: {len(test_df)} images")

# 训练数据增强（仅用于训练集）
train_datagen = ImageDataGenerator(
    rescale=1./255,  # 归一化
    rotation_range=30,  # 随机旋转
    width_shift_range=0.1,  # 水平平移
    height_shift_range=0.1,  # 垂直平移
    shear_range=0.2,  # 剪切变换
    zoom_range=0.2,  # 缩放
    horizontal_flip=True,  # 随机水平翻转
    fill_mode='nearest'
)

# 验证 & 测试数据（仅归一化，不做增强）
valid_test_datagen = ImageDataGenerator(rescale=1./255)



# 生成训练数据
train_generator = train_datagen.flow_from_dataframe(
    dataframe=train_df,
    x_col="filepath",
    y_col="label",
    target_size=(128, 128),
    batch_size=16,
    class_mode="categorical"  # 适用于二分类任务
)

# 生成验证数据
valid_generator = valid_test_datagen.flow_from_dataframe(
    dataframe=valid_df,
    x_col="filepath",
    y_col="label",
    target_size=(128, 128),
    batch_size=16,
    class_mode="categorical"
)

# 生成测试数据
test_generator = valid_test_datagen.flow_from_dataframe(
    dataframe=test_df,
    x_col="filepath",
    y_col="label",
    target_size=(128, 128),
    batch_size=16,
    class_mode="categorical",
    shuffle=False  # 测试数据不打乱顺序
)

# 输出数据量
print(f"Train generator samples: {train_generator.samples}")
print(f"Valid generator samples: {valid_generator.samples}")
print(f"Test generator samples: {test_generator.samples}")

# 输出结果
print(f"Train set - defect: {train_with_defect}, no_defect: {train_without_defect}")
print(f"Valid set - defect: {valid_with_defect}, no_defect: {valid_without_defect}")
print(f"Test set - defect: {test_with_defect}, no_defect: {test_without_defect}")


Train set: 3748 images
Valid set: 1071 images
Test set: 533 images
Found 3748 validated image filenames belonging to 2 classes.
Found 1071 validated image filenames belonging to 2 classes.
Found 533 validated image filenames belonging to 2 classes.
Train generator samples: 3748
Valid generator samples: 1071
Test generator samples: 533
Train set - defect: 3321, no_defect: 427
Valid set - defect: 942, no_defect: 129
Test set - defect: 470, no_defect: 63


In [4]:
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as K
import tensorflow as tf
# # 启用混合精度
# policy = tf.keras.mixed_precision.Policy('mixed_float16')
# tf.keras.mixed_precision.set_global_policy(policy)
# gpus = tf.config.experimental.list_physical_devices('GPU')
# if gpus:
#     try:
#         tf.config.experimental.set_virtual_device_configuration(
#             gpus[0],
#             [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=4096)]  # 限制为 4GB
#         )
#         print("GPU 内存限制已设置！")
#     except RuntimeError as e:
#         print(e)

def inception_module(x, filters):
    # 1x1卷积分支
    conv1x1 = Conv2D(filters, (1,1), padding='same', activation='relu')(x)
    
    # 3x3卷积分支
    conv3x3 = Conv2D(filters, (3,3), padding='same', activation='relu')(x)
    
    # 5x5卷积分支
    conv5x5 = Conv2D(filters, (5,5), padding='same', activation='relu')(x)
    
    # 池化分支
    pool = MaxPooling2D((3,3), strides=(1,1), padding='same')(x)
    pool = Conv2D(filters, (1,1), padding='same', activation='relu')(pool)
    
    # 合并各分支
    merged = concatenate([conv1x1, conv3x3, conv5x5, pool], axis=-1)
    return merged

def attention_block(inputs):
    attention = Dense(1, activation='tanh')(inputs)    # (batch_size, seq_length, 1)
    attention = Flatten()(attention)                   # (batch_size, seq_length)
    attention = Activation('softmax')(attention)       # (batch_size, seq_length)
    attention = Reshape((inputs.shape[1], 1))(attention)  # (batch_size, seq_length, 1)
    
    context = Multiply()([inputs, attention])          # (batch_size, seq_length, features)
    return context


def build_model(input_shape=(128, 128, 3), num_classes=2):
    inputs = Input(shape=input_shape)
    x = inception_module(inputs, 32)
    x = MaxPooling2D(2)(x)
    x = inception_module(x, 64)
    x = MaxPooling2D(2)(x)
    x = inception_module(x, 128)
    x = GlobalAveragePooling2D()(x)

    x = Reshape((-1, 128))(x)  # sequence for LSTM
    x = Bidirectional(LSTM(64, return_sequences=True))(x)
    x = Dropout(0.3)(x)
    x = attention_block(x)
    x = GlobalAveragePooling1D()(x)

    x = Dense(64, activation='relu')(x)
    x = Dropout(0.5)(x)
    outputs = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=inputs, outputs=outputs)
    return model
    # Build the model
model = build_model(input_shape=(128, 128, 3), num_classes=2)

# Print the model summary
model.summary()


Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 max_pooling2d_5 (MaxPooling2D)  (None, 128, 128, 3)  0          ['input_2[0][0]']                
                                                                                                  
 conv2d_12 (Conv2D)             (None, 128, 128, 32  128         ['input_2[0][0]']                
                                )                                                                 
                                                                                            

In [5]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Precision, Recall

# # 启用混合精度
# policy = tf.keras.mixed_precision.Policy('mixed_float16')
# tf.keras.mixed_precision.set_global_policy(policy)

# 定义 Focal Loss
def focal_loss(gamma=2., alpha=0.75):
    """
    Focal Loss implementation for binary classification.

    Parameters:
        gamma (float): Focusing parameter. Default is 2.
        alpha (float): Weighting factor for class imbalance. Default is 0.75 for better focusing on less frequent class.

    Returns:
        loss (function): Focal loss function.
    """

    def focal_loss_fixed(y_true, y_pred):
        # Clip predictions to avoid log(0)
        y_pred = K.clip(y_pred, K.epsilon(), 1. - K.epsilon())

        # Calculate cross entropy
        cross_entropy = -y_true * K.log(y_pred)

        # Calculate modulating factor (1 - p_t) ^ gamma
        modulating_factor = K.pow(1 - y_pred, gamma)

        # Calculate the focal loss
        loss = alpha * modulating_factor * cross_entropy

        return K.sum(loss, axis=-1)

    return focal_loss_fixed


In [6]:
model.compile(
        optimizer=Adam(learning_rate=1e-4),
        loss=focal_loss(gamma=2., alpha=0.75),
        metrics=[
            'accuracy', 
            Precision(name='precision'), 
            Recall(name='recall'), 
            'AUC'
        ]
    )

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

# assume epochs and bacth size
epochs = 50
batch_size = 32  

steps_per_epoch = train_generator.samples // batch_size
validation_steps = valid_generator.samples // batch_size

from tensorflow.keras.callbacks import ReduceLROnPlateau

callbacks = [
    EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True),
    # ModelCheckpoint('best_model.h5', monitor='val_loss', save_best_only=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=1e-6)
]


# model_checkpoint = ModelCheckpoint(
#     # filepath='best_model.h5',  # 保存最优模型的路径
#     monitor='val_loss',
#     save_best_only=True,
#     mode='min',
#     verbose=1
# )

history = model.fit(
    train_generator,
    steps_per_epoch=steps_per_epoch,  
    validation_data=valid_generator,
    validation_steps=validation_steps, 
    epochs=epochs,
    callbacks=callbacks
)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50