In [32]:
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import torchvision.models as models
from torch import optim
import torch.nn as nn
import torch.nn.functional as F
import json
import numpy as np
import os
import cv2
from tqdm import tqdm
import random
from matplotlib import pyplot as plt
import pandas as pd
from collections import namedtuple
from sklearn.metrics import accuracy_score
from copy import deepcopy

import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

In [33]:
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

### SEED

In [34]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

# Dataset

In [56]:
import cv2
from torch.utils.data import Dataset
from PIL import Image

class CustomDataset(Dataset):
    def __init__(self, imgs_path: list, labels_path: list=None, transform = None, mode='train'):
        self.imgs_path = imgs_path
        self.labels_path = labels_path
        self.transform = transform
        self.mode = mode

    def __len__(self):
        return len(self.imgs_path)
        
    def __getitem__(self, idx):
        img_path = self.imgs_path[idx]
        try:
            img = np.asarray(Image.open(img_path).convert('RGB'))  # OpenCV 대신 Pillow 사용
            if self.transform:
                img = self.transform(image = img)['image']
        except Exception as e:
            print(f"Error loading image at index {idx}: {e}, {img_path}")
            return

        if self.mode == 'train' or self.mode == 'valid':
            label_path = self.labels_path[idx]
            with open(label_path, 'r') as file:
                lines = file.readlines()
            labels = [float(x) for x in lines[0].strip().split()]
            lab = labels[0]
            return img, torch.tensor(lab)
            
        else: # test일 때
            return img

# Train

### Data load

In [57]:
BASE = '/mnt/d/Jupyter-Goodyoung/DiabetesSolution-AI'
SAVE_PATH = f"{BASE}/save"
MODEL_SAVE = f'{SAVE_PATH}/EfficientNetB0-3.pth' #
WORKERS = 2
EPOCHS = 50 # 훈련 epoch 지정
BATCH_SIZE = 8# batch size 지정
# IMAGE_SIZE = (256, 256) # 이미지 크기 지정
NUM_CLASSES = 722
RANDOM_STATE = 42 # seed 고정
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

if not os.path.exists(SAVE_PATH):
    os.makedirs(SAVE_PATH)

seed_everything(RANDOM_STATE) # SEED 고정

In [58]:
# 파일에서 문자열을 읽어와 리스트에 저장하는 함수
def read_strings_from_file(filename):
    strings = []
    with open(filename, 'r') as f:
        if filename == "TL_files.txt" or filename == "VL_files.txt" :
            for line in f:
                line = line.strip()
                line = line[:-4] +"txt"
                # line = line[:-4] +"json"
                strings.append(line)  # 줄바꿈 문자 제거            
        else:    
            for line in f:
                strings.append(line.strip())  # 줄바꿈 문자 제거
    return strings
TL_files = read_strings_from_file('TL_files.txt')
TS_files = read_strings_from_file('TS_files.txt')
VL_files = read_strings_from_file('VL_files.txt')
VS_files = read_strings_from_file('VS_files.txt')

# train : val = 8 : 2 나누기
combined_imgs = TS_files + VS_files
combined_label = TL_files + VL_files

divide_num =1000
if divide_num:
    real_imgs, real_label = combined_imgs[:divide_num], combined_label[:divide_num]
else:
    real_imgs, real_label = combined_imgs, combined_label
    
x_tr, x_val = train_test_split(real_imgs, test_size=0.2, random_state=RANDOM_STATE)
y_tr, y_val = train_test_split(real_label, test_size=0.2, random_state=RANDOM_STATE) # 둘 다 같은 비율로 나뉘어 진다

transform = A.Compose([
        A.Resize(480,480),
        # A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
        A.Normalize(0.5),
        ToTensorV2()
])
train_dataset = CustomDataset(x_tr, y_tr, transform = transform, mode = 'train')
valid_dataset = CustomDataset(x_val, y_val, transform = transform, mode = 'valid')

