In [3]:
import os
import glob
import torch
import torchvision.transforms.functional as TF
import numpy as np
import cv2

from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image

# 데이터 불러오기 및 데이터 전처리

## 이미지 리사이즈

In [2]:
#이미지 경로
fake_image_path = "Dataset\fake"
nfake_image_path = "Dataset\nfake"

In [3]:
def resize_image(input_path, output_path, new_size):
    """
    이미지를 불러와서 새로운 크기로 리사이즈하는 함수
    :param input_path: 원본 이미지 파일 경로
    :param output_path: 리사이즈된 이미지를 저장할 파일 경로
    :param new_size: 새로운 크기 (너비, 높이) 튜플 형태로 전달
    """
    try:
        # 이미지 열기
        with Image.open(input_path) as img:
            # 리사이즈
            resized_img = img.resize(new_size)
            # 리사이즈된 이미지 저장
            resized_img.save(output_path)
    except Exception as e:
        print(f"오류 발생: {e}")

In [4]:
def resize_images_in_directory(directory, output_directory, new_size):
    """
    디렉토리 내에 있는 이미지들을 리사이즈하는 함수
    :param directory: 원본 이미지 파일들이 있는 디렉토리 경로
    :param output_directory: 리사이즈된 이미지를 저장할 디렉토리 경로
    :param new_size: 새로운 크기 (너비, 높이) 튜플 형태로 전달
    """
    # 디렉토리 내의 모든 이미지 파일들을 가져옴
    image_files = glob.glob(os.path.join(directory, "*.jpg")) + glob.glob(os.path.join(directory, "*.jpeg")) + glob.glob(os.path.join(directory, "*.png"))

    # 출력 디렉토리가 없으면 생성
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # 이미지들을 순회하면서 리사이즈 수행
    for image_file in image_files:
        filename = os.path.basename(image_file)
        output_path = os.path.join(output_directory, filename)
        resize_image(image_file, output_path, new_size)

In [5]:
# 사용 예시
input_directory = r"Dataset\fake"
output_directory = r"Dataset\r_fake"
new_size = (224, 224)  # 새로운 크기 (너비, 높이)
resize_images_in_directory(input_directory, output_directory, new_size)

In [6]:
# 사용 예시
input_directory = r"Dataset\nfake"
output_directory = r"Dataset\r_nfake"
new_size = (224, 224)  # 새로운 크기 (너비, 높이)
resize_images_in_directory(input_directory, output_directory, new_size)

## 이미지 라벨링

In [5]:
def adjust_hls(hls_image, lightness_scale=1.0, saturation_scale=1.0):
    h, l, s = cv2.split(hls_image)  # HLS 이미지를 각 채널로 분리
    l = np.clip(l * lightness_scale, 0, 255).astype(np.uint8)  # 밝기 조절
    s = np.clip(s * saturation_scale, 0, 255).astype(np.uint8)  # 채도 조절
    adjusted_hls = cv2.merge([h, l, s])  # 조정된 채널을 다시 합침
    return adjusted_hls

In [7]:
def labeling(folder_paths):
    images = []
    labels = []
    for folder_path in folder_paths:
        if not os.path.exists(folder_path):
            print(f"경로가 존재하지 않습니다: {folder_path}")
            continue
        label = 1 if 'r_fake' in folder_path.lower() else 0
        for filename in os.listdir(folder_path):
            image_path = os.path.join(folder_path, filename)
            if image_path.endswith('.jpg') or image_path.endswith('.png'):
                image = Image.open(image_path).convert('RGB')
                image_array = np.array(image)[:, :, ::-1]  # RGB to BGR
                hls_image = cv2.cvtColor(image_array, cv2.COLOR_BGR2HLS)
                adjusted_hls_image = adjust_hls(hls_image, lightness_scale=1.2, saturation_scale=0.9)
                # 이미지 정규화 (픽셀 값의 범위를 0 ~ 1 사이로 조정)
                normalized_image = adjusted_hls_image / 255.0
                images.append(normalized_image)
                labels.append(label)
    return images, labels

In [8]:
# 스크립트가 PS3 폴더 내에 있으므로 상대 경로를 사용합니다.
folder_paths = ["Dataset/r_fake", "Dataset/r_nfake"]
images, labels = labeling(folder_paths)

