#專題資料結構
```
Project: DeepX
└─ original_dataset
   ├─ abnormal
   │   └─ abnormal_label
   │   └─ abnormal(.dcm)
   │   └─ abnormal(.jpg)
   ├─ normal
   │   └─ normal_label
   │   └─ normal(.dcm)
   │   └─ normal(.jpg)
└─ yolov8_custom
└─ processed_dataset
   ├─ abnormal
   ├─ normal
└─ all_data
└─ splitted_dataset
   ├─ test
   │   └─ abnormal
   │   └─ normal
   ├─ train
   │   └─ abnormal
   │   └─ normal
   ├─ train_argumentation
   │   └─ abnormal
   │   └─ normal
└─ tensor
   ├─ test
   ├─ train
└─ result
```

#連結Google drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
os.chdir('/content/drive/My Drive/Deep_X_torch') #切換目錄
os.listdir() #確認目錄內容

['SEResNet.py',
 'splitted_dataset',
 '__pycache__',
 'tensor',
 'result',
 'processed_dataset',
 'original_dataset',
 'yolov8_custom',
 'all_data',
 'Deep_X_torch_main.ipynb']

#將DCM檔轉成JPG檔

In [None]:
!pip install pydicom
!pip install opencv-python



In [None]:
from itertools import count
import pydicom
import cv2
import os

def dcm_to_png(folder_path,output_path,output_name):
    count = 1
    file_count = 0
    elbow_count = 0
    for file_name in os.listdir(folder_path):
        if file_name.endswith(".dcm"):
            file_count += 1
            try:
                ds = pydicom.dcmread(os.path.join(folder_path, file_name))
                # 檢索患者年齡
                patient_name = ds.PatientName
                uid = ds.SeriesInstanceUID
                patient_age = ds.PatientAge
                patient_part = ds.BodyPartExamined

                if patient_part == "ELBOW":
                  elbow_count += 1

                # 打印患者年齡
                # print("Filename:",file_name)
                # print("classified:",output_name)
                # print("Patient's Name:", patient_name)
                # print("UID:", uid)
                # print("Patient's Age:", patient_age)
                # print("Patient's body part:", patient_part)
                # print("\n\n")
            except pydicom.errors.InvalidDicomError:
                continue

            data = ds.pixel_array
            image = cv2.normalize(data, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)

            # calculate the rotation matrix
            rotation_matrix = cv2.getRotationMatrix2D((image.shape[1]/2, image.shape[0]/2), 0, 1.0)

            # apply the rotation matrix to the image
            rotated = cv2.warpAffine(image, rotation_matrix, (image.shape[1], image.shape[0]))

            # adjust the brightness of the rotated image
            bright_image = cv2.convertScaleAbs(rotated, alpha=1, beta=0)

            # create a new folder for each PNG image
            folder_name = os.path.join(os.getcwd(), output_path, output_name + "(.jpg)")
            os.makedirs(folder_name, exist_ok=True)

            # save the enhanced image as a PNG file inside the folder
            file_prefix = os.path.splitext(file_name)[0]
            cv2.imwrite(os.path.join(folder_name, f"{output_name}_{count}.jpg"), bright_image)
            count += 1

    print(output_name)
    print("File numbers:" + str(file_count))
    print("Elbow numbers: " + str(elbow_count))

# set the path to the folder containing the DICOM images
dcm_path_abnormal = '/content/drive/My Drive/Deep_X_torch/original_dataset/abnormal/abnormal(.dcm)'
output_path_abnormal = '/content/drive/My Drive/Deep_X_torch/original_dataset/abnormal'
dcm_path_normal = '/content/drive/My Drive/Deep_X_torch/original_dataset/normal/normal(.dcm)'
output_path_normal = '/content/drive/My Drive/Deep_X_torch/original_dataset/normal'

dcm_to_png(dcm_path_abnormal, output_path_abnormal, 'abnormal')
dcm_to_png(dcm_path_normal, output_path_normal, 'normal')

# dcm_path_test = '/content/drive/My Drive/Deep_X_torch/original_dataset/test/test(.dcm)'
# output_path_test = '/content/drive/My Drive/Deep_X_torch/original_dataset/test'

# dcm_to_png(dcm_path_test, output_path_test, 'test')

abnormal
File numbers:32
Elbow numbers: 32
normal
File numbers:16
Elbow numbers: 16


#資料前處理

In [None]:
import cv2
import os

def remove_noise(img, kernel_size=(9, 9)):
    """去除噪聲"""
    blurred = cv2.GaussianBlur(img, kernel_size, 0)
    return blurred

def gray_scale(img):
    """轉換為灰度影像"""
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return gray

def process_images(input_dir, output_dir):
    for filename in os.listdir(input_dir):
        if filename.endswith('.jpg'):
            input_path = os.path.join(input_dir, filename)
            if not os.path.exists(output_dir):
              os.makedirs(output_dir)
            output_path = os.path.join(output_dir, filename)

            img = cv2.imread(input_path)
            clahe = cv2.createCLAHE(clipLimit=3)
            gray_img = gray_scale(img)
            clahe_img = clahe.apply(gray_img)
            cv2.imwrite(output_path, clahe_img)

normal_input_dir = '/content/drive/My Drive/Deep_X_torch/original_dataset/normal/normal(.jpg)'
normal_output_dir = '/content/drive/My Drive/Deep_X_torch/processed_dataset/normal'

abnormal_input_dir = '/content/drive/My Drive/Deep_X_torch/original_dataset/abnormal/abnormal(.jpg)'
abnormal_output_dir = '/content/drive/My Drive/Deep_X_torch/processed_dataset/abnormal'

yolo_output_dir = '/content/drive/My Drive/Deep_X_torch/all_data'

process_images(normal_input_dir, normal_output_dir)
process_images(abnormal_input_dir, abnormal_output_dir)

process_images(normal_input_dir, yolo_output_dir)
process_images(abnormal_input_dir, yolo_output_dir)

#區分訓練集(train)和測試集(test)

In [None]:
#Split the raw dataset into train set, valid set and test set.
import os
import random
import shutil

TRAIN_SET_RATIO = 0.7
TEST_SET_RATIO = 0.3

class SplitDataset():
    def __init__(self, dataset_dir, saved_dataset_dir, train_ratio=TRAIN_SET_RATIO, test_ratio=TEST_SET_RATIO, show_progress=False):
        self.dataset_dir = '/content/drive/My Drive/Deep_X_torch/processed_dataset/'
        self.saved_dataset_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/'
        self.saved_train_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train/'
        self.saved_test_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/test/'

        self.train_ratio = train_ratio
        self.test_radio = test_ratio

        self.train_file_path = []
        self.test_file_path = []

        self.index_label_dict = {}

        self.show_progress = show_progress

        if not os.path.exists(self.saved_train_dir):
            os.mkdir(self.saved_train_dir)
        if not os.path.exists(self.saved_test_dir):
            os.mkdir(self.saved_test_dir)


    def __get_label_names(self):
        label_names = []
        for item in os.listdir(self.dataset_dir):
            item_path = os.path.join(self.dataset_dir, item)
            if os.path.isdir(item_path):
                label_names.append(item)
        return label_names

    def __get_all_file_path(self):
        all_file_path = []
        index = 0
        for file_type in self.__get_label_names():
            self.index_label_dict[index] = file_type
            index += 1
            type_file_path = os.path.join(self.dataset_dir, file_type)
            file_path = []
            for file in os.listdir(type_file_path):
                single_file_path = os.path.join(type_file_path, file)
                file_path.append(single_file_path)
            all_file_path.append(file_path)
        return all_file_path

    def __copy_files(self, type_path, type_saved_dir):
        for item in type_path:
            src_path_list = item[1]
            dst_path = type_saved_dir + "%s/" % (item[0])
            if not os.path.exists(dst_path):
                os.mkdir(dst_path)
            for src_path in src_path_list:
                shutil.copy(src_path, dst_path)
                if self.show_progress:
                    print("Copying file "+src_path+" to "+dst_path)

    def __split_dataset(self):
        all_file_paths = self.__get_all_file_path()
        for index in range(len(all_file_paths)):
            file_path_list = all_file_paths[index]
            file_path_list_length = len(file_path_list)
            random.shuffle(file_path_list)

            train_num = int(file_path_list_length * self.train_ratio)
            test_num = int(file_path_list_length * self.test_radio)
            test_num += 1

            self.train_file_path.append([self.index_label_dict[index], file_path_list[: train_num]])
            self.test_file_path.append([self.index_label_dict[index], file_path_list[train_num:train_num + test_num]])

    def start_splitting(self):
        self.__split_dataset()
        self.__copy_files(type_path=self.train_file_path, type_saved_dir=self.saved_train_dir)
        self.__copy_files(type_path=self.test_file_path, type_saved_dir=self.saved_test_dir)


if __name__ == '__main__':
    split_dataset = SplitDataset(dataset_dir='/content/drive/My Drive/Deep_X_torch/processed_dataset/',
                                 saved_dataset_dir='/content/drive/My Drive/Deep_X_torch/splitted_dataset/',
                                 show_progress=True)
    split_dataset.start_splitting()

Copying file /content/drive/My Drive/Deep_X_torch/processed_dataset/normal/normal_1.jpg to /content/drive/My Drive/Deep_X_torch/splitted_dataset/train/normal/
Copying file /content/drive/My Drive/Deep_X_torch/processed_dataset/normal/normal_5.jpg to /content/drive/My Drive/Deep_X_torch/splitted_dataset/train/normal/
Copying file /content/drive/My Drive/Deep_X_torch/processed_dataset/normal/normal_16.jpg to /content/drive/My Drive/Deep_X_torch/splitted_dataset/train/normal/
Copying file /content/drive/My Drive/Deep_X_torch/processed_dataset/normal/normal_4.jpg to /content/drive/My Drive/Deep_X_torch/splitted_dataset/train/normal/
Copying file /content/drive/My Drive/Deep_X_torch/processed_dataset/normal/normal_6.jpg to /content/drive/My Drive/Deep_X_torch/splitted_dataset/train/normal/
Copying file /content/drive/My Drive/Deep_X_torch/processed_dataset/normal/normal_11.jpg to /content/drive/My Drive/Deep_X_torch/splitted_dataset/train/normal/
Copying file /content/drive/My Drive/Deep_X_

#訓練集資料增強


In [None]:
!pip install imagecorruptions

Collecting imagecorruptions
  Downloading imagecorruptions-1.1.2-py3-none-any.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m22.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: imagecorruptions
Successfully installed imagecorruptions-1.1.2


