In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import cv2
from torch.utils.data import DataLoader, Dataset
import numpy as np
import os
import torchvision.transforms as transforms
import pandas as pd

In [2]:
class VideoDataset(Dataset):
    def __init__(self, csv_file, root_dir,output_folder):
        self.csv_file = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.file_suffix = '.avi'
        self.video_files = os.listdir(self.root_dir)
        # 读取 root_dir 下的所有视频文件，获取他们的文件名
        self.video_files_name = [i.split('.')[0] for i in self.video_files]
        # 将获得的文件名到 csv 文件中查找对应的标签，csv 文件中的 FileName 是文件名，health 是标签
        self.labels = []
        self.video_useful_name = []
        for i in self.video_files_name:
            self.labels.append(self.csv_file.loc[self.csv_file['FileName'] == i, ' health'].values[0])
            self.video_useful_name.append(i)
        #将self.video_useful_name里面所有的文件都添加上.avi后缀
        self.video_files = [i + self.file_suffix for i in self.video_useful_name]
        self.labels = np.array(self.labels)
        self.output_folder = output_folder
    
    def get_frames(self):
        frames = []
        # 遍历所有视频文件
        for video_file in self.video_files:
            video_path = os.path.join(self.root_dir, video_file)
            video_output_dir = os.path.join(self.output_folder, video_file.split('.')[0])
            cap = cv2.VideoCapture(video_path)
            frame_count = 0
            extracted_frames = []
            ventricular_sizes = []
            while cap.isOpened():
            # 获取视频的第一帧和第二帧，以及视频的宽和高
                ret, frame = cap.read()
                if not ret:
                    break
                frame_path = os.path.join(video_output_dir, f'frame_{frame_count:04d}.png')
                cv2.imwrite(frame_path, frame)
                extracted_frames.append(frame)

                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

                blurred = cv2.GaussianBlur(gray, (5, 5), 0)

                _, thresh = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

                contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

                largest_contour = max(contours, key=cv2.contourArea) if contours else None

                if largest_contour is not None:
                    ventricular_area = cv2.contourArea(largest_contour)
                else:
                    ventricular_area = 0

                ventricular_sizes.append(ventricular_area)
                frame_count += 1
            cap.release()   
            diastole_frame_index = np.argmax(ventricular_sizes)
            systole_frame_index = np.argmin(ventricular_sizes)
            # print(diastole_frame_index)
            # print(systole_frame_index)
            # # 将 frame1 和 frame2 合并成一个两通道的图片，他们本身就是灰度图
            # print(extracted_frames[diastole_frame_index].shape)
            # print(extracted_frames[systole_frame_index].shape)
            frame = np.concatenate([extracted_frames[diastole_frame_index],extracted_frames[systole_frame_index]], axis=2)
            frame = frame.transpose(2, 0, 1)
            frames.append(frame)
        frames = torch.tensor(frames)
        labels = torch.tensor(self.labels).float()
        print(frames.shape)
        print(labels)
        return frames, labels
testdataset = VideoDataset('/home/curry/code/curry_code_summay/AI/for_dc/FileList_new.csv', '/home/curry/code/视频划分/TEST','/home/curry/code/视频划分/output_folder')
test_frames,test_labels = testdataset.get_frames()
traindataset = VideoDataset('/home/curry/code/curry_code_summay/AI/for_dc/FileList_new.csv', '/home/curry/code/视频划分/TRAIN','/home/curry/code/视频划分/output_folder')
train_frames,train_labels = traindataset.get_frames()
valdataset = VideoDataset('/home/curry/code/curry_code_summay/AI/for_dc/FileList_new.csv', '/home/curry/code/视频划分/VAL','/home/curry/code/视频划分/output_folder')
val_frames,val_labels = valdataset.get_frames()

#将他们转换成 pytorch 的 tensor，作为神经网络的输入
# frames = torch.tensor(frames)
# labels = torch.tensor(labels)

  frames = torch.tensor(frames)


torch.Size([1277, 6, 112, 112])
tensor([1., 1., 0.,  ..., 0., 1., 0.])
torch.Size([7464, 6, 112, 112])
tensor([1., 1., 0.,  ..., 0., 0., 1.])
torch.Size([1288, 6, 112, 112])
tensor([1., 1., 1.,  ..., 1., 1., 1.])


In [3]:
from torch.utils.data import TensorDataset
#对trainframe进行数据增强,对testframe不进行数据增强
# train_transforms = transforms.Compose([
#     transforms.RandomHorizontalFlip(),
#     transforms.RandomVerticalFlip(),
#     transforms.RandomRotation(30),
# ])
# train_frames = train_transforms(train_frames)

# 创建 TensorDataset
train_dataset = TensorDataset(train_frames, train_labels)
test_dataset = TensorDataset(test_frames, test_labels)
val_dataset = TensorDataset(val_frames, val_labels)

# 创建 DataLoader
batch_size = 32  # 你可以根据需要调整批次大小
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 检查 DataLoader 中的数据
for i, (inputs, labels) in enumerate(train_loader):
    print(f"Batch {i}: inputs shape = {inputs.shape}, labels shape = {labels.shape}")
    break  # 仅检查第一个批次

Batch 0: inputs shape = torch.Size([32, 6, 112, 112]), labels shape = torch.Size([32])


