In [None]:
import tensorflow opencv-python matplotlib

In [None]:
#构建分割网络
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Conv2DTranspose, concatenate

def unet(input_size=(512, 512, 1)):
    inputs = tf.keras.Input(input_size)

    # 下采样路径
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    # 底部
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(conv3)

    # 上采样路径
    up4 = Conv2DTranspose(128, 2, strides=(2, 2), padding='same')(conv3)
    up4 = concatenate([up4, conv2])
    conv4 = Conv2D(128, 3, activation='relu', padding='same')(up4)
    conv4 = Conv2D(128, 3, activation='relu', padding='same')(conv4)

    up5 = Conv2DTranspose(64, 2, strides=(2, 2), padding='same')(conv4)
    up5 = concatenate([up5, conv1])
    conv5 = Conv2D(64, 3, activation='relu', padding='same')(up5)
    conv5 = Conv2D(64, 3, activation='relu', padding='same')(conv5)

    # 输出层
    outputs = Conv2D(1, 1, activation='sigmoid')(conv5)

    model = tf.keras.Model(inputs=[inputs], outputs=[outputs])
    return model

# 创建模型
segmentation_model = unet(input_size=(512, 512, 1))
segmentation_model.summary()

In [None]:
#构建决策网络
def decision_network(input_size=(64, 64, 1025)):
    inputs = tf.keras.Input(input_size)

    # 卷积层
    x = Conv2D(8, 5, activation='relu', padding='same')(inputs)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Conv2D(16, 5, activation='relu', padding='same')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Conv2D(32, 5, activation='relu', padding='same')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    # 全局池化
    x = tf.keras.layers.GlobalAveragePooling2D()(x)

    # 输出层
    outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    model = tf.keras.Model(inputs=[inputs], outputs=[outputs])
    return model

# 创建模型
decision_model = decision_network(input_size=(64, 64, 1025))
decision_model.summary()

In [2]:
'''

'''
import os
import cv2
import numpy as np

def center_crop(image, target_size):
    """
    对图像进行中心裁剪（裁剪高度和宽度）。
    :param image: 输入图像
    :param target_size: 目标分辨率 (height, width)
    :return: 裁剪后的图像
    """
    height, width = image.shape[:2]
    target_height, target_width = target_size

    # 计算裁剪区域的起始位置
    start_y = (height - target_height) // 2
    start_x = (width - target_width) // 2

    # 裁剪图像
    cropped_image = image[start_y:start_y + target_height, start_x:start_x + target_width]
    return cropped_image

def pad_image(image, target_size, pad_value=0):
    """
    对图像进行填充（填充高度和宽度）。
    :param image: 输入图像
    :param target_size: 目标分辨率 (height, width)
    :param pad_value: 填充值（默认为 0）
    :return: 填充后的图像
    """
    height, width = image.shape[:2]
    target_height, target_width = target_size

    # 计算需要填充的高度和宽度
    pad_height = max(target_height - height, 0)
    pad_width = max(target_width - width, 0)

    # 计算填充区域
    pad_top = pad_height // 2
    pad_bottom = pad_height - pad_top
    pad_left = pad_width // 2
    pad_right = pad_width - pad_left

    # 填充图像
    padded_image = cv2.copyMakeBorder(image, pad_top, pad_bottom, pad_left, pad_right, cv2.BORDER_CONSTANT, value=pad_value)
    return padded_image

def resize_image(image, target_size):
    """
    动态调整图像大小（调整高度和宽度）。
    :param image: 输入图像
    :param target_size: 目标分辨率 (height, width)
    :return: 调整后的图像
    """
    height, width = image.shape[:2]
    target_height, target_width = target_size

    if height > target_height or width > target_width:
        # 裁剪
        return center_crop(image, target_size)
    elif height < target_height or width < target_width:
        # 填充
        return pad_image(image, target_size)
    else:
        # 无需调整
        return image

def load_image_and_mask(image_path, mask_path, target_size=(500, 1260)):
    """
    加载图像和掩码，并调整大小。
    :param image_path: 图像文件路径
    :param mask_path: 掩码文件路径
    :param target_size: 目标分辨率 (height, width)
    :return: 调整后的图像和掩码
    """
    # 加载图像和掩码
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

    # 检查图像和掩码是否加载成功
    if image is None:
        print(f"Failed to load image: {image_path}")
        return None, None
    if mask is None:
        print(f"Failed to load mask: {mask_path}")
        return None, None

    # 调整大小
    image = resize_image(image, target_size)
    mask = resize_image(mask, target_size)
    #归一化
    image = image / 255.0
    mask = mask / 255.0
    
    return image, mask