train_dataloader = DataLoader(
    dataset=train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=WORKERS,
    pin_memory=True
)
val_dataloader = DataLoader(
    dataset=valid_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=WORKERS,
    pin_memory=True
        )

### Model train

#### model selection

In [59]:
# torch init cache
import torch, gc
gc.collect()
torch.cuda.empty_cache()

In [60]:
import timm
model = timm.create_model('efficientnet_b0', pretrained=True)
model.classifier = nn.Linear(1280,NUM_CLASSES)

In [61]:
optimizer = optim.AdamW(model.parameters(), lr=0.001)#(params=model.parameters(), lr=0.0001)
loss_fn = nn.CrossEntropyLoss().to(DEVICE)
lr_scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5, eta_min=0.000001)

In [None]:
from collections import defaultdict

if os.path.exists(MODEL_SAVE): # Model Save point가 있을 경우
    checkpoint = torch.load(MODEL_SAVE)
    
    model.load_state_dict(checkpoint["model_state_dict"])
    model.to(DEVICE) # optimizer 전에 선언
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    
    START_EPOCH, best_val_loss= checkpoint["start_epoch"], checkpoint["best_val_loss"]
else:
    checkpoint = defaultdict()
    START_EPOCH, best_val_acc = 0, 0 # 가장 큰 수
    model.to(DEVICE) # optimizer 전에 선언
    
early_stop = 0

for epoch in range(START_EPOCH+1, EPOCHS+1):
    print('Epoch {}/{}'.format(epoch, EPOCHS))
    print('-'*20)
    #### train ####
    model.train()
    train_losses = []
    for imgs, label in tqdm(train_dataloader):
        imgs = imgs.to(DEVICE)
        label = label.to(DEVICE)
        # 초기화
        optimizer.zero_grad() 
        # 예측
        output = model(imgs) 
        # 순전파
        one_hot_label = torch.eye(NUM_CLASSES, device=label.device)[label.long()] # long -> int 같은 형식
        loss = loss_fn(output, one_hot_label).to(DEVICE) 
        # 역전파
        loss.backward() 
        # 학습
        optimizer.step() 
        train_losses.append(loss.item())
        
    #### valid ####
    model.eval()
    val_accs, val_losses = [], []
    with torch.no_grad():
        for imgs, label in tqdm(val_dataloader):
            imgs = imgs.to(DEVICE)
            label = label.to(DEVICE)
            # 예측
            output = model(imgs)
            # 순전파
            one_hot_label = torch.eye(NUM_CLASSES, device=label.device)[label.long()] # long -> int 같은 형식
            loss = loss_fn(output, one_hot_label).to(DEVICE)
            
            probabilities = F.softmax(output, dim=1)
            predicted_classes = torch.argmax(probabilities, dim=1)
            predicted_classes = predicted_classes.cpu().detach().numpy()
            label = label.cpu().detach().numpy()
            batch_acc = accuracy_score(label, predicted_classes)
            
            val_accs.append(batch_acc)
            val_losses.append(loss.item())

    
    train_loss, val_loss, val_acc = np.mean(train_losses), np.mean(val_losses), np.mean(val_accs)
    print(f"EPOCH: {epoch}, TRAIN LOSS: {train_loss:.6f},  VAL LOSS: {val_loss:.6f}, VAL ACC: {val_acc:.6f}")
    
    if lr_scheduler is not None:
        lr_scheduler.step()
        
    if best_val_acc <= val_acc:
        print("Model Save")
        early_stop = 0
        best_val_acc = val_acc
        checkpoint["start_epoch"] = epoch
        checkpoint["best_val_acc"] = best_val_acc
        checkpoint["model_state_dict"] = model.state_dict()
        checkpoint["optimizer_state_dict"] = optimizer.state_dict()
        
        torch.save(checkpoint, MODEL_SAVE) # checkpoint 저장
        
        file_path = f'{SAVE_PATH}/result_all.txt' # 결과 debugging용
        with open(file_path, 'a') as file:
            file.write(f"[BEST]: EPOCH: {epoch}, TRAIN LOSS: {train_loss:.6f},  VAL LOSS: {val_loss:.6f}, VAL ACC: {val_acc:.6f}\n")
    else:
        early_stop += 1

    # early stop
    if early_stop > 4:
        print("Early Stop")
        break

