### 변수 설정

In [None]:
image_wh = 224

### GPU 9번 설정

In [None]:
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "9" 

In [None]:
import torch

print(torch.cuda.is_available())

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

### 필요 라이브러리 설치

In [None]:
import os.path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

import copy
import sys, time

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.autograd import Variable
from torchvision.models import resnet50, ResNet50_Weights

import torchvision
from torchvision import datasets, transforms

from sklearn.model_selection import train_test_split

In [None]:
base_directory = "/home/j-j9s006"

# Data 불러오기

### DF 생성

###### K-FASHION

In [None]:
df = pd.read_csv(os.path.join(base_directory, "material-classification", "k-fashion-label-material.csv"), encoding='cp949')
df = df[['file', 'type_id']]

df.dropna(subset=['type_id'], axis=0, how='any' , inplace=True)
df['type_id'] = df['type_id'].astype(int)

df['Filepath'] = base_directory + "/datasets/k-fashion/"+ df['file']

df.head()

###### Deep Fashion In shop

In [None]:
add_df = pd.read_csv(os.path.join(base_directory,"material-classification", "deep-inshop-label-material.csv"), encoding='cp949')
add_df = add_df[['image_id', 'type_id']]

add_df.dropna(subset=['type_id'], axis=0, how='any' , inplace=True)
add_df['type_id'] = add_df['type_id'].astype(int)
add_df['Filepath'] = base_directory + "/datasets/deep-fashion/inshop/segment-results/"+ add_df['image_id']+".png"
add_df = add_df.rename(columns={'image_id': 'file'})
add_df.head()

In [None]:
df = pd.concat([df, add_df], ignore_index=True)

df.dropna(subset=['type_id'], axis=0, how='any' , inplace=True)


# type_id 를 str로 수정
df['type_id'] = df['type_id'].astype(str)

### Custom data 정제 완료

In [None]:
print(df['type_id'].unique())

In [None]:
# type id별로 몇개의 데이터가 있는지 확인
print(df['type_id'].value_counts())

### 이미지 데이터셋 총 개수 및 label 종류 수 확인

In [None]:
labels = df['type_id'].values.tolist()

label_list=[]
for i in labels:
    if i not in label_list:
        label_list.append(i)

print('The number of pictures:', df.shape[0])
print('The number of labels:', len(label_list))
print('Labels:', label_list)

### 이미지 데이터 확인하기

In [None]:
fig, axes = plt.subplots(nrows=10, ncols=5, figsize=(8, 15),
                        subplot_kw={'xticks': [], 'yticks': []})

for i, ax in enumerate(axes.flat):
    ax.imshow(plt.imread(df.Filepath[i]))
    ax.set_title(df.type_id[i], fontsize = 12)
plt.tight_layout(pad=0.5)
plt.show()

### train test valid 데이터 나누기

In [None]:
# train, test set으로 split(9대 1)
train_df,test_df = train_test_split(df, test_size=0.1,random_state=1234)


# train_df를 9대 1로 split(train과 valid로)
train_df, valid_df = train_test_split(train_df, test_size=0.1, random_state=1234)

### PYTORCH에서 쓰는 방식으로 변경

In [None]:
# 이미지 변환 함수
transform = transforms.Compose([
    transforms.Resize((image_wh, image_wh)),
    transforms.ToTensor(),
    # 필요한 경우 추가적인 변환 (예: Normalize, RandomHorizontalFlip 등) 추가
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 전체 이미지에 대해 동일한 환경으로 맞춰줌
])

def load_paths_from_dataframe(df):
    image_paths = df['Filepath'].tolist()
    labels = df['type_id'].tolist()
    return image_paths, labels

# sample_image_path = "/home/j-j9s006/datasets/deep-fashion/inshop/segment-results/SE0000001.png"
# image = Image.open(sample_image_path)
# transformed_image = transform(image)
# print(transformed_image.size())

In [None]:
# 데이터 로드
X_train, y_train = load_paths_from_dataframe(train_df)
X_test, y_test = load_paths_from_dataframe(test_df)
X_valid, y_valid = load_paths_from_dataframe(valid_df)

### 비율 확인

In [None]:
class compare_train_test_valid_dataset():
    def __init__(self,li):
        self.li = li
        self.len_train = len(li[0])
        self.len_valid = len(li[1])
        self.len_test = len(li[2])
    def __call__(self):
        #draw plt
        label = ['train', 'valid','test']
        data = [self.len_train,self.len_valid,self.len_test]
        plt.rcParams["font.size"] = 12
        plt.figure(figsize=(12,8))

        x = np.arange(len(label))

        plt.bar(x, [self.len_train,self.len_valid,self.len_test], label='data', width=0.3, color='#FFFF00')
        plt.legend()
        plt.xticks(x, label)
        plt.ylabel('Number of data')
        plt.title('Compare DATASETS')
        plt.show()

show =compare_train_test_valid_dataset([X_train,X_valid,X_test])
show()

# Aug Comporse

