In [1]:
import torch
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

print(device)

cuda:0


In [8]:
import pandas as pd
import numpy as np
import torch
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from sklearn.ensemble import IsolationForest
from tqdm import tqdm
import torch.optim as optim


# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

import torch.nn as nn

In [47]:
# 데이터 로딩 클래스 정의
class CustomDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        """
        Args:
            csv_file (string): csv 파일의 경로.
            transform (callable, optional): 샘플에 적용될 Optional transform.
        """
        self.df = pd.read_csv(csv_file)
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df['img_path'].iloc[idx]
        image = Image.open(img_path)
        if self.transform:
            image = self.transform(image)
        return image

# 이미지 전처리 및 임베딩
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_data = CustomDataset(csv_file='../data/train.csv', transform=transform)
train_loader = DataLoader(train_data, batch_size=16, shuffle=False)

In [48]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, 3, stride=2, padding=1),  # [batch_size, 16, 112, 112]
            nn.ReLU(True),
            nn.Conv2d(16, 32, 3, stride=2, padding=1), # [batch_size, 32, 56, 56]
            nn.ReLU(True),
            nn.Conv2d(32, 64, 7) # [batch_size, 64, 25, 25]
        )

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, 7),             # [batch_size, 32, 31, 31]
            nn.ReLU(True),
            nn.ConvTranspose2d(32, 16, 5, stride=2, padding=1, output_padding=1),  # [batch_size, 16, 64, 64]
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 8, 5, stride=2, padding=1, output_padding=1),   # [batch_size, 8, 128, 128]
            nn.ReLU(True),
            nn.ConvTranspose2d(8, 3, 2, stride=2, padding=1, output_padding=1),    # [batch_size, 3, 256, 256]
            nn.ReLU(True),
            nn.Conv2d(3, 3, 3, padding=1),                                       # [batch_size, 3, 256, 256]
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        x = nn.functional.interpolate(x, size=(224, 224), mode='bilinear', align_corners=False)
        return x


In [53]:
from torch.optim.lr_scheduler import StepLR

model = Autoencoder()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-2)
scheduler = StepLR(optimizer, step_size=1, gamma=0.88)

In [54]:
# 훈련 루프
def train(model, train_loader, criterion, optimizer, scheduler, num_epochs=30):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for data in train_loader:
            imgs = data
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, imgs)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        # 에포크가 끝난 후 스케줄러의 step()을 호출하여 학습률을 업데이트
        scheduler.step()
        # 현재 학습률 출력
        current_lr = scheduler.get_last_lr()[0]
        print(f'Epoch {epoch+1}, Loss: {total_loss/len(train_loader)}, LR: {current_lr}')

In [55]:
# 훈련 시작
train(model, train_loader, criterion, optimizer, scheduler)

Epoch 1, Loss: 0.5061256225619998, LR: 0.0088
Epoch 2, Loss: 0.275914449776922, LR: 0.007744000000000001
Epoch 3, Loss: 0.1987355032137462, LR: 0.006814720000000001
Epoch 4, Loss: 0.172599943620818, LR: 0.0059969536
Epoch 5, Loss: 0.15746333769389562, LR: 0.005277319168
Epoch 6, Loss: 0.1492643037012645, LR: 0.00464404086784
Epoch 7, Loss: 0.14372549312455313, LR: 0.0040867559636991995
Epoch 8, Loss: 0.13963812589645386, LR: 0.0035963452480552954
Epoch 9, Loss: 0.13666887049164092, LR: 0.00316478381828866
Epoch 10, Loss: 0.1346004945891244, LR: 0.002785009760094021
Epoch 11, Loss: 0.1321566967027528, LR: 0.002450808588882738
Epoch 12, Loss: 0.13058671142373765, LR: 0.0021567115582168095
Epoch 13, Loss: 0.12939434977514402, LR: 0.0018979061712307923
Epoch 14, Loss: 0.1281086334160396, LR: 0.0016701574306830973
Epoch 15, Loss: 0.12725058730159486, LR: 0.0014697385390011256
Epoch 16, Loss: 0.126565620303154, LR: 0.0012933699143209906
Epoch 17, Loss: 0.12606502217905863, LR: 0.001138165524

In [70]:
test_data = CustomDataset(csv_file='../data/test.csv', transform=transform)
test_loader = DataLoader(test_data, batch_size=16 ,shuffle=False)

In [142]:
# 추론 모드 설정
model.eval()

# 임계값 설정
threshold = 0.120  # 이 값을 적절한 값으로 조정해야 합니다.

# 추론 결과 저장을 위한 리스트
anomaly_labels = []

criterion = nn.MSELoss(reduction='none')

# 데이터 로더에서 배치를 가져옵니다.
for data in test_loader:
    imgs = data
    with torch.no_grad():
        # 모델에 이미지를 전달하여 재구성된 이미지를 얻습니다.
        reconstructed_imgs = model(imgs)

        # 재구성 오차를 계산합니다.
        loss = criterion(reconstructed_imgs, imgs)

        # 배치 내의 각 이미지에 대해 이상치 여부를 판별합니다.
        batch_loss = loss.view(loss.size(0), -1).mean(1)
        batch_anomalies = batch_loss > threshold

        # 이상치면 1, 정상이면 0을 넘파이 배열에 추가합니다.
        labels = batch_anomalies.int().numpy()
        anomaly_labels.extend(labels)

In [143]:
anomaly_labels = np.array(anomaly_labels)

In [144]:
anomaly_labels.sum()

63

In [145]:
submission = pd.read_csv("../submission/sample_submission.csv")
submission['label'] = anomaly_labels

In [146]:
submission

Unnamed: 0,id,label
0,TEST_000,1
1,TEST_001,0
2,TEST_002,1
3,TEST_003,0
4,TEST_004,1
...,...,...
95,TEST_095,1
96,TEST_096,0
97,TEST_097,1
98,TEST_098,0


In [141]:
submission.to_csv("../submission/submission_2.csv",index = False)