In [None]:
import glob
import cv2
import imgaug.augmenters as iaa
import os
from tqdm import trange

def data_augmentation(input_path, output_path, yolo_output_path, times):
    # Define a set of image augmentation operations using imgaug
    sometimes = lambda aug: iaa.Sometimes(0.5, aug)
    seq = iaa.Sequential([
        iaa.SomeOf((0, 5), [
            iaa.Fliplr(0.5),  # 有 50% 的概率水平翻轉
            iaa.Flipud(0),  # 有 50% 的概率垂直翻轉
            iaa.Affine(rotate=(-20, 20)),  # 隨機旋轉圖像 -10 到 10 度
            iaa.OneOf([
                iaa.GaussianBlur((0, 1.5)),  # 高斯模糊，模糊程度在 0 到 1.5 之間
                iaa.AverageBlur(k=(2, 5)),   # 均值模糊，核的大小在 2 到 5 之間
                iaa.MedianBlur(k=(3, 7)),   # 中值模糊，核的大小在 3 到 8 之間
            ]),
            iaa.Sharpen(alpha=(0, 1.0), lightness=(0.75, 1.5)),  # 銳化，參數可調
            # iaa.Emboss(alpha=(0, 1.0), strength=(0, 0.5)),  # 浮雕效果，參數可調
            # iaa.Add((-5, 5), per_channel=0.5),  # 添加亮度，每通道亮度值在 -10 到 10 之間
            # iaa.Multiply((0.5, 1.5)),  # 乘以亮度因子，值在 0.5 到 1.5 之間
            iaa.contrast.LinearContrast((0.7, 1.2)),  # 線性對比度，參數可調
            iaa.imgcorruptlike.Saturate(severity=3),  # 飽和度增強，程度為 3
        ], random_order=True)  # 隨機應用上述操作，順序隨機
    ], random_order=True)

    # Create the output directory if it doesn't exist
    if not os.path.exists(output_path):
        os.makedirs(output_path)

    if not os.path.exists(yolo_output_path):
        os.makedirs(yolo_output_path)

    # Process each file in the input_path directory
    # Process each .jpg file in the input_path directory
    file_count = 0
    for jpg_file in glob.glob(os.path.join(input_path, '*.jpg')):
        img = cv2.imread(jpg_file)
        img_list = [img]  # Create a list with a single image

        for count in trange(times):
            images_aug = seq.augment_images(img_list)
            for index, augmented_image in enumerate(images_aug):
                filename = os.path.splitext(os.path.basename(jpg_file))[0]  # Extract the filename without extension
                output1 = os.path.join(output_path, f"{filename}_aug{count + 1}.jpg")
                output2 = os.path.join(yolo_output_path, f"{filename}_aug{count + 1}.jpg")
                cv2.imwrite(output1, augmented_image)
                cv2.imwrite(output2, augmented_image)
        file_count += 1


    # Calculate and print statistics
    print("增強前圖片數量：" + str(file_count))
    print("增強後圖片數量：" + str(file_count * times))
    print("數據增強完成")

# Example usage:
yolo_output_dir = '/content/drive/My Drive/Deep_X_torch/all_data'
normal_input_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train/normal'
normal_output_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train_argumentation/normal'
abnormal_input_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train/abnormal'
abnormal_output_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train_argumentation/abnormal'

data_augmentation(normal_input_dir, normal_output_dir, yolo_output_dir, 20)
data_augmentation(abnormal_input_dir, abnormal_output_dir, yolo_output_dir, 20)

# test_dir = '/content/drive/My Drive/Deep_X_torch/original_dataset/test/test(.jpg)'

# data_augmentation(test_dir, test_dir, 20)


#YoloV8自動化標記

##YoloV8資料結構
```
└── YOLOv8
    └── yolov8m
         └── args.yaml
└── classes.txt
└── data_custom.yaml
└── result
    └── source
└── runs
    └── detect
└── train
    └── images
    └── labels
└── val
    └── images
    └── labels
└── yolov8m.pt
```

##下載函式庫

In [3]:
#!pip install simple_image_download

In [4]:
!pip install ultralytics
from ultralytics import YOLO
import ultralytics
ultralytics.checks()

Ultralytics YOLOv8.0.200 🚀 Python-3.10.12 torch-2.0.1+cu118 CUDA:0 (Tesla T4, 15102MiB)
Setup complete ✅ (2 CPUs, 12.7 GB RAM, 26.9/78.2 GB disk)


In [5]:
import os
os.chdir('/content/drive/My Drive/Deep_X_torch/yolov8_custom') #切換目錄
os.listdir() #確認目錄內容

['YOLOv8',
 'train',
 'val',
 'yolov8x.pt',
 'yolov8n.pt',
 'runs',
 'data_custom.yaml',
 'classes.txt',
 'yolov8m.pt']

##訓練(train)Yolo模型

In [6]:
!yolo task=detect mode=train epochs=300 data=data_custom.yaml model=yolov8m.pt imgsz=640 batch=15 patience=0