In [None]:
train_transform = transforms.Compose([
    transforms.Resize(224),  # 이 줄을 추가합니다.
    transforms.RandomCrop(224, padding=3),  # 크기를 224로 수정합니다.
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

test_transform = transforms.Compose([
    transforms.Resize(224),  # 이 줄을 추가합니다.
    transforms.CenterCrop(224),  # 크기를 224로 수정합니다.
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])


# DataSet

In [None]:
def rgba_to_rgb(image, background_color=(255, 255, 255)):
    if image.mode == 'RGBA':
        bg = Image.new('RGB', image.size, background_color)
        bg.paste(image, mask=image.split()[3])  # 3 is the alpha channel
        return bg
    else:
        return image

class clothes_Dataset(Dataset):
    def __init__(self, image_paths, y, transform=None):
        self.image_paths = image_paths
        self.label_list = y
        self.transform = transform

    def __len__(self):
        return len(self.label_list)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        img = Image.open(img_path).convert('RGB')
        img = rgba_to_rgb(img)  # 이미지를 RGB로 변환
        label = int(self.label_list[idx])

        if self.transform:
            img = self.transform(img)
            
        return img, label

In [None]:
image_paths_train, y_train = load_paths_from_dataframe(train_df)
image_paths_test, y_test = load_paths_from_dataframe(test_df)
image_paths_valid, y_valid = load_paths_from_dataframe(valid_df)

In [None]:
Trainset = clothes_Dataset(image_paths=image_paths_train, y=y_train, transform=train_transform)
Valset = clothes_Dataset(image_paths=image_paths_valid, y=y_valid, transform=test_transform)
Testset = clothes_Dataset(image_paths=image_paths_test, y=y_test, transform=test_transform)

In [None]:
trainloader = DataLoader(Trainset, batch_size=32, shuffle=True, num_workers=1)
Valloader = DataLoader(Valset, batch_size=32, shuffle=True, num_workers=1)
testloader = DataLoader(Testset, batch_size=32, shuffle=False, num_workers=1)

# Model 생성

In [None]:
print("PyTorch Version:", torch.__version__)
print("CUDA Version:", torch.version.cuda)

In [None]:
# torchvision에서 제공하는 ResNet50 불러오기
model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)

# 클래스의 수를 변경하려면 마지막 Fully Connected Layer를 변경해야 합니다.
num_classes = len(label_list)
model.fc = nn.Linear(model.fc.in_features, num_classes)

# 모델을 GPU로 옮기기
model = model.to(device)

# Train & Test

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

### 함수 선언

In [None]:
def progress_bar(current, total, loss, acc, length=50):
    progress = current / total
    arrow = '=' * int(round(progress * length) - 1) + '>'
    spaces = ' ' * (length - len(arrow))

    sys.stdout.write('\r[%s%s] %d/%d | Loss: %.3f | Accuracy: %.2f%%' % (arrow, spaces, current, total, loss, acc*100))
    sys.stdout.flush()

# train 함수
def train(epoch, model, trainloader):
    model.train()
    train_loss = 0
    correct = 0
    total = 0

    for batch_idx, (inputs, targets) in enumerate(trainloader):
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        
        acc = correct / total

        progress_bar(batch_idx, len(trainloader), loss=train_loss/(batch_idx+1), acc=acc)

    print(f"\nTrain Epoch: {epoch}, Loss: {train_loss/len(trainloader)}, Accuracy: {100.*correct/total:.2f}%")

# validate 함수
def validate(epoch, model, valloader):
    model.eval()
    val_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(valloader):
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, targets)

            val_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

            acc = correct / total

            progress_bar(batch_idx, len(valloader), loss=val_loss/(batch_idx+1), acc=acc)

    print(f"\nValidation Epoch: {epoch}, Loss: {val_loss/len(valloader)}, Accuracy: {100.*correct/total:.2f}%")

# test 함수
def test(model, dataframe, testloader, criterion):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(testloader):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets.long())

            test_loss += loss.data.cpu().numpy()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

            acc = correct / total

            progress_bar(batch_idx, len(testloader), loss=test_loss/(batch_idx+1), acc=acc)

            # 예측 결과와 원본 이미지를 시각화합니다.
            for local_idx, (pred, actual) in enumerate(zip(predicted, targets)):
                global_idx = batch_idx * testloader.batch_size + local_idx
                original_image_path = dataframe.iloc[global_idx]['Filepath']
                print(f"\nDataframe Path: {original_image_path}, Actual Label: {actual.cpu().item()}")
                img = Image.open(original_image_path)
                plt.imshow(img)
                plt.title(f"Predicted Label: {cloth_label[pred.cpu().item()]} ({pred.cpu().item()})")
                plt.show()

        epoch_loss = test_loss / len(testloader)
        epoch_acc = correct / total
        print('\ntest | Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))


### Train 

In [None]:
num_epochs = 10
for epoch in range(num_epochs):
    train(epoch, model, trainloader)
    validate(epoch, model, Valloader)

### Test

In [None]:
# Label 정보
cloth_label = {
    0: "Fur",
    1: "Cotton/Polyester",
    2: "Knit",
    3: "Denim",
    4: "Chiffon",
    5: "Padding",
    6: "Tweed",
    7: "Fleece",
    8: "Leather",
    9: "Corduroy",
}

# 아래 코드를 통해 테스트 진행
predictions = test(model, df, testloader, criterion)