In [None]:
!pip install segmentation-models-pytorch
!pip install albumentations


Collecting segmentation-models-pytorch
  Downloading segmentation_models_pytorch-0.3.4-py3-none-any.whl.metadata (30 kB)
Collecting efficientnet-pytorch==0.7.1 (from segmentation-models-pytorch)
  Downloading efficientnet_pytorch-0.7.1.tar.gz (21 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pretrainedmodels==0.7.4 (from segmentation-models-pytorch)
  Downloading pretrainedmodels-0.7.4.tar.gz (58 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.8/58.8 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting timm==0.9.7 (from segmentation-models-pytorch)
  Downloading timm-0.9.7-py3-none-any.whl.metadata (58 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.8/58.8 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
Collecting munch (from pretrainedmodels==0.7.4->segmentation-models-pytorch)
  Downloading munch-4.0.0-py2.py3-none-any.whl.metadata (5.9 kB)
Downloading segm

In [None]:
!pip install segmentation-models-pytorch pycocotools




In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import segmentation_models_pytorch as smp
from pycocotools.coco import COCO
from torchvision import transforms
import matplotlib.pyplot as plt
from PIL import Image


In [None]:
import json
import os

def merge_coco_json(json_files, output_file):
    merged_data = {
        "images": [],
        "annotations": [],
        "categories": []
    }

    # 用于确保每个类别只添加一次
    category_ids = set()

    for json_file in json_files:
        with open(json_file, 'r') as f:
            coco_data = json.load(f)
            merged_data['images'].extend(coco_data['images'])
            merged_data['annotations'].extend(coco_data['annotations'])

            for category in coco_data['categories']:
                if category['id'] not in category_ids:
                    merged_data['categories'].append(category)
                    category_ids.add(category['id'])

    # 将合并后的数据写入到一个新的 JSON 文件中
    with open(output_file, 'w') as f:
        json.dump(merged_data, f)

# 示例用法
json_files = ['/content/drive/MyDrive/onlyvirus/labels/train/EPSON008 (0_0).json', '/content/drive/MyDrive/onlyvirus/labels/train/EPSON008 (0_1).json', '/content/drive/MyDrive/onlyvirus/labels/train/EPSON008 (1_0).json','/content/drive/MyDrive/onlyvirus/labels/train/EPSON008 (1_1).json','/content/drive/MyDrive/onlyvirus/labels/train/EPSON008 (2_0).json','/content/drive/MyDrive/onlyvirus/labels/train/EPSON008 (2_1).json']
merge_coco_json(json_files, 'only_merged_coco_annotations.json')



In [None]:
import os
import json
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset
from torchvision import transforms

class CocoSegmentationDataset(Dataset):
    def __init__(self, annotation_file, images_dir, transform=None):
        with open(annotation_file, 'r') as f:
            self.coco_data = json.load(f)

        self.images_dir = images_dir
        self.transform = transform
        self.image_ids = [image['id'] for image in self.coco_data['images']]
        self.masks = self.generate_masks()

    def generate_masks(self):
        masks = []
        for image in self.coco_data['images']:
            mask = np.zeros((image['height'], image['width']), dtype=np.uint8)
            for annotation in self.coco_data['annotations']:
                if annotation['image_id'] == image['id']:
                    cv2.fillPoly(mask, [np.array(annotation['segmentation'][0]).reshape(-1, 2).astype(np.int32)], 1)
            masks.append(mask)
        return masks

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        image_id = self.image_ids[idx]
        image_info = next(item for item in self.coco_data['images'] if item['id'] == image_id)
        img_path = os.path.join(self.images_dir, image_info['file_name'])

        image = cv2.imread(img_path)
        mask = self.masks[idx]

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        return image, mask
from torch.utils.data import DataLoader
import albumentations as A

# 定义数据增强
transform = A.Compose([
    A.Resize(256, 256),
    A.HorizontalFlip(),
    A.VerticalFlip(),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    A.pytorch.transforms.ToTensorV2()
])

# 加载数据集
train_dataset = CocoSegmentationDataset(
    annotation_file='merged_coco_annotations.json',
    images_dir='/content/drive/MyDrive/onlyvirus/images/train',
    transform=transform
)

val_dataset = CocoSegmentationDataset(
    annotation_file='/content/drive/MyDrive/virusdata/labels/val/EPSON008 (3_0).json',
    images_dir='/content/drive/MyDrive/virusdata/images/val',
    transform=transform
)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)

import segmentation_models_pytorch as smp
import torch.optim as optim

# 初始化模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = smp.DeepLabV3(encoder_name='resnet50', encoder_weights='imagenet', classes=1, activation='sigmoid')

model.to(device)



# 设置损失函数和优化器
criterion= torch.nn.BCEWithLogitsLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-5)  # 添加L2正则化


from sklearn.metrics import precision_score, recall_score, f1_score

# 定义函数来计算精确度、召回率和F1分数
def calculate_metrics(y_true, y_pred):
    # 将Tensor转换为NumPy数组并展平
    y_true_flat = y_true.view(-1).cpu().numpy()
    y_pred_flat = y_pred.view(-1).cpu().numpy()

    precision = precision_score(y_true_flat, y_pred_flat)
    recall = recall_score(y_true_flat, y_pred_flat)
    f1 = f1_score(y_true_flat, y_pred_flat)

    return precision, recall, f1
def visualize_results(image, mask, prediction):
    """可视化原图、真实掩模和预测掩模"""
    plt.figure(figsize=(12, 4))

    # 转换图像以便可视化
    image = image.permute(1, 2, 0).cpu().numpy()  # 从 (C, H, W) 转为 (H, W, C)
    if image.shape[-1] == 1:  # 如果是单通道图像，转换为灰度图
        image = image[..., 0]

    # 处理掩模和预测
    mask = mask.squeeze().cpu().numpy()  # 去掉多余的通道维度
    prediction = prediction.squeeze().cpu().numpy()  # 去掉多余的通道维度

    plt.subplot(1, 3, 1)
    plt.imshow(image, cmap='gray' if image.ndim == 2 else None)  # 如果是灰度图，使用 gray
    plt.title('Original Image')
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.imshow(mask, cmap='gray')
    plt.title('Ground Truth Mask')
    plt.axis('off')

    plt.subplot(1, 3, 3)
    plt.imshow(prediction, cmap='gray')
    plt.title('Predicted Mask')
    plt.axis('off')

    plt.show()

# 在验证阶段调用可视化函数


# 训练循环
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    for images, masks in train_loader:
        images = images.to(device)
        masks = masks.to(device).float()

        optimizer.zero_grad()
        outputs = model(images)

        loss = criterion(outputs, masks.unsqueeze(1))  # 对于二分类
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")

    # 验证阶段
    model.eval()
    with torch.no_grad():
        total_precision = 0
        total_recall = 0
        total_f1 = 0
        count = 0

        for val_images, val_masks in val_loader:
            val_images = val_images.to(device)
            val_masks = val_masks.to(device).float()

            val_outputs = model(val_images)
            val_predictions = (val_outputs > 0.5).float()  # 二值化输出

            # 计算精确度、召回率和F1分数
            precision = precision_score(val_masks.view(-1).cpu().numpy(), val_predictions.view(-1).cpu().numpy())
            recall = recall_score(val_masks.view(-1).cpu().numpy(), val_predictions.view(-1).cpu().numpy())
            f1 = f1_score(val_masks.view(-1).cpu().numpy(), val_predictions.view(-1).cpu().numpy())

            total_precision += precision
            total_recall += recall
            total_f1 += f1
            count += 1

            # 可视化结果
            # if count == 1:  # 只可视化一次
            #     visualize_results(val_images[0], val_masks[0], val_predictions[0])
        avg_precision = total_precision / count
        avg_recall = total_recall / count
        avg_f1 = total_f1 / count

        print(f"Validation - Precision: {avg_precision:.4f}, Recall: {avg_recall:.4f}, F1 Score: {avg_f1:.4f}")













Epoch [1/100], Loss: 0.7886
Validation - Precision: 0.1458, Recall: 0.8814, F1 Score: 0.2503
Epoch [2/100], Loss: 0.7167
Validation - Precision: 0.1524, Recall: 1.0000, F1 Score: 0.2645
Epoch [3/100], Loss: 0.7171
Validation - Precision: 0.2368, Recall: 0.8424, F1 Score: 0.3697
Epoch [4/100], Loss: 0.7224
Validation - Precision: 0.2083, Recall: 0.6163, F1 Score: 0.3114
Epoch [5/100], Loss: 0.7078
Validation - Precision: 0.2303, Recall: 0.4272, F1 Score: 0.2993
Epoch [6/100], Loss: 0.8079
Validation - Precision: 0.1600, Recall: 0.1328, F1 Score: 0.1451
Epoch [7/100], Loss: 0.7807
Validation - Precision: 0.2760, Recall: 0.4712, F1 Score: 0.3481
Epoch [8/100], Loss: 0.7733
Validation - Precision: 0.4300, Recall: 0.4638, F1 Score: 0.4463
Epoch [9/100], Loss: 0.7699
Validation - Precision: 0.0000, Recall: 0.0000, F1 Score: 0.0000


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Epoch [10/100], Loss: 0.7494
Validation - Precision: 0.4938, Recall: 0.2803, F1 Score: 0.3576
Epoch [11/100], Loss: 0.6652
Validation - Precision: 0.0000, Recall: 0.0000, F1 Score: 0.0000
Epoch [12/100], Loss: 0.7328
Validation - Precision: 0.0000, Recall: 0.0000, F1 Score: 0.0000


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Epoch [13/100], Loss: 0.6446
Validation - Precision: 0.7620, Recall: 0.0564, F1 Score: 0.1050
Epoch [14/100], Loss: 0.6497
Validation - Precision: 0.5824, Recall: 0.4880, F1 Score: 0.5310
Epoch [15/100], Loss: 0.7079
Validation - Precision: 0.5722, Recall: 0.4397, F1 Score: 0.4973
Epoch [16/100], Loss: 0.6287
Validation - Precision: 0.6226, Recall: 0.8167, F1 Score: 0.7066
Epoch [17/100], Loss: 0.7046
Validation - Precision: 0.5404, Recall: 0.7961, F1 Score: 0.6438
Epoch [18/100], Loss: 0.6912
Validation - Precision: 0.5339, Recall: 0.8344, F1 Score: 0.6512
Epoch [19/100], Loss: 0.7052
Validation - Precision: 0.5936, Recall: 0.9316, F1 Score: 0.7252
Epoch [20/100], Loss: 0.6770
Validation - Precision: 0.6280, Recall: 0.8864, F1 Score: 0.7352
Epoch [21/100], Loss: 0.6953
Validation - Precision: 0.5414, Recall: 0.5438, F1 Score: 0.5426
Epoch [22/100], Loss: 0.6744
Validation - Precision: 0.7889, Recall: 0.3261, F1 Score: 0.4615
Epoch [23/100], Loss: 0.6702
Validation - Precision: 0.5273,

In [None]:
from sklearn.metrics import accuracy_score, jaccard_score, f1_score

model.eval()  # 设置模型为评估模式
all_preds = []
all_targets = []

with torch.no_grad():  # 禁用梯度计算
    for images, targets in test_loader:
        images = images.to(device)
        targets = targets.to(device)

        # 前向传播
        outputs = model(images)
        preds = (torch.sigmoid(outputs) > 0.5).float()  # 将输出转换为二进制掩码

        all_preds.append(preds.cpu().numpy())
        all_targets.append(targets.cpu().numpy())

# 将所有结果拼接
all_preds = np.concatenate(all_preds)
all_targets = np.concatenate(all_targets)

# 计算性能指标
accuracy = accuracy_score(all_targets.flatten(), all_preds.flatten())
jaccard = jaccard_score(all_targets.flatten(), all_preds.flatten())
dice_coefficient = f1_score(all_targets.flatten(), all_preds.flatten())

print(f'Accuracy: {accuracy:.4f}')
print(f'Jaccard Index: {jaccard:.4f}')
print(f'Dice Coefficient: {dice_coefficient:.4f}')


In [None]:
!pip install scikit-learn

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.linear_model import LogisticRegression
import numpy as np

# 定义简单的FCN网络
class SimpleFCN(nn.Module):
    def __init__(self):
        super(SimpleFCN, self).__init__()
        # 第一层卷积，输入尺寸为 (128, 128, 1)，输出尺寸为 (122, 122, 64)
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=7, padding=1)
        # 第二层卷积，输入尺寸为 (122, 122, 64)，输出尺寸为 (120, 120, 16)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=16, kernel_size=3, padding=1)

    def forward(self, x):
        x = F.relu(self.conv1(x))  # 应用 ReLU 激活函数
        x = F.relu(self.conv2(x))
        return x  # 输出形状为 (batch_size, 16, 120, 120)

