<a href="https://colab.research.google.com/github/Haesong-0622/Medical_AI/blob/main/Untitled5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import necessary libraries
import random
import pandas as pd
import numpy as np
import os
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models
from tqdm import tqdm

from google.colab import drive
drive.mount('/content/drive')

# Set device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Seed everything
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

CFG = {
    'IMG_SIZE': 224,
    'EPOCHS': 5,
    'LEARNING_RATE': 3e-4,
    'BATCH_SIZE': 32,
    'SEED': 41,
    'MODEL_PATH': '/content/drive/MyDrive/open/best_model.pth'
}

seed_everything(CFG['SEED'])

# Load data
df = pd.read_csv('/content/drive/MyDrive/open/train.csv')
train_len = int(len(df) * 0.8)
train_df = df.iloc[:train_len]
val_df = df.iloc[train_len:]

train_label_vec = train_df.iloc[:, 2:].values.astype(np.float32)
val_label_vec = val_df.iloc[:, 2:].values.astype(np.float32)
CFG['label_size'] = train_label_vec.shape[1]

# Update paths
base_path = '/content/drive/MyDrive/open/train/'
train_df['path'] = base_path + train_df['path'].str.replace('./train/', '')
val_df['path'] = base_path + val_df['path'].str.replace('./train/', '')

class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list=None, transforms=None):
        self.img_path_list = img_path_list
        self.label_list = label_list  # 라벨이 없을 수 있음
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = self.img_path_list[index]
        image = cv2.imread(img_path)

        if image is None:
            print(f"Warning: Image not found at path: {img_path}")
            raise FileNotFoundError(f"Image not found at path: {img_path}")

        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.transforms:
            image = self.transforms(image=image)['image']

        if self.label_list is not None:
            return image, self.label_list[index]
        else:
            return image  # 테스트 데이터셋에서는 라벨 없이 이미지만 반환

    def __len__(self):
        return len(self.img_path_list)

# Data Augmentation
train_transform = A.Compose([
    A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, p=1.0),
    ToTensorV2()
])

test_transform = A.Compose([
    A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, p=1.0),
    ToTensorV2()
])

# Dataset and DataLoader
train_dataset = CustomDataset(train_df['path'].values, train_label_vec, train_transform)
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True)

val_dataset = CustomDataset(val_df['path'].values, val_label_vec, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

# Model Definition
class BaseModel(nn.Module):
    def __init__(self, gene_size=CFG['label_size']):
        super(BaseModel, self).__init__()
        self.backbone = models.densenet121(pretrained=True)
        self.regressor = nn.Linear(self.backbone.classifier.in_features, gene_size)
        self.backbone.classifier = nn.Identity()

    def forward(self, x):
        x = self.backbone(x)
        x = self.regressor(x)
        return x

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_df['path'] = base_path + train_df['path'].str.replace('./train/', '')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  val_df['path'] = base_path + val_df['path'].str.replace('./train/', '')


In [None]:
# Save and Load Model Functions
def save_model(model, optimizer, scheduler, epoch, path=CFG['MODEL_PATH']):
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        'epoch': epoch
    }, path)

def load_model(model, optimizer, scheduler, path=CFG['MODEL_PATH']):
    if os.path.isfile(path):
        checkpoint = torch.load(path, map_location=device)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
        start_epoch = checkpoint['epoch'] + 1
        print(f"Resuming from epoch {start_epoch}")
        return model, optimizer, scheduler, start_epoch
    else:
        print("No saved model found. Starting from scratch.")
        return model, optimizer, scheduler, 0

In [None]:
# Initialize model, optimizer, and scheduler
model = BaseModel()
optimizer = optim.AdamW(model.parameters(), lr=CFG['LEARNING_RATE'])
scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=1)

# Load model if exists, otherwise start from scratch
model, optimizer, scheduler, start_epoch = load_model(model, optimizer, scheduler)

  checkpoint = torch.load(path, map_location=device)


Resuming from epoch 5


In [None]:
# Training and Validation Functions
def train(model, optimizer, train_loader, val_loader, scheduler, device, start_epoch=0):
    model.to(device)
    criterion = nn.MSELoss().to(device)
    best_loss = float('inf')

    for epoch in range(start_epoch, CFG['EPOCHS']):
        model.train()
        train_loss = []
        for imgs, labels in tqdm(train_loader):
            imgs, labels = imgs.to(device), labels.to(device)

            optimizer.zero_grad()
            output = model(imgs)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())

        val_loss = validate(model, val_loader, criterion, device)
        print(f'Epoch {epoch+1}, Train Loss: {np.mean(train_loss):.5f}, Val Loss: {val_loss:.5f}')
        scheduler.step()

        save_model(model, optimizer, scheduler, epoch)

def validate(model, val_loader, criterion, device):
    model.eval()
    val_loss = []
    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            output = model(imgs)
            loss = criterion(output, labels)
            val_loss.append(loss.item())
    return np.mean(val_loss)

# Train the model
train(model, optimizer, train_loader, val_loader, scheduler, device, start_epoch)

In [None]:
# Inference Function
def inference(model, test_loader, device):
    model.eval()
    preds = []
    with torch.no_grad():
        for imgs in tqdm(test_loader):
            imgs = imgs.to(device).float()
            pred = model(imgs)
            preds.append(pred.detach().cpu())
    return torch.cat(preds).numpy()

In [None]:
# Initialize Test Dataset and DataLoader
test_dataset = CustomDataset(test['path'].values, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

# Load the trained model from the checkpoint
infer_model, _, _, _ = load_model(BaseModel(), optimizer, scheduler)

# Perform Inference
preds = inference(infer_model, test_loader, device)

# Create Submission File
submit = pd.read_csv('/content/drive/MyDrive/open/sample_submission.csv')
submit.iloc[:, 1:] = preds.astype(np.float32)
submit.to_csv('/content/drive/MyDrive/open/chan_submit.csv', index=False)

print("Inference 완료 및 제출 파일 생성됨: chan_submit.csv")

  checkpoint = torch.load(path, map_location=device)


Resuming from epoch 5


100%|██████████| 72/72 [23:18<00:00, 19.42s/it]


Inference 완료 및 제출 파일 생성됨: chan_submit.csv


In [None]:
# Load Test Data and Perform Inference
test = pd.read_csv('/content/drive/MyDrive/open/test.csv')
test['path'] = test['path'].str.replace('./test/', '/content/drive/MyDrive/open/test/')

test_dataset = CustomDataset(test['path'].values, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

infer_model, _, _, _ = load_model(BaseModel(), optimizer, scheduler)
preds = inference(infer_model, test_loader, device)

# Create Submission File
submit = pd.read_csv('/content/drive/MyDrive/open/sample_submission.csv')
submit.iloc[:, 1:] = preds.astype(np.float32)
submit.to_csv('/content/drive/MyDrive/open/chan_submit.csv', index=False)

print("Inference 완료 및 제출 파일 생성됨: chan_submit.csv")

  checkpoint = torch.load(path, map_location=device)


Resuming from epoch 5


  0%|          | 0/72 [00:07<?, ?it/s]


TypeError: 'NoneType' object is not subscriptable