# 결과 확인
print("이미지 개수:", len(images))
print("라벨 개수:", len(labels))

KeyboardInterrupt: 

In [9]:
class CustomDataset(Dataset):
    def __init__(self, folder_paths, transform=None):
        self.images, self.labels = labeling(folder_paths)
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        
        if self.transform:
            # numpy 이미지를 PIL 이미지로 변환합니다. 이 작업이 필요한 이유는 torchvision의 transforms는 PIL 이미지를 기대하기 때문입니다.
            image = Image.fromarray((image * 255).astype(np.uint8))
            image = self.transform(image)
        
        return image, label

In [10]:
# 데이터셋을 사용하기 위한 transform 정의 , RESNET-50(224*224를 입력으로 받음)
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 이미지 크기 조정
    transforms.ToTensor(),  # PIL 이미지를 PyTorch Tensor로 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # 이미지 정규화
])

In [18]:
import pickle

# 데이터셋을 파일로 저장하는 함수
def save_dataset(dataset, filename):
    with open(filename, 'wb') as f:
        pickle.dump(dataset, f)

# 파일에서 데이터셋을 로드하는 함수
def load_dataset(filename):
    with open(filename, 'rb') as f:
        dataset = pickle.load(f)
    return dataset

In [19]:
# 전체 데이터셋 로드
full_dataset = CustomDataset(["Dataset/r_fake", "Dataset/r_nfake"], transform=transform)
# 데이터셋을 파일로 저장
# 저장할 파일명 설정
filename = "dataset.pkl"
save_dataset(full_dataset, filename)

In [20]:
# 저장된 파일을 로드하여 데이터셋 사용
loaded_dataset = load_dataset(filename)

In [22]:
full_dataset = CustomDataset(["Dataset/r_fake", "Dataset/r_nfake"], transform=transform)

In [23]:
from torch.utils.data import random_split

# 데이터셋 로드
dataset = full_dataset

# 데이터셋 분할
train_size = int(len(dataset) * 0.7)
test_size = int(len(dataset) * 0.15)
val_size = len(dataset) - train_size - test_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])


In [24]:
# DataLoader 설정
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 모델

In [39]:
from torchvision.models import resnet50
from tqdm import tqdm
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriter

In [33]:
# ResNet50 모델 로드
model = resnet50(pretrained=True)
# 모델의 마지막 레이어를 이진 분류에 맞게 조정
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 1)  # 클래스 수를 1로 설정

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to C:\Users\kwonh/.cache\torch\hub\checkpoints\resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:01<00:00, 68.0MB/s]


In [37]:
# 모델 학습 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = torch.nn.BCEWithLogitsLoss()
num_epochs = 100

In [41]:
# 얼리 스톱핑을 위한 초기 설정
patience = 3  # 성능 향상이 없는 경우, 몇 에폭 동안 기다릴 것인지
val_loss_min = np.Inf  # 가능한 무한대 값으로 초기화
patience_counter = 0  # 현재 기다리고 있는 에폭 수
writer = SummaryWriter()
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    train_corrects = 0
    
    for inputs, labels in tqdm(train_loader, desc=f"Training Epoch {epoch+1}"):
        inputs = inputs.to(device)
        labels = labels.to(device).float().view(-1, 1)

        optimizer.zero_grad()
        
        outputs = model(inputs)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * inputs.size(0)
        preds = torch.sigmoid(outputs) >= 0.5
        train_corrects += torch.sum(preds == labels.data)

    train_loss = train_loss / len(train_loader.dataset)
    train_acc = train_corrects.double() / len(train_loader.dataset)
    
    model.eval()
    val_loss = 0.0
    val_corrects = 0
    
    with torch.no_grad():
        for inputs, labels in tqdm(val_loader, desc="Validating"):
            inputs = inputs.to(device)
            labels = labels.to(device).float().view(-1, 1)
            
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            
            val_loss += loss.item() * inputs.size(0)
            preds = torch.sigmoid(outputs) >= 0.5
            val_corrects += torch.sum(preds == labels.data)

    val_loss = val_loss / len(val_loader.dataset)
    val_acc = val_corrects.double() / len(val_loader.dataset)
    
    writer.add_scalar("Loss/train", train_loss, epoch)
    writer.add_scalar("Loss/val", val_loss, epoch)
    writer.add_scalar("Accuracy/train", train_acc, epoch)
    writer.add_scalar("Accuracy/val", val_acc, epoch)
    
    print(f"Epoch {epoch+1}: Train Loss={train_loss:.4f}, Train Acc={train_acc:.4f}, Val Loss={val_loss:.4f}, Val Acc={val_acc:.4f}")
    
    # 얼리 스톱핑 조건 검사
    if val_loss < val_loss_min:
        print(f"Validation loss decreased ({val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...")
        torch.save(model.state_dict(), "resnet50_binary_classification_model.pth")
        val_loss_min = val_loss
        patience_counter = 0  # 리셋
    else:
        patience_counter += 1
        print(f"EarlyStopping counter: {patience_counter} out of {patience}")
        if patience_counter >= patience:
            print("Early stopping initiated. Training stopped.")
            break

