<a href="https://colab.research.google.com/github/211iey/DScover-main-project-D-/blob/main/GAN_CIRCLE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
pip install tensorflow keras numpy opencv-python



In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, Input, LeakyReLU, Activation, Add
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

# 데이터 로드

In [None]:
import os
import cv2
import numpy as np

In [None]:
ldct_to_fdct_map = {
    "adenocarcinoma_left.lower.lobe_T2_N0_M0_Ib": "adenocarcinoma",
    "squamous.cell.carcinoma_left.hilum_T1_N2_M0_IIIa": "squamous.cell.carcinoma",
    "large.cell.carcinoma_left.hilum_T2_N2_M0_IIIa": "large.cell.carcinoma",
    "normal": "normal"
}

In [None]:
def load_images(base_dir, target_size=(512, 512)):
    data = []
    for folder in os.listdir(base_dir):
        class_dir = os.path.join(base_dir, folder)
        for filename in os.listdir(class_dir):
            if filename.endswith('.png') or filename.endswith('.jpg'):
                img_path = os.path.join(class_dir, filename)
                img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
                img = cv2.resize(img, target_size) / 255.0  # 정규화
                data.append(img)
    # numpy 배열로 변환 후 반환
    return np.array(data).reshape(-1, target_size[0], target_size[1], 1)

fdct 이미지 로드

In [None]:
fdct_train = load_images('/content/drive/MyDrive/DScover_main_project/fdct_data512/train')
fdct_valid = load_images('/content/drive/MyDrive/DScover_main_project/fdct_data512/valid')
fdct_test = load_images('/content/drive/MyDrive/DScover_main_project/fdct_data512/test')

In [None]:
ldct_train = load_images('/content/drive/MyDrive/DScover_main_project/ldct_data512/train')
ldct_valid = load_images('/content/drive/MyDrive/DScover_main_project/ldct_data512/valid')
ldct_test = load_images('/content/drive/MyDrive/DScover_main_project/ldct_data512/test')

In [None]:
print("ldct_train shape:", ldct_train.shape)
print("fdct_train shape:", fdct_train.shape)

ldct_train shape: (613, 512, 512, 1)
fdct_train shape: (613, 512, 512, 1)


# GAN-CIRCLE 구현
왜냐면 데이터가 unpaired라서.. paired 하려면 할 수 있을 것 같은데

In [None]:
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, LeakyReLU
from tensorflow.keras.models import Model

## generator

In [None]:
def build_generator(input_shape=(512, 512, 1)):
    inputs = Input(shape=input_shape)
    x = Conv2D(64, (7, 7), padding='same', activation='relu')(inputs)
    x = Conv2D(128, (3, 3), strides=2, padding='same', activation='relu')(x)
    x = Conv2D(256, (3, 3), strides=2, padding='same', activation='relu')(x)
    x = Conv2DTranspose(128, (3, 3), strides=2, padding='same', activation='relu')(x)
    x = Conv2DTranspose(64, (3, 3), strides=2, padding='same', activation='relu')(x)
    outputs = Conv2D(1, (7, 7), padding='same', activation='tanh')(x)
    return Model(inputs, outputs, name="Generator")

## discriminator