# 初始化模型
model = SimpleFCN()




In [None]:
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import cv2

# 自定义数据集类
class CustomDataset(Dataset):
    def __init__(self, img_paths, labels, transform=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        image = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)  # 读取灰度图
        label = self.labels[idx]  # 对应的标签
        if self.transform:
            image = self.transform(image)
        return image, label

# 定义图像变换
transform = transforms.Compose([
    transforms.ToTensor(),  # 转换为Tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # 标准化
])

# 假设我们有训练集图像路径和标签列表
train_img_paths = ['path_to_img1', 'path_to_img2', ...]  # 训练集图像路径
train_labels = ['path_to_mask1', 'path_to_mask2', ...]   # 训练集标签

# 创建训练数据集和数据加载器
train_dataset = CustomDataset(train_img_paths, train_labels, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)


In [None]:
import os
import json

def merge_json_files(json_dir):
    merged_data = {"images": [], "annotations": [], "categories": []}

    for filename in os.listdir(json_dir):
        if filename.endswith('.json'):
            with open(os.path.join(json_dir, filename), 'r') as f:
                data = json.load(f)

                # 仅添加唯一类别（假设所有文件的类别相同）
                for category in data['categories']:
                    if category not in merged_data['categories']:
                        merged_data['categories'].append(category)

                merged_data['images'].extend(data['images'])
                merged_data['annotations'].extend(data['annotations'])

    return merged_data