writer.close()

Training Epoch 1: 100%|██████████| 268/268 [18:00<00:00,  4.03s/it]
Validating: 100%|██████████| 58/58 [00:49<00:00,  1.17it/s]


Epoch 1: Train Loss=0.0046, Train Acc=0.9991, Val Loss=0.0095, Val Acc=0.9989
Validation loss decreased (inf --> 0.009451). Saving model ...


Training Epoch 2: 100%|██████████| 268/268 [17:20<00:00,  3.88s/it]
Validating: 100%|██████████| 58/58 [00:49<00:00,  1.17it/s]


Epoch 2: Train Loss=0.0051, Train Acc=0.9987, Val Loss=0.0032, Val Acc=0.9995
Validation loss decreased (0.009451 --> 0.003196). Saving model ...


Training Epoch 3: 100%|██████████| 268/268 [17:29<00:00,  3.92s/it]
Validating: 100%|██████████| 58/58 [00:50<00:00,  1.16it/s]


Epoch 3: Train Loss=0.0017, Train Acc=0.9994, Val Loss=0.0023, Val Acc=0.9995
Validation loss decreased (0.003196 --> 0.002259). Saving model ...


Training Epoch 4: 100%|██████████| 268/268 [17:29<00:00,  3.91s/it]
Validating: 100%|██████████| 58/58 [00:51<00:00,  1.13it/s]


Epoch 4: Train Loss=0.0042, Train Acc=0.9986, Val Loss=0.0007, Val Acc=1.0000
Validation loss decreased (0.002259 --> 0.000663). Saving model ...


Training Epoch 5: 100%|██████████| 268/268 [17:31<00:00,  3.93s/it]
Validating: 100%|██████████| 58/58 [00:50<00:00,  1.14it/s]


Epoch 5: Train Loss=0.0041, Train Acc=0.9986, Val Loss=0.0116, Val Acc=0.9962
EarlyStopping counter: 1 out of 3


Training Epoch 6: 100%|██████████| 268/268 [1:05:27<00:00, 14.66s/it]  
Validating: 100%|██████████| 58/58 [00:38<00:00,  1.51it/s]


Epoch 6: Train Loss=0.0014, Train Acc=0.9992, Val Loss=0.0006, Val Acc=0.9995
Validation loss decreased (0.000663 --> 0.000642). Saving model ...


Training Epoch 7: 100%|██████████| 268/268 [17:14<00:00,  3.86s/it]
Validating: 100%|██████████| 58/58 [00:39<00:00,  1.48it/s]


Epoch 7: Train Loss=0.0031, Train Acc=0.9988, Val Loss=0.0052, Val Acc=0.9978
EarlyStopping counter: 1 out of 3


Training Epoch 8: 100%|██████████| 268/268 [17:24<00:00,  3.90s/it]
Validating: 100%|██████████| 58/58 [00:40<00:00,  1.45it/s]


Epoch 8: Train Loss=0.0025, Train Acc=0.9993, Val Loss=0.0010, Val Acc=1.0000
EarlyStopping counter: 2 out of 3


