In [5]:
# from google.colab import drive
# drive.mount("/content/drive")

In [6]:
from PIL import Image
import random
def resize_image_and_bboxes(image, bboxes, target_size):
    """
    修改图像的大小并调整对应的边界框。

    :param image: 原始图像 (PIL Image)
    :param bboxes: 原始边界框 (Numpy 或 Tensor，形状为 [N, 4])，格式 [x_min, y_min, x_max, y_max]
    :param target_size: 目标大小 (宽度, 高度)
    :return: 调整大小后的图像和调整后的边界框
    """
    # 获取原始图像的宽度和高度
    orig_width, orig_height = image.size

    # 调整图像的大小
    image_resized = image.resize(target_size, Image.BILINEAR)

    # 计算缩放比例
    target_width, target_height = target_size
    scale_x = target_width / orig_width
    scale_y = target_height / orig_height

    # 调整边界框
    bboxes_resized = bboxes.clone()
    bboxes_resized[:, [0, 2]] *= scale_x  # x_min 和 x_max 按照水平比例缩放
    bboxes_resized[:, [1, 3]] *= scale_y  # y_min 和 y_max 按照垂直比例缩放

    return image_resized, bboxes_resized


def filter_invalid_boxes(boxes):
        """
        过滤无效的边界框（宽度或高度为零的框）
        """
        # 计算宽度和高度
        width = boxes[:, 2] - boxes[:, 0]
        height = boxes[:, 3] - boxes[:, 1]

        # 保留有效的框，宽度和高度大于0
        valid_boxes = boxes[(width > 0) & (height > 0) & (boxes[:, 1] > 0) & (boxes[:, 0] > 0)]

        # 如果没有有效框，返回一个空的框
        if len(valid_boxes) == 0:
            return np.empty((0, 4), dtype=np.float32)  # 空框

        return valid_boxes


def RandomHorizontalFlip(prob, images, targets):
    for image, target in zip(images, targets):
        if random.random() < prob:
            height, width = image.shape[-2:]
            image = image.flip(-1)
            bbox = target["boxes"]
            bbox[:, [0, 2]] = width - bbox[:, [2, 0]]
            target["boxes"] = bbox
    return images, targets

In [7]:
import os
import json
import torch
import torchvision
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection import FasterRCNN
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm

class CustomDataset(Dataset):
    def __init__(self, img_dir, json_file, transform=None):
        """
        :param img_dir: 存放图像的文件夹路径
        :param json_file: 包含标注信息的 JSON 文件路径
        :param transform: 需要应用于图像的变换（可选）
        """
        self.img_dir = img_dir
        self.transform = transform

        # 读取标注文件
        with open(json_file, 'r') as f:
            self.annotations = json.load(f)

        # 图像的ID列表
        self.img_ids = [anno["id"] for anno in self.annotations]

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        anno = self.annotations[idx]
        img_path = os.path.join(self.img_dir, anno["id"])
        img = Image.open(img_path).convert("RGB")

        # 处理目标信息
        boxes = []
        labels = []

        if anno["region"]:
            boxes = np.array(anno["region"], dtype=np.float32)
            boxes = torch.tensor(boxes, dtype=torch.float32)  # 转换为tensor
            labels = torch.tensor([1], dtype=torch.int64)  # 篡改类为类1

        else:
            boxes = torch.empty((0, 4), dtype=torch.float32)  # 空的2D tensor，形状为 [0, 4]
            boxes = torch.tensor(boxes, dtype=torch.float32)  # 转换为tensor
            labels = torch.tensor([0], dtype=torch.int64) # 未篡改类

        # 检查框的有效性
        if len(boxes) > 0:
            # 过滤无效的框（宽度和高度为零的框）
            boxes = self.filter_invalid_boxes(boxes)

        # 如果没有有效框，跳过该样本
        if len(boxes) == 0:
            boxes = torch.empty((0, 4), dtype=torch.float32)  # 空的2D tensor，形状为 [0, 4]
            boxes = torch.tensor(boxes, dtype=torch.float32)  # 转换为tensor
            labels = torch.tensor([0], dtype=torch.int64) # 未篡改类
            # return self.__getitem__((idx + 1) % len(self))  # 递归调用获取下一个有效样本
        
        img, boxes = resize_image_and_bboxes(img, boxes, (512, 512))
        
        target = {"boxes": boxes, "labels": labels}

        if self.transform:
            img = self.transform(img)

        return img, target

    def filter_invalid_boxes(self, boxes):
        """
        过滤无效的边界框（宽度或高度为零的框）
        """
        # 计算宽度和高度
        width = boxes[:, 2] - boxes[:, 0]
        height = boxes[:, 3] - boxes[:, 1]

        # 保留有效的框，宽度和高度大于0
        valid_boxes = boxes[(width > 0) & (height > 0) & (boxes[:, 1] > 0) & (boxes[:, 0] > 0)]

        # 如果没有有效框，返回一个空的框
        if len(valid_boxes) == 0:
            return np.empty((0, 4), dtype=np.float32)  # 空框

        return valid_boxes