# 设置 JSON 文件夹路径
json_directory = '/content/drive/MyDrive/fcnvirus/labels/'  # 修改为您的路径
merged_json = merge_json_files(json_directory)

# 将合并后的数据写入新的 JSON 文件
with open('merged_annotations.json', 'w') as outfile:
    json.dump(merged_json, outfile)


In [None]:
import os
import json
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image, ImageDraw
import torch.nn as nn
import torch.nn.functional as F
class COCOVirusDataset(Dataset):
    def __init__(self, image_dir, json_file, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.images = []
        self.annotations = []

        # 加载单个 JSON 文件中的数据
        if os.path.isfile(json_file):
            with open(json_file, 'r') as f:
                data = json.load(f)
                self.images = data['images']
                self.annotations = data['annotations']
        else:
            raise FileNotFoundError(f"无法找到指定的 JSON 文件: {json_file}")

    def __len__(self):
        return len(self.images)

    def fill_mask(self, mask, segmentation):
        draw = ImageDraw.Draw(mask)
        if isinstance(segmentation, list):
            for segment in segmentation:
                polygon = [(segment[i], segment[i + 1]) for i in range(0, len(segment), 2)]
                draw.polygon(polygon, outline=1, fill=1)
        return mask

    def __getitem__(self, idx):
        image_info = self.images[idx]
        img_path = os.path.join(self.image_dir, image_info['file_name'])

        if not os.path.exists(img_path):
            raise FileNotFoundError(f"图像文件 {img_path} 未找到")

        image = Image.open(img_path).convert("L")

        # 创建一个空白的掩码
        mask = Image.new("L", (image_info['width'], image_info['height']), 0)

        for ann in self.annotations:
            if ann['image_id'] == image_info['id']:
                segmentation = ann.get('segmentation', [])
                if segmentation:
                    mask = self.fill_mask(mask, segmentation)

        if self.transform:
            image = self.transform(image)
            mask = self.transform(mask)

        # 确保掩码的形状是 [1, height, width]
        mask = mask.squeeze(0)  # 去掉不必要的维度
        return image, mask.unsqueeze(0)





image_dir = '/content/drive/MyDrive/onlyvirus35/images'
json_files ='/content/drive/MyDrive/onlyvirus35/json/onlyvirus-2.json'
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # 将图像和掩码调整为相同大小
    transforms.ToTensor()  # 转换为张量
])