Ultralytics YOLOv8.0.200 🚀 Python-3.10.12 torch-2.0.1+cu118 CUDA:0 (Tesla T4, 15102MiB)
[34m[1mengine/trainer: [0mtask=detect, mode=train, model=yolov8m.pt, data=data_custom.yaml, epochs=300, patience=0, batch=15, imgsz=640, save=True, save_period=-1, cache=False, device=None, workers=8, project=None, name=train, exist_ok=False, pretrained=True, optimizer=auto, verbose=True, seed=0, deterministic=True, single_cls=False, rect=False, cos_lr=False, close_mosaic=10, resume=False, amp=True, fraction=1.0, profile=False, freeze=None, overlap_mask=True, mask_ratio=4, dropout=0.0, val=True, split=val, save_json=False, save_hybrid=False, conf=None, iou=0.7, max_det=300, half=False, dnn=False, plots=True, source=None, show=False, save_txt=False, save_conf=False, save_crop=False, show_labels=True, show_conf=True, vid_stride=1, stream_buffer=False, line_width=None, visualize=False, augment=False, agnostic_nms=False, classes=None, retina_masks=False, boxes=True, format=torchscript, keras=False, o

##驗證(predict)Yolo模型

In [None]:
!yolo task=detect mode=predict save=True model=/content/drive/MyDrive/Deep_X_torch/yolov8_custom/runs/detect/train2/weights/best.pt conf=0.1 source=/content/drive/MyDrive/Deep_X_torch/all_data save_txt=True save_crop=False max_det=1

#根據標記的label切割JPG檔案

In [None]:
import cv2
import os

def crop_images(img_folder, label_folder, save_folder, crop_size=(224, 224)):
    # create new folder to save the cropped images
    os.makedirs(save_folder, exist_ok=True)

    # loop through each image file in the folder
    for img_file in os.listdir(img_folder):
        # check if the file is an image file
        if not img_file.endswith('.jpg'):
            continue

        # read the corresponding text file
        txt_file = img_file.split(".")[0]
        txt_file = txt_file + '.txt'
        # txt_file = txt_file.split("_")[0] + "_" + txt_file.split("_")[1] + '.txt'
        txt_path = os.path.join(label_folder, txt_file)
        print(txt_file)
        if not os.path.exists(txt_path):
            continue

        # read the image
        img_path = os.path.join(img_folder, img_file)
        img = cv2.imread(img_path)

        # read the bounding box and class label from the text file
        with open(txt_path, 'r') as f:
            line = f.readline()
            class_id, x_center, y_center, width, height = [float(x) for x in line.split()]

        # Convert coordinates to the top-left and bottom-right corners of the image
        x_min = int((x_center - width / 2) * img.shape[1])
        y_min = int((y_center - height / 2) * img.shape[0])
        x_max = int((x_center + width / 2) * img.shape[1])
        y_max = int((y_center + height / 2) * img.shape[0])

        # Crop the image and save it to the save folder
        cropped_img = img[y_min:y_max, x_min:x_max]
        cropped_img = cv2.resize(cropped_img, crop_size)
        save_path = os.path.join(save_folder, img_file)
        cv2.imwrite(save_path, cropped_img)

normal_label_folder = '/content/drive/My Drive/Deep_X_torch/original_dataset/normal/normal_label'
abnormal_label_folder = '/content/drive/My Drive/Deep_X_torch/original_dataset/abnormal/abnormal_label'

yolo_label_folder = '/content/drive/MyDrive/Deep_X_torch/yolov8_custom/runs/detect/predict/labels'

train_normal_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train/normal'
train_abnormal_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train/abnormal'
train_argumentation_normal_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train_argumentation/normal'
train_argumentation_abnormal_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train_argumentation/abnormal'
test_normal_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/test/normal'
test_abnormal_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/test/abnormal'

# crop_images(train_normal_dir, normal_label_folder, train_normal_dir)
# crop_images(train_abnormal_dir, abnormal_label_folder, train_abnormal_dir)
# crop_images(train_argumentation_normal_dir, normal_label_folder, train_argumentation_normal_dir)
# crop_images(train_argumentation_abnormal_dir, abnormal_label_folder, train_argumentation_abnormal_dir)
# crop_images(test_normal_dir, normal_label_folder, test_normal_dir)
# crop_images(test_abnormal_dir, abnormal_label_folder, test_abnormal_dir)

crop_images(train_normal_dir, yolo_label_folder, train_normal_dir)
crop_images(train_abnormal_dir, yolo_label_folder, train_abnormal_dir)
crop_images(train_argumentation_normal_dir, yolo_label_folder, train_argumentation_normal_dir)
crop_images(train_argumentation_abnormal_dir, yolo_label_folder, train_argumentation_abnormal_dir)
crop_images(test_normal_dir, yolo_label_folder, test_normal_dir)
crop_images(test_abnormal_dir, yolo_label_folder, test_abnormal_dir)

#將圖像轉換為張量

In [None]:
import os
from PIL import Image
import torch
from torchvision import transforms

def transform(image_path,tensor_path):
    # 設置轉換方式，將圖像轉換為張量
    transform = transforms.Compose([
        transforms.ToTensor()
    ])

    # 遍歷資料夾中的所有檔案
    for filename in os.listdir(image_path):
        # 讀取圖像
        img_path = os.path.join(image_path, filename)
        img = Image.open(img_path)

        file_name,extension = os.path.splitext(filename)

        # 將圖像轉換為張量
        tensor_img = transform(img)

        # 將張量保存為.pt檔案
        tensors_path = os.path.join(tensor_path, file_name + ".pt")
        #print(tensors_path)
        torch.save(tensor_img,tensors_path)

train_normal_splitted_path = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train_argumentation/normal'
train_abnormal_splitted_path = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train_argumentation/abnormal'
train_normal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/train/normal'
train_abnormal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/train/abnormal'
test_normal_splitted_path = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/test/normal'
test_abnormal_splitted_path = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/test/abnormal'
test_normal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/test/normal'
test_abnormal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/test/abnormal'

transform(train_normal_splitted_path,train_normal_tensor_path)
transform(train_abnormal_splitted_path,train_abnormal_tensor_path)
transform(test_normal_splitted_path,test_normal_tensor_path)
transform(test_abnormal_splitted_path,test_abnormal_tensor_path)

#訓練及驗證模型(5-Fold)

In [None]:
import torch
import torch.nn as nn
import os
import numpy as np
import torch.optim as optim
import torchvision.models as models
import matplotlib.pyplot as plt
import csv
import random
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import ConcatDataset
from sklearn import datasets
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
from datetime import datetime

class MyDataset(Dataset):
    def __init__(self, data_path):
        self.data_path = data_path
        self.class_to_idx = {'abnormal': 1, 'normal': 0}  # 定義類別名稱到類別索引的映射
        self.data = []
        self.filenames = []  # store filenames
        for filename in os.listdir(data_path):
            if filename.endswith('.pt'):
                tensor = torch.load(os.path.join(data_path, filename))
                if filename.split('_')[0] == 'normal':
                    label_idx = 0
                else:
                    label_idx = 1
                self.data.append((tensor, label_idx, filename))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        tensor, labels, filename = self.data[index]
        return tensor, labels, filename

class CombinedDataset(ConcatDataset):
    def __init__(self, dataset1, dataset2):
        super().__init__([dataset1, dataset2])

    def __getitem__(self, index):
        return super().__getitem__(index)

    def __len__(self):
        return super().__len__()


# 建立資料夾顯示訓練結果
def mkdir_outcome(result_path):
    file_names = os.listdir(result_path)
    num_max = 0
    for file_name in file_names:
        if file_name.startswith("result_"):
            num_str = file_name.split("_")[1]
            num = int(num_str)
            if(num > num_max):
                num_max = num
    # make folder for train result
    result_path = os.path.join(result_path,"result_{}".format(num_max + 1))
    result_path_train = os.path.join(result_path,"train_{}".format(num_max + 1))
    os.makedirs(result_path,exist_ok=True)
    os.makedirs(result_path_train,exist_ok=True)
    return result_path_train


# 模型評估指標
def validation_index(conf_matrix):
    # Confusion Matrix to calculate [accuracy,precision,recall]
    precision = 0.0
    recall = 0.0
    f1_score = 0.0
    if((conf_matrix[0][0] + conf_matrix[0][1]) != 0):
        precision = conf_matrix[0][0] / (conf_matrix[0][0] + conf_matrix[0][1])
    if((conf_matrix[0][0] + conf_matrix[1][0]) != 0):
        recall = conf_matrix[0][0] / (conf_matrix[0][0] + conf_matrix[1][0])
    if((precision + recall) != 0):
        f1_score = 2*precision*recall / (precision + recall)
    TPR = recall
    FPR = conf_matrix[0][1] / (conf_matrix[0][1] + conf_matrix[1][1])
    print("\t      Precision: {:<.4f}  -  Recall: {:<.4f}  -  F1 Score: {:<.4f}".format(precision,recall,f1_score))
    return precision,recall,f1_score,TPR,FPR


# 混淆矩陣
def Confusion_Matrix(result_path,conf_matrix,fold_nums):
    # Create the 'confusion_matrix_record' directory if it doesn't exist
    confusion_matrix_record_dir = os.path.join(result_path, 'confusion_matrix_record')
    plt.clf()
    if not os.path.exists(confusion_matrix_record_dir):
        os.makedirs(confusion_matrix_record_dir)

    confusion_matrix = np.array([[conf_matrix[0][0], conf_matrix[0][1]], [conf_matrix[1][0], conf_matrix[1][1]]])
    print("Confusion matrix:")
    print(conf_matrix)
    plt.imshow(confusion_matrix, cmap=plt.cm.Blues, interpolation='nearest')
    plt.colorbar()

    # confusion matrix index 各個 index 的數值
    for i in range(2):
        for j in range(2):
            text_color = 'black' if confusion_matrix[i][j] < 0.5 * confusion_matrix.max() else 'white'
            plt.annotate(str(confusion_matrix[i][j]), xy=(j, i), ha='center', va='center', color=text_color)
    tick_marks = np.arange(2)
    plt.xticks(tick_marks, ['Positive', 'Negative'])
    plt.yticks(tick_marks, ['Positive', 'Negative'])
    plt.ylabel('Predicted Label')
    plt.xlabel('True Label')
    plt.title('Confusion Matrix')
    result_path = result_path + '/confusion_matrix_record'
    plt.savefig(os.path.join(result_path,'confusion_matrix_fold'+str(fold_nums + 1)+'.png'))


# ROC曲線
def ROC_Curve(result_path,tpr_list,fpr_list,fold_nums):
    # Create the 'roc_curve_record' directory if it doesn't exist
    roc_curve_record_dir = os.path.join(result_path, 'roc_curve_record')
    if not os.path.exists(roc_curve_record_dir):
        os.makedirs(roc_curve_record_dir)

    # 計算 AUC
    roc_auc = np.trapz(tpr_list, fpr_list)

    # 繪製 ROC 曲線
    plt.clf()
    plt.plot(fpr_list, tpr_list, lw=1, label='ROC (AUC = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], '--', color='gray', label='Random Guessing')
    plt.xlim([-0.05, 1.05])
    plt.ylim([-0.05, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend()
    result_path = result_path + '/roc_curve_record'
    plt.savefig(os.path.join(result_path,'Roc_curve'+str(fold_nums + 1)+'.png'))


# 輸出每一次 epoch 的結果
def CSV_Output(result_path,parameter,num_epochs,train_loss_list,train_acc_list,val_loss_list,val_acc_list,precision_list,recall_list,TPR_list,FPR_list,f1_score_list,fold_wrong_predict,fold_nums):
    # Create the 'csv_record' directory if it doesn't exist
    csv_record_dir = os.path.join(result_path, 'csv_record')
    if not os.path.exists(csv_record_dir):
        os.makedirs(csv_record_dir)
    with open(result_path + '/csv_record/epoch_fold'+str(fold_nums + 1)+'.csv','w',newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['num_epochs','batch_size','learning_rate','num_classes','device','start_time','end_time','num_fold'])
        writer.writerow(parameter)
        writer.writerow('')
        writer.writerow(['Epoch','train_loss','train_acc','val_loss','val_acc','precision','recall','TPR','FPR','F1 score'])
        for epoch in range(num_epochs):
            writer.writerow([epoch + 1,
                            round(train_loss_list[epoch], 4),
                            round(train_acc_list[epoch].item(), 4),
                            round(val_loss_list[epoch], 4),
                            round(val_acc_list[epoch].item(), 4),
                            round(precision_list[epoch], 4),
                            round(recall_list[epoch], 4),
                            round(TPR_list[epoch], 4),
                            round(FPR_list[epoch], 4),
                            round(f1_score_list[epoch], 4)])
        writer.writerow([f'wrong_predict:',fold_wrong_predict])
    print('CSV output Sucessfully')

def print_filename_in_txt(train_loader, valid_loader, result_path, fold_num):
    # Initialize empty lists to store filenames
    train_filenames = []
    valid_filenames = []

    # Iterate over the training dataset
    for batch_idx, (data, target, filename) in enumerate(train_loader):
        # Append the filename to the list
        train_filenames.extend(filename)

    # Iterate over the validation dataset
    for batch_idx, (data, target, filename) in enumerate(valid_loader):
        # Append the filename to the list
        valid_filenames.extend(filename)

    # Create the 'file_name_record' directory if it doesn't exist
    file_record_dir = os.path.join(result_path, 'file_name_record')
    if not os.path.exists(file_record_dir):
        os.makedirs(file_record_dir)

    # Save the training filenames as a text file
    with open(result_path + '/file_name_record/train_filenames_fold_'+str(fold_num + 1)+'.txt', 'w') as file:
        for filename in train_filenames:
            file.write(filename + '\n')

    # Save the validation filenames as a text file
    with open(result_path + '/file_name_record/valid_filenames_fold_'+str(fold_num + 1)+'.txt', 'w') as file:
        for filename in valid_filenames:
            file.write(filename + '\n')

    print(f"Filenames saved in {result_path}/train_filenames.txt and {result_path}/valid_filenames.txt")

def calculate_average(avg_train_acc_list, avg_val_acc_list, avg_recall_list, avg_precision_list, avg_f1_score_list, avg_TPR_list, avg_FPR_list, fold_nums, result_path):
    # Calculate averages
    avg_train_acc = sum(avg_train_acc_list) / fold_nums
    avg_val_acc = sum(avg_val_acc_list) / fold_nums
    avg_recall = sum(avg_recall_list) / fold_nums
    avg_precision = sum(avg_precision_list) / fold_nums
    avg_f1_score = sum(avg_f1_score_list) / fold_nums
    avg_TPR = sum(avg_TPR_list) / fold_nums
    avg_FPR = sum(avg_FPR_list) / fold_nums

    # Save averages to a text file
    with open(result_path + '/Average.txt', 'w') as file:
        file.write(f"Avg Train Accuracy: {avg_train_acc}\n")
        file.write(f"Avg Validation Accuracy: {avg_val_acc}\n")
        file.write(f"Avg Recall: {avg_recall}\n")
        file.write(f"Avg Precision: {avg_precision}\n")
        file.write(f"Avg F1 Score: {avg_f1_score}\n")
        file.write(f"Avg TPR: {avg_TPR}\n")
        file.write(f"Avg FPR: {avg_FPR}\n")

    print(f"Averages saved to {result_path}")

def train(normal_data_dir,abnormal_data_dir,train_normal_tensor_path,train_abnormal_tensor_path,result_path,num_fold):
    #超參數設定
    batch_size = 52
    learning_rate = 0.001
    num_epochs = 30
    num_classes = 2
    num_folds = num_fold
    start_time = datetime.now()
    end_time = 0
    num_argumentation = 20
    conf_matrix = np.zeros((num_classes, num_classes), dtype=np.int32)

    # Get the dataset
    train_normal_dataset = MyDataset(train_normal_tensor_path)
    train_abnormal_dataset = MyDataset(train_abnormal_tensor_path)
    # Combine the datasets
    train_datasets = CombinedDataset(train_normal_dataset,train_abnormal_dataset)

    # Create the k-fold cross-validation object
    kfold = KFold(n_splits=num_folds)

    #印出資料集大小
    print("train dataset's size : " + str(len(train_datasets)))

    #創建模型
    model = models.resnet152(pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    #print(model)
    #print(model.fc)

    #將模型移動到GPU上進行運算
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.fc.to(device)
    print("Device used : " + str(device))

    # make dir to save the training outcome
    result_path = mkdir_outcome(result_path)

    avg_train_acc_list = []
    avg_val_acc_list = []
    avg_recall_list = []
    avg_precision_list = []
    avg_f1_score_list = []
    avg_TPR_list = []
    avg_FPR_list = []

    files = []
    file_names = os.listdir(normal_data_dir)
    for file_name in file_names:
        name = os.path.splitext(file_name)[0]
        files.append(name)

    file_names = os.listdir(abnormal_data_dir)
    for file_name in file_names:
        name = os.path.splitext(file_name)[0]
        files.append(name)

    random.shuffle(files)

    torch.save(model.state_dict(),os.path.join(result_path,"resnet152_pre_train.pt"))

    for fold, (train_indices, valid_indices) in enumerate(kfold.split(files)):
        print(f"Fold: {fold+1}")

        #定義損失函數和優化器
        m = nn.Sigmoid()
        criterion = nn.BCELoss()
        # criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

        train_loss_list = []
        train_acc_list = []
        val_loss_list = []
        val_acc_list = []
        recall_list = []
        precision_list = []
        f1_score_list = []
        TPR_list = []
        FPR_list = []
        fold_wrong_predict = []

        train_files = [files[i] for i in train_indices]
        valid_files = [files[i] for i in valid_indices]

        # 定義用於儲存訓練集和驗證集的檔案名稱的列表
        train_data_files = []
        valid_data_files = []

        # 尋找所有訓練集資料檔名，根據增強資料的命名規則，找到對應的增強檔名
        for file in train_files:
            labels, index = file.split('_')
            # 建構增強後資料的檔名
            for num in range(1,num_argumentation + 1):
              augmented_file = f"{labels}_{index}_aug{num}.pt"
              # 將增強後資料的檔案名稱新增至訓練集檔案名稱列表
              train_data_files.append(augmented_file)

        # 尋找所有驗證集資料檔名，根據增強資料的命名規則，找到對應的增強檔名
        for file in valid_files:
            labels, index = file.split('_')
            # 建構增強後資料的檔名
            for num in range(1,num_argumentation + 1):
              augmented_file = f"{labels}_{index}_aug{num}.pt"
              valid_data_files.append(augmented_file)

        train = []
        valid = []

        # 尋找所有combined_dataset中的樣本索引
        for index, (data, labels, filename) in enumerate(train_datasets):
            # 檢查目前樣本的檔案名稱是否在訓練集檔案名稱清單中
            if filename in train_data_files:
                train.append(index)
            # 檢查目前樣本的檔案名稱是否在驗證集檔案名稱清單中
            elif filename in valid_data_files:
                valid.append(index)

        # Create the train and validation datasets for this fold
        train_dataset = torch.utils.data.Subset(train_datasets, train)
        valid_dataset = torch.utils.data.Subset(train_datasets, valid)


        print("train_dataset : " + str(len(train_dataset)),",valid_dataset : " + str(len(valid_dataset)))

        # Create the data loaders for this fold
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
        valid_loader = DataLoader(valid_dataset, batch_size=1, shuffle=True, num_workers=4)
        print_filename_in_txt(train_loader,valid_loader,result_path,fold)

        #訓練模型
        for epoch in range(num_epochs):

            train_loss = 0
            train_correct = 0
            train_acc = 0
            val_loss = 0
            val_corrects = 0
            val_acc = 0

            print("result_path:" + str(result_path))

            if(epoch == 0) :
              model.load_state_dict(torch.load(os.path.join(result_path,"resnet152_pre_train.pt")))
              print("Model initialized")
            else :
              model.load_state_dict(torch.load(os.path.join(result_path,"train_fold_"+f'{fold+1}'+".pt")))
              print("Model loaded")


            #初始化
            conf_matrix = np.zeros((num_classes, num_classes), dtype=np.int32)

            print("[Training Progress]: ")
            model.train()
            for inputs, labels, filename in tqdm(train_loader):
                targets=torch.eye(2)[labels.long(), :]
                inputs = inputs.to(device)
                labels = labels.to(device)
                targets = targets.to(device)

                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(m(outputs),targets.float())
                _, preds = torch.max(outputs, 1)
                train_correct += torch.sum(preds == labels.data)
                loss.backward()
                optimizer.step()
                train_loss += loss.item() * inputs.size(0)
            train_loss = train_loss / len(train_loader.dataset)
            train_acc = train_correct.double() / len(train_loader.dataset)
            train_loss_list.append(train_loss)

            model.eval()

            print("[Validating Progress]: ")
            wrong_predict = []
            for inputs, labels, filename in tqdm(valid_loader):
                targets=torch.eye(2)[labels.long(), :]
                inputs = inputs.to(device)
                labels = labels.to(device)
                targets = targets.to(device)
                with torch.set_grad_enabled(False):
                    outputs = model(inputs)
                    loss = criterion(m(outputs),targets.float())
                val_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                val_corrects += torch.sum(preds == labels.data)

                # Count Confusion Matrix
                for t, p in zip(preds.view(-1), labels.view(-1)):
                    conf_matrix[t.long(), p.long()] += 1
                    if t != p:
                      wrong_predict.append(filename)
            val_loss = val_loss / len(valid_loader.dataset)
            val_acc = val_corrects.double() / len(valid_loader.dataset)
            val_loss_list.append(val_loss)

            scheduler.step()
            end_time = datetime.now()
            print('\nEpoch: [{}/{}]  train_loss: {:<.4f}  -  train_accuracy: {:<.4f} -  val_loss: {:<.4f}  -  val_accuracy: {:<.4f}  -  val_correct: {:<10}'.format(
                epoch+1, num_epochs, train_loss, train_acc, val_loss, val_acc, val_corrects))
            print('wrong predict : ' + str(wrong_predict))
            torch.save(model.state_dict(),os.path.join(result_path,"train_fold_"+f'{fold+1}'+".pt"))
            print("Model saved")

            # validation index (評估指標)
            precision,recall,f1_score,TPR,FPR = validation_index(conf_matrix)

            # record the outcomes
            val_acc_list.append(val_acc),train_acc_list.append(train_acc),precision_list.append(precision),recall_list.append(recall)
            f1_score_list.append(f1_score),TPR_list.append(TPR),FPR_list.append(FPR)

            if epoch + 1 == num_epochs:
              avg_train_acc_list.append(train_acc),avg_val_acc_list.append(val_acc),avg_recall_list.append(recall),avg_precision_list.append(precision),avg_f1_score_list.append(f1_score),avg_TPR_list.append(TPR),avg_FPR_list.append(FPR),fold_wrong_predict.append(wrong_predict)

        # function of confusion matrix param(folder path, matrix, test normal dataset length, test unnormal dataset length)
        Confusion_Matrix(result_path,conf_matrix,fold)
        # functioN to show ROC curve
        ROC_Curve(result_path,TPR_list,FPR_list,fold)
        # CSV visualization
        param = [num_epochs,batch_size,learning_rate,num_classes,device,start_time,end_time,fold]
        CSV_Output(result_path,param,num_epochs,train_loss_list,train_acc_list,val_loss_list,val_acc_list,precision_list,recall_list,TPR_list,FPR_list,f1_score_list,fold_wrong_predict,fold)

    #calculate the average
    calculate_average(avg_train_acc_list, avg_val_acc_list, avg_recall_list, avg_precision_list, avg_f1_score_list, avg_TPR_list, avg_FPR_list, num_folds, result_path)

    torch.cuda.empty_cache()


normal_data_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train/normal'
abnormal_data_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train/abnormal'
train_normal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/train/normal'
train_abnormal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/train/abnormal'
result_path = '/content/drive/My Drive/Deep_X_torch/result'

train(normal_data_dir,abnormal_data_dir,train_normal_tensor_path,train_abnormal_tensor_path,result_path,5)

#訓練及驗證模型(SEResNet)

In [None]:
!pip install timm

In [None]:
import torch
import torch.nn as nn
import os
import numpy as np
import torch.optim as optim
import torchvision.models as models
import matplotlib.pyplot as plt
import csv
import random
import timm
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import ConcatDataset
from sklearn import datasets
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
from datetime import datetime

class MyDataset(Dataset):
    def __init__(self, data_path):
        self.data_path = data_path
        self.class_to_idx = {'abnormal': 1, 'normal': 0}  # 定義類別名稱到類別索引的映射
        self.data = []
        self.filenames = []  # store filenames
        for filename in os.listdir(data_path):
            if filename.endswith('.pt'):
                tensor = torch.load(os.path.join(data_path, filename))
                if filename.split('_')[0] == 'normal':
                    label_idx = 0
                else:
                    label_idx = 1
                self.data.append((tensor, label_idx, filename))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        tensor, labels, filename = self.data[index]
        return tensor, labels, filename

class CombinedDataset(ConcatDataset):
    def __init__(self, dataset1, dataset2):
        super().__init__([dataset1, dataset2])

    def __getitem__(self, index):
        return super().__getitem__(index)

    def __len__(self):
        return super().__len__()


# 建立資料夾顯示訓練結果
def mkdir_outcome(result_path):
    file_names = os.listdir(result_path)
    num_max = 0
    for file_name in file_names:
        if file_name.startswith("result_"):
            num_str = file_name.split("_")[1]
            num = int(num_str)
            if(num > num_max):
                num_max = num
    # make folder for train result
    result_path = os.path.join(result_path,"result_{}".format(num_max + 1))
    result_path_train = os.path.join(result_path,"train_{}".format(num_max + 1))
    os.makedirs(result_path,exist_ok=True)
    os.makedirs(result_path_train,exist_ok=True)
    return result_path_train


# 模型評估指標
def validation_index(conf_matrix):
    # Confusion Matrix to calculate [accuracy,precision,recall]
    precision = 0.0
    recall = 0.0
    f1_score = 0.0
    if((conf_matrix[0][0] + conf_matrix[0][1]) != 0):
        precision = conf_matrix[0][0] / (conf_matrix[0][0] + conf_matrix[0][1])
    if((conf_matrix[0][0] + conf_matrix[1][0]) != 0):
        recall = conf_matrix[0][0] / (conf_matrix[0][0] + conf_matrix[1][0])
    if((precision + recall) != 0):
        f1_score = 2*precision*recall / (precision + recall)
    TPR = recall
    FPR = conf_matrix[0][1] / (conf_matrix[0][1] + conf_matrix[1][1])
    print("\t      Precision: {:<.4f}  -  Recall: {:<.4f}  -  F1 Score: {:<.4f}".format(precision,recall,f1_score))
    return precision,recall,f1_score,TPR,FPR


# 混淆矩陣
def Confusion_Matrix(result_path,conf_matrix,fold_nums):
    # Create the 'confusion_matrix_record' directory if it doesn't exist
    confusion_matrix_record_dir = os.path.join(result_path, 'confusion_matrix_record')
    plt.clf()
    if not os.path.exists(confusion_matrix_record_dir):
        os.makedirs(confusion_matrix_record_dir)

    confusion_matrix = np.array([[conf_matrix[0][0], conf_matrix[0][1]], [conf_matrix[1][0], conf_matrix[1][1]]])
    print("Confusion matrix:")
    print(conf_matrix)
    plt.imshow(confusion_matrix, cmap=plt.cm.Blues, interpolation='nearest')
    plt.colorbar()

    # confusion matrix index 各個 index 的數值
    for i in range(2):
        for j in range(2):
            text_color = 'black' if confusion_matrix[i][j] < 0.5 * confusion_matrix.max() else 'white'
            plt.annotate(str(confusion_matrix[i][j]), xy=(j, i), ha='center', va='center', color=text_color)
    tick_marks = np.arange(2)
    plt.xticks(tick_marks, ['Positive', 'Negative'])
    plt.yticks(tick_marks, ['Positive', 'Negative'])
    plt.ylabel('Predicted Label')
    plt.xlabel('True Label')
    plt.title('Confusion Matrix')
    result_path = result_path + '/confusion_matrix_record'
    plt.savefig(os.path.join(result_path,'confusion_matrix_fold'+str(fold_nums + 1)+'.png'))


# ROC曲線
def ROC_Curve(result_path,tpr_list,fpr_list,fold_nums):
    # Create the 'roc_curve_record' directory if it doesn't exist
    roc_curve_record_dir = os.path.join(result_path, 'roc_curve_record')
    if not os.path.exists(roc_curve_record_dir):
        os.makedirs(roc_curve_record_dir)

    # 計算 AUC
    roc_auc = np.trapz(tpr_list, fpr_list)

    # 繪製 ROC 曲線
    plt.clf()
    plt.plot(fpr_list, tpr_list, lw=1, label='ROC (AUC = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], '--', color='gray', label='Random Guessing')
    plt.xlim([-0.05, 1.05])
    plt.ylim([-0.05, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend()
    result_path = result_path + '/roc_curve_record'
    plt.savefig(os.path.join(result_path,'Roc_curve'+str(fold_nums + 1)+'.png'))


# 輸出每一次 epoch 的結果
def CSV_Output(result_path,parameter,num_epochs,train_loss_list,train_acc_list,val_loss_list,val_acc_list,precision_list,recall_list,TPR_list,FPR_list,f1_score_list,fold_wrong_predict,fold_nums):
    # Create the 'csv_record' directory if it doesn't exist
    csv_record_dir = os.path.join(result_path, 'csv_record')
    if not os.path.exists(csv_record_dir):
        os.makedirs(csv_record_dir)
    with open(result_path + '/csv_record/epoch_fold'+str(fold_nums + 1)+'.csv','w',newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['num_epochs','batch_size','learning_rate','num_classes','device','start_time','end_time','num_fold'])
        writer.writerow(parameter)
        writer.writerow('')
        writer.writerow(['Epoch','train_loss','train_acc','val_loss','val_acc','precision','recall','TPR','FPR','F1 score'])
        for epoch in range(num_epochs):
            writer.writerow([epoch + 1,
                            round(train_loss_list[epoch], 4),
                            round(train_acc_list[epoch].item(), 4),
                            round(val_loss_list[epoch], 4),
                            round(val_acc_list[epoch].item(), 4),
                            round(precision_list[epoch], 4),
                            round(recall_list[epoch], 4),
                            round(TPR_list[epoch], 4),
                            round(FPR_list[epoch], 4),
                            round(f1_score_list[epoch], 4)])
        writer.writerow([f'wrong_predict:',fold_wrong_predict])
    print('CSV output Sucessfully')

def print_filename_in_txt(train_loader, valid_loader, result_path, fold_num):
    # Initialize empty lists to store filenames
    train_filenames = []
    valid_filenames = []

    # Iterate over the training dataset
    for batch_idx, (data, target, filename) in enumerate(train_loader):
        # Append the filename to the list
        train_filenames.extend(filename)

    # Iterate over the validation dataset
    for batch_idx, (data, target, filename) in enumerate(valid_loader):
        # Append the filename to the list
        valid_filenames.extend(filename)

    # Create the 'file_name_record' directory if it doesn't exist
    file_record_dir = os.path.join(result_path, 'file_name_record')
    if not os.path.exists(file_record_dir):
        os.makedirs(file_record_dir)

    # Save the training filenames as a text file
    with open(result_path + '/file_name_record/train_filenames_fold_'+str(fold_num + 1)+'.txt', 'w') as file:
        for filename in train_filenames:
            file.write(filename + '\n')

    # Save the validation filenames as a text file
    with open(result_path + '/file_name_record/valid_filenames_fold_'+str(fold_num + 1)+'.txt', 'w') as file:
        for filename in valid_filenames:
            file.write(filename + '\n')

    print(f"Filenames saved in {result_path}/train_filenames.txt and {result_path}/valid_filenames.txt")

def calculate_average(avg_train_acc_list, avg_val_acc_list, avg_recall_list, avg_precision_list, avg_f1_score_list, avg_TPR_list, avg_FPR_list, fold_nums, result_path):
    # Calculate averages
    avg_train_acc = sum(avg_train_acc_list) / fold_nums
    avg_val_acc = sum(avg_val_acc_list) / fold_nums
    avg_recall = sum(avg_recall_list) / fold_nums
    avg_precision = sum(avg_precision_list) / fold_nums
    avg_f1_score = sum(avg_f1_score_list) / fold_nums
    avg_TPR = sum(avg_TPR_list) / fold_nums
    avg_FPR = sum(avg_FPR_list) / fold_nums

    # Save averages to a text file
    with open(result_path + '/Average.txt', 'w') as file:
        file.write(f"Avg Train Accuracy: {avg_train_acc}\n")
        file.write(f"Avg Validation Accuracy: {avg_val_acc}\n")
        file.write(f"Avg Recall: {avg_recall}\n")
        file.write(f"Avg Precision: {avg_precision}\n")
        file.write(f"Avg F1 Score: {avg_f1_score}\n")
        file.write(f"Avg TPR: {avg_TPR}\n")
        file.write(f"Avg FPR: {avg_FPR}\n")

    print(f"Averages saved to {result_path}")

def train(normal_data_dir,abnormal_data_dir,train_normal_tensor_path,train_abnormal_tensor_path,result_path,num_fold):
    #超參數設定
    batch_size = 52
    learning_rate = 0.001
    num_epochs = 50
    num_classes = 2
    num_folds = num_fold
    start_time = datetime.now()
    end_time = 0
    num_argumentation = 20
    conf_matrix = np.zeros((num_classes, num_classes), dtype=np.int32)

    # Get the dataset
    train_normal_dataset = MyDataset(train_normal_tensor_path)
    train_abnormal_dataset = MyDataset(train_abnormal_tensor_path)
    # Combine the datasets
    train_datasets = CombinedDataset(train_normal_dataset,train_abnormal_dataset)

    # Create the k-fold cross-validation object
    kfold = KFold(n_splits=num_folds)

    #印出資料集大小
    print("train dataset's size : " + str(len(train_datasets)))

    #創建模型
    # model = models.resnet152(pretrained=True)
    model = timm.create_model('seresnet152d', pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    #print(model)
    #print(model.fc)

    #將模型移動到GPU上進行運算
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.fc.to(device)
    print("Device used : " + str(device))

    # make dir to save the training outcome
    result_path = mkdir_outcome(result_path)

    avg_train_acc_list = []
    avg_val_acc_list = []
    avg_recall_list = []
    avg_precision_list = []
    avg_f1_score_list = []
    avg_TPR_list = []
    avg_FPR_list = []

    files = []
    file_names = os.listdir(normal_data_dir)
    for file_name in file_names:
        name = os.path.splitext(file_name)[0]
        files.append(name)

    file_names = os.listdir(abnormal_data_dir)
    for file_name in file_names:
        name = os.path.splitext(file_name)[0]
        files.append(name)

    random.shuffle(files)

    torch.save(model.state_dict(),os.path.join(result_path,"seresnet152_pre_train.pt"))

    for fold, (train_indices, valid_indices) in enumerate(kfold.split(files)):
        print(f"Fold: {fold+1}")

        #定義損失函數和優化器
        m = nn.Sigmoid()
        criterion = nn.BCELoss()
        # criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

        train_loss_list = []
        train_acc_list = []
        val_loss_list = []
        val_acc_list = []
        recall_list = []
        precision_list = []
        f1_score_list = []
        TPR_list = []
        FPR_list = []
        fold_wrong_predict = []

        train_files = [files[i] for i in train_indices]
        valid_files = [files[i] for i in valid_indices]

       # 定義用於儲存訓練集和驗證集的檔案名稱的列表
        train_data_files = []
        valid_data_files = []

        # 尋找所有訓練集資料檔名，根據增強資料的命名規則，找到對應的增強檔名
        for file in train_files:
            labels, index = file.split('_')
            # 建構增強後資料的檔名
            for num in range(1,num_argumentation + 1):
              augmented_file = f"{labels}_{index}_aug{num}.pt"
              # 將增強後資料的檔案名稱新增至訓練集檔案名稱列表
              train_data_files.append(augmented_file)

        # 尋找所有驗證集資料檔名，根據增強資料的命名規則，找到對應的增強檔名
        for file in valid_files:
            labels, index = file.split('_')
            # 建構增強後資料的檔名
            for num in range(1,num_argumentation + 1):
              augmented_file = f"{labels}_{index}_aug{num}.pt"
              valid_data_files.append(augmented_file)

        train = []
        valid = []

        # 尋找所有combined_dataset中的樣本索引
        for index, (data, labels, filename) in enumerate(train_datasets):
            # 檢查目前樣本的檔案名稱是否在訓練集檔案名稱清單中
            if filename in train_data_files:
                train.append(index)
            # 檢查目前樣本的檔案名稱是否在驗證集檔案名稱清單中
            elif filename in valid_data_files:
                valid.append(index)

        # Create the train and validation datasets for this fold
        train_dataset = torch.utils.data.Subset(train_datasets, train)
        valid_dataset = torch.utils.data.Subset(train_datasets, valid)


        print("train_dataset : " + str(len(train_dataset)),",valid_dataset : " + str(len(valid_dataset)))

        # Create the data loaders for this fold
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
        valid_loader = DataLoader(valid_dataset, batch_size=1, shuffle=True, num_workers=4)
        print_filename_in_txt(train_loader,valid_loader,result_path,fold)

        #訓練模型
        for epoch in range(num_epochs):
            print("epoch:"+f"{epoch+1}")
            train_loss = 0
            train_correct = 0
            train_acc = 0
            val_loss = 0
            val_corrects = 0
            val_acc = 0

            print("result_path:" + str(result_path))

            if(epoch == 0) :
              model.load_state_dict(torch.load(os.path.join(result_path,"pre_train.pt")))
              print("Model initialized")
            else :
              model.load_state_dict(torch.load(os.path.join(result_path,"train"+f'_fold_{fold+1}'+".pt")))
              print("Model loaded")

            #初始化
            conf_matrix = np.zeros((num_classes, num_classes), dtype=np.int32)

            print("[Training Progress]: ")
            model.train()
            for inputs, labels, filename in tqdm(train_loader):
                targets=torch.eye(2)[labels.long(), :]
                inputs = inputs.to(device)
                labels = labels.to(device)
                targets = targets.to(device)

                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(m(outputs),targets.float())
                _, preds = torch.max(outputs, 1)
                train_correct += torch.sum(preds == labels.data)
                loss.backward()
                optimizer.step()
                train_loss += loss.item() * inputs.size(0)
            train_loss = train_loss / len(train_loader.dataset)
            train_acc = train_correct.double() / len(train_loader.dataset)
            train_loss_list.append(train_loss)

            model.eval()

            print("[Validating Progress]: ")
            wrong_predict = []
            for inputs, labels, filename in tqdm(valid_loader):
                targets=torch.eye(2)[labels.long(), :]
                inputs = inputs.to(device)
                labels = labels.to(device)
                targets = targets.to(device)
                with torch.set_grad_enabled(False):
                    outputs = model(inputs)
                    loss = criterion(m(outputs),targets.float())
                val_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                val_corrects += torch.sum(preds == labels.data)

                # Count Confusion Matrix
                for t, p in zip(preds.view(-1), labels.view(-1)):
                    conf_matrix[t.long(), p.long()] += 1
                    if t != p:
                      wrong_predict.append(filename)
            val_loss = val_loss / len(valid_loader.dataset)
            val_acc = val_corrects.double() / len(valid_loader.dataset)
            val_loss_list.append(val_loss)

            scheduler.step()
            end_time = datetime.now()
            print('\nEpoch: [{}/{}]  train_loss: {:<.4f}  -  train_accuracy: {:<.4f} -  val_loss: {:<.4f}  -  val_accuracy: {:<.4f}  -  val_correct: {:<10}'.format(
                epoch+1, num_epochs, train_loss, train_acc, val_loss, val_acc, val_corrects))
            print('wrong predict : ' + str(wrong_predict))
            torch.save(model.state_dict(),os.path.join(result_path,"train"+f'_fold_{fold+1}'+".pt"))
            print("Model saved")

            # validation index (評估指標)
            precision,recall,f1_score,TPR,FPR = validation_index(conf_matrix)

            # record the outcomes
            val_acc_list.append(val_acc),train_acc_list.append(train_acc),precision_list.append(precision),recall_list.append(recall)
            f1_score_list.append(f1_score),TPR_list.append(TPR),FPR_list.append(FPR)

            if epoch + 1 == num_epochs:
              torch.cuda.empty_cache()
              avg_train_acc_list.append(train_acc),avg_val_acc_list.append(val_acc),avg_recall_list.append(recall),avg_precision_list.append(precision),avg_f1_score_list.append(f1_score),avg_TPR_list.append(TPR),avg_FPR_list.append(FPR),fold_wrong_predict.append(wrong_predict)

        # function of confusion matrix param(folder path, matrix, test normal dataset length, test unnormal dataset length)
        Confusion_Matrix(result_path,conf_matrix,fold)
        # functioN to show ROC curve
        ROC_Curve(result_path,TPR_list,FPR_list,fold)
        # CSV visualization
        param = [num_epochs,batch_size,learning_rate,num_classes,device,start_time,end_time,fold]
        CSV_Output(result_path,param,num_epochs,train_loss_list,train_acc_list,val_loss_list,val_acc_list,precision_list,recall_list,TPR_list,FPR_list,f1_score_list,fold_wrong_predict,fold)

    #calculate the average
    calculate_average(avg_train_acc_list, avg_val_acc_list, avg_recall_list, avg_precision_list, avg_f1_score_list, avg_TPR_list, avg_FPR_list, num_folds, result_path)

    torch.cuda.empty_cache()


normal_data_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train/normal'
abnormal_data_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset/train/abnormal'
train_normal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/train/normal'
train_abnormal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/train/abnormal'
result_path = '/content/drive/My Drive/Deep_X_torch/result'

train(normal_data_dir,abnormal_data_dir,train_normal_tensor_path,train_abnormal_tensor_path,result_path,5)

#測試模型

In [None]:
import torch
import torch.nn as nn
import os
import numpy as np
import torch.optim as optim
import torchvision.models as models
import matplotlib.pyplot as plt
import csv
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import ConcatDataset
from sklearn import datasets
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
from datetime import datetime

class MyDataset(Dataset):
    def __init__(self, data_path):
        self.data_path = data_path
        self.class_to_idx = {'abnormal': 1, 'normal': 0}  # 定義類別名稱到類別索引的映射
        self.data = []
        self.filenames = []  # store filenames
        for filename in os.listdir(data_path):
            if filename.endswith('.pt'):
                tensor = torch.load(os.path.join(data_path, filename))
                if filename.split('_')[0] == 'normal':
                    label_idx = 0
                else:
                    label_idx = 1
                self.data.append((tensor, label_idx, filename))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        tensor, labels, filename = self.data[index]
        return tensor, labels, filename

class CombinedDataset(ConcatDataset):
    def __init__(self, dataset1, dataset2):
        super().__init__([dataset1, dataset2])

    def __getitem__(self, index):
        return super().__getitem__(index)

    def __len__(self):
        return super().__len__()

# 找尋最新的訓練結果
def find_train_result_path(result_path):
    file_names = os.listdir(result_path)
    train_result_path = None
    num_max = 0
    for file_name in file_names:
        if file_name.startswith("result_"):
            num_str = file_name.split("_")[1]
            num = int(num_str)
            if num > num_max:
                num_max = num

    result_path = os.path.join(result_path,"result_{}".format(num_max))
    train_result_path = os.path.join(result_path,"train_{}".format(num_max))
    return train_result_path


# 建立資料夾顯示訓練結果
def mkdir_outcome(result_path):
    file_names = os.listdir(result_path)
    num_max = 0
    for file_name in file_names:
        if file_name.startswith("result_"):
            num_str = file_name.split("_")[1]
            num = int(num_str)
            if(num > num_max):
                num_max = num
    # make folder for train result
    result_path = os.path.join(result_path,"result_{}".format(num_max))
    result_path_test = os.path.join(result_path,"test_{}".format(num_max))
    os.makedirs(result_path,exist_ok=True)
    os.makedirs(result_path_test,exist_ok=True)
    return result_path_test


# 模型評估指標
def test_index(conf_matrix):
    # Confusion Matrix to calculate [accuracy,precision,recall]
    precision = 0.0
    recall = 0.0
    f1_score = 0.0
    if((conf_matrix[0][0] + conf_matrix[0][1]) != 0):
        precision = conf_matrix[0][0] / (conf_matrix[0][0] + conf_matrix[0][1])
    if((conf_matrix[0][0] + conf_matrix[1][0]) != 0):
        recall = conf_matrix[0][0] / (conf_matrix[0][0] + conf_matrix[1][0])
    if((precision + recall) != 0):
        f1_score = 2*precision*recall / (precision + recall)
    TPR = recall
    FPR = conf_matrix[0][1] / (conf_matrix[0][1] + conf_matrix[1][1])
    print("\t      Precision: {:<.4f}  -  Recall: {:<.4f}  -  F1 Score: {:<.4f}".format(precision,recall,f1_score))
    return precision,recall,f1_score,TPR,FPR


# 混淆矩陣
def Confusion_Matrix(result_path,conf_matrix,fold):
    # Create the 'confusion_matrix_record' directory if it doesn't exist
    confusion_matrix_record_dir = os.path.join(result_path, 'confusion_matrix_record')
    plt.clf()
    if not os.path.exists(confusion_matrix_record_dir):
        os.makedirs(confusion_matrix_record_dir)

    confusion_matrix = np.array([[conf_matrix[0][0], conf_matrix[0][1]], [conf_matrix[1][0], conf_matrix[1][1]]])
    print("Confusion matrix:")
    print(conf_matrix)
    plt.imshow(confusion_matrix, cmap=plt.cm.Blues, interpolation='nearest')
    plt.colorbar()

    # confusion matrix index 各個 index 的數值
    for i in range(2):
        for j in range(2):
            text_color = 'black' if confusion_matrix[i][j] < 0.5 * confusion_matrix.max() else 'white'
            plt.annotate(str(confusion_matrix[i][j]), xy=(j, i), ha='center', va='center', color=text_color)
    tick_marks = np.arange(2)
    plt.xticks(tick_marks, ['Positive', 'Negative'])
    plt.yticks(tick_marks, ['Positive', 'Negative'])
    plt.ylabel('Predicted Label')
    plt.xlabel('True Label')
    plt.title('Confusion Matrix')
    result_path = result_path + '/confusion_matrix_record'+f'{fold}'
    plt.savefig(os.path.join(result_path,'test_confusion_matrix_'+f'{fold}'+'.png'))


# ROC曲線
def ROC_Curve(result_path,tpr_list,fpr_list,fold):
    # Create the 'roc_curve_record' directory if it doesn't exist
    roc_curve_record_dir = os.path.join(result_path, 'roc_curve_record')
    if not os.path.exists(roc_curve_record_dir):
        os.makedirs(roc_curve_record_dir)

    # 計算 AUC
    roc_auc = np.trapz(tpr_list, fpr_list)

    # 繪製 ROC 曲線
    plt.clf()
    plt.plot(fpr_list, tpr_list, lw=1, label='ROC (AUC = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], '--', color='gray', label='Random Guessing')
    plt.xlim([-0.05, 1.05])
    plt.ylim([-0.05, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend()
    result_path = result_path + '/roc_curve_record'
    plt.savefig(os.path.join(result_path,'test_Roc_curve_'+f'{fold}'+'.png'))


# 輸出每一次 epoch 的結果
def CSV_Output(test_result_path,param,num_epochs,test_loss_list,test_acc_list,precision_list,recall_list,TPR_list,FPR_list,f1_score_list,final_wrong_predict,fold):
    # Create the 'csv_record' directory if it doesn't exist
    csv_record_dir = os.path.join(test_result_path, 'csv_record')
    if not os.path.exists(csv_record_dir):
        os.makedirs(csv_record_dir)
    with open(test_result_path + '/csv_record'+f'{fold}'+'/test_epoch.csv','w',newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['num_epochs','batch_size','learning_rate','num_classes','device','start_time','end_time'])
        writer.writerow(param)
        writer.writerow('')
        writer.writerow(['Epoch','test_loss','test_acc','precision','recall','TPR','FPR','F1 score'])
        for epoch in range(num_epochs):
            writer.writerow([epoch + 1,
                            round(test_loss_list[epoch], 4),
                            round(test_acc_list[epoch].item(), 4),
                            round(precision_list[epoch], 4),
                            round(recall_list[epoch], 4),
                            round(TPR_list[epoch], 4),
                            round(FPR_list[epoch], 4),
                            round(f1_score_list[epoch], 4)])
        writer.writerow([f'wrong predict:',final_wrong_predict])
    print('CSV output Sucessfully')

def test(test_normal_tensor_path,test_abnormal_tensor_path,result_path,fold):
    #超參數設定
    batch_size = 1
    learning_rate = 0.001
    num_epochs = 5
    num_classes = 2
    start_time = datetime.now()
    end_time = 0
    conf_matrix = np.zeros((num_classes, num_classes), dtype=np.int32)

    # Get the dataset
    test_normal_dataset = MyDataset(test_normal_tensor_path)
    test_abnormal_dataset = MyDataset(test_abnormal_tensor_path)

    # Combine the datasets
    test_dataset = CombinedDataset(test_normal_dataset,test_abnormal_dataset)

    #印出資料集大小
    print("dataset's size : " + str(len(test_dataset)))

    #創建模型
    model = models.resnet152(pretrained=False)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    #print(model)
    #print(model.fc)

    #將模型移動到GPU上進行運算
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.fc.to(device)
    print("Device used : " + str(device))

    #定義損失函數和優化器
    m = nn.Sigmoid()
    criterion = nn.BCELoss()
    # criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

    test_loss_list = []
    test_acc_list = []
    recall_list = []
    precision_list = []
    f1_score_list = []
    TPR_list = []
    FPR_list = []
    wrong_predict = []
    final_wrong_predict = []

    # make dir to save the training outcome
    test_result_path = mkdir_outcome(result_path)
    print('test_result_path : ' + str(test_result_path))

    train_result_path = find_train_result_path(result_path)
    print('train_result_path : ' + str(train_result_path))

    for folds in range(1,fold + 1):

        print("train model:"+"train_fold_"+f'{folds}'+".pt")

        # 載入訓練好的模型參數
        model.load_state_dict(torch.load(os.path.join(train_result_path, "train_fold_"+f'{folds}'+".pt")))
        test_loader = DataLoader(test_dataset, batch_size, shuffle=True, num_workers=4)

        #測試模型
        for epoch in range(num_epochs):
            test_loss = 0
            test_corrects = 0
            test_acc = 0
            model.load_state_dict(torch.load(os.path.join(train_result_path, "train_fold_"+f'{folds}'+".pt")))
            model.eval()

            #初始化
            conf_matrix = np.zeros((num_classes, num_classes), dtype=np.int32)

            print("[Test Progress]: ")
            for inputs, labels, filename in tqdm(test_loader):
                targets=torch.eye(2)[labels.long(), :]
                inputs = inputs.to(device)
                labels = labels.to(device)
                targets = targets.to(device)
                with torch.set_grad_enabled(False):
                    outputs = model(inputs.to(device))
                    loss = criterion(m(outputs),targets.float())
                test_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                test_corrects += torch.sum(preds == labels.data)

                # Count Confusion Matrix
                for t, p in zip(preds.view(-1), labels.view(-1)):
                    conf_matrix[t.long(), p.long()] += 1
                    if t != p:
                      wrong_predict.append(filename)
            test_loss =  test_loss / len(test_loader.dataset)
            test_acc =  test_corrects.double() / len(test_loader.dataset)
            test_loss_list.append(test_loss)

            scheduler.step()
            end_time = datetime.now()
            print('\nEpoch: [{}/{}]  test_loss: {:<.4f}  -  test_accuracy: {:<.4f}  -  test_correct: {:<10}'.format(
                epoch+1, num_epochs, test_loss, test_acc, test_corrects))
            print('wrong predicts : ' + str(wrong_predict))


            # teat index (評估指標)
            precision,recall,f1_score,TPR,FPR = test_index(conf_matrix)

            # record the outcomes
            test_acc_list.append(test_acc),precision_list.append(precision),recall_list.append(recall)
            f1_score_list.append(f1_score),TPR_list.append(TPR),FPR_list.append(FPR)

            if epoch + 1 == num_epochs:
              final_wrong_predict.append(wrong_predict)
            else:
              wrong_predict.clear()

        # function of confusion matrix param(folder path, matrix, test normal dataset length, test unnormal dataset length)
        Confusion_Matrix(test_result_path,conf_matrix,folds)
        # functio to show ROC curve
        ROC_Curve(test_result_path,TPR_list,FPR_list,folds)
        # CSV visualization
        param = [num_epochs,batch_size,learning_rate,num_classes,device,start_time,end_time]
        CSV_Output(test_result_path,param,num_epochs,test_loss_list,test_acc_list,precision_list,recall_list,TPR_list,FPR_list,f1_score_list,final_wrong_predict,folds)

        confusion_matrix = np.array([[conf_matrix[0][0], conf_matrix[0][1]], [conf_matrix[1][0], conf_matrix[1][1]]])
        print("Confusion matrix:")
        print(conf_matrix)
        plt.clf()
        plt.imshow(confusion_matrix, cmap=plt.cm.Blues, interpolation='nearest')
        plt.colorbar()
        tick_marks = np.arange(2)
        plt.xticks(tick_marks, ['Positive', 'Negative'])
        plt.yticks(tick_marks, ['Positive', 'Negative'])
        plt.ylabel('Predicted Label')
        plt.xlabel('True Label')
        plt.title('Confusion Matrix({})'.format(len(test_dataset)))
        plt.show()

        torch.cuda.empty_cache()


test_normal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/test/normal'
test_abnormal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/test/abnormal'
result_path = '/content/drive/My Drive/Deep_X_torch/result'

test(test_normal_tensor_path,test_abnormal_tensor_path,result_path,5)



#測試模型(SEResNet)

In [None]:
!pip install timm

In [None]:
import torch
import torch.nn as nn
import os
import numpy as np
import torch.optim as optim
import torchvision.models as models
import matplotlib.pyplot as plt
import csv
import timm
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import ConcatDataset
from sklearn import datasets
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
from datetime import datetime

class MyDataset(Dataset):
    def __init__(self, data_path):
        self.data_path = data_path
        self.class_to_idx = {'abnormal': 1, 'normal': 0}  # 定義類別名稱到類別索引的映射
        self.data = []
        self.filenames = []  # store filenames
        for filename in os.listdir(data_path):
            if filename.endswith('.pt'):
                tensor = torch.load(os.path.join(data_path, filename))
                if filename.split('_')[0] == 'normal':
                    label_idx = 0
                else:
                    label_idx = 1
                self.data.append((tensor, label_idx, filename))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        tensor, labels, filename = self.data[index]
        return tensor, labels, filename

class CombinedDataset(ConcatDataset):
    def __init__(self, dataset1, dataset2):
        super().__init__([dataset1, dataset2])

    def __getitem__(self, index):
        return super().__getitem__(index)

    def __len__(self):
        return super().__len__()

# 找尋最新的訓練結果
def find_train_result_path(result_path):
    file_names = os.listdir(result_path)
    train_result_path = None
    num_max = 0
    for file_name in file_names:
        if file_name.startswith("result_"):
            num_str = file_name.split("_")[1]
            num = int(num_str)
            if num > num_max:
                num_max = num

    result_path = os.path.join(result_path,"result_{}".format(num_max))
    train_result_path = os.path.join(result_path,"train_{}".format(num_max))
    return train_result_path


# 建立資料夾顯示訓練結果
def mkdir_outcome(result_path):
    file_names = os.listdir(result_path)
    num_max = 0
    for file_name in file_names:
        if file_name.startswith("result_"):
            num_str = file_name.split("_")[1]
            num = int(num_str)
            if(num > num_max):
                num_max = num
    # make folder for train result
    result_path = os.path.join(result_path,"result_{}".format(num_max))
    result_path_test = os.path.join(result_path,"test_{}".format(num_max))
    os.makedirs(result_path,exist_ok=True)
    os.makedirs(result_path_test,exist_ok=True)
    return result_path_test


# 模型評估指標
def test_index(conf_matrix):
    # Confusion Matrix to calculate [accuracy,precision,recall]
    precision = 0.0
    recall = 0.0
    f1_score = 0.0
    if((conf_matrix[0][0] + conf_matrix[0][1]) != 0):
        precision = conf_matrix[0][0] / (conf_matrix[0][0] + conf_matrix[0][1])
    if((conf_matrix[0][0] + conf_matrix[1][0]) != 0):
        recall = conf_matrix[0][0] / (conf_matrix[0][0] + conf_matrix[1][0])
    if((precision + recall) != 0):
        f1_score = 2*precision*recall / (precision + recall)
    TPR = recall
    FPR = conf_matrix[0][1] / (conf_matrix[0][1] + conf_matrix[1][1])
    print("\t      Precision: {:<.4f}  -  Recall: {:<.4f}  -  F1 Score: {:<.4f}".format(precision,recall,f1_score))
    return precision,recall,f1_score,TPR,FPR


# 混淆矩陣
def Confusion_Matrix(result_path,conf_matrix,folds):
    # Create the 'confusion_matrix_record' directory if it doesn't exist
    confusion_matrix_record_dir = os.path.join(result_path, 'confusion_matrix_record'+'_fold'+f'{folds}')
    plt.clf()
    if not os.path.exists(confusion_matrix_record_dir):
        os.makedirs(confusion_matrix_record_dir)

    confusion_matrix = np.array([[conf_matrix[0][0], conf_matrix[0][1]], [conf_matrix[1][0], conf_matrix[1][1]]])
    print("Confusion matrix:")
    print(conf_matrix)
    plt.imshow(confusion_matrix, cmap=plt.cm.Blues, interpolation='nearest')
    plt.colorbar()

    # confusion matrix index 各個 index 的數值
    for i in range(2):
        for j in range(2):
            text_color = 'black' if confusion_matrix[i][j] < 0.5 * confusion_matrix.max() else 'white'
            plt.annotate(str(confusion_matrix[i][j]), xy=(j, i), ha='center', va='center', color=text_color)
    tick_marks = np.arange(2)
    plt.xticks(tick_marks, ['Positive', 'Negative'])
    plt.yticks(tick_marks, ['Positive', 'Negative'])
    plt.ylabel('Predicted Label')
    plt.xlabel('True Label')
    plt.title('Confusion Matrix')
    result_path = result_path + '/confusion_matrix_record'+'_fold'+f'{folds}'
    plt.savefig(os.path.join(result_path,'test_confusion_matrix.png'))


# ROC曲線
def ROC_Curve(result_path,tpr_list,fpr_list,folds):
    # Create the 'roc_curve_record' directory if it doesn't exist
    roc_curve_record_dir = os.path.join(result_path, 'roc_curve_record'+'_fold'+f'{folds}')
    if not os.path.exists(roc_curve_record_dir):
        os.makedirs(roc_curve_record_dir)

    # 計算 AUC
    roc_auc = np.trapz(tpr_list, fpr_list)

    # 繪製 ROC 曲線
    plt.clf()
    plt.plot(fpr_list, tpr_list, lw=1, label='ROC (AUC = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], '--', color='gray', label='Random Guessing')
    plt.xlim([-0.05, 1.05])
    plt.ylim([-0.05, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend()
    result_path = result_path + '/roc_curve_record'+'_fold'+f'{folds}'
    plt.savefig(os.path.join(result_path,'test_Roc_curve.png'))


# 輸出每一次 epoch 的結果
def CSV_Output(test_result_path,param,num_epochs,test_loss_list,test_acc_list,precision_list,recall_list,TPR_list,FPR_list,f1_score_list,final_wrong_predict,folds):
    # Create the 'csv_record' directory if it doesn't exist
    csv_record_dir = os.path.join(test_result_path, 'csv_record'+'_fold'+f'{folds}')
    if not os.path.exists(csv_record_dir):
        os.makedirs(csv_record_dir)
    with open(csv_record_dir + '/test_epoch.csv','w',newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['num_epochs','batch_size','learning_rate','num_classes','device','start_time','end_time'])
        writer.writerow(param)
        writer.writerow('')
        writer.writerow(['Epoch','test_loss','test_acc','precision','recall','TPR','FPR','F1 score'])
        for epoch in range(num_epochs):
            writer.writerow([epoch + 1,
                            round(test_loss_list[epoch], 4),
                            round(test_acc_list[epoch].item(), 4),
                            round(precision_list[epoch], 4),
                            round(recall_list[epoch], 4),
                            round(TPR_list[epoch], 4),
                            round(FPR_list[epoch], 4),
                            round(f1_score_list[epoch], 4)])
        writer.writerow([f'wrong predict:',final_wrong_predict])
    print('CSV output Sucessfully')

def test(test_normal_tensor_path,test_abnormal_tensor_path,result_path,fold):
    #超參數設定
    batch_size = 1
    learning_rate = 0.001
    num_epochs = 5
    num_classes = 2
    start_time = datetime.now()
    end_time = 0
    conf_matrix = np.zeros((num_classes, num_classes), dtype=np.int32)

    # Get the dataset
    test_normal_dataset = MyDataset(test_normal_tensor_path)
    test_abnormal_dataset = MyDataset(test_abnormal_tensor_path)

    # Combine the datasets
    test_dataset = CombinedDataset(test_normal_dataset,test_abnormal_dataset)

    #印出資料集大小
    print("dataset's size : " + str(len(test_dataset)))

    #創建模型
    # model = models.resnet152(pretrained=False)
    model = timm.create_model('seresnet152d', pretrained=False)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    #print(model)
    #print(model.fc)

    #將模型移動到GPU上進行運算
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.fc.to(device)
    print("Device used : " + str(device))

    #定義損失函數和優化器
    m = nn.Sigmoid()
    criterion = nn.BCELoss()
    # criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

    test_loss_list = []
    test_acc_list = []
    recall_list = []
    precision_list = []
    f1_score_list = []
    TPR_list = []
    FPR_list = []
    wrong_predict = []
    final_wrong_predict = []

    # make dir to save the training outcome
    test_result_path = mkdir_outcome(result_path)
    print('test_result_path : ' + str(test_result_path))

    train_result_path = find_train_result_path(result_path)
    print('train_result_path : ' + str(train_result_path))

    for folds in range(1,fold + 1):

        print("train model:"+"train"+f'_fold_{folds}'+".pt")

        # 載入訓練好的模型參數
        model.load_state_dict(torch.load(os.path.join(train_result_path, "train"+f'_fold_{folds}'+".pt")))
        test_loader = DataLoader(test_dataset, batch_size, shuffle=True, num_workers=4)

        #測試模型
        for epoch in range(num_epochs):
            test_loss = 0
            test_corrects = 0
            test_acc = 0
            model.load_state_dict(torch.load(os.path.join(train_result_path, "train"+f'_fold_{folds}'+".pt")))
            model.eval()

            #初始化
            conf_matrix = np.zeros((num_classes, num_classes), dtype=np.int32)

            print("[Test Progress]: ")
            for inputs, labels, filename in tqdm(test_loader):
                targets=torch.eye(2)[labels.long(), :]
                inputs = inputs.to(device)
                labels = labels.to(device)
                targets = targets.to(device)
                with torch.set_grad_enabled(False):
                    outputs = model(inputs.to(device))
                    loss = criterion(m(outputs),targets.float())
                test_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                test_corrects += torch.sum(preds == labels.data)

                # Count Confusion Matrix
                for t, p in zip(preds.view(-1), labels.view(-1)):
                    conf_matrix[t.long(), p.long()] += 1
                    if t != p:
                      wrong_predict.append(filename)
            test_loss =  test_loss / len(test_loader.dataset)
            test_acc =  test_corrects.double() / len(test_loader.dataset)
            test_loss_list.append(test_loss)

            scheduler.step()
            end_time = datetime.now()
            print('\nEpoch: [{}/{}]  test_loss: {:<.4f}  -  test_accuracy: {:<.4f}  -  test_correct: {:<10}'.format(
                epoch+1, num_epochs, test_loss, test_acc, test_corrects))
            print('wrong predicts : ' + str(wrong_predict))


            # teat index (評估指標)
            precision,recall,f1_score,TPR,FPR = test_index(conf_matrix)

            # record the outcomes
            test_acc_list.append(test_acc),precision_list.append(precision),recall_list.append(recall)
            f1_score_list.append(f1_score),TPR_list.append(TPR),FPR_list.append(FPR)

            if epoch + 1 == num_epochs:
              final_wrong_predict.append(wrong_predict)
            else:
              wrong_predict.clear()

        # function of confusion matrix param(folder path, matrix, test normal dataset length, test unnormal dataset length)
        Confusion_Matrix(test_result_path,conf_matrix,folds)
        # functio to show ROC curve
        ROC_Curve(test_result_path,TPR_list,FPR_list,folds)
        # CSV visualization
        param = [num_epochs,batch_size,learning_rate,num_classes,device,start_time,end_time]
        CSV_Output(test_result_path,param,num_epochs,test_loss_list,test_acc_list,precision_list,recall_list,TPR_list,FPR_list,f1_score_list,final_wrong_predict,folds)

        confusion_matrix = np.array([[conf_matrix[0][0], conf_matrix[0][1]], [conf_matrix[1][0], conf_matrix[1][1]]])
        print("Confusion matrix:")
        print(conf_matrix)
        plt.clf()
        plt.imshow(confusion_matrix, cmap=plt.cm.Blues, interpolation='nearest')
        plt.colorbar()
        tick_marks = np.arange(2)
        plt.xticks(tick_marks, ['Positive', 'Negative'])
        plt.yticks(tick_marks, ['Positive', 'Negative'])
        plt.ylabel('Predicted Label')
        plt.xlabel('True Label')
        plt.title('Confusion Matrix({})'.format(len(test_dataset)))
        plt.show()

        torch.cuda.empty_cache()


test_normal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/test/normal'
test_abnormal_tensor_path = '/content/drive/My Drive/Deep_X_torch/tensor/test/abnormal'
result_path = '/content/drive/My Drive/Deep_X_torch/result'

test(test_normal_tensor_path,test_abnormal_tensor_path,result_path,5)



#清除資料

In [None]:
import os
import torch

def delete_file_under_path(path):
    for root, directories, files in os.walk(path):
        for file in files:
            os.remove(os.path.join(root, file))

def delete_folder_under_path(path):
    for root, directories, files in os.walk(path, topdown=False):
        for directory in directories:
            folder_path = os.path.join(root, directory)
            os.rmdir(folder_path)

normal_jpg_path = '/content/drive/My Drive/Deep_X_torch/original_dataset/normal/normal(.jpg)'
normal_label_path = '/content/drive/My Drive/Deep_X_torch/original_dataset/normal/normal_label'
abnormal_jpg_path = '/content/drive/My Drive/Deep_X_torch/original_dataset/abnormal/abnormal(.jpg)'
abnormal_label_path = '/content/drive/My Drive/Deep_X_torch/original_dataset/abnormal/abnormal_label'
processed_dir = '/content/drive/My Drive/Deep_X_torch/processed_dataset'
splitted_dir = '/content/drive/My Drive/Deep_X_torch/splitted_dataset'
tesor_path = '/content/drive/My Drive/Deep_X_torch/tensor'
result_path = '/content/drive/My Drive/Deep_X_torch/result/'
all_data_dir = '/content/drive/My Drive/Deep_X_torch/all_data'

torch.cuda.empty_cache()

# delete_file_under_path(normal_jpg_path)
# delete_file_under_path(abnormal_jpg_path)
# delete_file_under_path(normal_label_path)
# delete_file_under_path(abnormal_label_path)
# delete_file_under_path(processed_dir)
# delete_file_under_path(splitted_dir)
# delete_file_under_path(all_data_dir)
# delete_file_under_path(tesor_path)

# delete_file_under_path(result_path)
# delete_folder_under_path(result_path)