In [1]:
import torch 
import random
import numpy as np
import os
from torchvision.datasets import ImageFolder
from PIL import Image
import cv2  # OpenCV 라이브러리
from sklearn.decomposition import PCA

In [2]:
# 시드값 고정

seed = 42
os.environ['PYTHONHASHSEED'] = str(seed)
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.enabled = False

In [3]:
#GPU 장비 설정

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [4]:
from torchvision.transforms import functional as F
import torchvision.transforms as transforms
import numpy as np
from PIL import Image
import scipy.fftpack

# 훈련, 검증, 테스트 데이터 경로 설정
train_path = 'C:/Users/user/Desktop/Workspace/Thesis/generative_image_detection/Personal Research/FakeImageDetection_Dataset/train'
valid_path = 'C:/Users/user/Desktop/Workspace/Thesis/generative_image_detection/Personal Research/FakeImageDetection_Dataset/valid'

class PCATransform:
    def __init__(self, n_components):
        self.n_components = n_components

    def __call__(self, image):
        # 이미지를 numpy 배열로 변환
        image_np = np.array(image)

        # 차원을 변경하여 (height * width, num_channels) 형태로 만듭니다.
        reshaped_image = image_np.reshape(-1, image_np.shape[2])

        # PCA를 계산합니다.
        pca = PCA(n_components=self.n_components)
        transformed_image = pca.fit_transform(reshaped_image)

        # 가중치를 2배로 조정합니다.
        transformed_image *= 1.2

        # 원래의 형태로 이미지를 변환합니다.
        restored_image = pca.inverse_transform(transformed_image)
        restored_image = restored_image.reshape(image_np.shape)

        # 이미지를 다시 PIL 형태로 변환합니다.
        transformed_image_pil = F.to_pil_image(restored_image.astype(np.uint8))

        return transformed_image_pil
    
class FourierTransform:
    def __call__(self, image):
        # 이미지를 numpy 배열로 변환 (여기서 image는 PIL 이미지가 될 것으로 예상됩니다.)
        image_np = np.array(image)

        # 컬러 채널 분리
        channel_results = []
        for channel in range(image_np.shape[2]):  # RGB 채널을 가정합니다.
            # 해당 채널에 대해 푸리에 변환 적용
            f_transform = cv2.dft(np.float32(image_np[:, :, channel]), flags=cv2.DFT_COMPLEX_OUTPUT)
            f_transform_shifted = np.fft.fftshift(f_transform)
            f_transform_magnitude = 12 * np.log(cv2.magnitude(f_transform_shifted[:, :, 0], f_transform_shifted[:, :, 1]) + 1)
            channel_results.append(f_transform_magnitude)

        # 채널 결과를 쌓아 하나의 이미지로 복원합니다. (스택으로 복원)
        magnitude_image = np.stack(channel_results, axis=2)

        # 결과 이미지의 스케일을 조정합니다.
        magnitude_image -= magnitude_image.min()  # 최소값을 0으로 설정
        magnitude_image = magnitude_image / magnitude_image.max() * 255.0  # 최대값을 255로 조정
        magnitude_image = magnitude_image.astype(np.uint8)

        # 다시 PIL 이미지로 변환합니다.
        transformed_image_pil = F.to_pil_image(magnitude_image)
        
        return transformed_image_pil
    
class ResizeIfNeeded:
    def __call__(self, image):
        # 이미지의 가로, 세로 길이를 확인
        width, height = image.size
        
        # 가로 또는 세로 길이가 224 미만인 경우를 확인하여 크기를 늘림
        new_width = width if width >= 224 else 224
        new_height = height if height >= 224 else 224
        
        # 이미지 크기를 변경
        resized_image = image.resize((new_width, new_height), Image.BICUBIC)
        return resized_image
    
transform = transforms.Compose([
    FourierTransform(),
    PCATransform(n_components=3),
    transforms.Lambda(ResizeIfNeeded()),
    transforms.RandomCrop((224, 224)),
    transforms.ToTensor(),
])

# 훈련 데이터셋
datasets_train = ImageFolder(train_path, transform=transform)
# 검증 데이터셋
datasets_valid = ImageFolder(valid_path, transform=transform)