Training Epoch 9: 100%|██████████| 268/268 [16:56<00:00,  3.79s/it]
Validating: 100%|██████████| 58/58 [00:39<00:00,  1.48it/s]


Epoch 9: Train Loss=0.0018, Train Acc=0.9994, Val Loss=0.0003, Val Acc=1.0000
Validation loss decreased (0.000642 --> 0.000289). Saving model ...


Training Epoch 10: 100%|██████████| 268/268 [16:53<00:00,  3.78s/it]
Validating: 100%|██████████| 58/58 [00:38<00:00,  1.51it/s]


Epoch 10: Train Loss=0.0003, Train Acc=1.0000, Val Loss=0.0001, Val Acc=1.0000
Validation loss decreased (0.000289 --> 0.000148). Saving model ...


Training Epoch 11: 100%|██████████| 268/268 [16:57<00:00,  3.80s/it]
Validating: 100%|██████████| 58/58 [00:41<00:00,  1.41it/s]


Epoch 11: Train Loss=0.0000, Train Acc=1.0000, Val Loss=0.0001, Val Acc=1.0000
Validation loss decreased (0.000148 --> 0.000114). Saving model ...


Training Epoch 12: 100%|██████████| 268/268 [19:09<00:00,  4.29s/it]
Validating: 100%|██████████| 58/58 [00:39<00:00,  1.47it/s]


Epoch 12: Train Loss=0.0000, Train Acc=1.0000, Val Loss=0.0002, Val Acc=1.0000
EarlyStopping counter: 1 out of 3


Training Epoch 13: 100%|██████████| 268/268 [17:25<00:00,  3.90s/it]
Validating: 100%|██████████| 58/58 [00:38<00:00,  1.50it/s]


Epoch 13: Train Loss=0.0025, Train Acc=0.9993, Val Loss=0.0131, Val Acc=0.9951
EarlyStopping counter: 2 out of 3


Training Epoch 14: 100%|██████████| 268/268 [17:08<00:00,  3.84s/it]
Validating: 100%|██████████| 58/58 [00:39<00:00,  1.46it/s]


Epoch 14: Train Loss=0.0073, Train Acc=0.9978, Val Loss=0.0022, Val Acc=0.9995
EarlyStopping counter: 3 out of 3
Early stopping initiated. Training stopped.


# 테스트

In [1]:
import torch
import torchvision.models as models
import torch
from torchvision import models
from PIL import Image

  from .autonotebook import tqdm as notebook_tqdm


In [19]:
image_path = r"C:\Users\kwonh\Desktop\ps3\Dataset\test\5.jpg"


image = Image.open(image_path).convert('RGB')
image_array = np.array(image)[:, :, ::-1]  # RGB to BGR
hls_image = cv2.cvtColor(image_array, cv2.COLOR_BGR2HLS)
adjusted_hls_image = adjust_hls(hls_image, lightness_scale=1.2, saturation_scale=0.9)
# 이미지 정규화 (픽셀 값의 범위를 0 ~ 1 사이로 조정)
normalized_image = adjusted_hls_image / 255.0
image = Image.fromarray((normalized_image * 255).astype(np.uint8))
image = transform(image)

In [12]:
# 모델 구조 정의
model = models.resnet50(pretrained=False)  # 사전 학습된 가중치는 불러오지 않음

# 모델의 마지막 레이어를 이진 분류에 맞게 조정 (이진 분류 예시)
num_ftrs = model.fc.in_features
model.fc = torch.nn.Linear(num_ftrs, 1)  



In [20]:
# 모델 상태 로드
model.load_state_dict(torch.load("resnet50_binary_classification_model.pth"))

input_tensor = image.unsqueeze(0)  # 배치 차원 추가
# 모델을 평가 모드로 설정 (예측할 때는 evaluation 모드로 설정)
model.eval()
with torch.no_grad():  # 추론할 때는 그라디언트를 계산할 필요가 없으므로 no_grad() 컨텍스트 관리자를 사용하여 그라디언트를 비활성화합니다.
    outputs = model(input_tensor)
    preds = torch.sigmoid(outputs) >= 0.5

if preds == True:
    print("합성입니다.")
else:
    print("합성이 아닙니다.")


합성입니다.