dataset = COCOVirusDataset(image_dir, json_files, transform)
train_size = int(0.6 * len(dataset))  # 80% 作为训练集
val_size = len(dataset) - train_size  # 20% 作为验证集
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)

from torch.utils.data import DataLoader

data_loader = DataLoader(dataset, batch_size=4, shuffle=True)


# 定义简单 FCN 模型（与之前相同）
class SimpleFCN(nn.Module):
    def __init__(self):
        super(SimpleFCN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=7, padding=3)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=16, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(in_channels=16, out_channels=1, kernel_size=1)  # 输出通道为1

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.conv3(x)
          # 输出形状为 [batch_size, 1, height, width]
         # 去掉通道维度，输出形状为 [batch_size, height, width]

        return torch.sigmoid(x)  # 使用sigmoid函数生成概率图

# 训练模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SimpleFCN().to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.RMSprop(model.parameters(), lr=0.001)


import torch
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import torch
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

from sklearn.metrics import precision_score, recall_score

def calculate_metrics(predictions, targets):
    """
    计算精度和召回率
    predictions: 模型的输出（概率）
    targets: 真实标签
    """
    preds = (predictions > 0.5).float()  # 将概率转换为二进制标签
    targets = targets.cpu().numpy()
    preds = preds.cpu().numpy()

    precision = precision_score(targets.flatten(), preds.flatten(), average='binary')
    recall = recall_score(targets.flatten(), preds.flatten(), average='binary')

    return precision, recall

