<a href="https://colab.research.google.com/github/Guo-bot-1998/Appendicitis/blob/master/Appendicitis_colab_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# import

In [None]:
!pip install timm

In [None]:
import nibabel as nib
import numpy as np
import os
import pandas as pd
import nibabel as nib
import torch
import torch.nn as nn
import torch.optim as optim
import timm

In [None]:

def read_data(directory, shift=0, termi=10):
    """
    Traverse directories starting from 'directory', find .nil files,
    and read them into NumPy arrays.
    """
    data_dict = {}
    k = 0
    ncut = 0
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.nii'):
                if (k<shift)       :
                  k += 1
                  continue
                file_path = os.path.join(root, file)
                nii_file =  nib.load(file_path)
                data = nii_file.get_fdata()
                data_dict[file.strip('.nii')] = data
                ncut +=  1
                if ncut == termi:
                  return data_dict
    return data_dict

def read_label(excel_path):
  with open(excel_path, 'r') as f:
    df = pd.read_csv(f)
    return df


def isgpu():
    """檢查是否有 CUDA 支持的 GPU"""
    if torch.cuda.is_available():
        device = torch.device("cuda")
        print("GPU is available")
    else:
        device = torch.device("cpu")
        raise("GPU not available")
    return device

In [None]:
device = isgpu()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# 路徑


In [None]:
cd /content/drive/MyDrive/AOCR2024

In [None]:
!pwd

# 處理資料

In [None]:
data = read_data("TrainValid_Image/train_data")
df = read_label("TrainValid_ground_truth.csv")
df.set_index('id', inplace=True)
# df
processed = []
for key, value in data.items():
  scan  = df.loc[df.index.str.startswith(key+'_')]
  labels = np.array(scan['label'])
  processed.append((value, labels))
len(processed)

In [22]:
len(data.keys())

10

In [40]:
# 把每個scan第三個維度的資訊取出來,訓練完好分辨哪個cut屬於哪個scan(訓練不改變cut的順序)
nslices = [datatuple[0].shape[2] for datatuple in processed]
keys = data.keys()
assert(len(keys) == len(nslices))

In [41]:

# 轉換每個掃描中的圖像和標籤
images_list = []
labels_list = []

for value, label in processed:
    # value.shape 為 (512, 512, n)，label.shape 為 (n,)
    for i in range(value.shape[2]):
        image = value[:, :, i]
        image_tensor = torch.from_numpy(image).float().unsqueeze(0)  # 添加通道維度
        label_tensor = torch.tensor(label[i], dtype=torch.float32)
        images_list.append(image_tensor)
        labels_list.append(label_tensor)

# 合併成批次數據
images = torch.stack(images_list)
labels = torch.stack(labels_list)


# 訓練 (Unet with only encoder)

In [45]:
class FC(nn.Module):
    def __init__(self, in_channels):
        super(FC, self).__init__()

        # Encoder部分
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.fc = nn.Linear(128 * 128 * 256, 1)  # x 和 y 是經過 encoder 處理後的特徵圖尺寸


    def forward(self, x):
        x = self.encoder(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        x = torch.sigmoid(x)
        return x

## 訓練 (EfficiencyNetV2_m)

In [None]:
model_name = "tf_efficientnetv2_m"
pretrained_model = timm.create_model(model_name, pretrained=True)

# 修改輸入通道
pretrained_model.conv_stem = nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1, bias=False)

# 修改輸出類別
num_classes = 2
pretrained_model.classifier = nn.Linear(pretrained_model.classifier.in_features, num_classes)

# 添加 Sigmoid 激活函數
pretrained_model = nn.Sequential(
    pretrained_model,
    nn.Sigmoid()
)

# 檢查模型結構
# print(pretrained_model)

In [None]:
num_epochs = 3
batch_size = 16
lr = 0.001
num_batches = len(images) // batch_size
# 初始化模型、損失函數和優化器
model = FC(in_channels=1).to(device)
# model = pretrained_model

if images.dtype != model.encoder[0].weight.dtype:
    images = images.type(model.encoder[0].weight.dtype)

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr)


# 訓練循環
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for i in range(num_batches):
        batch_images = images[i*batch_size:(i+1)*batch_size].to(device)
        batch_labels = labels[i*batch_size:(i+1)*batch_size].to(device)

        optimizer.zero_grad()
        outputs = model(batch_images)
        outputs = outputs.squeeze()
        loss = criterion(outputs, batch_labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {running_loss/num_batches}")


# 儲存模型參數



In [None]:
cd /content/drive/MyDrive/AOCR2024

In [None]:
import json
appendix = input("請輸入要儲存的編號:")
modelname = model.__class__.__name__
filename = f"params/{modelname}/{modelname}{appendix}"
if os.path.isfile(filename+'.pth'):
    print(f"{filename}.pth exist.")
else:
    torch.save(model.state_dict(), f'{filename}.pth')

params = {
    'num_epochs': num_epochs,
    'batch_size': batch_size,
    'learning_rate': lr
}

if os.path.isfile(filename+'.json'):
    print(f"{filename}.json exist.")
else:
    with open(f'{filename}.json', 'w') as f:
        json.dump(params, f)

# 讀取模型參數

In [52]:
import json
remove_digits = str.maketrans('', '', '0123456789')
modelname = input("請輸入要獲取模型檔名:")
filename = f"params/{modelname.translate(remove_digits)}/{modelname}"

if 'model' not in globals() or model.__class__.__name__ != modelname.translate(remove_digits):
  exec(f'model = {modelname.translate(remove_digits)}(in_channels=1).to(device)')

if not os.path.isfile(filename+'.pth'):
    print(f"{filename}.pth not exist.")
else:
    print(model.load_state_dict(torch.load(filename+'.pth')))
with open(f'{filename}.json', 'r') as f:
    params = json.load(f)

請輸入要獲取模型檔名:FC1
<All keys matched successfully>


# 評估


In [53]:

# 評估設置
num_epochs = params['num_epochs']
batch_size = params['batch_size']
num_batches = len(images) / batch_size

In [None]:
model.eval()
predict_list = []
with torch.no_grad():  # 不更新梯度
    correct = 0
    total = 0
    for i in range(num_batches):
        batch_images = images[i*batch_size:(i+1)*batch_size].to(device)
        batch_labels = labels[i*batch_size:(i+1)*batch_size].to(device)

        outputs = model(batch_images)
        predicted = (outputs.squeeze() > 0.5).int()
        predict_list.append([predicted.cpu()])


predict_list

# 輸出至submission.csv

In [76]:
submissions = pd.Series(predict_list)
for i, key in enumerate(keys):
  nslice = nslices[i]
  for appendix in range(-1,len(nslice))
    data = {key:predict_list}

0    ff
dtype: object

Measure Recall