In [1]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from efficientnet_pytorch import EfficientNet
from PIL import Image
import numpy as np
import random
import os

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Load the dataset
train_data = pd.read_csv('cv_challenge/train_augmented_byteam_rf_and_original.csv')
test_data = pd.read_csv('answer.csv')

# Define custom dataset
class CustomDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.data.iloc[idx, 0])
        image = Image.open(img_name).convert("RGB")
        label = self.data.iloc[idx, 1]

        if self.transform:
            image = self.transform(image)

        return image, label

# Define custom transform to add Gaussian noise
class AddGaussianNoise(object):
    def __init__(self, mean=0.0, std=0.1):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean

# Define custom transform to add Speckle noise
class AddSpeckleNoise(object):
    def __init__(self, prob=0.5, mean=0.0, std=0.1):
        self.prob = prob
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        if random.random() < self.prob:
            noise = torch.randn(tensor.size()) * self.std + self.mean
            tensor = tensor + tensor * noise
        return tensor
    
    
# Define transforms including Gaussian noise and blur
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomApply([transforms.GaussianBlur(3)], p=0.5),
    transforms.ToTensor(),
    AddSpeckleNoise(prob=0.5, mean=0.0, std=0.1),
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])
# Create datasets and dataloaders
train_dataset = CustomDataset(csv_file='cv_challenge/train_augmented_byteam_rf_and_original.csv', root_dir='cv_challenge/data_argumentation_byteam_rotate_flipped', transform=train_transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)




# 테스트 데이터셋 로드
class TestDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.file_names = [f for f in os.listdir(root_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]  # 이미지 파일만 로드
        self.transform = transform

    def __len__(self):
        return len(self.file_names)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.file_names[idx])
        image = Image.open(img_name).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        return image, self.file_names[idx]


test_dataset = TestDataset(root_dir='cv_challenge/test', transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


# Load EfficientNet-B5 model
model = EfficientNet.from_pretrained('efficientnet-b2', num_classes=17)

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Function to calculate accuracy
def calculate_accuracy(test_loader, model, device):
    # 예측 수행
    predictions = []
    with torch.no_grad():
        for images, file_names in test_loader:
            images = images.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            for file_name, prediction in zip(file_names, predicted):
                predictions.append({
                    'ID': file_name,
                    'target': prediction.item()
                })

    # 예측 결과를 DataFrame으로 변환하고 저장
    submission_df = pd.DataFrame(predictions)
    
    ### 데이터셋 결과 비교
    filename = "answer.csv"
    # 데이터셋 로드
    b_path = 'answer.csv'
    a_df = submission_df
    b_df = pd.read_csv(b_path)
    # a_df.shape, b_df.shape:
    b_df = b_df.drop(columns=['Unnamed: 0'])
    # 두 데이터프레임의 크기가 같은지 확인
    if a_df.shape != b_df.shape:
        raise ValueError("두 데이터프레임의 크기가 다릅니다.")
    # 두 데이터프레임의 각 셀을 비교
    comparison = a_df != b_df
    # 다른 항목의 개수를 계산
    difference_count = comparison.sum().sum()
    print(f"데이터 간 다른 항목수 : {difference_count}  / 3140")
    print(f"데이터 중 맞은 퍼센트 : {(3140 - difference_count) / 3140}")
    
    

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        
        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / len(train_dataset)
    
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}')
    
    # 예측 수행
    predictions = []
    model.eval()
    with torch.no_grad():
        for images, file_names in test_loader:
            images = images.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            for file_name, prediction in zip(file_names, predicted):
                predictions.append({
                    'ID': file_name,
                    'target': prediction.item()
                })

    # 예측 결과를 DataFrame으로 변환하고 저장
    submission_df = pd.DataFrame(predictions)
    
    ### 데이터셋 결과 비교
    filename = "answer.csv"
    # 데이터셋 로드
    b_path = 'answer.csv'
    a_df = submission_df
    b_df = pd.read_csv(b_path)
    # a_df.shape, b_df.shape:
    b_df = b_df.drop(columns=['Unnamed: 0'])
    # 두 데이터프레임의 크기가 같은지 확인
    if a_df.shape != b_df.shape:
        raise ValueError("두 데이터프레임의 크기가 다릅니다.")
    # 두 데이터프레임의 각 셀을 비교
    comparison = a_df != b_df
    # 다른 항목의 개수를 계산
    difference_count = comparison.sum().sum()
    print(f"데이터 간 다른 항목수 : {difference_count}  / 3140")
    print(f"데이터 중 맞은 퍼센트 : {(3140 - difference_count) / 3140}")

# Save the model
torch.save(model.state_dict(), 'efficientnet_b2_SpeckleNoise.pth')
print('Model saved!')

Loaded pretrained weights for efficientnet-b2
Epoch [1/10], Loss: 0.3823
데이터 간 다른 항목수 : 410  / 3140
데이터 중 맞은 퍼센트 : 0.8694267515923567
Epoch [2/10], Loss: 0.1305
데이터 간 다른 항목수 : 446  / 3140
데이터 중 맞은 퍼센트 : 0.8579617834394905
Epoch [3/10], Loss: 0.0811
데이터 간 다른 항목수 : 414  / 3140
데이터 중 맞은 퍼센트 : 0.8681528662420382
Epoch [4/10], Loss: 0.0666
데이터 간 다른 항목수 : 366  / 3140
데이터 중 맞은 퍼센트 : 0.8834394904458599
Epoch [5/10], Loss: 0.0465
데이터 간 다른 항목수 : 340  / 3140
데이터 중 맞은 퍼센트 : 0.89171974522293
Epoch [6/10], Loss: 0.0394
데이터 간 다른 항목수 : 344  / 3140
데이터 중 맞은 퍼센트 : 0.8904458598726115
Epoch [7/10], Loss: 0.0371
데이터 간 다른 항목수 : 422  / 3140
데이터 중 맞은 퍼센트 : 0.8656050955414013
Epoch [8/10], Loss: 0.0373
데이터 간 다른 항목수 : 419  / 3140
데이터 중 맞은 퍼센트 : 0.8665605095541401
Epoch [9/10], Loss: 0.0308
데이터 간 다른 항목수 : 453  / 3140
데이터 중 맞은 퍼센트 : 0.8557324840764331
Epoch [10/10], Loss: 0.0305
데이터 간 다른 항목수 : 342  / 3140
데이터 중 맞은 퍼센트 : 0.8910828025477707
Model saved!