def train_model(model, train_loader, val_loader, optimizer, criterion, num_epochs=10, device='cuda'):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        # 存储所有的真实标签和预测结果
        all_targets = []
        all_predictions = []

        for images, masks in train_loader:
            images, masks = images.to(device), masks.to(device)
            optimizer.zero_grad()
            outputs = model(images)

            # 确保输出和掩码的形状一致
            if outputs.size() != masks.size():
                continue  # 跳过形状不匹配的批次

            # 计算损失
            loss = criterion(outputs, masks)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            # 保存真实标签和预测结果
            all_targets.append(masks)
            all_predictions.append(outputs)

        avg_loss = running_loss / len(train_loader)

        # 确保至少有一个批次的输出被保存
        if len(all_targets) == 0:
            print(f"Warning: No targets collected during training epoch {epoch + 1}.")
            continue

        # 将所有的真实标签和预测结果拼接在一起
        all_targets = torch.cat(all_targets, dim=0)
        all_predictions = torch.cat(all_predictions, dim=0)

        # 计算训练集的精度和召回率
        train_precision, train_recall = calculate_metrics(all_predictions, all_targets)

        # 验证模型
        model.eval()
        val_loss = 0.0
        val_targets = []
        val_predictions = []

        with torch.no_grad():
            for images, masks in val_loader:
                images, masks = images.to(device), masks.to(device)

                outputs = model(images)

                # 确保输出和掩码的形状一致
                if outputs.size() != masks.size():
                    continue  # 跳过形状不匹配的批次

                val_loss += criterion(outputs, masks).item()

                # 保存真实标签和预测结果
                val_targets.append(masks)
                val_predictions.append(outputs)

        avg_val_loss = val_loss / len(val_loader)

        # 确保至少有一个批次的输出被保存
        if len(val_targets) == 0:
            print(f"Warning: No targets collected during validation epoch {epoch + 1}.")
            continue

        # 拼接所有的验证集真实标签和预测结果
        val_targets = torch.cat(val_targets, dim=0)
        val_predictions = torch.cat(val_predictions, dim=0)

        # 计算验证集的精度和召回率
        val_precision, val_recall = calculate_metrics(val_predictions, val_targets)

        print(f"Epoch [{epoch + 1}/{num_epochs}], Train Loss: {avg_loss:.4f}, "
              f"Val Loss: {avg_val_loss:.4f}, "
              f"Train Precision: {train_precision:.4f}, Train Recall: {train_recall:.4f}, "
              f"Val Precision: {val_precision:.4f}, Val Recall: {val_recall:.4f}")

# 开始训练
train_model(model, train_loader, val_loader, optimizer, criterion, num_epochs=100, device=device)




# 开始训练




# 使用示例














ValueError: Classification metrics can't handle a mix of continuous and binary targets

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import os
import cv2
from pycocotools.coco import COCO
from keras.models import Sequential
from keras.layers import Conv2D
from keras.optimizers import RMSprop
from tensorflow.keras import layers, models
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import TensorBoard
import numpy as np
import random
import matplotlib.pyplot as plt
# 创建 TensorBoard 回调
tensorboard_callback = TensorBoard(log_dir="logs")
# COCO 数据集路径
coco_annotation_file = '/content/drive/MyDrive/onlyvirus35/json/onlyvirus-2.json'  # 注释文件路径
images_dir = '/content/drive/MyDrive/onlyvirus35/images'  # 图像文件夹路径

# 初始化 COCO API
coco = COCO(coco_annotation_file)

# 获取图像 ID 列表
image_ids = coco.getImgIds()