In [4]:
#定义残差块
class ResidualBlock(nn.Module):
    def __init__(self, inchannel, outchannel, stride=1):
        super(ResidualBlock, self).__init__()
        self.left = nn.Sequential(
            nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(outchannel),
            nn.ReLU(inplace=True),
            nn.Conv2d(outchannel, outchannel, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(outchannel)
        )
        self.shortcut = nn.Sequential()
        if stride != 1 or inchannel != outchannel:
            self.shortcut = nn.Sequential(
                nn.Conv2d(inchannel, outchannel, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(outchannel)
            )
    def forward(self, x):
        out = self.left(x)
        out += self.shortcut(x)
        out = F.relu(out)
        return out

In [5]:
#定义ResNet18模型
class ResNet(nn.Module):
    def __init__(self, ResidualBlock, num_classes=1000):
        super(ResNet, self).__init__()
        self.inchannel = 64
        self.conv1 = nn.Sequential(
            nn.Conv2d(6, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )
        self.layer1 = self.make_layer(ResidualBlock, 64, 2, stride=1)
        self.layer2 = self.make_layer(ResidualBlock, 128, 2, stride=2)
        self.layer3 = self.make_layer(ResidualBlock, 256, 2, stride=2)
        self.layer4 = self.make_layer(ResidualBlock, 512, 2, stride=2)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(25088, num_classes)
    def make_layer(self, block, channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.inchannel, channels, stride))
            self.inchannel = channels
        return nn.Sequential(*layers)
    def forward(self, x):
        out = self.conv1(x)
        #print(out.shape)
        out = self.layer1(out)
        #print(out.shape)
        out = self.layer2(out)
        #print(out.shape)
        out = self.layer3(out)
        #print(out.shape)
        out = self.layer4(out)
        #print(out.shape)
        out = F.avg_pool2d(out, 1)
        #print(out.shape)
        out = out.view(out.size(0), -1)
        # if self.training:
        #     out = self.dropout(out)
        #print(out.shape)
        out = self.fc(out)
        return out

In [6]:
import time
from sklearn.metrics import roc_auc_score,f1_score,mean_absolute_error
# 实例化模型
model = ResNet(ResidualBlock, 2)  # ResNet18
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 训练模型
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        inputs = inputs.float()
        labels = labels.long()
        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    

    #评估模型
    model.eval()
    all_probabilities = []
    all_labels = []
    all_predictions = []
    all_outputs = []
    correct = 0
    total = 0
    time_start = time.time()
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            inputs = inputs.float()
            labels = labels.long()
            outputs = model(inputs)
            # 计算 AUC
            probabilities = torch.softmax(outputs, dim=1)[:, 1].cpu().detach().numpy()
            all_probabilities.extend(probabilities)
            all_labels.extend(labels.cpu().detach().numpy())
            
            # 计算 Dice 相似系数
            predictions = torch.argmax(outputs, dim=1).cpu().detach().numpy()
            all_predictions.extend(predictions)
            
            # 计算 MAE
            all_outputs.extend(outputs.cpu().detach().numpy())
            
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    time_end = time.time()
    print(f"Time cost: {(time_end - time_start)/1277}ms")
    print(f"Accuracy of the model on the test images: {100 * correct / total:.2f}%")


    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")
    # 转换为 numpy 数组
    all_labels = np.array(all_labels)
    all_probabilities = np.array(all_probabilities)
    all_predictions = np.array(all_predictions)
    all_outputs = np.array(all_outputs)

    # 计算 AUC
    auc = roc_auc_score(all_labels, all_probabilities)
    print(f"AUC: {auc:.4f}")

    # 计算 Dice 相似系数
    dice = f1_score(all_labels, all_predictions, average='binary')
    print(f"Dice Similarity Coefficient: {dice:.4f}")

    # 计算 MAE
    # mae = mean_absolute_error(all_labels, all_outputs)
    # print(f"Mean Absolute Error: {mae:.4f}")

# 评估模型
# model.eval()
# correct = 0
# total = 0
# time_start = time.time()
# with torch.no_grad():
#     for inputs, labels in test_loader:
#         inputs, labels = inputs.to(device), labels.to(device)
#         inputs = inputs.float()
#         labels = labels.long()
#         outputs = model(inputs)
#         _, predicted = torch.max(outputs.data, 1)
#         total += labels.size(0)
#         correct += (predicted == labels).sum().item()

# time_end = time.time()
# print(f"Time cost: {time_end - time_start:.2f}s")
# print(f"Accuracy of the model on the test images: {100 * correct / total:.2f}%")

Time cost: 0.00022635575655203082ms
Accuracy of the model on the test images: 78.49%
Epoch [1/100], Loss: 0.9102
AUC: 0.6781
Dice Similarity Coefficient: 0.8748
Time cost: 0.00022739438032301124ms
Accuracy of the model on the test images: 79.58%
Epoch [2/100], Loss: 0.6120
AUC: 0.7244
Dice Similarity Coefficient: 0.8803
Time cost: 0.00022298148953550574ms
Accuracy of the model on the test images: 78.18%
Epoch [3/100], Loss: 0.4983
AUC: 0.7395
Dice Similarity Coefficient: 0.8651
Time cost: 0.00022178211525817728ms
Accuracy of the model on the test images: 80.67%
Epoch [4/100], Loss: 0.4712
AUC: 0.7428
Dice Similarity Coefficient: 0.8872
Time cost: 0.0002368459873304016ms
Accuracy of the model on the test images: 80.98%
Epoch [5/100], Loss: 0.4526
AUC: 0.7525
Dice Similarity Coefficient: 0.8873
Time cost: 0.0002222208651883955ms
Accuracy of the model on the test images: 80.51%
Epoch [6/100], Loss: 0.4345
AUC: 0.7608
Dice Similarity Coefficient: 0.8841
Time cost: 0.0002193734714923282ms
A