# 定义测试集的数据集类（与训练集相同，但不需要标签）
class TestDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        """
        :param img_dir: 存放测试图像的文件夹路径
        :param transform: 需要应用于图像的变换（可选）
        """
        self.img_dir = img_dir
        self.transform = transform
        self.img_ids = os.listdir(img_dir)  # 获取所有图片的文件名
        self.original_sizes = {}

    def __len__(self):
        return len(self.img_ids)

    def __getitem__(self, idx):
        img_id = self.img_ids[idx]
        img_path = os.path.join(self.img_dir, img_id)
        img = Image.open(img_path).convert("RGB")
        self.original_sizes[img_id] = img.size

        if self.transform:
            img = self.transform(img)

        return img, img_id  # 只返回图像和图片ID

# 加载模型
def load_model(model, filepath):
    model.load_state_dict(torch.load(filepath))
    model.eval()  # 切换到评估模式
    print(f"模型已从 {filepath} 加载")
    return model

# 图像转换操作
transform = transforms.Compose([
    transforms.ToTensor(),  # 转换为Tensor
])

# 定义训练数据集和数据加载器
train_dataset = CustomDataset(img_dir="data/image/train", json_file="data/label_train.json", transform=transform)
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))

# 定义测试数据集和数据加载器
test_dataset = TestDataset(img_dir="data/image/val", transform=transform)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False)

# 加载预训练的Faster R-CNN模型
model = fasterrcnn_resnet50_fpn(pretrained=True)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes=2)

# model = fasterrcnn_mobilenet_v3_large_fpn(pretained=True)
# model = torchvision.models.detection.retinanet_resnet50_fpn_v2(pretrained=True)
# in_features = model.roi_heads.box_predictor.cls_score.in_features
# model.roi_heads.box_predictor = torchvision.models.detection.retinanet.FastRCNNPredictor(in_features, 2)

# 加载训练好的模型
# load_model(model, 'model/model.pth')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 定义优化器
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=1e-4, momentum=0.9, weight_decay=0.0005)

# 定义学习率调度器
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.7)


In [8]:

# 保存模型函数
def save_model(model, filepath):
    torch.save(model.state_dict(), filepath)
    print(f"模型已保存到 {filepath}")

# 定义训练函数
def train_model(model, dataloader, optimizer, lr_scheduler, num_epochs, save_path="model/model.pth"):
    model.train()

    for epoch in range(num_epochs):
        epoch_loss = 0
        for images, targets in tqdm(dataloader):
            images = [image.to(device) for image in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            images, targets = RandomHorizontalFlip(0.95, images, targets)

            # 前向传播
            loss_dict = model(images, targets)

            # 计算总损失
            losses = sum(loss for loss in loss_dict.values())
            epoch_loss += losses.item()

            # 反向传播
            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

        lr_scheduler.step()
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss / len(dataloader)}")
        # 每个 epoch 后保存模型
        save_model(model, save_path)

    # # 每个 epoch 后保存模型
    # save_model(model, save_path)

# 训练模型
train_model(model, train_loader, optimizer, lr_scheduler, num_epochs=10)


  boxes = torch.tensor(boxes, dtype=torch.float32)  # 转换为tensor
  boxes = torch.tensor(boxes, dtype=torch.float32)  # 转换为tensor
  2%|▏         | 107/6500 [00:29<29:17,  3.64it/s]


KeyboardInterrupt: 

In [None]:
# 进行推理并生成结果
def generate_predictions(model, dataloader, output_json_path):
    results = []

    with torch.no_grad():  # 禁用梯度计算
        model.eval()  # 切换到评估模式
        for images, img_ids in tqdm(dataloader):
            images = [image.to(device) for image in images]

            # 模型推理
            predictions = model(images)

            for i, img_id in enumerate(img_ids):
                prediction = predictions[i]

                # 获取预测框和标签
                boxes = prediction['boxes'].cpu().numpy()
                labels = prediction['labels'].cpu().numpy()
                scores = prediction['scores'].cpu().numpy()

                # 只保留标签为1的框，即篡改的区域，阈值可根据需要调整
                mask = labels == 1
                boxes = boxes[mask]
                scores = scores[mask]

                if len(boxes) > 0:
                    # 获取置信度最高的框的索引
                    best_idx = np.argmax(scores)
                    best_box = boxes[best_idx]
                    best_score = scores[best_idx]

                    # 将最高置信度的框添加到结果中
                    region = [best_box.tolist()]

                else:
                    region = []

                # 将结果添加到列表中
                results.append({"id": img_id, "region": region})

    # 将结果保存为JSON文件
    with open(output_json_path, 'w') as f:
        json.dump(results, f, indent=4)
    print(f"预测结果已保存到 {output_json_path}")

# 修改模型的分类头部分，将类别数改为2（篡改和未篡改）
load_model(model, 'model/model.pth')
generate_predictions(model, test_loader, "output/label_test.json")