# 预处理数据
# def load_data(image_ids):
#     X = []  # 用于存储图像
#     y = []  # 用于存储掩膜图
#     for img_id in image_ids:
#         # 加载图像
#         img_info = coco.imgs[img_id]
#         img = cv2.imread(os.path.join(images_dir, img_info['file_name']))
#         img = cv2.resize(img, (128, 128))  # 调整图像大小
#         img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)  # 转为灰度图
#         X.append(img)

#         # 创建掩膜图（基于标注）
#         mask = np.zeros((128, 128), dtype=np.uint8)  # 创建一个空的概率图
#         ann_ids = coco.getAnnIds(imgIds=img_id)
#         anns = coco.loadAnns(ann_ids)

#         # 将分割区域填入掩膜
#         for ann in anns:
#             if ann['area'] > 0:  # 只处理有效的标注
#                 segmentation = coco.annToMask(ann)  # 将标注转换为掩膜
#                 # 将分割掩膜调整到合适的大小
#                 segmentation_resized = cv2.resize(segmentation, (128, 128), interpolation=cv2.INTER_NEAREST)  # 将分割掩膜调整为 128×128
#                 # 合并掩膜
#                 mask = np.maximum(mask, segmentation_resized)


#                 # 对每张图像生成多次裁剪
#         for _ in range(num_crops_per_image):
#             # 随机生成裁剪的高度和宽度
#             crop_size = np.random.randint(min_crop_size, max_crop_size + 1)

#             # 随机生成裁剪的起始坐标
#             x = np.random.randint(0, 128 - crop_size + 1)
#             y_coord = np.random.randint(0, 128 - crop_size + 1)

#             # 裁剪图像和掩膜
#             cropped_img = img[y_coord:y_coord + crop_size, x:x + crop_size]
#             cropped_mask = mask[y_coord:y_coord + crop_size, x:x + crop_size]

#             # 调整裁剪后的图像和掩膜为固定大小（例如128x128）
#             cropped_img_resized = cv2.resize(cropped_img, (128, 128), interpolation=cv2.INTER_LINEAR)
#             cropped_mask_resized = cv2.resize(cropped_mask, (128, 128), interpolation=cv2.INTER_NEAREST)

#             # 将生成的裁剪图像和掩膜添加到列表中
#             X.append(cropped_img_resized)
#             y.append(cropped_mask_resized)

#     return np.array(X), np.array(y)
# 预处理数据
# def random_crop(image, mask, min_size=64, max_size=128):
#     """
#     随机裁剪图像和掩膜
#     :param image: 输入图像
#     :param mask: 输入掩膜
#     :param min_size: 最小裁剪尺寸
#     :param max_size: 最大裁剪尺寸
#     :return: 裁剪后的图像和掩膜
#     """
#     h, w = image.shape[:2]

#     # 随机选择裁剪的宽和高
#     crop_height = np.random.randint(min_size, max_size)
#     crop_width = np.random.randint(min_size, max_size)

#     # 确保裁剪区域不超出原图像的边界
#     top = np.random.randint(0, h - crop_height + 1)
#     left = np.random.randint(0, w - crop_width + 1)

#     # 进行裁剪
#     cropped_img = image[top:top + crop_height, left:left + crop_width]
#     cropped_mask = mask[top:top + crop_height, left:left + crop_width]

#     return cropped_img, cropped_mask



def load_data(image_ids,target_size=(114, 114)):
    X = []  # 用于存储图像
    y = []  # 用于存储掩膜图
    for img_id in image_ids:
        # 加载图像
        img_info = coco.imgs[img_id]
        img_path = os.path.join(images_dir, img_info['file_name'])
        img = cv2.imread(img_path)

        if img is None:
            print(f"Warning: Image at {img_path} could not be loaded.")
            continue  # 如果图像加载失败，跳过该图像

        # 检查通道数，如果是彩色图像则转换为灰度图
        if img.ndim == 3 and img.shape[2] == 3:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)  # 转为灰度图
        img_resized = cv2.resize(img, target_size)
        X.append(img_resized)

        # 创建掩膜图
        mask = np.zeros(img.shape[:2], dtype=np.uint8)
        ann_ids = coco.getAnnIds(imgIds=img_id)
        anns = coco.loadAnns(ann_ids)
        for ann in anns:
            if ann['area'] > 0:
                segmentation = coco.annToMask(ann)
                mask = np.maximum(mask, segmentation)
                # 调整掩膜大小
        mask_resized = cv2.resize(mask, target_size)
        y.append(mask_resized)

    X_array = np.array(X)
    y_array = np.array(y)
    print(f"Loaded{len(X)} images and {len(y)} masks")


    return X,y