Epoch 1/50
--------------------


100%|█████████████████████████████████████████████████████████████████████████████████| 100/100 [01:01<00:00,  1.63it/s]
100%|███████████████████████████████████████████████████████████████████████████████████| 25/25 [00:15<00:00,  1.59it/s]


EPOCH: 1, TRAIN LOSS: 1.978417,  VAL LOSS: 0.693702, VAL ACC: 0.900000
Model Save
Epoch 2/50
--------------------


100%|█████████████████████████████████████████████████████████████████████████████████| 100/100 [00:34<00:00,  2.86it/s]
100%|███████████████████████████████████████████████████████████████████████████████████| 25/25 [00:08<00:00,  2.96it/s]


EPOCH: 2, TRAIN LOSS: 0.345961,  VAL LOSS: 0.215316, VAL ACC: 0.930000
Model Save
Epoch 3/50
--------------------


  7%|█████▊                                                                             | 7/100 [00:02<00:32,  2.87it/s]

In [31]:

probabilities = F.softmax(output, dim=1)
predicted_classes = torch.argmax(probabilities, dim=1)
output = output.cpu().detach().numpy()
label = label.cpu().detach().numpy()
accuracy_score(label, predicted_classes)

AttributeError: 'numpy.ndarray' object has no attribute 'softmax'

In [17]:
one_hot_label[0]

tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 

In [22]:
output.shape

torch.Size([8, 722])

In [29]:
accuracy_score(label, predicted_classes)

TypeError: can't convert cuda:0 device type tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first.

In [28]:
label

tensor([671., 177., 443., 246.,  97.,  51., 527.,  49.], device='cuda:0')

In [27]:
predicted_classes

tensor([671, 177, 443, 246,  97,  50, 527,  49], device='cuda:0')

# Inference

In [None]:
import timm
model = timm.create_model('efficientnet_b0', pretrained=True)
model.classifier = nn.Linear(1280,NUM_CLASSES)

In [63]:
# label data load
import json
with open(f"{BASE}/label_mapping_data.json") as f:
    data = json.load(f)
label_data = list(data.keys())
# transform
transform = A.Compose([
        A.Resize(480,480),
        A.Normalize(0.5,0.5),
        ToTensorV2()
])

# load test path
num= 23050
test_dataset = CustomDataset([combined_imgs[num]], transform = transform, mode = 'test')
test_dataloader = DataLoader(
    dataset=test_dataset,
    batch_size=1,
    shuffle=False
)

if os.path.exists(MODEL_SAVE):
    
    model.load_state_dict(torch.load(MODEL_SAVE)["model_state_dict"])
model.to(DEVICE)
model.eval()  # 모델을 평가 모드로 설정

with torch.no_grad():
    for imgs in test_dataloader:  # 테스트 데이터로더 사용
        imgs = imgs.to(DEVICE)
        # 예측
        output = model(imgs)
        
        # 확률값으로 변환
        probabilities = F.softmax(output, dim=1) # 확률로 변환 -> softmax
        # # 가장 높은 확률을 가진 클래스 선택
        predicted_classes = torch.argmax(probabilities, dim=1)
        # # 예측 결과 저장
        # predictions.extend(predicted_classes.cpu().numpy())

# 예측 결과 출력
print(combined_imgs[num])
print(predicted_classes.item())
print(label_data[predicted_classes.item()])

/mnt/d/Jupyter-Goodyoung/diabetes/data/Train/TS/TS10/D/08/D08001/14/측면/D_08_D08001_고구마라떼_14_02.jpg
529
오곡라떼


In [None]:
갈비치킨
치킨