In [None]:
def build_discriminator(input_shape=(512, 512, 1)):
    inputs = Input(shape=input_shape)
    x = Conv2D(64, (4, 4), strides=2, padding='same')(inputs)
    x = LeakyReLU(alpha=0.2)(x)
    x = Conv2D(128, (4, 4), strides=2, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = Conv2D(256, (4, 4), strides=2, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = Conv2D(512, (4, 4), strides=2, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    outputs = Conv2D(1, (4, 4), padding='same')(x)
    return Model(inputs, outputs, name="Discriminator")

## 모델 초기화

In [None]:
generator_g = build_generator()  # LDCT → FDCT
generator_f = build_generator()  # FDCT → LDCT
discriminator_x = build_discriminator()  # FDCT 판별기
discriminator_y = build_discriminator()  # LDCT 판별기



In [None]:
gen_optimizer = Adam(1e-4, beta_1=0.5)
disc_optimizer = Adam(1e-4, beta_1=0.5)

## 손실함수
1. adversarial loss
2. cycle consistency loss
3. identity loss

In [None]:
# Adversarial Loss
def adversarial_loss(real, fake):
    return tf.reduce_mean(tf.square(real - 1)) + tf.reduce_mean(tf.square(fake))

In [None]:
# Cycle Consistency Loss
def cycle_consistency_loss(real, cycled):
    return tf.reduce_mean(tf.abs(real - cycled))

In [None]:
# Identity Loss
def identity_loss(real, same):
    return tf.reduce_mean(tf.abs(real - same))

## 학습 루프

In [None]:
epochs = 50
batch_size = 4

In [None]:
for epoch in range(50):
    print(f"Epoch {epoch+1}/50")

    for i in range(0, len(ldct_train), batch_size):
        # 배치 데이터 준비
        ldct_batch = ldct_train[i:i+batch_size]
        fdct_batch = fdct_train[i:i+batch_size]

        with tf.GradientTape(persistent=True) as tape:
            # 생성기 예측
            fake_fdct = generator_g(ldct_batch)
            fake_ldct = generator_f(fdct_batch)
            cycled_ldct = generator_f(fake_fdct)
            cycled_fdct = generator_g(fake_ldct)

            # 판별기 예측
            disc_real_fdct = discriminator_x(fdct_batch)
            disc_fake_fdct = discriminator_x(fake_fdct)
            disc_real_ldct = discriminator_y(ldct_batch)
            disc_fake_ldct = discriminator_y(fake_ldct)

            # 손실 계산
            gen_g_loss = adversarial_loss(disc_real_fdct, disc_fake_fdct) \
                         + cycle_consistency_loss(ldct_batch, cycled_ldct) \
                         + identity_loss(fdct_batch, fake_fdct)

            gen_f_loss = adversarial_loss(disc_real_ldct, disc_fake_ldct) \
                         + cycle_consistency_loss(fdct_batch, cycled_fdct) \
                         + identity_loss(ldct_batch, fake_ldct)

            disc_x_loss = adversarial_loss(disc_real_fdct, disc_fake_fdct)
            disc_y_loss = adversarial_loss(disc_real_ldct, disc_fake_ldct)

        # 그래디언트 계산
        gen_g_gradients = tape.gradient(gen_g_loss, generator_g.trainable_variables)
        gen_f_gradients = tape.gradient(gen_f_loss, generator_f.trainable_variables)
        disc_x_gradients = tape.gradient(disc_x_loss, discriminator_x.trainable_variables)
        disc_y_gradients = tape.gradient(disc_y_loss, discriminator_y.trainable_variables)

        # 그래디언트 적용
        gen_optimizer.apply_gradients(zip(gen_g_gradients, generator_g.trainable_variables))
        gen_optimizer.apply_gradients(zip(gen_f_gradients, generator_f.trainable_variables))
        disc_optimizer.apply_gradients(zip(disc_x_gradients, discriminator_x.trainable_variables))
        disc_optimizer.apply_gradients(zip(disc_y_gradients, discriminator_y.trainable_variables))

Epoch 1/50


In [None]:
# 성능 확인
print(f"Epoch {epoch+1}, Gen_G_Loss: {gen_g_loss.numpy()}, Gen_F_Loss: {gen_f_loss.numpy()}, "
      f"Disc_X_Loss: {disc_x_loss.numpy()}, Disc_Y_Loss: {disc_y_loss.numpy()}")

## 모델 평가 w/ PSNR, SSIM

In [None]:
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim

In [None]:
restored_fdct = generator_g.predict(ldct_test)

In [None]:
psnr_scores = [psnr(fdct_test[i], restored_fdct[i]) for i in range(len(fdct_test))]
ssim_scores = [ssim(fdct_test[i].squeeze(), restored_fdct[i].squeeze()) for i in range(len(fdct_test))]

print("Average PSNR:", np.mean(psnr_scores))
print("Average SSIM:", np.mean(ssim_scores))