X, y = load_data(image_ids)




print(f"X length: {len(X)}, y length: {len(y)}")


X, y = load_data(image_ids)
assert len(X) == len(y), "X and y must have the same length."


import random

# 假设你有一个列表 image_ids，包含所有图像的 ID
random.seed(42)  # 设置随机种子以确保结果可重复
all_image_ids = list(image_ids)  # 将 image_ids 转换为列表

# 随机选择 18 张作为训练集
train_image_ids = random.sample(all_image_ids, 18)

# 从剩余图像中选择 17 张作为测试集
remaining_ids = list(set(all_image_ids) - set(train_image_ids))  # 获取剩余图像的 ID
test_image_ids = random.sample(remaining_ids, 17)

# 确保选择后的 ID 数量正确
assert len(train_image_ids) == 18, "训练集应该有 18 张图片"
assert len(test_image_ids) == 17, "测试集应该有 17 张图片"
# 加载训练数据
X_train, y_train = load_data(train_image_ids)

# 加载测试数据
X_test, y_test = load_data(test_image_ids)

# 确保训练集和测试集的长度一致
assert len(X_train) == len(y_train), "训练集的 X 和 y 长度必须一致"
assert len(X_test) == len(y_test), "测试集的 X 和 y 长度必须一致"
# 训练模型


import tensorflow as tf
from tensorflow.keras import layers, models

def create_model(input_shape):
    model = models.Sequential()

    model.add(layers.Input(shape=input_shape))

    # 第一个卷积层，使用7x7滤波器，输出122x122
    model.add(layers.Conv2D(64, (7, 7), padding='same'))  # 128x128 -> 128x128

    # 使用边缘填充，使输出大小为122x122
    model.add(layers.Lambda(lambda x: tf.image.resize(x, (122, 122))))

    # 第二个卷积层，使用3x3滤波器，输出120x120
    model.add(layers.Conv2D(16, (3, 3), padding='same'))  # 122x122 -> 122x122

    # 使用边缘填充，使输出大小为120x120
    model.add(layers.Lambda(lambda x: tf.image.resize(x, (120, 120))))

    # 第三个卷积层，使用7x7滤波器，输出114x114
    model.add(layers.Conv2D(1, (7, 7), padding='same', activation='sigmoid'))  # 120x120 -> 120x120

    # 使用边缘填充，使输出大小为114x114
    model.add(layers.Lambda(lambda x: tf.image.resize(x, (114, 114))))

    model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate=0.001, rho=0.9, epsilon=1e-8),
                  loss='binary_crossentropy',
                  metrics=['accuracy',
                            tf.keras.metrics.Precision(),
                            tf.keras.metrics.Recall(),
                            tf.keras.metrics.AUC()])

    return model

# 假设输入图像的形状为 (128, 128, 1)
input_shape = (128, 128, 1)
model = create_model(input_shape)

# 确保掩膜的形状符合模型输入要求
X_train = np.expand_dims(X_train, axis=-1)  # (num_samples, 128, 128, 1)
y_train = np.expand_dims(y_train, axis=-1)  # (num_samples, 128, 128, 1)
X_test = np.expand_dims(X_test, axis=-1)
y_test = np.expand_dims(y_test, axis=-1)




# 编译模型
input_shape = (128, 128, 1)  # 输入图像的形状
model = create_model(input_shape)
model.fit(X_train, y_train, batch_size=8, epochs=100, validation_data=(X_test, y_test), callbacks=[tensorboard_callback])
model.summary()


# 训练模型
# 评估模型


results = model.evaluate(X_test, y_test)

# 解包所有返回的值
loss = results[0]
accuracy = results[1]
precision = results[2]
recall = results[3]
auc_score = results[4]

# 输出评估结果
print(f'Loss: {loss:.4f}, Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, AUC: {auc_score:.4f}')

from sklearn.metrics import classification_report

# 进行预测
y_pred = model.predict(X_test)
y_pred_classes = (y_pred > 0.5).astype(int)  # 将概率转换为二进制标签
y_true = y_test.flatten()      # 将真实标签展平
y_pred_classes = y_pred_classes.flatten()  # 将预测标签展平

# 打印分类报告
print(classification_report(y_true, y_pred_classes, target_names=['Background', 'Foreground']))
# # 打印分类报告
# print(classification_report(y_test.flatten(), y_pred_classes.flatten(), target_names=['Background', 'Foreground']))



