In [None]:
import tensorflow as tf
import numpy as np
import cv2
import os
from google.colab import drive

# 挂载 Google Drive
drive.mount('/content/drive')

# 设置文件路径
detection_data_dir = '/content/drive/MyDrive/Machine Vision/Minne Apple Count/detection/'
image_folder_detection_trains = detection_data_dir + 'train/images/'
image_folder_detection_masks = detection_data_dir + 'train/masks/'
image_folder_detection_test = detection_data_dir + 'test/images/'

# 数据集参数
batch_size = 64
shuffle_size = 1000
img_height = 180
img_width = 180

# 获取训练文件和掩码文件
train_image_names = os.listdir(image_folder_detection_trains)
mask_image_names = os.listdir(image_folder_detection_masks)
test_image_names = os.listdir(image_folder_detection_test)

# 构建完整路径
train_image_paths = [os.path.join(image_folder_detection_trains, name) for name in train_image_names]
mask_image_paths = [os.path.join(image_folder_detection_masks, name) for name in mask_image_names]
test_image_paths = [os.path.join(image_folder_detection_test, name) for name in test_image_names]

# 将数据集拆分，划分训练集和验证集，80%训练，20%验证
train_size = int(0.8 * len(train_image_paths))
val_size = len(train_image_paths) - train_size
train_image_paths, val_image_paths = train_image_paths[:train_size], train_image_paths[train_size:]
train_mask_paths, val_mask_paths = mask_image_paths[:train_size], mask_image_paths[train_size:]

# 创建数据集
train_dataset = tf.data.Dataset.from_tensor_slices((train_image_paths, train_mask_paths))
val_dataset = tf.data.Dataset.from_tensor_slices((val_image_paths, val_mask_paths))
test_dataset = tf.data.Dataset.from_tensor_slices(test_image_paths)

# 边界框生成函数
def mask_to_bboxes(mask):
    # 将掩码转换为 NumPy 数组，然后转为二值图片（苹果转白色）
    mask_np = mask.numpy().astype(np.uint8)
    _, binary_mask = cv2.threshold(mask_np, 1, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    bboxes = []

    for contour in contours:
        x, y, w, h = cv2.boundingRect(contour)  # 获取外接矩形
        bboxes.append([x, y, x+w, y+h])  # [x_min, y_min, x_max, y_max]

    return bboxes

# 数据预处理，读取文件，解码，重新设置尺寸，归一化
# 预处理函数
def preprocess(image_path, mask_path=None):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [img_height, img_width])
    image = image / 255.0

    if mask_path is not None:
        mask = tf.io.read_file(mask_path)
        mask = tf.image.decode_png(mask, channels=1) #掩码单通道
        mask = tf.image.resize(mask, [img_height, img_width])
        mask = mask / 255.0
        bboxes = mask_to_bboxes(mask)
        return image, bboxes
    else:
        return image

train_dataset = train_dataset.map(preprocess)
val_dataset = val_dataset.map(preprocess)
test_dataset = test_dataset.map(preprocess)

# 设置每批64个
train_dataset.batch(batch_size)
val_dataset.batch(batch_size)
test_dataset.batch(batch_size)

# 打乱数据
train_dataset.shuffle(shuffle_size)
val_dataset.shuffle(shuffle_size)
test_dataset.shuffle(shuffle_size)

# 预取数据来提高性能
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.prefetch(tf.data.experimental.AUTOTUNE)
test_dataset = test_dataset.prefetch(tf.data.experimental.AUTOTUNE)

# 设置迭代次数
train_dataset.repeat(2)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


<_RepeatDataset element_spec=(TensorSpec(shape=(180, 180, 3), dtype=tf.float32, name=None), TensorSpec(shape=(180, 180, 1), dtype=tf.float32, name=None))>