In [5]:
def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

# 제너레이터 시드값 고정
g = torch.Generator()
g.manual_seed(0)

<torch._C.Generator at 0x21649e0f710>

In [6]:
from torch.utils.data import DataLoader

batch_size = 64

loader_train = DataLoader(dataset=datasets_train, batch_size=batch_size, 
                          shuffle=True, worker_init_fn=seed_worker,
                          generator=g, num_workers=0)
loader_valid = DataLoader(dataset=datasets_valid, batch_size=batch_size, 
                          shuffle=False, worker_init_fn=seed_worker,
                          generator=g, num_workers=0)

In [7]:
from efficientnet_pytorch import EfficientNet
# 모델 생성
model = EfficientNet.from_pretrained('efficientnet-b0', num_classes=2) 
# 장비 할당
model = model.to(device)

Loaded pretrained weights for efficientnet-b0


In [8]:
import torch.nn as nn

criterion = nn.CrossEntropyLoss()

In [9]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [10]:
import wandb

# wandb 초기화: 새로운 실험을 생성하거나 기존 실험에 연결
wandb.init(project="FakeImageDetection_Research", name="Research2_3")

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mguddnjs2366[0m ([33mseohw[0m). Use [1m`wandb login --relogin`[0m to force relogin


In [11]:
from sklearn.metrics import accuracy_score # 정확도 계산 함수
from sklearn.metrics import recall_score   # 재현율 계산 함수
from sklearn.metrics import f1_score       # F1 점수 계산 함수
from tqdm.notebook import tqdm             # 진행률 표시 막대

def train(model, loader_train, loader_valid, criterion, optimizer, 
          scheduler=None, epochs=10, save_file='Research2_3_.pth'):
    
    valid_loss_min = np.inf # 최소 손실값 초기화 (검증 데이터용) 

    # 총 에폭만큼 반복
    for epoch in range(epochs):
        print(f'에폭 [{epoch+1}/{epochs}] \n-----------------------------')
        
        # == [ 훈련 ] ==============================================
        model.train()        # 모델을 훈련 상태로 설정
        epoch_train_loss = 0 # 에폭별 손실값 초기화 (훈련 데이터용)
        train_preds_list = []  # 훈련 데이터 예측값 저장용 리스트 초기화
        train_true_list = []   # 훈련 데이터 실젯값 저장용 리스트 초기화
        # '반복 횟수'만큼 반복 
        for images, labels in tqdm(loader_train):
            # 이미지, 레이블(타깃값) 데이터 미니배치를 장비에 할당 
            images = images.to(device)
            labels = labels.to(device)
            
            # 옵티마이저 내 기울기 초기화
            optimizer.zero_grad()
            # 순전파 : 이미지 데이터를 신경망 모델의 입력값으로 사용해 출력값 계산
            outputs = model(images)
            # 손실 함수를 활용해 outputs와 labels의 손실값 계산
            loss = criterion(outputs, labels)
            # 현재 배치에서의 손실 추가 (훈련 데이터용)
            epoch_train_loss += loss.item() 
            loss.backward()       # 역전파 수행
            optimizer.step()      # 가중치 갱신
            if scheduler != None: # 스케줄러 학습률 갱신 
                scheduler.step() 
                
            # 예측값 및 실제값 저장 (훈련 데이터용)
            train_preds = torch.max(outputs.cpu(), dim=1)[1].numpy()
            train_true = labels.cpu().numpy()
            train_preds_list.extend(train_preds)
            train_true_list.extend(train_true)
            
        # 훈련 데이터 정확도 계산
        train_accuracy = accuracy_score(train_true_list, train_preds_list)

        # 훈련 데이터 손실값 출력
        print(f'\t훈련 데이터 손실값 : {epoch_train_loss/len(loader_train):.4f}')
        print(f'\t훈련 데이터 정확도 : {train_accuracy:.4f}')
        
        # == [ 검증 ] ==============================================
        model.eval()         # 모델을 평가 상태로 설정 
        epoch_valid_loss = 0 # 에폭별 손실값 초기화 (검증 데이터용)
        preds_list = []      # 예측값 저장용 리스트 초기화
        true_list = []       # 실젯값 저장용 리스트 초기화
        
        with torch.no_grad(): # 기울기 계산 비활성화
            for images, labels in loader_valid:
                images = images.to(device)
                labels = labels.to(device)
                
                outputs = model(images)
                loss = criterion(outputs, labels)
                epoch_valid_loss += loss.item()
                
                # 예측값 및 실제값 
                preds = torch.max(outputs.cpu(), dim=1)[1].numpy() 
                true = labels.cpu().numpy() 
    
                preds_list.extend(preds)
                true_list.extend(true)
                
        # 정확도, 재현율, F1 점수 계산
        val_accuracy = accuracy_score(true_list, preds_list)
        val_recall = recall_score(true_list, preds_list)
        val_f1_score = f1_score(true_list, preds_list)

        wandb.log({"train_loss": epoch_train_loss/len(loader_train),
                   "train_accuracy" : train_accuracy,
                   "val_loss": epoch_valid_loss/len(loader_valid), 
                   "val_accuracy": val_accuracy,
                   "val_recall" : val_recall,
                   "val_f1_score" : val_f1_score})
        
        # 검증 데이터 손실값 및 정확도, 재현율, F1점수 출력
        print(f'\t검증 데이터 손실값 : {epoch_valid_loss/len(loader_valid):.4f}')
        print(f'\t정확도 : {val_accuracy:.4f} / 재현율 : {val_recall:.4f} / F1 점수 : {val_f1_score:.4f}')
        # == [ 최적 모델 가중치 찾기 ] ==============================
        # 현 에폭에서의 손실값이 최소 손실값 이하면 모델 가중치 저장 
        if epoch_valid_loss <= valid_loss_min: 
            print(f'\t### 검증 데이터 손실값 감소 ({valid_loss_min:.4f} --> {epoch_valid_loss:.4f}). 모델 저장')
            # 모델 가중치를 파일로 저장 
            torch.save(model.state_dict(), save_file) 
            valid_loss_min = epoch_valid_loss # 최소 손실값 갱신 
    return torch.load(save_file) # 저장한 모델 가중치를 불러와 반환

In [12]:
# 모델 훈련
model_state_dict = train(model = model,
                         loader_train = loader_train, 
                         loader_valid = loader_valid,
                         criterion = criterion, 
                         optimizer = optimizer)

torch.cuda.empty_cache()
wandb.finish()

에폭 [1/10] 
-----------------------------


  0%|          | 0/250 [00:00<?, ?it/s]

	훈련 데이터 손실값 : 0.1207
	훈련 데이터 정확도 : 0.9531
	검증 데이터 손실값 : 0.8870
	정확도 : 0.5078 / 재현율 : 0.9795 / F1 점수 : 0.6655
	### 검증 데이터 손실값 감소 (inf --> 55.8782). 모델 저장
에폭 [2/10] 
-----------------------------


  0%|          | 0/250 [00:00<?, ?it/s]

	훈련 데이터 손실값 : 0.0599
	훈련 데이터 정확도 : 0.9773
	검증 데이터 손실값 : 0.3964
	정확도 : 0.8635 / 재현율 : 0.8160 / F1 점수 : 0.8567
	### 검증 데이터 손실값 감소 (55.8782 --> 24.9738). 모델 저장
에폭 [3/10] 
-----------------------------


  0%|          | 0/250 [00:00<?, ?it/s]

	훈련 데이터 손실값 : 0.0492
	훈련 데이터 정확도 : 0.9834
	검증 데이터 손실값 : 0.0758
	정확도 : 0.9718 / 재현율 : 0.9980 / F1 점수 : 0.9725
	### 검증 데이터 손실값 감소 (24.9738 --> 4.7758). 모델 저장
에폭 [4/10] 
-----------------------------


  0%|          | 0/250 [00:00<?, ?it/s]

	훈련 데이터 손실값 : 0.0362
	훈련 데이터 정확도 : 0.9859
	검증 데이터 손실값 : 0.0571
	정확도 : 0.9812 / 재현율 : 0.9670 / F1 점수 : 0.9810
	### 검증 데이터 손실값 감소 (4.7758 --> 3.5963). 모델 저장
에폭 [5/10] 
-----------------------------


  0%|          | 0/250 [00:00<?, ?it/s]

	훈련 데이터 손실값 : 0.0324
	훈련 데이터 정확도 : 0.9878
	검증 데이터 손실값 : 0.2162
	정확도 : 0.9230 / 재현율 : 0.9890 / F1 점수 : 0.9278
에폭 [6/10] 
-----------------------------


  0%|          | 0/250 [00:00<?, ?it/s]

	훈련 데이터 손실값 : 0.0314
	훈련 데이터 정확도 : 0.9897
	검증 데이터 손실값 : 0.1030
	정확도 : 0.9647 / 재현율 : 0.9945 / F1 점수 : 0.9658
에폭 [7/10] 
-----------------------------


  0%|          | 0/250 [00:00<?, ?it/s]

	훈련 데이터 손실값 : 0.0281
	훈련 데이터 정확도 : 0.9906
	검증 데이터 손실값 : 0.0243
	정확도 : 0.9925 / 재현율 : 0.9930 / F1 점수 : 0.9925
	### 검증 데이터 손실값 감소 (3.5963 --> 1.5285). 모델 저장
에폭 [8/10] 
-----------------------------


  0%|          | 0/250 [00:00<?, ?it/s]

	훈련 데이터 손실값 : 0.0266
	훈련 데이터 정확도 : 0.9904
	검증 데이터 손실값 : 0.0350
	정확도 : 0.9872 / 재현율 : 0.9965 / F1 점수 : 0.9874
에폭 [9/10] 
-----------------------------


  0%|          | 0/250 [00:00<?, ?it/s]

	훈련 데이터 손실값 : 0.0257
	훈련 데이터 정확도 : 0.9902
	검증 데이터 손실값 : 0.0253
	정확도 : 0.9918 / 재현율 : 0.9915 / F1 점수 : 0.9917
에폭 [10/10] 
-----------------------------


  0%|          | 0/250 [00:00<?, ?it/s]

	훈련 데이터 손실값 : 0.0268
	훈련 데이터 정확도 : 0.9901
	검증 데이터 손실값 : 0.4675
	정확도 : 0.8848 / 재현율 : 0.9780 / F1 점수 : 0.8946


0,1
train_accuracy,▁▆▇▇▇█████
train_loss,█▄▃▂▁▁▁▁▁▁
val_accuracy,▁▆██▇████▆
val_f1_score,▁▅██▇▇███▆
val_loss,█▄▁▁▃▂▁▁▁▅
val_recall,▇▁█▇█████▇

0,1
train_accuracy,0.99006
train_loss,0.02685
val_accuracy,0.88475
val_f1_score,0.89458
val_loss,0.4675
val_recall,0.978


In [13]:
# 최적 가중치 불러오기

<All keys matched successfully>

In [14]:
import pandas as pd

datasets_test = ImageFolder('C:/Users/user/Desktop/Workspace/Thesis/generative_image_detection/Personal Research/test_data', transform=transform)

batch_sise = 256

loader_test = DataLoader(dataset=datasets_test, batch_size=batch_size, 
                         shuffle=False, worker_init_fn=seed_worker,
                         generator=g, num_workers=0)
model.eval()  # 평가 모드로 전환

# 3. 예측하기
predictions = []

for images, _ in loader_test:  # 레이블은 사용하지 않음
    images = images.to(device)  # 장비 할당
    with torch.no_grad():
        outputs = model(images)
        prediction = outputs.argmax(dim=1).cpu().numpy()
        predictions.extend(prediction)

# 4. 결과 저장
df = pd.read_csv('C:/Users/user/Desktop/Workspace/Thesis/generative_image_detection/Personal Research/test_data/sample_submission.csv')
df['answer'] = predictions
df['answer'] = 1 - df['answer']
df.to_csv('C:/Users/user/Desktop/Workspace/Thesis/generative_image_detection/Personal Research/test_data/research2_3.csv', index=False)

In [15]:
torch.cuda.empty_cache()