loading annotations into memory...
Done (t=0.01s)
creating index...
index created!
Loaded35 images and 35 masks
X length: 35, y length: 35
Loaded35 images and 35 masks
Loaded18 images and 18 masks
Loaded17 images and 17 masks
Epoch 1/100
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 4s/step - accuracy: 0.6703 - auc_11: 0.6205 - loss: 4.6830 - precision_11: 0.1564 - recall_11: 0.5621 - val_accuracy: 0.9462 - val_auc_11: 0.4994 - val_loss: 0.8646 - val_precision_11: 0.0000e+00 - val_recall_11: 0.0000e+00
Epoch 2/100
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 47ms/step - accuracy: 0.8814 - auc_11: 0.4978 - loss: 1.9059 - precision_11: 0.0000e+00 - recall_11: 0.0000e+00 - val_accuracy: 0.9463 - val_auc_11: 0.4996 - val_loss: 0.8644 - val_precision_11: 0.0000e+00 - val_recall_11: 0.0000e+00
Epoch 3/100
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46ms/step - accuracy: 0.8704 - auc_11: 0.4988 - loss: 2.0867 - precision_11: 0.0000e+00 - rec

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step - accuracy: 0.9464 - auc_11: 0.5000 - loss: 0.8642 - precision_11: 0.0000e+00 - recall_11: 0.0000e+00
Loss: 0.8642, Accuracy: 0.9464, Precision: 0.0000, Recall: 0.0000, AUC: 0.5000
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 239ms/step
              precision    recall  f1-score   support

  Background       0.95      1.00      0.97    209086
  Foreground       0.00      0.00      0.00     11846

    accuracy                           0.95    220932
   macro avg       0.47      0.50      0.49    220932
weighted avg       0.90      0.95      0.92    220932



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [None]:
# 使用逻辑回归进行像素分类
def logistic_regression_per_pixel(features, labels):
    # 展平特征和标签以用于逻辑回归
    features = features.reshape(-1, features.shape[-1])  # (num_pixels, num_features)
    labels = labels.flatten()  # (num_pixels,)

    # 使用逻辑回归进行训练
    log_reg = LogisticRegression(max_iter=100)
    log_reg.fit(features, labels)

    return log_reg

# 模型训练流程
def train_model(model, train_loader):
    model.eval()  # 模型进入评估模式
    for images, masks in train_loader:
        features = model(images)  # 提取特征
        features = features.permute(0, 2, 3, 1)  # 转换为 (batch_size, height, width, num_features)

        # 对每个图像的每个像素应用逻辑回归
        for i in range(features.shape[0]):
            log_reg = logistic_regression_per_pixel(features[i].detach().numpy(), masks[i].numpy())

            # 可以进一步使用log_reg来预测和评估


In [None]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def predict_pixel_probabilities(log_reg, features):
    # 使用逻辑回归计算后验概率
    features = features.reshape(-1, features.shape[-1])
    probabilities = sigmoid(log_reg.decision_function(features))
    return probabilities.reshape(120, 120)  # 恢复为原始形状


In [None]:
import cv2
from skimage.morphology import remove_small_objects

def post_process(prob_map, threshold=0.5, min_size=2.5):
    # 阈值化
    binary_map = prob_map > threshold

    # 移除小于2.5个像素的小区域
    processed_map = remove_small_objects(binary_map, min_size=min_size)

    return processed_map

# 例子：对一个概率图进行后处理
prob_map = np.random.rand(120, 120)  # 假设生成了一个概率图
processed_map = post_process(prob_map)


In [None]:
from sklearn.metrics import accuracy_score, roc_auc_score

def evaluate_model(model, log_reg, val_loader):
    model.eval()
    all_preds = []
    all_labels = []

    for images, masks in val_loader:
        features = model(images)
        features = features.permute(0, 2, 3, 1)

        for i in range(features.shape[0]):
            probs = predict_pixel_probabilities(log_reg, features[i].detach().numpy())
            preds = probs > 0.5
            all_preds.append(preds.flatten())
            all_labels.append(masks[i].flatten())

    accuracy = accuracy_score(np.concatenate(all_labels), np.concatenate(all_preds))
    auc = roc_auc_score(np.concatenate(all_labels), np.concatenate(all_preds))

    print(f'Accuracy: {accuracy}, AUC: {auc}')