def load_dataset(base_path, folders, target_size=(1254, 500)):
    """
    加载数据集中的所有图像和掩码，并调整大小。
    :param base_path: 数据集的根目录
    :param folders: 包含所有子文件夹名称的列表
    :param target_size: 目标分辨率 (height, width)
    :return: 图像和掩码的 NumPy 数组
    """
    images = []
    masks = []
    for folder in folders:
        folder_path = os.path.join(base_path, folder)
        for file in os.listdir(folder_path):
            if file.endswith('.jpg'):  # 筛选图像文件
                image_path = os.path.join(folder_path, file)
                mask_path = os.path.join(folder_path, file.replace('.jpg', '_label.bmp'))
                if os.path.exists(mask_path):  # 检查掩码文件是否存在
                    image, mask = load_image_and_mask(image_path, mask_path, target_size)  # 加载并调整大小
                    if image is not None and mask is not None:  # 检查是否加载成功
                        images.append(image)
                        masks.append(mask)
    return np.array(images), np.array(masks)  # 转换为 NumPy 数组

# 数据集路径
base_path = r'Defect Dataset Used in Paper Two/KolektorSDD'  # 根据实际路径修改
folders = [f'kos{i:02d}' for i in range(1, 51)]  # 生成 kos01 到 kos50 的文件夹列表

# 加载数据集
images, masks = load_dataset(base_path, folders, target_size)

In [None]:
#可视化
import matplotlib.pyplot as plt

def visualize_images_and_masks(images, masks, num_samples=5):
    """
    可视化图像和掩码。
    :param images: 图像数组
    :param masks: 掩码数组
    :param num_samples: 随机可视化的样本数量
    """
    # 随机选择一些样本
    indices = np.random.choice(len(images), num_samples, replace=False)

    # 创建画布
    plt.figure(figsize=(15, 5 * num_samples))

    for i, idx in enumerate(indices):
        # 显示图像
        plt.subplot(num_samples, 2, 2 * i + 1)
        plt.imshow(images[idx], cmap='gray')
        plt.title(f"Image {idx}")
        plt.axis('off')

        # 显示掩码
        plt.subplot(num_samples, 2, 2 * i + 2)
        plt.imshow(masks[idx], cmap='gray')
        plt.title(f"Mask {idx}")
        plt.axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
##训练分割网络
# 编译模型
segmentation_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 假设有一个数据生成器
def data_generator(image_paths, mask_paths, batch_size=8):
    while True:
        indices = np.random.choice(len(image_paths), batch_size)
        batch_images = []
        batch_masks = []
        for i in indices:
            image, mask = load_data(image_paths[i], mask_paths[i])
            batch_images.append(image)
            batch_masks.append(mask)
        yield np.array(batch_images), np.array(batch_masks)

# 训练
history = segmentation_model.fit(
    data_generator(image_paths, mask_paths),
    steps_per_epoch=len(image_paths) // 8,
    epochs=50
)

In [None]:
##训练决策模型
# 冻结分割网络的权重
segmentation_model.trainable = False

# 构建完整模型
inputs = segmentation_model.input
segmentation_output = segmentation_model.output
decision_output = decision_model(segmentation_output)
full_model = tf.keras.Model(inputs=inputs, outputs=decision_output)

# 编译完整模型
full_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练决策网络
history = full_model.fit(
    data_generator(image_paths, mask_paths),
    steps_per_epoch=len(image_paths) // 8,
    epochs=20
)

In [None]:
##推理与测试
def predict_defect(image_path, model):
    image, _ = load_data(image_path, image_path)  # 加载图像
    image = np.expand_dims(image, axis=0)  # 添加批次维度
    prediction = model.predict(image)  # 预测
    return prediction[0][0]  # 返回缺陷概率

# 示例
image_path = "dataset/images/img1.png"
defect_probability = predict_defect(image_path, full_model)
print(f"Defect Probability: {defect_probability}")